In [6]:
# init values
def create_spark_session():
    """
    Crea una sesión de Spark y devuelve el objeto SparkSession.
    """
    spark = SparkSession.builder \
        .appName("Inicialización de datos") \
        .getOrCreate()
    return spark

def init_dataframes(spark, input_directory):
    """
    Inicializa los dataframes con los datos de entrada y devuelve los dataframes.
    """
    # Ejemplo de carga de datos desde archivos CSV y JSON
    df_csv = spark.read.csv(input_directory + "/pays.csv", header=True, inferSchema=True)
    df_json1 = spark.read.json(input_directory + "/prints.json")
    df_json2 = spark.read.json(input_directory + "/taps.json")

    return df_csv, df_json1, df_json2

In [8]:
from pyspark.sql import SparkSession

# Crea una sesión de Spark
spark = create_spark_session()

# Directorio de entrada
input_directory = "./data"
output_directory = "./output"

# Inicializa los dataframes con los datos de entrada
df_csv, df_json1, df_json2 = init_dataframes(spark, input_directory)

                                                                                

In [13]:
from pyspark.sql.functions import *

df_csv.show()

+----------+------+-------+------------------+
|  pay_date| total|user_id|        value_prop|
+----------+------+-------+------------------+
|2020-11-01|  7.04|  35994|        link_cobro|
|2020-11-01| 37.36|  79066|cellphone_recharge|
|2020-11-01| 15.84|  19321|cellphone_recharge|
|2020-11-01| 26.26|  19321|        send_money|
|2020-11-01| 35.35|  38438|        send_money|
|2020-11-01| 20.95|  85939|         transport|
|2020-11-01| 74.48|  14372|           prepaid|
|2020-11-01| 31.52|  14372|        link_cobro|
|2020-11-01| 83.76|  65274|         transport|
|2020-11-01| 93.54|  65274|           prepaid|
|2020-11-01| 37.84|  97428|        link_cobro|
|2020-11-01| 26.77|  82163|        link_cobro|
|2020-11-01| 92.56|   9816|        send_money|
|2020-11-01|122.03|   9816|           prepaid|
|2020-11-01| 83.66|  28929|           prepaid|
|2020-11-01|136.78|  97275|        link_cobro|
|2020-11-01| 17.34|  85001|cellphone_recharge|
|2020-11-01| 41.93|  85001|        link_cobro|
|2020-11-01| 

In [60]:
#df_json1.select("day").distinct().orderBy(col("day").asc()).show(truncate=False)
df_json1.show(truncate=False)

+----------+-----------------------+-------+
|day       |event_data             |user_id|
+----------+-----------------------+-------+
|2020-11-01|{0, cellphone_recharge}|98702  |
|2020-11-01|{1, prepaid}           |98702  |
|2020-11-01|{0, prepaid}           |63252  |
|2020-11-01|{0, cellphone_recharge}|24728  |
|2020-11-01|{1, link_cobro}        |24728  |
|2020-11-01|{2, credits_consumer}  |24728  |
|2020-11-01|{3, point}             |24728  |
|2020-11-01|{0, point}             |25517  |
|2020-11-01|{1, credits_consumer}  |25517  |
|2020-11-01|{2, transport}         |25517  |
|2020-11-01|{0, point}             |57587  |
|2020-11-01|{0, transport}         |13609  |
|2020-11-01|{0, cellphone_recharge}|3708   |
|2020-11-01|{1, prepaid}           |3708   |
|2020-11-01|{2, point}             |3708   |
|2020-11-01|{3, send_money}        |3708   |
|2020-11-01|{0, send_money}        |99571  |
|2020-11-01|{1, point}             |99571  |
|2020-11-01|{2, link_cobro}        |99571  |
|2020-11-0

In [61]:
df_json2.select("day").distinct().orderBy(col("day").asc()).show(truncate=False)

+----------+
|day       |
+----------+
|2020-11-01|
|2020-11-02|
|2020-11-03|
|2020-11-04|
|2020-11-05|
|2020-11-06|
|2020-11-07|
|2020-11-08|
|2020-11-09|
|2020-11-10|
|2020-11-11|
|2020-11-12|
|2020-11-13|
|2020-11-14|
|2020-11-15|
|2020-11-16|
|2020-11-17|
|2020-11-18|
|2020-11-19|
|2020-11-20|
+----------+
only showing top 20 rows



