In [1]:
#pysftp==0.2.9
#Imports presentacion
from IPython.display import clear_output

In [2]:
# Imports generales
import pysftp
import logging
import sys
import os
import pandas as pd

# Cargar directorio CDF y Configuraciones
sys.path.append('../')
from dotenv import load_dotenv

load_dotenv()

# Importar CDF
from centraal_dataframework.resources import datalake
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from centraal_dataframework.tasks import task_dq, task
from centraal_dataframework.excepciones import ErrorTareaCalidadDatos
from centraal_dataframework.resources import GreatExpectationsToolKit
from centraal_dataframework.runner import Runner

# Preparación de ambiente
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [4]:
runner = Runner()

In [None]:
CONTENEDOR = ""
CONTENEDOR_DESITNO = os.environ['datalake_cleandir']
WD = os.environ['datalake_workdir']

In [7]:
# Traer Archivos SFTP
@task
def get_scrapping_file(datalake, logger):
    # Variables
    last_file_date = 0
    last_file_name = ''
    scrapping_file = None
    csv_output_dir = f"{CONTENEDOR}/{WD}"
    # Abrir conexión
    cnopts = pysftp.CnOpts()
    cnopts.hostkeys = None
    sftp = pysftp.Connection(
        host=os.environ['sftp_servidor'],
        port=int(os.environ['sftp_port']),
        username=os.environ['sftp_usuario'],
        password=os.environ['sftp_clave'],
        cnopts=cnopts,
    )
    sftp.cwd(os.environ['sftp_raiz'])
    # Buscar el último archivo
    archivos = sftp.listdir_attr()
    for archivo in archivos:
        if archivo.longname[0] != 'd':
            if archivo.st_atime > last_file_date:
                last_file_date = archivo.st_atime
                last_file_name = archivo.filename
    # Cargar el archivo al DataFrame
    with sftp.open(last_file_name) as sfile:
        scrapping_file = pd.read_csv(sfile, sep=',', encoding='latin1')
        sftp.close()
    # Escribimos el DataFrame en nuestro raw-zone
    datalake.write_csv(scrapping_file, f"{csv_output_dir}/scrapping.csv", sep='|', index=False, encoding='latin1')

In [3]:
@task_dq
def scrapping_validate_prerequisites(datalake, gx_toolkit: GreatExpectationsToolKit, logger):
    """Valida los pre-requisitos básicos del archivo de Scrapping"""
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['scrapping_workdir'] + '/'
    source = datalake.read_csv(csv_input_dir + "scrapping.csv", sep="|", encoding='latin1')
    logger.info("Validando prerrequisitos del archivo Scrapping...")

    # Nombres de columnas
    scrapping_column_names = ExpectationConfiguration(
        expectation_type="expect_table_columns_to_match_set",
        kwargs={
            "column_set": [
                "date",
                "canal",
                "category",
                "subcategory",
                "subcategory2",
                "subcategory3",
                "marca",
                "modelo",
                "sku",
                "upc",
                "item",
                "item characteristics",
                "url sku",
                "image",
                "price",
                "sale price",
                "shipment cost",
                "sales flag",
                "store id",
                "store name",
                "store address",
                "stock",
                "upc wm",
                "final price",
                "upc wm2",
                "comp",
                "composition",
                "homogenized_clothing",
                "homogenized_subcategory",
                "homogenized_category",
                "homogenized_color",
                "made_in",
            ],
            "exact_match": True,
            "result_format": "SUMMARY",
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Las columnas del archivo de Scrapping no concuerdan con las esperadas",
            }
        },
    )

    # Price Not Null
    scrapping_price_notnull = ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "price", "result_format": "SUMMARY"},
        meta={
            "notes": {"format": "markdown", "content": "Algunos precios del archivo de Scrapping no están presentes"}
        },
    )

    # EXECUTE EXPECTATIONS
    url, result = gx_toolkit.run_expectations_on_df(
        source, "SCRAPPING_MANDATORY", [scrapping_column_names, scrapping_price_notnull]
    )
    clear_output(wait=True)
    if result.success:
        logger.info('Validación exitosa, se promueve a cleansed-zone')
        logger.info(result['url'])
        csv_output_dir = f"{CONTENEDOR_DESITNO}/{WD}/scrapping.csv"
        datalake.write_csv(source, csv_output_dir, sep="|", encoding='latin1')
    else:
        logger.info('ERROR: Validación fallida, se detiene el proceso.')
        logger.info(result['url'])
        raise ErrorTareaCalidadDatos(result)

