In [1]:
#EXPLORACION - SMS FACTURACION
#Este notebook solo contiene un breve analisis exploratorio de datos.

In [2]:
#Confirguraci√≥n del entorno

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, count, when, isnan

spark = SparkSession.builder \
    .appName("SMS Facturacion - Exploracion") \
    .getOrCreate()

In [3]:
#Carga los datasets

path_eventos = "/home/jovyan/data_in/events.csv.gz"
path_free_sms = "/home/jovyan/data_in/free_sms_destinations.csv.gz"

eventos_df = spark.read.csv(path_eventos, header=True, inferSchema=True)
free_sms_df = spark.read.csv(path_free_sms, header=True, inferSchema=True)

In [4]:
#Solo para ver como estan estructurados
eventos_df.show(5)
free_sms_df.show(5)

+----+-----+-------+---+--------+------+---------+--------------+
|hour|calls|seconds|sms|    date|region|id_source|id_destination|
+----+-----+-------+---+--------+------+---------+--------------+
|  11|    1|     24|  0|20211001|     5|      BF3|           374|
|   1|    1|     51|  0|20211001|     4|      9F5|           374|
|  11|    1|      3|  0|20211001|     6|      025|           374|
|  10|    1|     36|  0|20211001|     5|      FB6|           D52|
|  23|    4|    137|  0|20211001|     8|      4BB|           861|
+----+-----+-------+---+--------+------+---------+--------------+
only showing top 5 rows

+---+
| id|
+---+
|374|
|D52|
|861|
|5B0|
|4CA|
+---+
only showing top 5 rows



In [5]:
#Solo para ver el esquema de columnas
print("Esquema de eventos_df:")
eventos_df.printSchema()

print("Esquema de free_sms_df:")
free_sms_df.printSchema()

Esquema de eventos_df:
root
 |-- hour: integer (nullable = true)
 |-- calls: integer (nullable = true)
 |-- seconds: integer (nullable = true)
 |-- sms: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- region: integer (nullable = true)
 |-- id_source: string (nullable = true)
 |-- id_destination: string (nullable = true)

Esquema de free_sms_df:
root
 |-- id: string (nullable = true)



In [6]:
#Ver si hay nulos y cuantas por columa

eventos_df.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c)
    for c in eventos_df.columns
]).show()

+----+-----+-------+---+----+------+---------+--------------+
|hour|calls|seconds|sms|date|region|id_source|id_destination|
+----+-----+-------+---+----+------+---------+--------------+
|   0|    0|      0|  0|   0|     0|       18|            15|
+----+-----+-------+---+----+------+---------+--------------+



In [7]:
eventos_df.filter(
    (eventos_df["id_source"].isNull()) | (eventos_df["id_destination"].isNull())
).show(truncate=False)

+----+-----+-------+---+--------+------+---------+--------------+
|hour|calls|seconds|sms|date    |region|id_source|id_destination|
+----+-----+-------+---+--------+------+---------+--------------+
|18  |1    |1299   |0  |20211001|9     |250      |NULL          |
|19  |1    |52     |0  |20211001|7     |NULL     |392           |
|16  |2    |75     |0  |20211001|7     |CC5      |NULL          |
|12  |1    |320    |0  |20211001|7     |NULL     |6E7           |
|15  |1    |612    |0  |20211001|7     |3E4      |NULL          |
|19  |1    |63     |0  |20211001|7     |NULL     |E21           |
|20  |1    |116    |0  |20211001|7     |A84      |NULL          |
|12  |1    |59     |0  |20211001|7     |NULL     |6DE           |
|19  |1    |50     |0  |20211001|2     |8DE      |NULL          |
|18  |2    |146    |0  |20211001|9     |NULL     |NULL          |
|13  |1    |23     |0  |20211001|4     |F06      |NULL          |
|10  |1    |12     |0  |20211001|4     |NULL     |FC5           |
|14  |1   

In [8]:
# Agrupa por hora y suma las llamadas, solo para verificar el grafico
df_llamadas_por_hora = eventos_df.groupBy("hour").agg(sum("calls").alias("total_llamadas")).orderBy("hour")
df_llamadas_por_hora.show()

+----+--------------+
|hour|total_llamadas|
+----+--------------+
|   0|          6440|
|   1|          3222|
|   2|          1853|
|   3|          1514|
|   4|          1955|
|   5|          4090|
|   6|          9655|
|   7|         23034|
|   8|         43609|
|   9|         62837|
|  10|         74799|
|  11|         78794|
|  12|         80129|
|  13|         81543|
|  14|         77990|
|  15|         75306|
|  16|         72230|
|  17|         71419|
|  18|         70747|
|  19|         70060|
+----+--------------+
only showing top 20 rows



In [17]:

salida_df = spark.read.parquet("/home/jovyan/data_out/top_usuarios_sms/")

In [19]:
salida_df.show()

