In [58]:
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, FloatType

### Iniciamos una sesión de Spark y leemos el csv

In [2]:
spark = SparkSession.builder.getOrCreate()
csv_path = "file:///home/jovyan/work/Sleep_health_and_lifestyle_dataset.csv"
data = spark.read.csv(csv_path, header=True, inferSchema=True)
data.printSchema()

root
 |-- Person ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Sleep Duration: double (nullable = true)
 |-- Quality of Sleep: integer (nullable = true)
 |-- Physical Activity Level: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)
 |-- BMI Category: string (nullable = true)
 |-- Blood Pressure: string (nullable = true)
 |-- Heart Rate: integer (nullable = true)
 |-- Daily Steps: integer (nullable = true)
 |-- Sleep Disorder: string (nullable = true)



In [59]:
data.createOrReplaceTempView("temp_table")

### Definimos las funciones que realizaran la limpieza de datos

In [4]:
def numerar_filas(df, column, distinct=True):
    window = Window.orderBy(column)
    column_with_numbers = df.withColumn("id "+column, F.dense_rank().over(window))
    if distinct:
        return column_with_numbers.distinct() 
    else: 
        return column_with_numbers

def check_duplicates(data, column):
    duplicates = data.groupBy(column).count().filter(F.col("count") > 1)
    if duplicates.count() != 0:
        print("Hay duplicados")
        print(f"Nº de duplicados: {duplicates.count()}")
        data = data.withColumn(column, F.col(column)).dropDuplicates() 
    return data

def limpiado(data, id_column, columnas_float, columnas_union, columnas_num): 
    data = data.withColumn('Sleep Duration', F.col('Sleep Duration').cast('float'))
    
    data = data.withColumn(columnas_union, F.when(F.col(columnas_union).isin(['Normal', 'Normal Weight']), 'Normal Weight') \
                           .otherwise(F.col(columnas_union))) 
    
    data = data.withColumn('Sistolic pressure', F.split(F.col(columnas_float), '/').getItem(0).cast('float')) 
    data = data.withColumn('Diastolic pressure', F.split(F.col(columnas_float), '/').getItem(1).cast('float'))

    for col in columnas_num:
        data = numerar_filas(data, col, distinct = False)
        
    data = check_duplicates(data, id_column)
    
    return data

### Definimos las funciones que verificaran pruebas de:
- Unicidad
- Integridad
- Consistencia
- Validez
- Completitud

In [55]:
def verificar_tipado(df):
    tipos_correctos = {
        'Person ID': IntegerType(),               'Gender': StringType(),
        'Age': IntegerType(),                     'Occupation': StringType(),
        'Sleep Duration': FloatType(),            'Quality of Sleep': IntegerType(),
        'Physical Activity Level': IntegerType(), 'Stress Level': IntegerType(),
        'BMI Category': StringType(),             'Blood Pressure': StringType(),
        'Heart Rate': IntegerType(),              'Daily Steps': IntegerType(),
        'Sleep Disorder': StringType(),           'Sistolic pressure': FloatType(),
        'Diastolic pressure': FloatType(),        'id Sleep Disorder': IntegerType(), 
        'id Gender': IntegerType(),               'id Occupation': IntegerType(), 
        'id BMI Category': IntegerType()
    }
    for columna, tipo in tipos_correctos.items():
        if df.schema[columna].dataType != tipo:
            print(f'La columna {columna} tiene tipado incorrecto: \
                 {tipo} en lugar de {df.schema[columna].dataType}')
    return True

def verificar_numero_registros(df, tabla_bd):
    registros_df = df.count()
    sleep_table = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", tabla_bd) \
    .options(**properties) \
    .load()
    
    registros_bd = sleep_table.count()
    if registros_df == registros_bd:
        print(f'Coinciden el nº de registros en df {registros_df} con registros en bd {registros_bd}')
    else: 
        print(f'No coinciden el nº de {registros_df} con {registros_bd}')

def verificar_nulos(df):
    for column in data.columns:
        if df.filter(F.col(column).isNull()).count() > 0: 
            print(f"Hay {df.filter(F.col(column).isNull()).count()} nulos en la columna {column}")
        else:
            print("No hay nulos")

def verificar_duplicados(df):
    return df.dropDuplicates().count() == df.count()

In [6]:
id_column = "Person ID"
columnas_float = "Blood Pressure"
columnas_union = "BMI Category"
columnas_num = ["Sleep Disorder", "Gender", "Occupation", "BMI Category"]
data = limpiado(data, id_column, columnas_float, columnas_union, columnas_num)
data.printSchema()

root
 |-- Person ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Sleep Duration: float (nullable = true)
 |-- Quality of Sleep: integer (nullable = true)
 |-- Physical Activity Level: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)
 |-- BMI Category: string (nullable = true)
 |-- Blood Pressure: string (nullable = true)
 |-- Heart Rate: integer (nullable = true)
 |-- Daily Steps: integer (nullable = true)
 |-- Sleep Disorder: string (nullable = true)
 |-- Sistolic pressure: float (nullable = true)
 |-- Diastolic pressure: float (nullable = true)
 |-- id Sleep Disorder: integer (nullable = false)
 |-- id Gender: integer (nullable = false)
 |-- id Occupation: integer (nullable = false)
 |-- id BMI Category: integer (nullable = false)



