# S3 Connection

This class is responsible for connecting to an **S3 Bucket**.

## S3 FileSystem

**S3FileSystem** builds on **aiobotocore** to provide a convenient Python filesystem interface for **S3**. In other words, this package simulates a folder structure on our work directory that handles needed **S3 API** calls just like our local directory.

In [1]:
import s3fs
import json

with open(r'../connections/connex.json', 'r') as f:
    connex = json.load(f)

By creating a **S3FileSystem** object, we are capable of oppening files on **S3** just like we do locally. Not only open them, but write and read information on these files.

In [2]:
fs = s3fs.S3FileSystem(key = connex['DEV']['s3']['key'], secret = connex['DEV']['s3']['secret']) 

fs.ls('lake-s3-dev')

with fs._open(path = 'lake-s3-dev/landing/first_s3fs_load/test.csv', mode = 'wb') as f:
    f.write(b'test,test2,test3\n1,2,3\na,b,c')

In [3]:
class S3Connection:
    
    def __init__(self, bucket_name: str, key: str, secret: str) -> None:
        self.bucket_name = bucket_name
        self.s3 = s3fs.S3FileSystem(key = key, secret = secret)
        
    def get(self, source_path: str):
        return self.s3._open(path = source_path, mode = 'rb')
        
    def upload(self , source_file, final_path: str):
        self.file = source_file
        self.final_path = self.s3._open(path = final_path, mode = 'wb')
        self.final_path.write(source_file)
        
    def move(self, source_path: str, final_path: str):
        self.source_path = self.s3._open(path = source_path, mode = 'rb')
        self.final_path = self.s3._open(path = final_path, mode = 'wb')
        
        self.final_path.write(self.source_source)
    
    def delete(self, source_path: str):
        return self.s3._rm_file(path = source_path)
        

# Executors and Tasks

This notebook has a brief explanation of how we are going to create generic **Classes** capable of handling important Data Lake jobs. *E.g.*, transferring data from different storages, creating tables on **AWS Athena** and **AWS Redshift**, managing **Glue Jobs**, *etc*.

## Staging Data with a optional transformation

The class we are creating below is capable of taking data from a parent directory and moving it to a dump directory. Furthermore, given a python script, it will run it with its respective arguments. This is a good option for a before hand cleaning.

In [4]:
##Local StagingExecutor for testing

class StagingExecutor_alpha:
    
    def __init__(self, parent_directory: str, dump_directory: str, archive_or_delete: str = "archive", py_exec_path: str = None, py_exec_args: dict = None) -> None:
        self.parent = parent_directory
        self.dump = dump_directory
        self.archive_or_delete = archive_or_delete
        self.py_exec_path = py_exec_path
        self.py_exec_args = py_exec_args

    def transfer(self) -> None:
        
        if not self.py_exec_path:
            print('No transformation required. Moving file using only parent and dump')
        
        elif self.py_exec_path:
            import sys
            sys.path.insert(1, self.py_exec_path)
            import py_exec
            
            if not self.py_exec_args:
                print('Python Executor does not require arguments')
            
            elif self.py_exec_args:
                print(f'Python Executor is running with the following parameters:\n{self.py_exec_args}')
                py_exec.main(self.parent, self.dump, **self.py_exec_args)
                
    def post_staging(self) -> None:
                
        if self.archive_or_delete == 'archive':
            print('File from landing will be moved to archive folder')
        
        elif self.archive_or_delete == 'delete':
            print('File will be deleted from landing')

In [5]:
class StagingExecutor:
    
    def __init__(self, parent_directory: str, dump_directory: str, archive_or_delete: str = "archive", py_exec_path: str = None, py_exec_args: dict = None) -> None:
        self.parent = parent_directory
        self.dump = dump_directory
        self.archive_or_delete = archive_or_delete
        self.py_exec_path = py_exec_path
        self.py_exec_args = py_exec_args

    def transfer(self) -> None:
        
        if not self.py_exec_path:
            print('No transformation required. Moving file using only parent and dump')
        
        elif self.py_exec_path:
            import sys
            sys.path.insert(1, self.py_exec_path)
            import py_exec
            
            if not self.py_exec_args:
                print('Python Executor does not require arguments')
            
            elif self.py_exec_args:
                print(f'Python Executor is running with the following parameters:\n{self.py_exec_args}')
                py_exec.main(self.parent, self.dump, **self.py_exec_args)
                
    def post_staging(self) -> None:
                
        if self.archive_or_delete == 'archive':
            print('File from landing will be moved to archive folder')
        
        elif self.archive_or_delete == 'delete':
            print('File will be deleted from landing')

