# Procesamiento de datos individuales

En este notebook lo que se hace es obtener para cada cluster predictivo los valores preparados para el forecasting. Se obtiene la suma del consumo de cada usuario del cluster por día.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
from pyspark.sql.functions import udf, collect_list
from pyspark.sql.types import ArrayType, FloatType
import numpy as np
from pyspark.sql.functions import year, col


# Crear una sesión de Spark
# Configurar el tamaño de la memoria del driver y de los ejecutores
spark = SparkSession.builder \
    .appName("EDA SmartWater ") \
    .config("spark.driver.memory", "120g") \
    .config("spark.executor.memory", "120g") \
    .config("spark.driver.maxResultSize", "120g") \
    .getOrCreate()

spark.conf.set("spark.sql.pivotMaxValues", "200000")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/05 13:10:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import extraccion

Cargamos los datos con las predicciones de clustering asociadas:

In [None]:
data_preds = 'output/data/predictions_ALL_1_V3.parquet'

Filtramos por el cluster del cual queremos obtener los datos concretos:

In [20]:
predictions_df = extraccion.carga_datos(data_preds, 4, 'output/data/consumption.parquet')

Número total de predicciones:  0 para el cluster:  4


Realizamos un filtrado de outliers:

In [None]:
df_filtrado = extraccion.filtrado_outliers(predictions_df)


Seguidamente procesamos los datos para obtener por el acumulado del consumo de díario por usuario:

In [None]:
df = df_filtrado.withColumn("day", F.to_date("date"))

# Agrupar por 'day' y 'serial_number' y calcular la media de 'value' solo con valores no nulos
df_sum_day = df.groupBy("day", "serial_number") \
                .agg(F.sum("value").alias("sum_value")) \
                .orderBy("day", "serial_number")
df_sum_day.show()



+----------+-------------+-------------------+
|       day|serial_number|          sum_value|
+----------+-------------+-------------------+
|2021-01-01|   13LA062798| 0.4740000000000002|
|2021-01-01|   13LA062799| 0.5930000000000003|
|2021-01-01|   14CA016872|              0.728|
|2021-01-01|   14CA016873|0.34800000000000003|
|2021-01-01|   14FA084354|              0.384|
|2021-01-01|   14FA084371|0.36500000000000005|
|2021-01-01|   14FA084393|0.30300000000000005|
|2021-01-01|   14FA084415|0.34500000000000003|
|2021-01-01|   14FA084419|0.35400000000000004|
|2021-01-01|   14FA084427| 0.3380000000000001|
|2021-01-01|   14FA084443|0.31400000000000006|
|2021-01-01|   14FA084456|              0.334|
|2021-01-01|   14FA084463|0.28400000000000003|
|2021-01-01|   14FA084470|0.37300000000000005|
|2021-01-01|   14FA084500|0.37800000000000006|
|2021-01-01|   14FA084507|0.35300000000000004|
|2021-01-01|   14FA084540|0.44400000000000006|
|2021-01-01|   14KA079247|0.29000000000000004|
|2021-01-01| 

                                                                                

Finalmente guardamos los datos en un csv para poder utilizarlos para la predicción.

In [19]:
df_sum_day.write.csv('data_testing/cluster_sub1_3_sum_V3.csv', header=True, mode='overwrite')

                                                                                