# Accessing Files on FTPs/SFTPs with Airflow.

## Agenda:
- Basic Airflow architecture.

- Simple FTP requests

- PythonOperator

- Hook+Python Operator

- Scheduling

## Airflow words.

- DAG: Directed Acyclic Graph, the structure Airflow uses for its workflows. Each DAG has an ordering (one task can depend on another (Directed)) and contains no cycles (acyclic). A DAG is made up of seperate tasks that are the  configuration for the workflow's structure - all the heavy lifting is done in the hooks and operators.

- Hooks: Files used by Airflow to interact with external systems (databases, APIs, etc.)

- Operators: The atomic unit of logic in Airflow - these files determine how the work gets done.

# Simple FTP requests:

#### There are plenty of different modules for dealing with FTPs in Python. For SFTPs, _paramiko_ is the best library to use.
#### This unsecured example uses ftplib.

_Note_: For . All commands used here have a paramiko equivalent.

#### _Suppose you have some workflow that downloads data off of an FTP, does some transformation, and uploads it on a cron schedule. 

In [None]:
from ftplib import FTP

def download_file(connection, file_name):
    """
    Downloads file from FTP.
    """

    filename = 'sample_data.csv'

    localfile = open(filename, 'wb')
    ftp.retrbinary('RETR ' + filename, localfile.write, 1024)

    ftp.quit()
    localfile.close()
    
    #Do some manipulations to the local file. 
    

def upload_file(connection, file_name):
    """
    Uploads file as binary to FTP. 
    """
    filename = 'sample_data.csv'
    ftp.storbinary('STOR '+filename, open(filename, 'rb'))
    ftp.quit()
    
host = ''
username = ''
password = ''
port = 21

ftp = FTP(host)
connection = ftp.login(username, password)


download_file(connection, 'test.csv')
upload_file(connection, 'test.csv')


## Doing this in Airflow.

The fastest way to do this in a DAG is to use the PythonOperator:

In [None]:
# Import Airflow specific dependencies. 
from airflow import DAG
from airflow.operators import DummyOperator
from airflow.operators.python_operator import PythonOperator 
from datetime import datetime

#Import the module for the FTP.. 
from ftplib import FTP


#Define functions
def upload_file(**kwargs):
    """
    Uploads file as binary to FTP. 
    """
    
    credentials = kwargs.get('templates_dict').get('credentials', None)
    host = credentials['host']
    username = credentials['username']
    password = credentials['password']
    
    
    ftp = FTP(host)
    ftp.login(username, password)

    
    filename = 'sample_data.csv'
    ftp.storbinary('STOR '+filename, open(filename, 'rb'))
    ftp.quit()


def download_file(**kwargs):
    """
    Downloads file from FTP.
    """
    
    credentials = kwargs.get('templates_dict').get('credentials', None)
    host = credentials['host']
    username = credentials['username']
    password = credentials['password']
    

    ftp = FTP(host)
    ftp.login(username, password)

    filename = 'sample_data.csv'

    localfile = open(filename, 'wb')
    ftp.retrbinary('RETR ' + filename, localfile.write, 1024)

    ftp.quit()
    localfile.close()


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 12, 19)
}


# Schedule this DAG to run once.
dag = DAG('test_ftp',
          description='Manipulating FTPs with PythonOperators',
          schedule_interval='@once',
          start_date=datetime(2017, 12, 18),
          default_args=default_args)

# FTP creds
credentials= {
    'host' : ''
    'username' : '',
    'password' : '',
    'port' : 21
    
}
with dag:
    # Dummy start DAG.
    kick_off_dag = DummyOperator(task_id='kick_off_dag')

    # Call the functions

    download_file = PythonOperator(
        task_id='download_file',
        python_callable=download_file,
        # This passes the date into the function as a dictionaryt.
        templates_dict={'credentials': credentials},
        provide_context=True
    )
    
    upload_file = PythonOperator(
    task_id='upload_file',
    python_callable=upload_file, #function-name
    # This passes the params into the function as a dictionaryt.
    templates_dict={'credentials': credentials},
    provide_context=True
    )
    
    
    # Set dependencies.First the kickoff, then the download, and finally, the upload.
    # A task won't start until the one before it does.
    # e.g. the upload won't start until the download taks finishes. 
    kick_off_dag >>  download_file >> upload_file

