In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MiAplicacion").getOrCreate()

In [2]:
import os
csv_files = [os.path.join("archive", f) for f in os.listdir("archive") if f.endswith('.csv')]

df = spark.read.options(header='True', inferSchema='True').csv(csv_files)

In [3]:
from pyspark.sql import functions as F

# Nulos

In [4]:
# Muestra conteo de nulos por columna
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     20339246|8757117|    0|      0|        4598|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+



In [4]:
# Imputamos los valores nulos ya que las columnas en cuestion no son críticas.
df_clean = df.fillna({
    "category_code": "unknown",
    "brand": "unknown"
})

# Al ser pocas filas, optamos por eliminarlas. 
df_clean = df_clean.filter(F.col("user_session").isNotNull())

In [6]:
# Verificación

df_clean.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_clean.columns]).show()

+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|         0|         0|         0|          0|            0|    0|    0|      0|           0|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+



# Outliers

In [7]:
# Estadísticas básicas de precio
df_clean.select(
    F.mean("price").alias("avg_price"),
    F.stddev("price").alias("std_price"),
    F.min("price").alias("min_price"),
    F.max("price").alias("max_price")
).show()

# Filtrar precios "razonables" (ej: entre 0.01 y 10,000)
df_clean = df_clean.filter((F.col("price") > 0.01) & (F.col("price") < 10000))

+----------------+------------------+---------+---------+
|       avg_price|         std_price|min_price|max_price|
+----------------+------------------+---------+---------+
|8.53489224700748|19.382060565203542|   -79.37|   327.78|
+----------------+------------------+---------+---------+



In [5]:
# Eliminamos precios negativos, podrían ser reembolsos, cupones etc. Pero deberia estar especificado de otra forma.

df_clean = df_clean.filter(F.col("price") >= 0)

In [10]:
# Estadísticas básicas de precio
df_clean.select(
    F.mean("price").alias("avg_price"),
    F.stddev("price").alias("std_price"),
    F.min("price").alias("min_price"),
    F.max("price").alias("max_price")
).show()

# Filtrar precios "razonables" (ej: entre 0.01 y 10,000)
df_clean = df_clean.filter((F.col("price") > 0.01) & (F.col("price") < 10000))

+-----------------+------------------+---------+---------+
|        avg_price|         std_price|min_price|max_price|
+-----------------+------------------+---------+---------+
|9.377097434957705|20.792348796755153|     0.05|   327.78|
+-----------------+------------------+---------+---------+



In [11]:
quantiles = df_clean.approxQuantile("price", [0.25, 0.75, 0.99], 0.01)
Q1, Q3, P99 = quantiles[0], quantiles[1], quantiles[2]
IQR = Q3 - Q1

upper_limit = Q3 + 1.5 * IQR
print(f"Límite superior (IQR): {upper_limit}, Percentil 99%: {P99}")

Límite superior (IQR): 15.75, Percentil 99%: 327.78


A pesar de que puede ser extraño, comprendemos que puede haber cosmeticos de hasta mas de 300 dolares. Por lo que no tomamos medidas respecto a valores atipicos.

In [17]:
df_clean.filter(F.col("price")>300).filter((F.col("brand") != "strong")  & (F.col("brand") != "unknown")).show()

+-------------------+----------+----------+-------------------+-------------+------+-----+---------+--------------------+
|         event_time|event_type|product_id|        category_id|category_code| brand|price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+-------------+------+-----+---------+--------------------+
|2019-11-14 16:54:04|      view|   5635474|1487580008984608779|      unknown|entity|307.6|571598796|18af287e-6279-41c...|
|2019-10-07 10:00:58|      view|   5635474|1487580008984608779|      unknown|entity|307.6|440589741|726655f1-fa53-42e...|
|2019-10-09 14:50:11|      view|   5635474|1487580008984608779|      unknown|entity|307.6|558545066|464127c4-1cfb-423...|
|2019-10-09 18:43:02|      view|   5635474|1487580008984608779|      unknown|entity|307.6|480150376|72b5f64a-618c-4a8...|
|2019-10-15 09:50:40|      view|   5635474|1487580008984608779|      unknown|entity|307.6|560482622|583ea02a-fd3f-44d...|
|2019-10-19 15:10:14|   

In [21]:
from pyspark.sql.window import Window

# Agregar columna de eventos por día
user_daily_events = df_clean.withColumn(
    "events_per_day", 
    F.count("*").over(Window.partitionBy("user_id", F.to_date("event_time")))
)

# Filtrar usuarios con >100 eventos/día y contar ocurrencias
high_activity_users = user_daily_events.filter(F.col("events_per_day") > 100)
print(f"Usuarios con >100 eventos/día: {high_activity_users.select('user_id').distinct().count()}")
high_activity_users.select("user_id", "events_per_day").distinct().show(10)