In [56]:
verificar_nulos(data)

No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos
No hay nulos


In [9]:
verificar_tipado(data)

True

In [10]:
verificar_duplicados(data)

True

In [12]:
sel_col = data.select("Gender")
df_gender = numerar_filas(sel_col, "Gender")
df_gender.show()

+------+---------+
|Gender|id Gender|
+------+---------+
|Female|        1|
|  Male|        2|
+------+---------+



In [13]:
sel_col = data.select("Sleep Disorder")
df_sleep_disorder = numerar_filas(sel_col, "Sleep Disorder")
df_sleep_disorder.show()

+--------------+-----------------+
|Sleep Disorder|id Sleep Disorder|
+--------------+-----------------+
|      Insomnia|                1|
|          None|                2|
|   Sleep Apnea|                3|
+--------------+-----------------+



In [14]:
sel_col = data.select("BMI category")
df_bmi_category = numerar_filas(sel_col, "BMI category")
df_bmi_category.show()

+-------------+---------------+
| BMI category|id BMI category|
+-------------+---------------+
|Normal Weight|              1|
|        Obese|              2|
|   Overweight|              3|
+-------------+---------------+



In [15]:
sel_col = data.select("Occupation")
df_occupation = numerar_filas(sel_col, "Occupation")
df_occupation.show()

+--------------------+-------------+
|          Occupation|id Occupation|
+--------------------+-------------+
|          Accountant|            1|
|              Doctor|            2|
|            Engineer|            3|
|              Lawyer|            4|
|             Manager|            5|
|               Nurse|            6|
|Sales Representative|            7|
|         Salesperson|            8|
|           Scientist|            9|
|   Software Engineer|           10|
|             Teacher|           11|
+--------------------+-------------+



In [16]:
tabla_hechos         = 'facts'
tabla_dim_occupation = 'occupation'
tabla_dim_gender     = 'gender'
tabla_dim_bmi        = 'bmi'
tabla_dim_disorder   = 'disorder'

In [17]:
url = "jdbc:mysql://172.29.192.1:3306/sleep"
properties = {
    "driver"   : "com.mysql.jdbc.Driver",
    "user"     : "jose",
    "password" : "abc1234"
    }

In [22]:
cols = ["Person ID", "id Gender", "id Occupation", "Age", "Sleep Duration", "Quality of Sleep", "Physical Activity Level", "Stress Level",
        "Heart Rate", "Daily Steps", "id Sleep Disorder", "id BMI Category", "Sistolic pressure", "Diastolic Pressure"]
data.select(cols).write.format("jdbc") \
    .option("url", url) \
    .option("dbtable", tabla_hechos) \
    .options(**properties) \
    .save()

In [23]:
dict_tb = {
    tabla_dim_gender    : df_gender,
    tabla_dim_disorder  : df_sleep_disorder,
    tabla_dim_bmi       : df_bmi_category,
    tabla_dim_occupation: df_occupation
}
for tb, df in dict_tb.items():
    df.select("*").write.format("jdbc") \
    .option("url", url) \
    .option("dbtable", tb) \
    .options(**properties) \
    .save()

In [44]:
def verificar_relaciones(tabla_pk, pk, tabla_fk, fk):
    upperbound_dim = data.agg(F.max(fk)).collect()[0][0]
    lowerbound_dim = data.agg(F.min(fk)).collect()[0][0]
    upperbound_hechos = data.agg(F.max(pk)).collect()[0][0]
    lowerbound_hechos = data.agg(F.min(pk)).collect()[0][0]
    
    hechos_id = spark.read.format("jdbc") \
                .option("url", url) \
                .option("dbtable", tabla_pk) \
                .option("partitionColumn", pk) \
                .option("lowerBound", lowerbound_hechos) \
                .option("upperBound", upperbound_hechos) \
                .option("numPartitions", 1) \
                .options(**properties) \
                .load()
    dim_id = spark.read.format("jdbc") \
             .option("url", url) \
             .option("dbtable", tabla_fk) \
             .option("partitionColumn", fk) \
             .option("lowerBound", lowerbound_dim) \
             .option("upperBound", upperbound_dim) \
             .option("numPartitions", 1) \
             .options(**properties) \
             .load()

    # Verificar la consistencia entre PK y FK
    consistencia    = dim_id.join(hechos_id, dim_id[fk] == hechos_id[pk], "leftanti")

    # Mostrar los registros inconsistentes
    if consistencia.count() > 0:
        return print(f"No hay consistencias entre las claves: {cosistencia}")
    else:
        return "Hay consistencia entre las claves"

In [45]:
verificar_relaciones(tabla_hechos, "Person ID", tabla_dim_disorder, "id Sleep Disorder")

'Hay consistencia entre las claves'

In [54]:
verificar_numero_registros(data, 'facts')

Coinciden el nº de registros en df 374 con registros en bd 374
