## Process filtering

In [1]:
import numpy as np
import pandas as pd
pd.set_option('max_columns',None)
pd.set_option('max_rows',None)

In [2]:
import s3fs
import pyarrow as pa
import pyarrow.parquet as pq

def spark_read_parquet(s3_url: str, **args):
    fs = s3fs.S3FileSystem()
    # Leyendo base
    dataset = pq.ParquetDataset(s3_url, filesystem=fs)
    table = dataset.read()
    dataframe = table.to_pandas()

    del dataset, table

    return dataframe

fs = s3fs.S3FileSystem()

In [3]:
"""
MODULE: processing_filtering
This script extracts activos information from the activos file
Steps:
1. Get activos file
2. Get the needed columns and rows from activos file including the needed historical period
3. Create additional columns
"""

def processing_filtering(input_paths_activos):
    """Pending docstring"""
    # 1. Get filter file
    ids_final = get_filtering_file(input_paths_activos)
    print('get_filtering_file')
    # 2. Get demog file
    data_demog = get_demog_file(path)
    print('get_demog_file')
    # 3. Join between 
    ids_final2 = pd.merge(ids_final,data_demog[['id_cliente','id_numero_cliente','id_tp_cd']], on='id_cliente', how='left')
    print('ids_final_df')
    print('filtering_df created successfully')

    return ids_final2


def get_filtering_file(input_path):
    """ Gets the input_path to the activos file, drops some not useful columns
     and outputs a DataFrame
    :param input_path to activos file location
    :return: DataFrame
    """
    
    mes = '01'  ### REVISAR!!!!
    anio = '2021'   ### REVISAR!!!!
    
    filtering_file_path = input_path
    filtering = spark_read_parquet(filtering_file_path)

    filtering.rename(columns=lambda x: x.lower(), inplace=True)
    
    # Filter and casting variables
    filtering = filtering[filtering['id_cliente'].notnull()]
    filtering['id_cliente'] = filtering['id_cliente'].astype('int')
    
    ids = filtering.loc[~filtering['ds_estado_actual'].isin(['Cancelada','Castigado']),'id_cliente'].unique()
    ids_final = pd.DataFrame({'id_cliente':ids})
    ids_final['fecha_camp'] = int(anio+mes)  ### REVISAR!!!!

    return ids_final


def get_demog_file(path):
    """ Gets the input_path to the activos file, drops some not useful columns
     and outputs a DataFrame
    :param input_path to activos file location
    :return: DataFrame
    """

    # read the new table
    data_demog = spark_read_parquet(path)
    data_demog = data_demog[data_demog['ref_num'].str.isdigit()]

    # Join cedula, typeid and casting variables
    data_demog = data_demog.rename(columns={'cont_id':'id_cliente',
                                       'ref_num':'id_numero_cliente'})
    data_demog['id_tp_cd'] = data_demog['id_tp_cd'].astype('str')
    
    # Drop duplicates and casting variables
    data_demog = data_demog.drop_duplicates(subset=['id_tp_cd', 'id_numero_cliente'], keep='first')
    data_demog['id_cliente'] = data_demog['id_cliente'].fillna('-99').astype(np.int64)

    return data_demog

In [4]:
mes = '01'
anio = '2021'
input_paths_activos = 's3://data-bpop-dev-sandbox/estandarizado/productos/libranzas/productos_libranzas_dwh_M'+anio+mes
path = 's3://data-bpop-dev-sandbox/estandarizado/clientes/identificacion/clientes_identificacion_mdm_D20210218'

ids_final2 = processing_filtering(input_paths_activos)

print(ids_final2.shape)
ids_final2.head()

  labels, = index.labels


get_filtering_file
get_demog_file
ids_final_df
filtering_df created successfully
(305273, 4)


Unnamed: 0,id_cliente,fecha_camp,id_numero_cliente,id_tp_cd
0,102652295880533801,202101,13544804,1000003
1,102652294446682301,202101,2485304,1000003
2,102652295151150201,202101,6880226,1000003
3,102652314595088801,202101,41311613,1000003
4,102652308842687701,202101,36695801,1000003


In [18]:
import boto3

In [16]:
def list_files(s3_path):
    """
    Autor: ADL
    Email: ADL
    Descripcion; Función para ver los elementos que hay en una ruta determinada.
    Parámetros: texto con la ruta de interés
    Retorno: Listado de los elementos que se encuentra en la ruta determinada.
    """
    s3 = boto3.resource('s3')
    bucket = s3_path.split('//')[1].split('/')[0]
    my_bucket = s3.Bucket(bucket)
    prefix = '/'.join(s3_path.split('//')[1].split('/')[1:])
    n_sub = len(prefix.split('/'))
    list_obj = []
    for object_summary in my_bucket.objects.filter(Prefix=prefix):
        obj_x = object_summary.key.split('/')[n_sub - 1]
        if not obj_x in list_obj:
            list_obj.append(obj_x)
    return(list_obj)

In [21]:
list_files('s3://data-bpop-dev-sandbox/estandarizado/productos/activo-tarjeta-credito-nueva/')

['productos_activo-tarjeta-credito-nueva_masterfile_D20200511',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200512',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200513',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200514',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200518',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200519',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200520',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200521',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200522',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200526',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200527',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200528',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200529',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200601',
 'productos_activo-tarjeta-credito-nueva_masterfile_D20200602',
 'productos_activo-tarjeta-credito-nueva