Usuarios con >100 eventos/día: 14795
+---------+--------------+
|  user_id|events_per_day|
+---------+--------------+
|343320891|           171|
|414453512|           102|
|438606371|           106|
|479888694|           143|
|483574352|           183|
|495786914|           123|
|527692709|           106|
|527739278|           201|
|550355871|           181|
|563408172|           266|
+---------+--------------+
only showing top 10 rows



In [27]:
# Filtrar usuarios con >100 eventos/día y contar ocurrencias
high_activity_users = user_daily_events.filter(F.col("events_per_day") < 100)
print(f"Usuarios con >100 eventos/día: {high_activity_users.select('user_id').distinct().count()}")
high_activity_users.select("user_id", "events_per_day").distinct().show(10)

Usuarios con >100 eventos/día: 1636982
+---------+--------------+
|  user_id|events_per_day|
+---------+--------------+
| 31156111|             1|
| 34915661|             1|
| 67944478|             1|
|152961343|             1|
|153316955|            17|
|196042408|             4|
|204142009|             1|
|204166748|             7|
|223293700|            28|
|228684512|             2|
+---------+--------------+
only showing top 10 rows



Vamos a tomar una decision sensata sin mayores complicaciones por el momento. Hay aproximadamente 1.6 millones de usuarios, de los cuales 15 mil usuarios tienen mas de 100 sesiones por dia. Vamos a eliminar los registros con mas de 150 sesiones, con el fin de preservar algun posible cliente extremo o revendedor pero limitando la posibilidad de bots obvios en la plataforma.

In [25]:
# 1. Calcular eventos diarios por usuario
df_clean = df_clean.withColumn(
    "events_per_day", 
    F.count("*").over(Window.partitionBy("user_id", F.to_date("event_time")))
)

# 2. Filtrar SOLO registros donde eventos_per_day < 150
df_clean = df_clean.filter(F.col("events_per_day") < 150).drop("events_per_day")

In [22]:
from pyspark.sql.functions import datediff

session_duration = df_clean.groupBy("user_session").agg(
    F.min("event_time").alias("start_time"),
    F.max("event_time").alias("end_time"),
    datediff(F.max("event_time"), F.min("event_time")).alias("duration_days")
)

# Top 10 sesiones más largas
session_duration.orderBy(F.desc("duration_days")).show(10)

+--------------------+-------------------+-------------------+-------------+
|        user_session|         start_time|           end_time|duration_days|
+--------------------+-------------------+-------------------+-------------+
|2c1569d4-8ab3-414...|2019-10-01 11:12:49|2020-02-29 10:50:25|          151|
|52b30a79-923b-461...|2019-10-01 06:07:13|2020-02-29 08:06:45|          151|
|ac7b1c23-10a7-485...|2019-10-01 17:32:33|2020-02-29 18:34:01|          151|
|38785db2-b8d6-4c3...|2019-10-01 15:05:11|2020-02-29 13:30:03|          151|
|ae74cec4-ae31-447...|2019-10-01 02:48:54|2020-02-29 17:34:50|          151|
|1345d1ab-4163-46c...|2019-10-01 18:40:21|2020-02-29 07:54:17|          151|
|099fefe4-a74c-4da...|2019-10-01 03:52:13|2020-02-29 14:31:51|          151|
|5b9bcf07-5c80-4f9...|2019-10-01 03:34:49|2020-02-29 13:15:36|          151|
|32d76835-fa23-4e5...|2019-10-01 16:06:58|2020-02-29 14:44:47|          151|
|b8ec8ea3-5fbd-40e...|2019-10-01 02:14:54|2020-02-29 02:08:35|          151|

Observamos una cantidad anomala de dias de duracion en algunas sesiones, algo que no es posible. Una sesion no puede durar 5 meses. Vamos a explorar la cantidad de sesiones por rangos de dias mas aceptables.

In [None]:
from pyspark.sql.functions import count, when

# Calcular la duración en días de cada sesión (si no lo has hecho)
session_duration = df_clean.groupBy("user_session").agg(
    (F.datediff(F.max("event_time"), F.min("event_time"))).alias("duration_days")
)

# Contar sesiones en cada rango
duration_bins = session_duration.select(
    F.count(F.when(F.col("duration_days") < 15, True)).alias("<15_days"),
    F.count(F.when(F.col("duration_days") > 15, True)).alias(">15_days"),
    F.count(F.when(F.col("duration_days") > 30, True)).alias(">30_days"),
    F.count(F.when(F.col("duration_days") > 50, True)).alias(">50_days"),
    F.count(F.when(F.col("duration_days") > 100, True)).alias(">100_days"),
    F.count("*").alias("total_sessions")
).show()

+--------+--------+--------+--------+---------+--------------+
|<15_days|>15_days|>30_days|>50_days|>100_days|total_sessions|
+--------+--------+--------+--------+---------+--------------+
| 4356772|    9911|    7265|    5697|     1904|       4366938|
+--------+--------+--------+--------+---------+--------------+