In [9]:
import datetime

current_date = datetime.datetime.now().date()
three_weeks_ago = current_date - datetime.timedelta(days=21)
one_week_ago = current_date - datetime.timedelta(days=7)

print("Fecha actual:", current_date)
print("Hace tres semanas:", three_weeks_ago)
print("Hace una semana:", one_week_ago)

Fecha actual: 2024-03-20
Hace tres semanas: 2024-02-28
Hace una semana: 2024-03-13


In [10]:
fecha_str = "2020-11-21"

# Convertir la cadena a un objeto de fecha
current_date = datetime.datetime.strptime(fecha_str, "%Y-%m-%d").date()
three_weeks_ago = current_date - datetime.timedelta(days=21)
one_week_ago = current_date - datetime.timedelta(days=7)

print("Fecha actual:", current_date)
print("Hace tres semanas:", three_weeks_ago)
print("Hace una semana:", one_week_ago)


Fecha actual: 2020-11-21
Hace tres semanas: 2020-10-31
Hace una semana: 2020-11-14


In [64]:
print("Cantidad de registros prints: ", df_json1.count())
print("Cantidad de registros taps: ", df_json2.count())

Cantidad de registros prints:  508617
Cantidad de registros taps:  50859


In [14]:
# Convertir la columna 'day' a tipo fecha en los DataFrames df_prints, df_taps
df_prints = df_json1.withColumn("day", col("day").cast("date")).selectExpr("user_id", "day", "event_data.value_prop as value_prop")
df_taps = df_json2.withColumn("day", col("day").cast("date")).selectExpr("user_id", "day", "event_data.value_prop as value_prop")

# Filtrar los eventos de la última semana para cada "print"
df_last_week_prints = df_prints.filter(col("day") > one_week_ago)
df_last_week_taps = df_taps.filter(col("day") > one_week_ago)

df_3weeks_prints = df_prints.filter((col("day") > three_weeks_ago) & (col("day") < one_week_ago))
df_3weeks_taps = df_taps.filter((col("day") > three_weeks_ago) & (col("day") < one_week_ago))
df_3weeks_pays = df_csv.withColumn("pay_date", col("pay_date").cast("date")).filter((col("pay_date") > three_weeks_ago) & (col("pay_date") < one_week_ago))


In [66]:
#df_last_week_prints.show(truncate=False)
#df_last_week_prints.count()

# Verificar el número de filas antes de eliminar duplicados
num_rows_before = df_last_week_prints.count()

# Eliminar filas duplicadas
df_no_duplicates = df_last_week_prints.dropDuplicates()

# Verificar el número de filas después de eliminar duplicados
num_rows_after = df_no_duplicates.count()

# Calcular el número de filas duplicadas
num_duplicates = num_rows_before - num_rows_after

print("Número de filas antes de eliminar duplicados:", num_rows_before)
print("Número de filas después de eliminar duplicados:", num_rows_after)
print("Número de filas duplicadas eliminadas:", num_duplicates)




Número de filas antes de eliminar duplicados: 266518
Número de filas después de eliminar duplicados: 266518
Número de filas duplicadas eliminadas: 0


                                                                                

In [67]:
#df_last_week_taps.show(truncate=False)
#df_last_week_taps.count()

# Verificar el número de filas antes de eliminar duplicados
num_rows_before = df_last_week_taps.count()

# Eliminar filas duplicadas
df_no_duplicates = df_last_week_taps.dropDuplicates()

# Verificar el número de filas después de eliminar duplicados
num_rows_after = df_no_duplicates.count()

# Calcular el número de filas duplicadas
num_duplicates = num_rows_before - num_rows_after

print("Número de filas antes de eliminar duplicados:", num_rows_before)
print("Número de filas después de eliminar duplicados:", num_rows_after)
print("Número de filas duplicadas eliminadas:", num_duplicates)

Número de filas antes de eliminar duplicados: 26643
Número de filas después de eliminar duplicados: 26643
Número de filas duplicadas eliminadas: 0


In [76]:
df_prints_with_taps = df_last_week_prints.alias("prints")\
            .join(df_last_week_taps.alias("taps"), 
                  on = ["user_id", "value_prop", "day"], 
                  how = "left_outer")\
            .select("prints.*", when(col("taps.day").isNull(), False).otherwise(True).alias("clicked"))

