# Proceso GLUE Eventos Amplitude

## 1. Cargamos las librerias

In [1]:
%%capture
!pip install -q awswrangler

In [5]:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import boto3
import ast
from datetime import datetime, timedelta
import awswrangler as wr
from itertools import chain
import gc
import sys
import time
from sklearn import preprocessing

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
glue = boto3.client('glue')
s3 = boto3.resource('s3')
ssm = boto3.client('ssm') 
lakeformation = boto3.client('lakeformation')

### 2. Armamos el proceso en "GLUE LTV-RFM AR"

In [72]:
%%writefile get_data_amplitude.py

import sys
import pyspark.sql.functions as func
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.types import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F
import json
import boto3
import ast
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import gc
import sys
from pyspark.conf import SparkConf
import pandas as pd

print('Lectura de parámetros')

# ----------------------------------------------------------------------------------
print('NOW:', datetime.now())

args = getResolvedOptions(sys.argv,
                          ['bucket_amplitude_data', 
                           'today', 
                           'kms_key_arn', 
                           'recommendations_bucket'])

bucket_amplitude_data = args['bucket_amplitude_data']
recommendations_bucket = args['recommendations_bucket']
kms_key_id = args['kms_key_arn']
today = args['today']


#--------------------------------------------------------------------------------------------------------------

print('Spark Configuración')

spark_conf = SparkConf().setAll([
  ("spark.hadoop.fs.s3.enableServerSideEncryption", "true"),
  ("spark.hadoop.fs.s3.serverSideEncryption.kms.keyId", kms_key_id)
])