In [5]:
@task_dq
def scrapping_validate_column_contents(datalake, gx_toolkit, logger):
    """Identifica inconsistencias en el contenido de las columnas del archivo Scrapping"""
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['scrapping_workdir'] + '/'
    source = datalake.read_csv(csv_input_dir+"scrapping.csv", sep="|", encoding = 'latin1')
    logger.info(source.head(1))
    logger.info("Validando contenido de columnas del archivo Scrapping...")
    # creaciones de expectativas

    scrapping_clothing_notnull = ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={
            "column":        "clothing",
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'clothing' no están presentes"
            }
        }
    )
    #homogenized_category NOT NULL
    scrapping_homogenized_category_notnull = ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={
            "column":        "homogenized_category",
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'homogenized_category' no están presentes"
            }
        }
    )

    #homogenized_subcategory NOT NULL
    scrapping_homogenized_subcategory_notnull = ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={
            "column":        "homogenized_subcategory",
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'homogenized_subcategory' no están presentes"
            }
        }
    )
    #homogenized_color NOT NULL
    scrapping_homogenized_color_notnull = ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={
            "column":        "homogenized_color",
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'homogenized_color' no están presentes"
            }
        }
    )
    #marca 9 values (up to 11) <- PARAMETER
    scrapping_marca_unique = ExpectationConfiguration(
        expectation_type="expect_column_unique_value_count_to_be_between",
        kwargs={
            "column":        "marca",
            "min_value":     9,
            "max_value":     11,
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'homogenized_color' no están presentes"
            }
        }
    )
    
    result = gx_toolkit.run_expectations_on_df(source, "SCRAPPING_CONSISTENCE", [scrapping_clothing_notnull, scrapping_homogenized_category_notnull,
                                                                              scrapping_homogenized_subcategory_notnull, scrapping_homogenized_color_notnull,
                                                                              scrapping_marca_unique])
    clear_output(wait=True)
    if result['status']:
        logger.info('Validación exitosa, se promueve a cleansed-zone')
    else:
        logger.info('WARNING: Validación fallida, continúa.')
        logger.info(result['url'])
    csv_output_dir = os.environ['datalake_cleandir'] + '/' + os.environ['scrapping_workdir'] + '/'
    datalake.write_csv(source, csv_output_dir+'scrapping.csv', sep="|", encoding = 'latin1')
    

In [7]:
@task_dq
def scrapping_validate_prices(datalake, gx_toolkit, logger):
    """Valida el rango de precios del archivo de Scrapping
       Según el lote Zara y de las demás marcas"""
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['scrapping_workdir'] + '/'
    source = datalake.read_csv(csv_input_dir + "scrapping.csv", sep="|", encoding = 'latin1')
    logger.info(source.head(1))
    logger.info("Validando contenido de columnas del archivo Scrapping...")
    # creaciones de expectativas
    #Final Price
    #Precio Final Between
    ##Zara 12.000 - 4.000.000
    scrapping_zara_price_range = ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column":        "final price",
            "min_value":     12000,
            "max_value":     4000000,
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Para la marca Zara, algunos precios no se encuentran en el rango esperado."
            }
        }
    )

    ##Otras - 12.000 - 1.300.000
    scrapping_otras_price_range = ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column":        "final price",
            "min_value":     12000,
            "max_value":     1300000,
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos precios no se encuentran en el rango esperado."
            }
        }
    )
    
    result_zara  = gx_toolkit.run_expectations_on_df(source[source['canal'] == 'Zara Colombia'], "SCRAPPING_PRECIOS_ZARA", [scrapping_zara_price_range])
    result_otras = gx_toolkit.run_expectations_on_df(source[source['canal'] != 'Zara Colombia'], "SCRAPPING_PRECIOS", [scrapping_otras_price_range])

    clear_output(wait=True)
    if result_zara['status'] and result_otras['status']:
        logger.info('Validación exitosa, se promueve a cleansed-zone')
    else:
        logger.info('WARNING: Validación fallida, continúa.')
        logger.info('ZARA:  '+result_zara['url'])
        logger.info('OTRAS: '+result_otras['url'])
    csv_output_dir = os.environ['datalake_cleandir'] + '/' + os.environ['scrapping_workdir'] + '/'
    datalake.write_csv(source, csv_output_dir+'scrapping.csv', sep="|", encoding = 'latin1')

