In [1]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("limpiar_datos") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2023-11-18T22:04:19,019 WARN [Thread-4] org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Carga y limpieza de Usuarios

In [214]:
df = spark.read.csv('/datalake/raw/Usuarios.csv', header=True, sep=',')

In [215]:
df.printSchema()

root
 |-- id_usuario: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- apellido: string (nullable = true)
 |-- email: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- actividad: string (nullable = true)
 |-- subscripcion: string (nullable = true)
 |-- fecha_subs: string (nullable = true)
 |-- fecha_renov: string (nullable = true)
 |-- ultimo_pago: string (nullable = true)



In [216]:
df.show()

+--------------------+-----------+-------------+--------------------+----------+--------------+-------------+------------+----------+-----------+-----------+
|          id_usuario|     nombre|     apellido|               email|      sexo|          pais|    actividad|subscripcion|fecha_subs|fecha_renov|ultimo_pago|
+--------------------+-----------+-------------+--------------------+----------+--------------+-------------+------------+----------+-----------+-----------+
|9efa1289-5256-49c...|      Elise|       Austen|  eausten1@unblog.fr|    Female|        Brazil|     natacion|      basica|12/13/2022| 10/18/2023| 12/25/2022|
|b4d30551-59cb-45b...|   Rochella|      Davidof|rdavidof2@timeson...|Non-binary|        Russia|personalizado|      basica|04/22/2023| 08/31/2023| 07/20/2023|
|effbcd55-e394-487...|Worthington|  Sherringham|wsherringham5@tut...|      Male|     Indonesia|personalizado|       anual|03/26/2023| 11/07/2023| 10/27/2023|
|fc621f46-e0ef-4ad...|    Camella|       Crooke|ccro

In [220]:
df_clean = df.withColumn("fecha_subs", to_date(df["fecha_subs"], "MM/dd/yyyy"))
df_clean = df_clean.withColumn("fecha_renov", to_date(df["fecha_renov"], "MM/dd/yyyy"))
df_clean = df_clean.withColumn("ultimo_pago", to_date(df["ultimo_pago"], "MM/dd/yyyy"))

In [221]:
df_clean.printSchema()

root
 |-- id_usuario: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- apellido: string (nullable = true)
 |-- email: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- actividad: string (nullable = true)
 |-- subscripcion: string (nullable = true)
 |-- fecha_subs: date (nullable = true)
 |-- fecha_renov: date (nullable = true)
 |-- ultimo_pago: date (nullable = true)



In [222]:
null_counts = df_clean.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_clean.columns])
null_counts.show()

+----------+------+--------+-----+----+----+---------+------------+----------+-----------+-----------+
|id_usuario|nombre|apellido|email|sexo|pais|actividad|subscripcion|fecha_subs|fecha_renov|ultimo_pago|
+----------+------+--------+-----+----+----+---------+------------+----------+-----------+-----------+
|         0|     0|       0|    0|  63|   0|       60|           0|         0|          0|          0|
+----------+------+--------+-----+----+----+---------+------------+----------+-----------+-----------+



- En el caso de los datos de usuario se pueden apreciar que se tienen 63 filas donde en la columna "sexo" se tiene el valor null y lo mismo sucede en 60 filas de la columna "actividad". Dado que no son columnas que se considere relevantes para análisis posteriores se decide mantener las filas con estos valores.

In [223]:
df_clean.write.mode("overwrite").parquet("/datalake/clean/usuarios")

### Carga y limpieza de Actividades

In [205]:
df_act = spark.read.csv('/datalake/raw/Actividades.csv', header=True, sep=',')

In [206]:
df_act.printSchema()

root
 |-- id_actividad: string (nullable = true)
 |-- id_usuario: string (nullable = true)
 |-- id_dispositivo: string (nullable = true)
 |-- id_plan: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- Longitud: string (nullable = true)
 |-- Latitud: string (nullable = true)
 |-- duracion: string (nullable = true)
 |-- fecha_hora: string (nullable = true)



In [207]:
df_act.show()

