In [0]:
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import pandas as pd

from EDA.ValidationData import Validation_data, DataTypeAnalysis
from EDA.StatisticalAnalysis import StatisticalAnalysis
from EDA.PlotGeometryAnalysis import PlotGeometryAnalysis

# Preprocesing Data

In [0]:
#1. Leemos los datos de PROCEESED la tabla Delta
df_delta = spark.read.format("delta").load("/mnt/datalakemlopsd4m/presentation/proyectocongestion_presentation/tablacaracteristicas_congestion_tabladelta")
datos = df_delta.toPandas()
datos.head()

In [0]:
mask = (datos['x'] == 0) & (datos['y'] == 0) & (datos['z'] == 0)
datos[mask]

In [0]:
#Hacer el Balanceo de datos import pandas as pd
import numpy as np
from imblearn.over_sampling import SMOTE
 
# Convertimos a index la fecha
datos = datos.set_index('instant_date_t')

# Eliminamos registros con coordenadas (0,0,0)
mask = (datos['x']>0) & (datos['y']>0) & (datos['z']>0)
datos = datos[mask]

# Eliminacion de variables tipo object
datos = datos.drop(columns=['nombre', 'nombre_equipo'])


# Filtrar las fechas donde congestion == 1
fechas_congestion_1 = datos[datos['congestion'] == 1].index
 
# Generar un rango de fechas completo dentro del rango del DataFrame
fecha_min = datos.index.min()
fecha_max = datos.index.max()
rango_completo = pd.date_range(start=fecha_min, end=fecha_max, freq='S')
 
# Excluir las fechas de congestion == 1 del rango completo
fechas_disponibles = rango_completo.difference(fechas_congestion_1)
 
# Aplicar SMOTE para generar nuevas muestras de la clase minoritaria (congestion == 0)
smote = SMOTE(sampling_strategy='auto', random_state=42)
 
# Usar todas las columnas como características para SMOTE excepto 'congestion'
X = datos.drop(columns=['congestion'])
y = datos['congestion']
 
# Convertir las fechas del índice a números para usarlas con SMOTE
X['instant_date_num'] = X.index.astype(int) / 10**9  # Convertir a segundos
 
# Aplicar SMOTE
X_res, y_res = smote.fit_resample(X, y)
 
# Convertir de nuevo a DataFrame
X_res = pd.DataFrame(X_res, columns=X.columns)
y_res = pd.Series(y_res, name='congestion')
 
# Filtrar las nuevas muestras generadas por SMOTE (la diferencia entre las originales y las nuevas)
nuevas_muestras = X_res[len(datos):]
nuevas_congestion = y_res[len(datos):]
 
# Asignar nuevas fechas secuenciales de fechas_disponibles a las nuevas muestras
nuevas_fechas_asignadas = fechas_disponibles[:len(nuevas_muestras)]
nuevas_muestras['instant_date'] = nuevas_fechas_asignadas
 
# Crear el DataFrame final combinando los datos originales y las nuevas muestras
nuevas_muestras = nuevas_muestras.set_index('instant_date')
nuevas_muestras = nuevas_muestras.drop(columns=['instant_date_num'])
nuevas_muestras['congestion'] = nuevas_congestion.values
 
# Combinar los datos originales y las nuevas muestras
datos_cong_balanceado = pd.concat([datos, nuevas_muestras]).sort_index()

datos_cong_balanceado.index.name = 'instant_date_t'
#datos_cong_balanceado = datos_cong_balanceado.reset_index()

In [0]:
mask = (datos_cong_balanceado['x'] == 0) & (datos_cong_balanceado['y'] == 0) & (datos_cong_balanceado['z'] == 0)
datos_cong_balanceado[mask]

In [0]:
# 3. Convertir el DataFrame de Pandas a un DataFrame de Spark
spark_datos = spark.createDataFrame(datos_cong_balanceado)
 
# 4. Guardar los datos preprocesados en una tabla Delta en el Azure Storage
# Asegurar que las columnas de fecha y hora mantengan sus tipos de datos
#spark_datos = spark_datos.withColumn("Event_Date", col("Event_Date").cast("timestamp"))
#spark_datos = spark_datos.withColumn("instant_date_t", col("instant_date_t").cast("timestamp"))
 
# Nombre de la tabla Delta a guardar
nombre_tabla_delta = "dbproyectocongestion_presentation.tablacaracteristicas_congestion_tabladelta_2"
 
# 4.1 Verificar si ya existe la tabla Delta
if spark.catalog.tableExists(nombre_tabla_delta):
    # Eliminar la tabla Delta existente
    spark.sql("DROP TABLE IF EXISTS " + nombre_tabla_delta)
 
# 4.2 Guardar los datos preprocesados en una tabla Delta
spark_datos.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(nombre_tabla_delta)

Leer Datos Sin Balancear

In [0]:
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import pandas as pd

from EDA.ValidationData import Validation_data, DataTypeAnalysis
from EDA.StatisticalAnalysis import StatisticalAnalysis
from EDA.PlotGeometryAnalysis import PlotGeometryAnalysis