sc = SparkContext(conf=spark_conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
logger = glueContext.get_logger()



print('Crear objetos S3-ssm')
# ----------------------------------------------------------------------------------
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
ssm = boto3.client('ssm')

#--------------------------------------------------------------------------------------------------------------
print('Parámetros:')
path_key_amplitude = 'ar/amplitude/tb_ar_amplitude_events_stage/'

## FECHAS INTERVALO
#print('1. CALCULO DE FECHAS')
##Today llevado al primero del mes menos 1 día
#today = datetime.strptime(today, '%Y-%m-%d').date().replace(day=1)
#last_day=(today-pd.offsets.DateOffset(days=1)).date()
##
#first_day=(last_day-pd.offsets.DateOffset(days=365)).date()
#
#print('2. Intevalo de fechas analizada: ',first_day,'y',last_day)

def first_and_last(today):
    fecha=datetime.strptime(today, '%Y-%m-%d').date()
    first_day=fecha.replace(day=1)
    next_month = fecha.replace(day=28) + timedelta(days=4)
    last_day_of_month = next_month - timedelta(days=next_month.day)
    return first_day,last_day_of_month

print('Declaración de funciones')
def list_objects_function(buckets_, first_day, last_day, keys_, retrieve_last=False):
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(buckets_)
    files_in_bucket = list(bucket.objects.all())
    files_objets = [f"s3://{buckets_}/" + i.key for i in files_in_bucket if
                        (i.key.find(keys_) >= 0) and (i.key.find('.parquet') >= 0)]
    df_bucket_files = pd.DataFrame({
            'key': [i[:(i.find('dt=') + 14)] for i in files_objets],
            'path': files_objets,
            'date': pd.to_datetime([i[(i.find('dt=') + 3):(i.find('dt=') + 13)] for i in files_objets])
        })
    files=list(df_bucket_files.loc[df_bucket_files['date'].between(str(first_day),str(last_day)),'path'].values)
    return files



map_events = {
    "cuoti_selecciona_elegircuotas" : "cuotificaciones",
    "cuoti_sigue_seleccion_consumos" : "cuotificaciones",
    "prestamos_selecciona_simular_prestamo": "prestamos",
    "prestamos_espera": "prestamos",
    "general_ingresa_promociones": "promociones",
    "recargas_click_empezar": "recargas",
    "recargas_click_repetir": "recargas",
    "transferencia_selecciona_tieneuala": "transferencia_c2c",
    "transferencia_selecciona_notieneuala": "transferencia_cvu",
    "general_ingresa_cobros": "cobros",
    "cobros_acepta_tyc" : "cobros",
    "cobros_elige_link": "cobros",
    "cobros_elige_mpos": "cobros",
    "pagos_empezar": "pago_servicios",
    "click_inversiones":"inversiones"
}

eventos_recommendations = list(map_events.keys())
#-----------------------------------------------------------------------------------------------------------------
first_day,last_day = first_and_last(today)
print('Primer dia',first_day)
print('Ultimo dia',last_day)

files_objets_amplitude = list_objects_function(bucket_amplitude_data, first_day, last_day ,path_key_amplitude)

print(f'Hay {len(files_objets_amplitude)} archivos de survival en la carpeta')
df_amplitude = spark.read.parquet(*files_objets_amplitude).select(['user_id',"os_name","event_type","event_time"])
df_amplitude=df_amplitude.filter(df_amplitude.event_type.isin(eventos_recommendations))

df_amplitude = df_amplitude.withColumn('year_month', F.date_format(df_amplitude.event_time,'YYYY-MM'))

df_amplitude = df_amplitude.drop("event_time")

df_amplitude = df_amplitude.na.replace(map_events,1,"event_type")

df_amplitude = (df_amplitude    
      .groupBy(['user_id', 'event_type', 'year_month'])
      .agg(F.count('event_type').alias('cant'),
           F.max('os_name').alias('os_name'))
      .groupBy(['user_id','year_month','os_name'])
      .pivot("event_type")
      .agg(F.sum('cant'))
      .na.fill(0)
      )

df_amplitude.write\
     .format('parquet')\
     .save(f's3://{recommendations_bucket}/data/raw/amplitude/dt={str(first_day)}', mode='overwrite')

print('Ubicación files', f's3://{recommendations_bucket}/data/raw/amplitude/dt={str(first_day)}')

#DELETE $FOLDER$

def retrieve_files(path, file_type, list_dates):
    bucket=path.split('/')[2]
    prefix='/'.join(path.split('/')[3:])
    list_objects=list(s3.Bucket(bucket).objects.all())
    list_objects=[f's3://{bucket}/{i.key}' for i in list_objects if ((i.key.find(prefix)>=0) & any(x in i.key.lower() for x in list_dates) & (i.key.find(file_type)>=0))]
    return list_objects


delete_files = retrieve_files(path=f's3://{recommendations_bucket}/data/', file_type='$folder$', list_dates=['$folder$'])
print('Files to delete', delete_files)
files_keys=[]
for i in range(0,len(delete_files)):
    files_keys=files_keys+[{'Key':('/').join(delete_files[i].split('/')[3:])}]
if len(files_keys)>0:
    s3_client.delete_objects(Bucket=recommendations_bucket,
                             Delete={'Objects':files_keys})
del delete_files
gc.collect()

print(df_amplitude.show())
print((df_amplitude.count(), len(df_amplitude.columns)))

Overwriting get_data_amplitude.py


In [73]:
job_name='test-job_recommendations_amplitude'

In [74]:
# borrar job
#glue.delete_job(
#    JobName=job_name
#)

## 3. Generamos los parametros

In [75]:
today = '2021-01-10'
bucket_amplitude_data='churn-ds-stage'  ## AFIP, GP, etc
recommendations_bucket='test-uala-arg-datalake-aiml-recommendations'  # Para outputs
kms_key_arn='arn:aws:kms:us-east-1:322149183112:key/9cc44b23-c5e9-46cb-9987-0982d21f8d00' ## key para desencriptar

In [76]:
s3 = boto3.resource('s3')

# Guardar el archivo .py
s3.meta.client.upload_file('get_data_amplitude.py', 
                           recommendations_bucket, #bucket
                           'artifacts/code/amplitude/get_data_amplitude.py' #key+filename
)
print('.py uploaded')

.py uploaded


## 4. Creamos el job de GLUE

In [77]:
#job = glue.create_job(Name=job_name, 
#                      GlueVersion='2.0',
#                      Role='ML_AWSGlueServiceRole',
#                      Command={'Name': 'glueetl',
#                               'ScriptLocation': f's3://{recommendations_bucket}/artifacts/code/amplitude/get_data_amplitude.py'},
#                      DefaultArguments={
#                        '--additional-python-modules': 'dateutil==2.8.1'},
#                      MaxCapacity=3
#                      )

In [78]:
job_run = glue.start_job_run(
    JobName = job_name,
    Arguments = {
        '--today':today,
        '--bucket_amplitude_data': bucket_amplitude_data,
        '--recommendations_bucket': recommendations_bucket,
        '--kms_key_arn': kms_key_arn
    } 
)

In [79]:
print(job_run)

{'JobRunId': 'jr_cf2978c1ed6c989cac356896b755adc916606f4449ff8bf16a36c86932e7beca', 'ResponseMetadata': {'RequestId': 'ded488e5-e827-4452-bb2d-5e053957841f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Thu, 20 May 2021 19:56:31 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '82', 'connection': 'keep-alive', 'x-amzn-requestid': 'ded488e5-e827-4452-bb2d-5e053957841f'}, 'RetryAttempts': 0}}


In [80]:
MAX_WAIT_TIME=time.time() + 60*10 # 1 hour
max_time = time.time() + MAX_WAIT_TIME
while time.time() < max_time:
    response=glue.get_job_run(JobName=job_name, RunId=job_run['JobRunId'])
    status = response['JobRun']['JobRunState']
    print('Job run: {}'.format(status))
    
    if status == 'SUCCEEDED' or status == 'FAILED':
        break
        
    time.sleep(45)

Job run: RUNNING
Job run: RUNNING
Job run: RUNNING
Job run: RUNNING
Job run: RUNNING
Job run: RUNNING
Job run: SUCCEEDED


## 6. Controlamos la importación de datos

In [105]:
#pd.set_option('display.max_columns', None)

In [14]:
today = datetime.strptime(today, '%Y-%m-%d').date().replace(day=1)
last_day=(today-pd.offsets.DateOffset(days=1)).date()
full_path = f's3://{recommendations_bucket}/data/dt={str(last_day)}/full'
train_path = f's3://{recommendations_bucket}/data/dt={str(last_day)}/train'

In [15]:
df_train=wr.s3.read_parquet(train_path)
df_full=wr.s3.read_parquet(full_path)

In [16]:
df_train.shape, df_full.shape

((152191, 10), (193545, 5))

In [17]:
df_train

Unnamed: 0,cuenta,frequency_cal,recency_cal,T_cal,importe_dolar_cal,frequency_holdout,recency_holdout,T_holdout,importe_dolar_holdout,duration_holdout
0,1009908,6,124,157,14.025951,11,23,24,30.281554,30
1,1015208,38,157,206,13.938252,5,19,20,9.678834,30
2,1015941,11,164,206,4.206839,,,,,30
3,1015973,15,149,189,25.578332,,,,,30
4,1017613,12,146,189,24.595988,0,0,27,64.322470,30
...,...,...,...,...,...,...,...,...,...,...
6293,9993893,8,72,117,9.133972,1,4,30,15.608866,30
6294,9998071,7,50,158,8.829365,0,0,19,21.440823,30
6295,9998945,10,40,73,28.421985,8,19,29,30.284401,30
6296,9999517,4,14,49,7.375704,9,18,19,10.176126,30


In [3]:
print(datetime.today().replace(day=1))

2021-05-01 17:51:44.086588


31


In [11]:

def last_day_month(fecha):
    next_month = fecha.replace(day=28) + datetime.timedelta(days=4)
    last_day_of_month = next_month - datetime.timedelta(days=next_month.day)
    return last_day_of_month
print(last_day_month(datetime.datetime.today().date()))

2021-05-31


In [39]:
today='2021-01-10'
def first_and_last(today):
    fecha=datetime.strptime(today, '%Y-%m-%d').date()
    first_day=fecha.replace(day=1)
    next_month = fecha.replace(day=28) + timedelta(days=4)
    last_day_of_month = next_month - timedelta(days=next_month.day)
    return first_day,last_day_of_month

In [40]:
first_and_last(today)

(datetime.date(2021, 1, 1), datetime.date(2021, 1, 31))

In [3]:
df=wr.s3.read_parquet(f's3://uala-arg-datalake-aiml-survival-dev/data/raw/GP_7001',dataset=True,partition_filter=lambda x: '2020-05-31' in x["dt"])

In [7]:
df[df.accountgp==4026497].head(5)

Unnamed: 0,accountgp,nu_tcc_r_aprob,nu_tcc_t_aprob,nu_tcc_z_aprob,vl_tcc_r_aprob,vl_tcc_t_aprob,vl_tcc_z_aprob,nu_mode_digital_qty_0_aprob,nu_mode_digital_qty_1_aprob,nu_automatic_debit_aprob,nu_cash_out_cvu_aprob,nu_consumption_pos_aprob,nu_investments_withdraw_aprob,nu_telerecargas_carga_aprob,nu_user_to_user_aprob,nu_withdraw_atm_aprob,vl_automatic_debit_aprob,vl_cash_out_cvu_aprob,vl_consumption_pos_aprob,vl_investments_withdraw_aprob,vl_telerecargas_carga_aprob,vl_user_to_user_aprob,vl_withdraw_atm_aprob,nu_sin_categoria_aprob,nu_compras_aprob,nu_entretenimiento_aprob,nu_servicios_débitos_automaticos_aprob,nu_supermercados_alimentos_aprob,nu_transferencias_retiros_aprob,vl_sin_categoria_aprob,vl_compras_aprob,vl_entretenimiento_aprob,vl_servicios_débitos_automaticos_aprob,vl_supermercados_alimentos_aprob,vl_transferencias_retiros_aprob,nu_tcc_r_rech,nu_tcc_t_rech,nu_tcc_z_rech,vl_tcc_r_rech,vl_tcc_t_rech,vl_tcc_z_rech,nu_mode_digital_qty_0_rech,nu_mode_digital_qty_1_rech,nu_automatic_debit_rech,nu_cash_out_cvu_rech,nu_consumption_pos_rech,nu_investments_withdraw_rech,nu_telerecargas_carga_rech,nu_user_to_user_rech,nu_withdraw_atm_rech,vl_automatic_debit_rech,vl_cash_out_cvu_rech,vl_consumption_pos_rech,vl_investments_withdraw_rech,vl_telerecargas_carga_rech,vl_user_to_user_rech,vl_withdraw_atm_rech,nu_sin_categoria_rech,nu_compras_rech,nu_entretenimiento_rech,nu_servicios_débitos_automaticos_rech,nu_supermercados_alimentos_rech,nu_transferencias_retiros_rech,vl_sin_categoria_rech,vl_compras_rech,vl_entretenimiento_rech,vl_servicios_débitos_automaticos_rech,vl_supermercados_alimentos_rech,vl_transferencias_retiros_rech,dt
8649,4026497,0,24,0,0.0,135.1,0.0,6,23,1,3,24,1,0,0,0,0.82,130.96,141.7,137.85,0.0,0.0,0.0,5,18,2,4,0,0,276.23,104.07,12.85,18.18,0.0,0.0,0,0,0,0.0,0.0,0.0,0,0,0,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,2020-05-31