In [9]:
@task_dq
def marcaspropias_validate_column_contents(datalake, gx_toolkit, logger):
    """Validar el contenido de las columnas de marcas propias"""
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['datasvcs_workdir'] + '/'
    source = datalake.read_csv(csv_input_dir + "marcas_propias.csv", sep=",", encoding='utf8')
    logger.info(source.head(1))
    logger.info("Validando contenido de columnas del archivo Marcas Propias...")
    # creaciones de expectativas
    marcprop_categoria_notnull = ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={
            "column":        "categoria",
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'categoria' no están presentes"
            }
        }
    )

    #Uso NOT NULL
    marcprop_use_notnull = ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={
            "column":        "use",
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'use' no están presentes"
            }
        }
    )

    #Tipo Prenda NOT NULL
    marcprop_tipo_prenda_notnull = ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={
            "column":        "prendasGenerales", #??????
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'prendasGenerales' no están presentes"
            }
        }
    )
    result = gx_toolkit.run_expectations_on_df(source, "MARCAS_PROPIAS_CONSISTENCY", [marcprop_categoria_notnull, marcprop_use_notnull,
                                                                                   marcprop_tipo_prenda_notnull])
    clear_output(wait=True)
    if result['status']:
        logger.info('Validación exitosa, se promueve a cleansed-zone')
    else:
        logger.info('WARNING: Validación fallida, continúa.')
        logger.info(result['url'])
    csv_output_dir = os.environ['datalake_cleandir'] + '/' + os.environ['datasvcs_workdir'] + '/'
    datalake.write_csv(source, csv_output_dir+'marcas_propias.csv', sep="|", encoding='utf8')
    


In [11]:
@task_dq
def homologaciones_validar_cantidad(datalake, gx_toolkit, logger):
    """Valida la cantidad de marcas que existen en el archivo de homologaciones"""
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['homologa_workdir'] + '/'
    source = datalake.read_csv(csv_input_dir + "homologaciones.csv", sep=",")
    logger.info(source.head(1))
    logger.info("Validando cantidad de registros en homologaciones..")
    homologacion_marca_join = ExpectationConfiguration(
        expectation_type="expect_column_unique_value_count_to_be_between",
        kwargs={
            "column":        "Marca",
            "min_value":     9,
            "max_value":     11,
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'marca' no están presentes"
            }
        }
    )
    result = gx_toolkit.run_expectations_on_df(source, "CANTIDAD_MARCAS_HOMOLOGACION", [homologacion_marca_join])
    clear_output(wait=True)
    if result['status']:
        logger.info('Validación exitosa, se promueve a cleansed-zone')
    else:
        logger.info('WARNING: Validación fallida, continúa.')
        logger.info(result['url'])
    csv_output_dir = os.environ['datalake_cleandir'] + '/' + os.environ['homologa_workdir'] + '/'
    datalake.write_csv(source, csv_output_dir+'homologaciones.csv', sep="|", encoding='utf8')
    

In [13]:
@task_dq
def ordentallas_validar_join(datalake, gx_toolkit, logger):
    """Validación de Orden Tallas para Join"""
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['homologa_workdir'] + '/'
    source = datalake.read_csv(csv_input_dir + "orden_tallas.csv", sep=",")
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['scrapping_workdir'] + '/'
    validation_set = datalake.read_csv(csv_input_dir + "scrapping.csv", sep="|", encoding='latin1')
    tallas_set = validation_set.stock.unique().tolist()
    logger.info(source.head(1))
    logger.info("Validando JOIN Orden Tallas...")

    ordentallas_marca_join = ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_in_set",
        kwargs={
            "column":        "Talla",
            "value_set":      tallas_set,
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'marca' no están presentes en el archivo de Orden Tallas"
            }
        }
    )
    result = gx_toolkit.run_expectations_on_df(source, "ORDENTALLA_MARCAS_JOIN", [ordentallas_marca_join])
    clear_output(wait=True)
    if result['status']:
        logger.info('Validación exitosa, se promueve a cleansed-zone')
    else:
        logger.info('WARNING: Validación fallida, continúa.')
        logger.info(result['url'])
    csv_output_dir = os.environ['datalake_cleandir'] + '/' + os.environ['homologa_workdir'] + '/'
    datalake.write_csv(source, csv_output_dir+'orden_tallas.csv', sep="|")



In [14]:
ordentallas_validar_join()

TAREA: ordentallas_validar_join--2023-11-30 08:54:40,978-INFO-Se genera un archivo privado: /calidad-datos\validations/ordentallas_validar_join_expectation_suite/__none__/20231130T135407.552650Z/ordentallas_validar_join_source-ordentallas_validar_join_source_ORDENTALLA_MARCAS_JOIN.html. Visitar portal azure para acceder resultados.


In [15]:
@task_dq
def ean_validar_cantidad_registros(datalake, gx_toolkit, logger):
    """Valida la canitdad de EANs"""
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['homologa_workdir'] + '/'
    source = datalake.read_csv(csv_input_dir + "ean.csv", sep="|")
    logger.info(source.head(1))
    logger.info("Validando Cantidad EAN...")
    ean_unique = ExpectationConfiguration(
        expectation_type="expect_column_unique_value_count_to_be_between",
        kwargs={
            "column":        "EAN",
            "min_value":     3000,
            "max_value":     4000,
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "La cantidad de productos presentes excede la esperada"
            }
        }
    )
    result = gx_toolkit.run_expectations_on_df(source, "CANTIDAD_EAN", [ean_unique])
    clear_output(wait=True)
    if result['status']:
        logger.info('Validación exitosa, se promueve a cleansed-zone')
    else:
        logger.info('WARNING: Validación fallida, continúa.')
        logger.info(result['url'])
    csv_output_dir = os.environ['datalake_cleandir'] + '/' + os.environ['homologa_workdir'] + '/'
    datalake.write_csv(source, csv_output_dir+'ean.csv', sep="|")
    
    