### Why use the PythonOperator?

Airflow is made up of 3 core components: the webserver, the scheduler, and the executor.
    
    Webserver - Responsible for the UI in the browser.
    Scheduler - Handles the scheduling and state of tasks.
    Executor- Handles executing underlying code.
    
The scheduler "heartbeats" DAG files every few seconds before sending anything anything to the executor.
Each "heartbeat" executes **all** top level code. Any code that isn't wrappped in an operator is executed 
each heartbeat, making it incredibly expensive.

**Airflow Best Practice: Minimize top-level code. **
    
The PythonOperator is a quick and dirty way around this - just throw your function in a PythonOperator and you 
can leverage Airflow's scheduling and dependency capabilities. 

### Lots of repeated code.

Python Operators make it easy to take previous scripts and use Airflow for scheduling and manage depedencies, but they lead to a fair deal of repeated code and aren't anymore modular than regular python functions. Furthermore, they make the DAG file itself cluttered. 

** Airflow Best Practice: The DAG file should be as close to a "config" file as possible. **


### Using the FTP Hook.

Using a hook to handle the connection can clean this code up a ton by handling the connection to the FTP.

https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/contrib/hooks/ftp_hook.py

In [None]:
from airflow import DAG
from airflow.operators import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.contrib.hooks import FTPHook


def upload_file(**kwargs):
    """
    Uploads file as binary to FTP. 
    """

    hook = FTP(ftp_conn_id='astro_ftp').get_conn()

    local_path = 'sample_data.csv'
    remote_path = '/astro_test/saple_data.csv'
    
    hook.store_file(local_path, remote_path)
    hook.close()


def download_file(**kwargs):
    """
    Downloads file from FTP.
    """
    hook = FTPHook(ftp_conn_id='astro_ftp').get_conn()

    local_path = 'sample_data.csv'
    remote_path = '/astro_test/saple_data.csv'
    
    hook.retrieve_file(local_path, remote_path)
    hook.close()


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 12, 19)
}


# Schedule this DAG to run once.
dag = DAG('test_ftp',
          description='Manipulating FTPs with PythonOperators+Hooks',
          schedule_interval='@once',
          start_date=datetime(2017, 12, 18),
          default_args=default_args)

with dag:

    kick_off_dag = DummyOperator(task_id='kick_off_dag')

    upload_file = PythonOperator(
        task_id='upload_file',
        python_callable=upload_file,
        # This passes the params into the function.
        provide_context=True
    )
    
    download_file = PythonOperator(
        task_id='download_file',
        python_callable=download_file,
        # This passes the date into the function.
        provide_context=True
    )
    
    # Set dependencies.
    kick_off_dag >> upload_file >> download_file

### Handling Connections.

Notice when the hook was instanstiated, it was simply passed the name of the connection instead of the actual credentials used. All hooks inherit from the BaseHook, which has access to the Airflow database that stores connections.

![connections](img/airflow_connections.png)

The Connections Panel can be accessed from the UI. from Admin -> Connections.

Connections are fernet key encrypted after they're entered, prevent credentials from going into files, 
and can be used by other DAGs in the same instance.

## Not "Airflowic" Enough

Using the hook with the PythonOperator cut down on repeated code, 
but the DAG file doesn't read like a config file yet.

To polish it off, we'll write a custom FTPtoS3Operator.

In [None]:
from airflow.models import BaseOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.contrib.hooks import FTPHook
import os
import pandas as pd
import boa
import logging