print("Cantidad de registros: ", df_prints_with_taps.count())
print("Cantidad de registros: ", df_last_week_prints.count())
df_prints_with_taps.show()

                                                                                

Cantidad de registros:  266518
Cantidad de registros:  266518
+-------+------------------+----------+-------+
|user_id|        value_prop|       day|clicked|
+-------+------------------+----------+-------+
|  30069|        send_money|2020-11-15|  false|
|  47604|           prepaid|2020-11-15|  false|
|  43158|             point|2020-11-15|  false|
|  43158|cellphone_recharge|2020-11-15|  false|
|  43158|           prepaid|2020-11-15|  false|
|  48531|        send_money|2020-11-15|  false|
|  48531|        link_cobro|2020-11-15|  false|
|  52959|         transport|2020-11-15|  false|
|  89193|cellphone_recharge|2020-11-15|   true|
|  89193|         transport|2020-11-15|  false|
|  89193|           prepaid|2020-11-15|  false|
|  89193|  credits_consumer|2020-11-15|  false|
|  42567|        send_money|2020-11-15|  false|
|  39495|             point|2020-11-15|   true|
|   4974|        send_money|2020-11-15|  false|
|   4974|        link_cobro|2020-11-15|  false|
|  60208|cellphone_recharg

In [77]:
df_json1_3weeks_grouped = df_3weeks_prints.groupBy("user_id", "value_prop").count().withColumnRenamed("count", "print_count_3weeks")
df_json2_3weeks_grouped = df_3weeks_taps.groupBy("user_id", "value_prop").count().withColumnRenamed("count", "tap_count_3weeks")
df_csv_3weeks_grouped = df_3weeks_pays.groupBy("user_id", "value_prop")\
    .agg(count("total").alias("payment_count_3weeks"), sum("total").alias("total_spent_3weeks"))


In [78]:
df_csv_3weeks_grouped.show()



+-------+------------------+--------------------+------------------+
|user_id|        value_prop|payment_count_3weeks|total_spent_3weeks|
+-------+------------------+--------------------+------------------+
|  85418|             point|                   1|            109.64|
|  82670|        link_cobro|                   1|              5.03|
|  91597|         transport|                   1|              0.69|
|  43625|        send_money|                   1|            165.56|
|  33567|cellphone_recharge|                   1|             22.24|
|  30669|  credits_consumer|                   2|11.200000000000001|
|  44230|        send_money|                   1|             40.48|
|   1594|        send_money|                   1|             71.05|
|   3000|cellphone_recharge|                   1|            153.31|
|  19103|        link_cobro|                   2|24.299999999999997|
|  57926|  credits_consumer|                   1|             65.61|
|  99924|           prepaid|      

                                                                                

In [79]:
df_final = df_prints_with_taps.join(df_json1_3weeks_grouped, ["user_id", "value_prop"], "left")\
            .join(df_json2_3weeks_grouped, ["user_id", "value_prop"], "left")\
            .join(df_csv_3weeks_grouped, ["user_id", "value_prop"], "left") 

In [80]:
df_final.count()

                                                                                

266518

In [81]:
df_final.show()

                                                                                

+-------+------------------+----------+-------+------------------+----------------+--------------------+------------------+
|user_id|        value_prop|       day|clicked|print_count_3weeks|tap_count_3weeks|payment_count_3weeks|total_spent_3weeks|
+-------+------------------+----------+-------+------------------+----------------+--------------------+------------------+
|   3047|  credits_consumer|2020-11-15|  false|                 1|            NULL|                NULL|              NULL|
|   4335|  credits_consumer|2020-11-17|  false|                 1|            NULL|                   1|              23.5|
|   8408|        link_cobro|2020-11-29|  false|              NULL|            NULL|                NULL|              NULL|
|  14003|             point|2020-11-29|   true|              NULL|            NULL|                NULL|              NULL|
|  20018|cellphone_recharge|2020-11-17|  false|              NULL|            NULL|                NULL|              NULL|
|  24793

In [91]:
compresor = 'snappy'

df_final.coalesce(1).write.mode("overwrite").parquet(output_directory, compression=compresor)


                                                                                

In [2]:
import os
os.path.join(os.getcwd(), "config", "config.conf")

'/Users/andrescarrillo/projects/ml_pipeline/config/config.conf'

In [4]:
config_file = os.path.join(os.getcwd(), "config", "config.conf")
config.read(config_file)

NameError: name 'config' is not defined