+--------------------+--------------------+--------------------+--------------------+--------------+--------+--------+--------+----------------+
|        id_actividad|          id_usuario|      id_dispositivo|             id_plan|          pais|Longitud| Latitud|duracion|      fecha_hora|
+--------------------+--------------------+--------------------+--------------------+--------------+--------+--------+--------+----------------+
|d2b9f5a4-e457-406...|9efa1289-5256-49c...|5f6742cf-0cc3-499...|1a257404-2741-4d5...|        Brazil|-47.9247|-38.5258|    1.47|07/08/2023 06:10|
|742ceebe-7cbd-42a...|b4d30551-59cb-45b...|9c9d7ede-4915-4fd...|296f1010-fd8b-4fe...|        Russia|100.6432| 60.0272|    2.78|03/01/2023 02:16|
|f079c279-1249-40e...|effbcd55-e394-487...|af1e4b22-9836-4ff...|d3c2f38b-249f-40c...|     Indonesia|112.6225| -7.7611|    3.43|09/17/2023 07:09|
|b8db20aa-9915-426...|fc621f46-e0ef-4ad...|5d5aa38b-9e5e-412...|b206f2a3-5ddd-47a...|        Russia|100.6432| 60.0272|     2.7|05/

In [208]:
df_act = df_act.withColumnRenamed("longitud", "Longitud")
df_act = df_act.withColumnRenamed("latitud", "Latitud")

In [209]:
df_act = df_act.withColumn('duracion', col('duracion').cast(IntegerType()))
df_act = df_act.withColumn('fecha_hora', to_timestamp(df_act['fecha_hora'], "MM/dd/yyyy HH:mm"))
df_act = df_act.withColumn('latitud', col('latitud').cast(FloatType()))
df_act = df_act.withColumn('longitud', col('longitud').cast(FloatType()))

In [210]:
df_act.printSchema()

root
 |-- id_actividad: string (nullable = true)
 |-- id_usuario: string (nullable = true)
 |-- id_dispositivo: string (nullable = true)
 |-- id_plan: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- longitud: float (nullable = true)
 |-- latitud: float (nullable = true)
 |-- duracion: integer (nullable = true)
 |-- fecha_hora: timestamp (nullable = true)



In [211]:
df_act.show()

+--------------------+--------------------+--------------------+--------------------+--------------+--------+--------+--------+-------------------+
|        id_actividad|          id_usuario|      id_dispositivo|             id_plan|          pais|longitud| latitud|duracion|         fecha_hora|
+--------------------+--------------------+--------------------+--------------------+--------------+--------+--------+--------+-------------------+
|d2b9f5a4-e457-406...|9efa1289-5256-49c...|5f6742cf-0cc3-499...|1a257404-2741-4d5...|        Brazil|-47.9247|-38.5258|       1|2023-07-08 06:10:00|
|742ceebe-7cbd-42a...|b4d30551-59cb-45b...|9c9d7ede-4915-4fd...|296f1010-fd8b-4fe...|        Russia|100.6432| 60.0272|       2|2023-03-01 02:16:00|
|f079c279-1249-40e...|effbcd55-e394-487...|af1e4b22-9836-4ff...|d3c2f38b-249f-40c...|     Indonesia|112.6225| -7.7611|       3|2023-09-17 07:09:00|
|b8db20aa-9915-426...|fc621f46-e0ef-4ad...|5d5aa38b-9e5e-412...|b206f2a3-5ddd-47a...|        Russia|100.6432| 60

In [212]:
null_counts = df_act.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_act.columns])
null_counts.show()

+------------+----------+--------------+-------+----+--------+-------+--------+----------+
|id_actividad|id_usuario|id_dispositivo|id_plan|pais|longitud|latitud|duracion|fecha_hora|
+------------+----------+--------------+-------+----+--------+-------+--------+----------+
|           0|         0|             0|      0|   0|     216|    216|      62|         0|
+------------+----------+--------------+-------+----+--------+-------+--------+----------+



- En el caso de los datos de actividades se pueden apreciar que se tienen 216 filas donde en la columna "latitud" y "longitud" se tiene valores nulos. Dado que no son columnas que se considere relevantes para análisis posteriores se decide mantener las filas con estos valores.

In [213]:
df_act.write.mode("overwrite").parquet("/datalake/clean/actividades")

### Carga y limpieza de Planes de Entretenimiento

In [190]:
df_plan = spark.read.csv('/datalake/raw/Plan_de_entrenamiento.csv', header=True, sep=',', quote='"')

In [191]:
df_plan = spark.read.option("header", "true") \
               .option("quote", "\"") \
               .option("escape", "\"") \
               .option("multiline", "true") \
               .csv('/datalake/raw/Plan_de_entrenamiento.csv')

In [192]:
df_plan.printSchema()