class FTPToS3Operator(BaseOperator):
    """
    SFTP To S3 Operator
    :param ftp_conn_id:     The source FTP conn_id.
    :type sftp_conn_id:     string
    :param ftp_path:        The path to the file on the FTP client.
    :type sftp_path:        string
    :param s3_conn_id:      The s3 connnection id.
    :type s3_conn_id:       string
    :param s3_bucket:       The destination s3 bucket.
    :type s3_bucket:        string
    :param s3_key:          The destination s3 key.
    :type s3_key:           string
    """

    template_fields = ('s3_key',)

    def __init__(self,
                 ftp_conn_id,
                 ftp_directory,
                 local_path,
                 s3_conn_id,
                 s3_bucket,
                 s3_key,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.ftp = ftp_conn_id
        self.ftp_directory = ftp_directory
        self.local_path = local_path
        self.s3_conn_id = s3_conn_id
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key

    def execute(self, context):
        
        # In an operator, everything should be fed to the execute operator.

        # Operators use hooks for external connections.
        ftp = FTPHook(ftp_conn_id=self.ftp_conn_id)
        s3_hook = S3Hook(self.s3_conn_id)
        
        # Log out everything in the directory. 
        logging.info(self.ftp_directory)
        
        download_file(self, ftp)
        upload_to_s3(self, s3_hook)
    
    def download_file(self, ftp):
        
        ftp.retrieve_file(self.local_path, self.remote_path)
        logging.info("Downloaded file!")
    
    
    def upload_to_s3(self, s3_hook):
        s3_hook.load_file(
            filename=(self.s3_key).split('/')[1],
            key='{0}'.format(self.s3_key),
            bucket_name=self.s3_bucket,
            replace=True
        )


Now that all the logic is offloaded to the operator, the DAG file itself is a lot cleaner:

In [None]:
# Import Airflow specific dependencies. 
from airflow import DAG
from airflow.operators import DummyOperator
from plugins.ftp_plugin.operators import FTPToS3Operator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 12, 19)
}

# Schedule this DAG to run once.
dag = DAG('test_ftp',
          description='FTPs with FTPOperator',
          schedule_interval='@once',
          start_date=datetime(2017, 12, 18),
          default_args=default_args)


with dag:
    # Dummy start DAG.
    kick_off_dag = DummyOperator(task_id='kick_off_dag')

    # Call the functions

    ftp_to_s3 = FTPToS3Operator(
        task_id='download_file',
        ftp_conn_id = 'astro-ftp',
        ftp_director='',
        local_path= '/temp/test_data.csv',
        s3_conn_id='astronomer_s3',
        s3_bucket ='astronomer-worflows-dev',
        s3_key ='test_data.csv',
    )
    
    # Set dependencies.First the kickoff, then the download, and finally, the upload.
    # A task won't start until the one before it does.
    # e.g. the upload won't start until the download taks finishes. 
    kick_off_dag >>  ftp_to_s3

All custom logic should go inside the operator class. 
For example, suppose we wanted to add an option to delete the file off of the FTP after downloading it:

In [None]:
from airflow.models import BaseOperator
from airflow.hooks.S3_hook import S3Hook
from airflow.contrib.hooks import FTPHook
import os
import pandas as pd
import boa
import logging