## Downloader with unzip functionality

This executor will take directories as main arguments, and download them into a landing folder (local or on **S3**) using *requests*. Notwithstanding downloading, it can be set to unzip and organize raw data for a cleaner staging task.

In [10]:
import shutil
import requests
import zipfile
import io
import os


class DownloaderExecutor:
    def __init__(self, requests_arguments: dict, landing_directory: str, unzip: str = None) -> None:
        self.requests_arguments = requests_arguments ##A dictionary such as: {"url": URL, "params": PARAMS, ...}
        self.landing_directory = landing_directory
        self.unzip = unzip
        self.res = None
        ## this atribute should be set up by the DataLake/DataWarehouse class
        self.s3 = S3Connection(bucket_name = 'lake-s3-dev', key = connex['DEV']['s3']['key'], secret = connex['DEV']['s3']['secret'])
        
        
    def download(self):
        self.res = requests.get(**self.requests_arguments)
        
        if self.unzip == "unzip":
            print('Will have to unzip')
            temp = zipfile.ZipFile(io.BytesIO(self.res.content))
            temp.extractall('../data/temp/' + self.landing_directory.split("/")[-2] + '/')
            extracted_file = os.listdir('../data/temp/' + self.landing_directory.split("/")[-2] + '/')[0]
            with open('../data/temp/' + self.landing_directory.split("/")[-2] + '/' + extracted_file, 'rb') as extracted_data:
                self.s3.upload(extracted_data.read(), self.landing_directory)
            shutil.rmtree('../data/temp/' + self.landing_directory.split("/")[-2])
            
        elif not self.unzip:
            print('Does not unzip')
            self.s3 = S3Connection(bucket_name = 'lake-s3-dev', key = connex['DEV']['s3']['key'], secret = connex['DEV']['s3']['secret'])
            self.s3.upload(self.res.content, self.landing_directory)

To test de unzip functionallity, we are using the following dataset: https://www.stats.govt.nz/assets/Uploads/Retail-trade-survey/Retail-trade-survey-September-2020-quarter/Download-data/retail-trade-survey-september-2020-quarter-csv.zip

In [11]:
downexec = {"requests_arguments": {"url": "https://www.stats.govt.nz/assets/Uploads/Retail-trade-survey/Retail-trade-survey-September-2020-quarter/Download-data/retail-trade-survey-september-2020-quarter-csv.zip"}, "landing_directory": "lake-s3-dev/landing/retail_trade_survey/retail_trade_survey.csv", "unzip": "unzip"}
klass = DownloaderExecutor(**downexec)
klass.download()

Will have to unzip


# Orchestrator

Data orchestration is a relatively new concept to describe the set of technologies that abstracts data access across storage systems, virtualizes all the data, and presents the data via standardized APIs with a global namespace to data-driven applications. There is a clear need for data orchestration because of the increasing complexity of the data ecosystem due to new frameworks, cloud adoption/migration, as well as the rise of data-driven applications. [[Data Orchestrator]](https://dzone.com/articles/data-orchestration-its-open-source-but-what-is-it)

The Orchestrator is a class that will follow the routine_config.json, where one will declare which executors and their respective tasks to run. The model for our routine_config is:

```
{
    "routine_name": <ROUTINE_NAME>,
    "executors": {
        <EXECUTOR_CLASS>:  {
            "params":<__init__ PARAMETERS>,
            "tasks": <LIST_SELECTED_TASKS_FROM_EXECUTOR>
        }
    }
}   
```

In [9]:
class Orchestrator:
    
    def __init__(self, routine_config: dict) -> None:
        self.routine_config = routine_config
    
    def run_executors(self):
        for executor in routine_config['executors']:
            self.executor_name = executor
            klass = globals()[self.executor_name]
            self.executor = klass(**routine_config['executors'][self.executor_name]['params'])
            self.run_tasks()
            
    def run_tasks(self):
        for task in routine_config['executors'][self.executor_name]['tasks']:
            getattr(self.executor, task)()