In [0]:
import pandas as pd
#1. Leemos los datos de PROCEESED la tabla Delta 
df_delta = spark.read.format("delta").load("/mnt/datalakemlopsd4m/processed/proyectocongestion_processed/datapreprocessed_congestion_tabladelta")
datos_sin_balanceo = df_delta.toPandas()

In [0]:
# 2. Limpieza de variables No prioritarias
columnas_a_eliminar = ['nombre_equipo','nombre']

# 2.1 Filtrar las columnas que existen en el DataFrame
columnas_existentes = [col for col in columnas_a_eliminar if col in datos_sin_balanceo.columns]

# 2.2 Verificar si hay columnas para eliminar
if columnas_existentes:
    datos_sin_balanceo.drop(columnas_existentes, axis=1, inplace=True)

In [0]:
# Filtramos los valores de x, y, z que son 0s
mask = (datos_sin_balanceo['x'] == 0) & (datos_sin_balanceo['y'] == 0) & (datos_sin_balanceo['z'] == 0)
datos_sin_balanceo = datos_sin_balanceo[~mask]

In [0]:
datos_sin_balanceo['congestion'].value_counts()

In [0]:
# 2. Limpieza de variables No prioritarias
columnas_a_eliminar = ['nombre_equipo','nombre','start_time_alert','end_time_alert','Event_Date']

# 2.1 Filtrar las columnas que existen en el DataFrame
columnas_existentes = [col for col in columnas_a_eliminar if col in datos_sin_balanceo.columns]

# 2.2 Verificar si hay columnas para eliminar
if columnas_existentes:
    datos_sin_balanceo.drop(columnas_existentes, axis=1, inplace=True)

In [0]:
datos = datos_sin_balanceo.copy()

Hacemos el Balanceo de la serie temporal

In [0]:
import pandas as pd
import numpy as np
from imblearn.over_sampling import SMOTE

# Convertimos a index la fecha
datos = datos.set_index('instant_date_t')

# Filtrar las fechas donde congestion == 1
fechas_congestion_1 = datos[datos['congestion'] == 1].index

# Generar un rango de fechas completo dentro del rango del DataFrame
fecha_min = datos.index.min()
fecha_max = datos.index.max()
rango_completo = pd.date_range(start=fecha_min, end=fecha_max, freq='S')

# Excluir las fechas de congestion == 1 del rango completo
fechas_disponibles = rango_completo.difference(fechas_congestion_1)

# Aplicar SMOTE para generar nuevas muestras de la clase minoritaria (congestion == 0)
smote = SMOTE(sampling_strategy='auto', random_state=42)

# Usar todas las columnas como características para SMOTE excepto 'congestion'
X = datos.drop(columns=['congestion'])
y = datos['congestion']

# Convertir las fechas del índice a números para usarlas con SMOTE
X['instant_date_num'] = X.index.astype(int) / 10**9  # Convertir a segundos

# Aplicar SMOTE
X_res, y_res = smote.fit_resample(X, y)

# Convertir de nuevo a DataFrame
X_res = pd.DataFrame(X_res, columns=X.columns)
y_res = pd.Series(y_res, name='congestion')

# Filtrar las nuevas muestras generadas por SMOTE (la diferencia entre las originales y las nuevas)
nuevas_muestras = X_res[len(datos):]
nuevas_congestion = y_res[len(datos):]

# Asignar nuevas fechas secuenciales de fechas_disponibles a las nuevas muestras
nuevas_fechas_asignadas = fechas_disponibles[:len(nuevas_muestras)]
nuevas_muestras['instant_date'] = nuevas_fechas_asignadas

# Crear el DataFrame final combinando los datos originales y las nuevas muestras
nuevas_muestras = nuevas_muestras.set_index('instant_date')
nuevas_muestras = nuevas_muestras.drop(columns=['instant_date_num'])
nuevas_muestras['congestion'] = nuevas_congestion.values

# Combinar los datos originales y las nuevas muestras
datos_balanceados = pd.concat([datos, nuevas_muestras]).sort_index()

In [0]:
datos_balanceados = datos_balanceados.reset_index(names='instant_date_t')

# 3. Convertir el DataFrame de Pandas a un DataFrame de Spark
spark_datos = spark.createDataFrame(datos_cong_balanceado)
 
# 4. Guardar los datos preprocesados en una tabla Delta en el Azure Storage
# Asegurar que las columnas de fecha y hora mantengan sus tipos de datos
#spark_datos = spark_datos.withColumn("Event_Date", col("Event_Date").cast("timestamp"))
#spark_datos = spark_datos.withColumn("instant_date_t", col("instant_date_t").cast("timestamp"))
 
# Nombre de la tabla Delta a guardar
nombre_tabla_delta = "dbproyectocongestion_presentation.tablacaracteristicas_congestion_tabladelta_v3"
 
# 4.1 Verificar si ya existe la tabla Delta
if spark.catalog.tableExists(nombre_tabla_delta):
    # Eliminar la tabla Delta existente
    spark.sql("DROP TABLE IF EXISTS " + nombre_tabla_delta)
 
# 4.2 Guardar los datos preprocesados en una tabla Delta
spark_datos.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(nombre_tabla_delta)