root
 |-- id_plan: string (nullable = true)
 |-- id_usuario: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- plan_duracion: string (nullable = true)
 |-- instructions: string (nullable = true)
 |-- objectivo: string (nullable = true)



In [193]:
df_plan = df_plan.withColumn('plan_duracion', col('plan_duracion').cast(FloatType()))

In [194]:
df_plan.printSchema()

root
 |-- id_plan: string (nullable = true)
 |-- id_usuario: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- plan_duracion: float (nullable = true)
 |-- instructions: string (nullable = true)
 |-- objectivo: string (nullable = true)



In [195]:
df_plan.show()

+--------------------+--------------------+----------------+-------------+--------------------+--------------------+
|             id_plan|          id_usuario|            tipo|plan_duracion|        instructions|           objectivo|
+--------------------+--------------------+----------------+-------------+--------------------+--------------------+
|1a257404-2741-4d5...|9efa1289-5256-49c...|   media-maraton|         1.63|Lorem ipsum dolor...|Maecenas leo odio...|
|296f1010-fd8b-4fe...|b4d30551-59cb-45b...|            hiit|         1.73|Nulla ut erat id ...|Duis bibendum. Mo...|
|d3c2f38b-249f-40c...|effbcd55-e394-487...|condicionamiento|          1.3|Proin leo odio po...|Donec diam neque,...|
|b206f2a3-5ddd-47a...|fc621f46-e0ef-4ad...|            otro|         1.25|Cum sociis natoqu...|Quisque id justo ...|
|147cd3b7-c8c6-4b6...|5cc10861-3ddd-467...|            hiit|         1.67|Duis consequat du...|Maecenas leo odio...|
|0c384ba2-f2c6-4ca...|1ee35bd7-2a17-4a8...|        triatlon|    

In [196]:
null_counts = df_plan.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_plan.columns])
null_counts.show()

+-------+----------+----+-------------+------------+---------+
|id_plan|id_usuario|tipo|plan_duracion|instructions|objectivo|
+-------+----------+----+-------------+------------+---------+
|      0|         0|   0|            0|           0|        0|
+-------+----------+----+-------------+------------+---------+



In [197]:
df_plan.write.mode("overwrite").parquet("/datalake/clean/planes_de_entrenamiento")

### Carga y limpieza de Actividades-Eventos

In [167]:
df_event = spark.read.csv('/datalake/raw/Actividades-Eventos.csv', header=True, sep=',')

In [168]:
df_event.printSchema()

root
 |-- id_sensor: string (nullable = true)
 |-- pulse_rate: string (nullable = true)
 |-- id_actividad: string (nullable = true)
 |-- Longitud: string (nullable = true)
 |-- Latitud: string (nullable = true)
 |-- fecha_hora: string (nullable = true)



In [169]:
df_event.show()

+--------------------+----------+--------------------+--------+--------+----------------+
|           id_sensor|pulse_rate|        id_actividad|Longitud| Latitud|      fecha_hora|
+--------------------+----------+--------------------+--------+--------+----------------+
|853dc9e4-6094-451...|       173|d2b9f5a4-e457-406...|-47.9247|-38.5258|07/08/2023 06:10|
|278b2f82-087a-469...|       128|742ceebe-7cbd-42a...|100.6432| 60.0272|03/01/2023 02:16|
|ea0b5fad-ff1d-491...|       148|f079c279-1249-40e...|112.6225| -7.7611|09/17/2023 07:09|
|f6f4ad28-30e2-4c8...|       110|b8db20aa-9915-426...|100.6432| 60.0272|05/27/2023 10:45|
|e63edcf3-eca8-412...|       170|59b61169-ecde-498...|-78.4163| 22.2258|08/24/2023 06:49|
|bab58192-fc85-49c...|        42|8ea971d8-bc3e-44c...|106.3456| 35.3371|05/18/2023 00:35|
|cafd29e6-3782-4fa...|       197|f0c4dc60-3cf3-4f9...|-47.9247|-38.5258|05/14/2023 09:20|
|ce9ed08c-ccdc-48d...|       198|32bf0c7b-f21a-44b...|-63.2369|-17.4052|12/08/2022 01:13|
|999d5a48-

In [170]:
df_event = df_event.withColumnRenamed("longitud", "Longitud")
df_event = df_event.withColumnRenamed("latitud", "Latitud")