class FTPToS3Operator(BaseOperator):
    """
    SFTP To S3 Operator
    :param ftp_conn_id:     The source FTP conn_id.
    :type sftp_conn_id:     string
    :param ftp_path:        The path to the file on the FTP client.
    :type sftp_path:        string
    :param s3_conn_id:      The s3 connnection id.
    :type s3_conn_id:       string
    :param s3_bucket:       The destination s3 bucket.
    :type s3_bucket:        string
    :param s3_key:          The destination s3 key.
    :type s3_key:           string
    """

    template_fields = ('s3_key',)

    def __init__(self,
                 ftp_conn_id,
                 ftp_directory,
                 local_path,
                 s3_conn_id,
                 s3_bucket,
                 s3_key,
                 delete=False, # Add another property here
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.ftp = ftp_conn_id
        self.ftp_directory = ftp_directory
        self.local_path = local_path
        self.s3_conn_id = s3_conn_id
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        self.delete = delete

    def execute(self, context):
        
        # In an operator, everything should be fed to the execute operator.

        # Operators use hooks for external connections.
        ftp = FTPHook(ftp_conn_id=self.ftp_conn_id)
        s3_hook = S3Hook(self.s3_conn_id)
        
        # Log out everything in the directory. 
        logging.info(self.ftp_directory)
        
        download_file(self, ftp)
        upload_to_s3(self, s3_hook)
    
    def download_file(self, ftp):
        
        ftp.retrieve_file(self.local_path, self.remote_path)
        if self.delete:
            ftp.delete_file(self.remote_path)
            
        logging.info("Downloaded file!")
        
        # Maybe add some custom transforms here.
    
    
    def upload_to_s3(self, s3_hook):
        s3_hook.load_file(
            filename=(self.s3_key).split('/')[1],
            key='{0}'.format(self.s3_key),
            bucket_name=self.s3_bucket,
            replace=True
        )


This results in a one line change to the DAG file.

In [None]:
# Import Airflow specific dependencies. 
from airflow import DAG
from airflow.operators import DummyOperator
from plugins.ftp_plugin.operators import FTPToS3Operator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 12, 19)
}

# Schedule this DAG to run once.
dag = DAG('test_ftp',
          description='FTPs with FTPOperator',
          schedule_interval='@once',
          start_date=datetime(2017, 12, 18),
          default_args=default_args)


with dag:
    # Dummy start DAG.
    kick_off_dag = DummyOperator(task_id='kick_off_dag')

    # Call the functions

    ftp_to_s3 = FTPToS3Operator(
        task_id='download_file',
        ftp_conn_id = 'astro-ftp',
        ftp_director='',
        local_path= '/temp/test_data.csv',
        s3_conn_id='astronomer_s3',
        s3_bucket ='astronomer-worflows-dev',
        s3_key ='test_data.csv',
        delete=True
    )
    
    # Set dependencies.First the kickoff, then the download, and finally, the upload.
    # A task won't start until the one before it does.
    # e.g. the upload won't start until the download taks finishes. 
    kick_off_dag >>  ftp_to_s3

With all the heavy lifting done in the hook and operator files, the DAG file itself can mirror a config file:

In [None]:
# Import Airflow specific dependencies. 
from airflow import DAG
from airflow.operators import DummyOperator
from plugins.ftp_plugin.operators import FTPToS3Operator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 12, 19)
}

# Schedule this DAG to run once.
dag = DAG('test_ftp',
          description='FTPs with FTPOperator',
          schedule_interval='@once',
          start_date=datetime(2017, 12, 18),
          default_args=default_args)

files = [
    {
        'name' : 'sample_one.csv',
        'delete': True,
    },
    
    {
        'name': 'sample_two.csv',
        'delete': False
    },
    
    {
        'name' : 'sample_three.csv',
        'delete':True
        
    }
]

with dag:
    # Dummy start DAG.
    kick_off_dag = DummyOperator(task_id='kick_off_dag')
    for file in files:

        ftp_to_s3 = FTPToS3Operator(
            task_id='download_file_{0}'.format(file['name']),
            ftp_conn_id = 'astro-ftp',
            ftp_director='',
            local_path= '/temp/{0}'.format(file['name']),
            s3_conn_id='astronomer-s3',
            s3_bucket ='astronomer-worflows-dev',
            s3_key =file['name'],
            delete=file['delete']
        )

    # Set dependencies.First the kickoff, then the download, and finally, the upload.
    # A task won't start until the one before it does.
    # e.g. the upload won't start until the download taks finishes. 
    kick_off_dag >>  ftp_to_s3