+---------+-----------+--------------------+
|id_source|monto_total|             id_hash|
+---------+-----------+--------------------+
|      4D3|    23403.0|911914c7729eedbdf...|
|      76D|    13664.0|bd180b7811395cbce...|
|      07E|     4226.0|14a0660ae2f5d1868...|
|      541|     2526.0|16c222aa19898e505...|
|      C25|     2019.0|0bfa0b57d99985aa1...|
|      17D|     1209.0|7521526054bb89ba2...|
|      3AE|     1087.5|6a57072949dbc409c...|
|      B86|     1054.5|bc97b32ee2abb9c18...|
|      E89|      979.5|5135cc35322269f2f...|
|      B71|      972.0|1088a10d026eae0ac...|
|      162|      887.5|82aa4b0af34c2313a...|
|      068|      854.5|fb8490a9504a36c21...|
|      A2C|      691.5|d7dcc6703e425a6e3...|
|      335|      681.0|f9b902fc3289af4dd...|
|      1BD|      679.0|f5a45e33602ea62f9...|
|      3E0|      661.5|e9d076deb3451cc49...|
|      76F|      644.0|1b9204fd05eed7af9...|
|      6FB|      598.5|dbcec12ccfc780516...|
|      5D7|      596.5|d688a78f3638bed0c...|
|      84C

In [26]:
df_joined = eventos_df.join(
    free_sms_df,
    eventos_df.id_destination == free_sms_df.id,
    "left")

In [30]:
df_joined.show()

+----+-----+-------+---+--------+------+---------+--------------+----+
|hour|calls|seconds|sms|    date|region|id_source|id_destination|  id|
+----+-----+-------+---+--------+------+---------+--------------+----+
|  11|    1|     24|  0|20211001|     5|      BF3|           374| 374|
|   1|    1|     51|  0|20211001|     4|      9F5|           374| 374|
|  11|    1|      3|  0|20211001|     6|      025|           374| 374|
|  10|    1|     36|  0|20211001|     5|      FB6|           D52| D52|
|  23|    4|    137|  0|20211001|     8|      4BB|           861| 861|
|  18|    0|      0|  1|20211001|     4|      90C|           5B0| 5B0|
|  13|    1|    618|  0|20211001|     9|      7AB|           4CA| 4CA|
|  16|    1|    172|  0|20211001|     9|      7AB|           4CA| 4CA|
|   6|    1|    208|  0|20211001|     9|      7AB|           4CA| 4CA|
|   5|    1|     66|  0|20211001|     9|      7AB|           4CA| 4CA|
|  18|    1|    135|  0|20211001|     9|      7AB|           4CA| 4CA|
|  12|

In [31]:
from pyspark.sql import functions as F

df_fact = df_joined.withColumn(
    "monto_sms",
    F.when(F.col("id").isNotNull(), F.lit(0.0))        # Destino gratuito
     .when(F.col("region").between(1, 5), F.lit(1.5))  # Regiones 1-5
     .when(F.col("region").between(6, 9), F.lit(2.0))  # Regiones 6-9
     .otherwise(F.lit(0.0))
)

df_fact = df_fact.withColumn("total_sms", F.col("sms") * F.col("monto_sms"))

# Mostrar los resultados
df_fact.show(10)

+----+-----+-------+---+--------+------+---------+--------------+---+---------+---------+
|hour|calls|seconds|sms|    date|region|id_source|id_destination| id|monto_sms|total_sms|
+----+-----+-------+---+--------+------+---------+--------------+---+---------+---------+
|  11|    1|     24|  0|20211001|     5|      BF3|           374|374|      0.0|      0.0|
|   1|    1|     51|  0|20211001|     4|      9F5|           374|374|      0.0|      0.0|
|  11|    1|      3|  0|20211001|     6|      025|           374|374|      0.0|      0.0|
|  10|    1|     36|  0|20211001|     5|      FB6|           D52|D52|      0.0|      0.0|
|  23|    4|    137|  0|20211001|     8|      4BB|           861|861|      0.0|      0.0|
|  18|    0|      0|  1|20211001|     4|      90C|           5B0|5B0|      0.0|      0.0|
|  13|    1|    618|  0|20211001|     9|      7AB|           4CA|4CA|      0.0|      0.0|
|  16|    1|    172|  0|20211001|     9|      7AB|           4CA|4CA|      0.0|      0.0|
|   6|    

In [38]:
df_fact_sorted = df_fact.orderBy(F.col("total_sms").desc())


In [39]:
df_fact_sorted.show()

+----+-----+-------+---+--------+------+---------+--------------+----+---------+---------+
|hour|calls|seconds|sms|    date|region|id_source|id_destination|  id|monto_sms|total_sms|
+----+-----+-------+---+--------+------+---------+--------------+----+---------+---------+
|  18|    0|      0|180|20211001|     9|      A2C|           A4B|NULL|      2.0|    360.0|
|  15|    0|      0|165|20211001|     8|      17D|           AFD|NULL|      2.0|    330.0|
|  15|    0|      0|160|20211001|     7|      CF6|           FBE|NULL|      2.0|    320.0|
|  23|    0|      0|158|20211001|     7|      1DF|           264|NULL|      2.0|    316.0|
|  15|    1|     13|206|20211001|     4|      162|           FB6|NULL|      1.5|    309.0|
|  20|    0|      0|188|20211001|     4|      CEE|           63C|NULL|      1.5|    282.0|
|   0|    0|      0|124|20211001|     9|      068|           A66|NULL|      2.0|    248.0|
|  13|    0|      0|117|20211001|     9|      FCA|           700|NULL|      2.0|    234.0|

In [40]:
suma_total_sms = df_fact.agg(sum("total_sms").alias("suma_total"))

In [41]:
suma_total_sms.show()

+----------+
|suma_total|
+----------+
|  391375.0|
+----------+