In [171]:
df_event = df_event.withColumn('fecha_hora', to_timestamp(df_event['fecha_hora'], "MM/dd/yyyy HH:mm"))
df_event = df_event.withColumn('pulse_rate', col('pulse_rate').cast(IntegerType()))
df_event = df_event.withColumn('latitud', col('latitud').cast(FloatType()))
df_event = df_event.withColumn('longitud', col('longitud').cast(FloatType()))

In [172]:
df_event.show()

+--------------------+----------+--------------------+--------+--------+-------------------+
|           id_sensor|pulse_rate|        id_actividad|longitud| latitud|         fecha_hora|
+--------------------+----------+--------------------+--------+--------+-------------------+
|853dc9e4-6094-451...|       173|d2b9f5a4-e457-406...|-47.9247|-38.5258|2023-07-08 06:10:00|
|278b2f82-087a-469...|       128|742ceebe-7cbd-42a...|100.6432| 60.0272|2023-03-01 02:16:00|
|ea0b5fad-ff1d-491...|       148|f079c279-1249-40e...|112.6225| -7.7611|2023-09-17 07:09:00|
|f6f4ad28-30e2-4c8...|       110|b8db20aa-9915-426...|100.6432| 60.0272|2023-05-27 10:45:00|
|e63edcf3-eca8-412...|       170|59b61169-ecde-498...|-78.4163| 22.2258|2023-08-24 06:49:00|
|bab58192-fc85-49c...|        42|8ea971d8-bc3e-44c...|106.3456| 35.3371|2023-05-18 00:35:00|
|cafd29e6-3782-4fa...|       197|f0c4dc60-3cf3-4f9...|-47.9247|-38.5258|2023-05-14 09:20:00|
|ce9ed08c-ccdc-48d...|       198|32bf0c7b-f21a-44b...|-63.2369|-17.405

In [180]:
null_counts = df_event.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_event.columns])
null_counts.show()

+---------+----------+------------+--------+-------+----------+
|id_sensor|pulse_rate|id_actividad|longitud|latitud|fecha_hora|
+---------+----------+------------+--------+-------+----------+
|        0|         0|           0|     216|    216|         0|
+---------+----------+------------+--------+-------+----------+



- En el caso de los datos de actividades eventos se pueden apreciar que se tienen 216 filas donde en la columna "latitud" y "longitud" se tienen valores nulos. En este caso los valores de longitud y longitud son relevantes para análisis posteriores por lo que se decide eliminar las filas que tienen esta anomalia.

In [185]:
df_event = df_event.dropna(subset=["longitud"])

In [186]:
df_event.count()

784

In [189]:
df_event.write.mode("overwrite").parquet("/datalake/clean/actividades_eventos")

                                                                                

### Carga y limpieza de Dispositivos

In [174]:
df_disp = spark.read.csv('/datalake/raw/Dispositivos.csv', header=True, sep=',')

In [175]:
df_disp.printSchema()

root
 |-- id_dispositivo: string (nullable = true)
 |-- model: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- operating_system: string (nullable = true)



In [176]:
df_disp.show()

+--------------------+----------------+-------+----------------+
|      id_dispositivo|           model|  brand|operating_system|
+--------------------+----------------+-------+----------------+
|5f6742cf-0cc3-499...|   iPhone 12 Pro|  Apple|             iOS|
|9c9d7ede-4915-4fd...|         Pixel 5| Google|         Android|
|af1e4b22-9836-4ff...|    Xperia 1 III|   Sony|         Android|
|5d5aa38b-9e5e-412...|     Find X3 Pro|   Oppo|         Android|
|39e8db1f-0258-459...|          Velvet|     LG|         Android|
|6188ae01-946b-40e...|         X60 Pro|   Vivo|         Android|
|0b8d0acd-7391-449...|           9 Pro|OnePlus|         Android|
|cac63327-f1b8-4ba...|Galaxy S21 Ultra|Samsung|         Android|
|a8ec5dc7-b977-432...|           Mi 11| Xiaomi|         Android|
+--------------------+----------------+-------+----------------+



In [177]:
df_disp.write.mode("overwrite").parquet("/datalake/clean/dispositivos")

In [178]:
null_counts = df_disp.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_disp.columns])


In [179]:
null_counts.show()

+--------------+-----+-----+----------------+
|id_dispositivo|model|brand|operating_system|
+--------------+-----+-----+----------------+
|             0|    0|    0|               0|
+--------------+-----+-----+----------------+