In [16]:
ean_validar_cantidad_registros()

TAREA: ean_validar_cantidad_registros--2023-11-30 08:55:19,701-INFO-Se genera un archivo privado: /calidad-datos\validations/ean_validar_cantidad_registros_expectation_suite/__none__/20231130T135445.252623Z/ean_validar_cantidad_registros_source-ean_validar_cantidad_registros_source_CANTIDAD_EAN.html. Visitar portal azure para acceder resultados.


In [17]:
@task_dq
def atributos_validar_referencias(datalake, gx_toolkit, logger):
    """Valida la existencia de las referencias para los atributos"""
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['homologa_workdir'] + '/'
    source = datalake.read_csv(csv_input_dir + "atributos.csv", sep="|")
    validation_set = datalake.read_csv(csv_input_dir + "ean.csv", sep="|")
    referencia_set = validation_set.REFERENCIA.unique().tolist()
    logger.info(source.head(1))
    logger.info("Validando Cantidad EAN...")
    ean_unique = ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_in_set",
        kwargs={
            "column":        "REFERENCIA",
            "value_set":      referencia_set,
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunas referencias del archivo de Atributos no están definidas en Marcas Propias"
            }
        }
    )
    result = gx_toolkit.run_expectations_on_df(source, "ATRIBUTOS_REFERENCIAS", [ean_unique])
    clear_output(wait=True)
    if result['status']:
        logger.info('Validación exitosa, se promueve a cleansed-zone')
    else:
        logger.info('WARNING: Validación fallida, continúa.')
        logger.info(result['url'])
    csv_output_dir = os.environ['datalake_cleandir'] + '/' + os.environ['homologa_workdir'] + '/'
    datalake.write_csv(source, csv_output_dir+'atributos.csv', sep="|")


In [18]:
atributos_validar_referencias()

TAREA: atributos_validar_referencias--2023-11-30 08:56:03,799-INFO-Se genera un archivo privado: /calidad-datos\validations/atributos_validar_referencias_expectation_suite/__none__/20231130T135526.261477Z/atributos_validar_referencias_source-atributos_validar_referencias_source_ATRIBUTOS_REFERENCIAS.html. Visitar portal azure para acceder resultados.


In [19]:
@task_dq
def tallasagotadas_validar_join(datalake, gx_toolkit, logger):
    """Veriifca las referencias de las tallas agotadas"""
    csv_input_dir = os.environ['datalake_workdir'] + '/' + os.environ['homologa_workdir'] + '/'
    source = datalake.read_csv(csv_input_dir+'tallas_agotadas.csv', sep='|')
    validation_set = datalake.read_csv(csv_input_dir+'ean.csv', sep='|')
    ean_set = validation_set.EAN.unique().tolist()
    logger.info(source.head(1))
    logger.info("Validando JOIN Orden Tallas...")

    tallasagotadas_marca_join = ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_in_set",
        kwargs={
            "column":        "PARTNUMBER",
            "value_set":      ean_set,
            "result_format": "SUMMARY"
        },
        meta={
            "notes": {
                "format": "markdown",
                "content": "Algunos elementos de la columna 'PARTNUMBER' no están presentes en el archivo de EAN"
            }
        }
    )
    result = gx_toolkit.run_expectations_on_df(source, "test", [tallasagotadas_marca_join])
    clear_output(wait=True)
    if result['status']:
        logger.info('Validación exitosa, se promueve a cleansed-zone')
    else:
        logger.info('WARNING: Validación fallida, continúa.')
        logger.info(result['url'])
    csv_output_dir = os.environ['datalake_cleandir'] + '/' + os.environ['homologa_workdir'] + '/'
    datalake.write_csv(source, csv_output_dir+'tallas_agotadas.csv', sep="|")

In [20]:
tallasagotadas_validar_join()

TAREA: tallasagotadas_validar_join--2023-11-30 08:56:56,809-INFO-Se genera un archivo privado: /calidad-datos\validations/tallasagotadas_validar_join_expectation_suite/__none__/20231130T135609.111781Z/tallasagotadas_validar_join_source-tallasagotadas_validar_join_source_test.html. Visitar portal azure para acceder resultados.
