In [1]:
import luigi

from src.utils import general as gral
from src.pipeline import ingesta_almacenamiento as ing
import src.utils.constants as cte

from datetime import date, timedelta, datetime
import pickle

In [None]:
%%file LuigiIngestionTasks.py

import luigi

from src.utils import general as gral
from src.pipeline import ingesta_almacenamiento as ing
import src.utils.constants as cte

from datetime import date, timedelta, datetime
import pickle


class IngestionTask(luigi.Task):
    
    path_cred = luigi.Parameter(default = 'credentials.yaml')
    initial = luigi.BoolParameter(default=True, parsing = luigi.BoolParameter.EXPLICIT_PARSING)
    limit = luigi.IntParameter(default = 300000)
    date = luigi.DateParameter(default = None)
                    
    def output(self):
        
        hoy = datetime.today().strftime('%Y-%m-%d')
        
        if self.initial:
            # 'historical'
            file_name = cte.BUCKET_PATH_HIST + '{}.pkl'.format(hoy)
        else:
            # 'consecutive'
            file_name = cte.BUCKET_PATH_CONS + '{}.pkl'.format(hoy)
                
        return luigi.local_target.LocalTarget(file_name, format = luigi.format.Nop)
        
    def run(self):
        
        s3_c = gral.get_s3_credentials(self.path_cred)
        my_token = gral.get_api_token(self.path_cred)
        
        cliente = ing.get_client(data_url = cte.DATA_URL, token = my_token)
        
        if self.initial:
            datos = ing.ingesta_inicial(
                cliente, 
                data_set = cte.DATA_SET, 
                limit = self.limit
            )
            
        else:
            datos = ing.ingesta_consecutiva(
                client = cliente, 
                data_set = cte.DATA_SET, 
                fecha = self.date.strftime('%Y-%m-%d'), 
                limit = self.limit
            )

        with self.output().open('wb') as f:
            pickle.dump(datos, f)
        

            
if __name__ == '__main__':
    luigi.run()

In [None]:
%%bash
PYTHONPATH='.' luigi \
--module src.pipeline.LuigiIngestionTasks IngestionTask \
--local-scheduler \
--path-cred ./conf/local/credentials.yaml \
--initial true \
--limit 100 \
--date '2021-03-15' 

In [None]:
%%bash
PYTHONPATH='.' luigi \
--module src.pipeline.LuigiIngestionTasks IngestionTask \
--local-scheduler \
--path-cred ./conf/local/credentials.yaml \
--initial false \
--limit 1000 \
--date '2021-01-15'

In [12]:
%%file LuigiTasks2.py

import luigi
import luigi.contrib.s3
from src.utils import general as gral
from src.pipeline import ingesta_almacenamiento as ing
from src.pipeline.LuigiIngestionTasks import IngestionTask
import src.utils.constants as cte
from datetime import date, timedelta, datetime
import pickle

class ExportFileTask(luigi.Task):
    
    path_cred = luigi.Parameter(default = 'credentials.yaml')
    initial = luigi.BoolParameter(default=True, parsing = luigi.BoolParameter.EXPLICIT_PARSING)
    limit = luigi.IntParameter(default = 300000)
    date = luigi.DateParameter(default = None)
    bucket_path = luigi.Parameter(default = cte.BUCKET)# Bucket en archivo de constantes
    
    # Se requiere IngestionTask
    def requires(self):
        
        return IngestionTask(self.path_cred, self.initial, self.limit, self.date)
    
    # Se carga el archivo a ser usado
    def input(self):
        
        hoy = datetime.today().strftime('%Y-%m-%d')
        
        if self.initial:
            file_name = cte.BUCKET_PATH_HIST + '{}.pkl'.format(hoy)
        else:
            file_name = cte.BUCKET_PATH_CONS + '{}.pkl'.format(hoy)
        
        with open(file_name, 'rb') as f:
            data = pickle.load(f)

        return data

    def output(self):
        
        s3_c = gral.get_s3_credentials(self.path_cred)
        client_s3 = luigi.contrib.s3.S3Client(aws_access_key_id = s3_c["aws_access_key_id"],
                             aws_secret_access_key = s3_c["aws_secret_access_key"])
        
        hoy = datetime.today().strftime('%Y-%m-%d')
        
        if self.initial:
            file_type = cte.BUCKET_PATH_HIST 
        else:
            file_type = cte.BUCKET_PATH_CONS
            
        output_path = "s3://{}/{}{}.pkl".format(cte.BUCKET, file_type, hoy)

        return luigi.contrib.s3.S3Target(path = output_path, client = client_s3)
    
    
    def run(self):
        
        if self.initial:
            file_type = cte.BUCKET_PATH_HIST 
        else:
            file_type = cte.BUCKET_PATH_CONS
                    
        data = self.input()
        
        ing.guardar_ingesta(
            path_cred = self.path_cred, 
            bucket = self.bucket_path, 
            bucket_path = file_type, 
            data = data
        )

    
            
if __name__ == '__main__':
    luigi.run()

Overwriting LuigiTasks2.py


In [None]:
%%bash
PYTHONPATH='.' luigi \
--module src.pipeline.LuigiTasks2 ExportFileTask \
--path-cred ./conf/local/credentials.yaml \
--initial false \
--limit 1000 \
--date '2021-03-15' \
--local-scheduler \
#--bucket-path 'data-product-architecture-equipo-n'

In [None]:
%%bash
PYTHONPATH='.' luigi \
--module src.pipeline.LuigiTasks2 ExportFileTask \
--path-cred ./conf/local/credentials.yaml \
--initial true \
--limit 300000 \
--date '2021-03-15' \
--local-scheduler \
#--bucket-path 'data-product-architecture-equipo-n'

In [None]:
%%bash
PYTHONPATH='.' luigi \
--module src.pipeline.LuigiTasks2 ExportFileTask \
--path-cred ./conf/local/credentials.yaml \
--initial false \
--limit 1000 \
--date '2021-03-15'
#--bucket-path 'data-product-architecture-equipo-n'