In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, broadcast, when
from project.business.func import cleaningFunctions

In [2]:
spark = SparkSession.builder \
    .appName("cleaning") \
    .config("spark.some.config.option", "config-value") \
    .getOrCreate()

## Análisis general de la raw data

### events data

In [3]:
events_df = spark.read.csv("/home/jovyan/work/data/raw/events.csv", header=True)

In [4]:
events_df.show(5)

+----+-----+-------+---+--------+------+---------+--------------+
|hour|calls|seconds|sms|    date|region|id_source|id_destination|
+----+-----+-------+---+--------+------+---------+--------------+
|  11|    1|     24|  0|20211001|     5|      BF3|           374|
|   1|    1|     51|  0|20211001|     4|      9F5|           374|
|  11|    1|      3|  0|20211001|     6|      025|           374|
|  10|    1|     36|  0|20211001|     5|      FB6|           D52|
|  23|    4|    137|  0|20211001|     8|      4BB|           861|
+----+-----+-------+---+--------+------+---------+--------------+
only showing top 5 rows



In [5]:
events_df.printSchema()

root
 |-- hour: string (nullable = true)
 |-- calls: string (nullable = true)
 |-- seconds: string (nullable = true)
 |-- sms: string (nullable = true)
 |-- date: string (nullable = true)
 |-- region: string (nullable = true)
 |-- id_source: string (nullable = true)
 |-- id_destination: string (nullable = true)



In [6]:
events_df.select("id_source").distinct().count()

4097

In [7]:
columns_no_nulls = ["id_source", "id_destination"]
null_counts = cleaningFunctions.count_nulls(events_df, columns_no_nulls)
null_counts.show()

+---------+--------------+
|id_source|id_destination|
+---------+--------------+
|       18|            15|
+---------+--------------+



####  registros con id_source o id_destination nulo deben ser descartados.

### free sms dest

In [8]:
free_sms = spark.read.csv("/home/jovyan/work/data/raw/free_destinations.csv", header=True)

In [9]:
free_sms.show(5)

+---+
| id|
+---+
|374|
|D52|
|861|
|5B0|
|4CA|
+---+
only showing top 5 rows



In [10]:
free_sms.printSchema()

root
 |-- id: string (nullable = true)



In [11]:
free_sms.distinct().count()

200

In [12]:
count_nulls = cleaningFunctions.count_nulls(free_sms, ["id"])
count_nulls.show()

+---+
| id|
+---+
|  0|
+---+



## Limpieza de datos
### Requerimeintos
* registros con id_source o id_destination nulo deben ser descartados.

In [13]:
# Selecionando columnas dónde eliminaremos los registros si existen nulos
columns_no_nulls = ["id_source", "id_destination"]

In [14]:
# Validación, conteo de nulos antes del drop
null_counts = cleaningFunctions.count_nulls(events_df, columns_no_nulls)
null_counts.show()

+---------+--------------+
|id_source|id_destination|
+---------+--------------+
|       18|            15|
+---------+--------------+



In [15]:
# Nuevo df de eventos sin nulos
events_withNoNulls = events_df.na.drop(subset=columns_no_nulls)
events_df.unpersist()

DataFrame[hour: string, calls: string, seconds: string, sms: string, date: string, region: string, id_source: string, id_destination: string]

In [16]:
# Unión del df de eventos sin nulos con el df de numeros de msm de destino gratuitos
consolidate_df = events_withNoNulls.join(broadcast(free_sms),events_withNoNulls["id_destination"] == free_sms["id"] , "left")
free_sms.unpersist()

DataFrame[id: string]

In [17]:
# Spark ordena los datos, en la cabecera deben estar las concordancia entre id_destination y id
consolidate_df.head(5)

[Row(hour='11', calls='1', seconds='24', sms='0', date='20211001', region='5', id_source='BF3', id_destination='374', id='374'),
 Row(hour='1', calls='1', seconds='51', sms='0', date='20211001', region='4', id_source='9F5', id_destination='374', id='374'),
 Row(hour='11', calls='1', seconds='3', sms='0', date='20211001', region='6', id_source='025', id_destination='374', id='374'),
 Row(hour='10', calls='1', seconds='36', sms='0', date='20211001', region='5', id_source='FB6', id_destination='D52', id='D52'),
 Row(hour='23', calls='4', seconds='137', sms='0', date='20211001', region='8', id_source='4BB', id_destination='861', id='861')]

In [18]:
# Los valores del final deben ser nulos al no existir los id de destino en el df de free_destinations.
consolidate_df.tail(5)

[Row(hour='17', calls='1', seconds='19', sms='0', date='20211001', region='6', id_source='4E0', id_destination='650', id=None),
 Row(hour='21', calls='1', seconds='43', sms='0', date='20211001', region='6', id_source='EE5', id_destination='885', id=None),
 Row(hour='14', calls='1', seconds='26', sms='0', date='20211001', region='6', id_source='21F', id_destination='F9A', id=None),
 Row(hour='15', calls='1', seconds='95', sms='3', date='20211001', region='6', id_source='37B', id_destination='203', id=None),
 Row(hour='18', calls='1', seconds='71', sms='0', date='20211001', region='6', id_source='3DC', id_destination='203', id=None)]

## Validación de datos

In [19]:
null_counts = cleaningFunctions.count_nulls(events_withNoNulls, columns_no_nulls)
null_counts.show()
events_withNoNulls.unpersist()
null_counts.unpersist()

+---------+--------------+
|id_source|id_destination|
+---------+--------------+
|        0|             0|
+---------+--------------+



DataFrame[id_source: bigint, id_destination: bigint]

## Procesando data limpia

In [21]:
consolidate_df.write.mode("overwrite").partitionBy("date").parquet("/home/jovyan/work/data/preprocessed/")

In [22]:
spark.stop()