# Configuracion del pipeline

In [4]:
%pip install azure-storage-blob


Note: you may need to restart the kernel to use updated packages.


In [None]:

from azure.storage.blob import BlobServiceClient
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, when, lit
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import os

In [20]:
import os
from azure.storage.blob import BlobServiceClient

# Configurar conexión a Azure Blob Storage
CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=etlbasesavanzadas;AccountKey=Q43XAlHcXaXgyet05FvwLBntN8EoP2Dx59g8jwmLD+Ox345nFPLAJDanZb5c3M+R0JnKt8Crg7S5+AStYU9JAw==;EndpointSuffix=core.windows.net"

blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING)
landing_zone_container = blob_service_client.get_container_client("landing-zone")

# Crear el contenedor si no existe
if not landing_zone_container.exists():
    print("El contenedor 'landing-zone' no existe. Creándolo...")
    blob_service_client.create_container("landing-zone")
    print("Contenedor 'landing-zone' creado exitosamente.")

# Ruta local donde están los archivos
local_folder_path = r"C:\Users\diego\DataspellProjects\air-quality-etl\data\landing-zone"

# Subir cada archivo de la carpeta local al contenedor
for file_name in os.listdir(local_folder_path):
    file_path = os.path.join(local_folder_path, file_name)
    
    if os.path.isfile(file_path):  # Asegúrate de que sea un archivo
        blob_client = landing_zone_container.get_blob_client(file_name)
        print(f"Subiendo {file_name} a Azure Blob Storage...")
        
        with open(file_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True)
        
        print(f"{file_name} subido exitosamente.")

print("Todos los archivos han sido subidos.")


El contenedor 'landing-zone' no existe. Creándolo...
Contenedor 'landing-zone' creado exitosamente.
Subiendo city_day.csv a Azure Blob Storage...
city_day.csv subido exitosamente.
Subiendo city_hour.csv a Azure Blob Storage...
city_hour.csv subido exitosamente.
Subiendo stations.csv a Azure Blob Storage...
stations.csv subido exitosamente.
Subiendo station_day.csv a Azure Blob Storage...
station_day.csv subido exitosamente.
Subiendo station_hour.csv a Azure Blob Storage...
station_hour.csv subido exitosamente.
Todos los archivos han sido subidos.


In [6]:
# Crear sesión de Spark
spark = SparkSession.builder.appName("Air Quality ETL").getOrCreate()

# Definir directorios
landing_zone_path = r"C:\Users\diego\DataspellProjects\air-quality-etl\data\landing-zone"
raw_zone_path = r"C:\Users\diego\DataspellProjects\air-quality-etl\data\raw-zone"


In [7]:
# Listar todos los archivos CSV en el landing zone
file_paths = [os.path.join(landing_zone_path, f) for f in os.listdir(landing_zone_path) if f.endswith('.csv')]
print(file_paths)

['C:\\Users\\diego\\DataspellProjects\\air-quality-etl\\data\\landing-zone\\city_day.csv', 'C:\\Users\\diego\\DataspellProjects\\air-quality-etl\\data\\landing-zone\\city_hour.csv', 'C:\\Users\\diego\\DataspellProjects\\air-quality-etl\\data\\landing-zone\\stations.csv', 'C:\\Users\\diego\\DataspellProjects\\air-quality-etl\\data\\landing-zone\\station_day.csv', 'C:\\Users\\diego\\DataspellProjects\\air-quality-etl\\data\\landing-zone\\station_hour.csv']


In [12]:
def apply_imputation_rules(df):
    from pyspark.sql.functions import mean, when, col, lit
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.regression import LinearRegression
    from pyspark.ml.clustering import KMeans
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType

    # 1. Imputar PM2.5 con la media
    pm25_mean = df.select(mean(col("`PM2.5`"))).collect()[0][0]
    df = df.withColumn("PM2.5", when(col("`PM2.5`").isNull(), lit(pm25_mean)).otherwise(col("`PM2.5`")))

    # 2. Imputar PM10 con Regresión Lineal usando PM2.5
    complete_rows_pm10 = df.filter(col("`PM2.5`").isNotNull() & col("`PM10`").isNotNull())
    missing_rows_pm10 = df.filter(col("`PM10`").isNull() & col("`PM2.5`").isNotNull())

    if not missing_rows_pm10.rdd.isEmpty():
        assembler_pm10 = VectorAssembler(inputCols=["PM2.5"], outputCol="features")
        train_data_pm10 = assembler_pm10.transform(complete_rows_pm10).select("features", "PM10")
        
        lr_pm10 = LinearRegression(featuresCol="features", labelCol="PM10")
        lr_model_pm10 = lr_pm10.fit(train_data_pm10)
        
        test_data_pm10 = assembler_pm10.transform(missing_rows_pm10)
        predictions_pm10 = lr_model_pm10.transform(test_data_pm10).select("prediction").rdd.map(lambda row: row[0]).collect()

        for i, row in enumerate(missing_rows_pm10.collect()):
            df = df.withColumn("PM10", when(col("PM10").isNull(), lit(predictions_pm10[i])).otherwise(col("PM10")))

    # 3. Imputar AQI con Regresión Lineal usando PM2.5 y PM10
    complete_rows_aqi = df.filter(col("`PM2.5`").isNotNull() & col("`PM10`").isNotNull() & col("`AQI`").isNotNull())
    missing_rows_aqi = df.filter(col("`AQI`").isNull() & col("`PM2.5`").isNotNull() & col("`PM10`").isNotNull())

    if not missing_rows_aqi.rdd.isEmpty():
        assembler_aqi = VectorAssembler(inputCols=["PM2.5", "PM10"], outputCol="features")
        train_data_aqi = assembler_aqi.transform(complete_rows_aqi).select("features", "AQI")
        
        lr_aqi = LinearRegression(featuresCol="features", labelCol="AQI")
        lr_model_aqi = lr_aqi.fit(train_data_aqi)
        
        test_data_aqi = assembler_aqi.transform(missing_rows_aqi)
        predictions_aqi = lr_model_aqi.transform(test_data_aqi).select("prediction").rdd.map(lambda row: row[0]).collect()

        for i, row in enumerate(missing_rows_aqi.collect()):
            df = df.withColumn("AQI", when(col("AQI").isNull(), lit(predictions_aqi[i])).otherwise(col("AQI")))

    # 4. Imputar AQI_Bucket basándose en AQI
    def calculate_bucket(aqi):
        if aqi <= 50:
            return "Good"
        elif aqi <= 100:
            return "Satisfactory"
        elif aqi <= 200:
            return "Moderate"
        elif aqi <= 300:
            return "Poor"
        elif aqi <= 400:
            return "Very Poor"
        else:
            return "Severe"

    bucket_udf = udf(calculate_bucket, StringType())
    df = df.withColumn("AQI_Bucket", when(col("AQI_Bucket").isNull(), bucket_udf(col("AQI"))).otherwise(col("AQI_Bucket")))

    # 5. Imputar otros valores con KMeans
    columns_for_clustering = ["PM2.5", "PM10", "AQI"]
    assembler_cluster = VectorAssembler(inputCols=columns_for_clustering, outputCol="features")
    cluster_data = assembler_cluster.transform(df.dropna(subset=columns_for_clustering))
    
    kmeans = KMeans(k=3, seed=1)
    kmeans_model = kmeans.fit(cluster_data)
    cluster_centers = kmeans_model.clusterCenters()

    # Asignar valores faltantes con centroides del cluster más cercano
    df = df.withColumn("cluster", kmeans_model.transform(df).select("prediction"))
    
    return df


In [16]:
spark = SparkSession.builder \
    .appName("ETL Processing") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()


In [17]:
spark.sparkContext.setLogLevel("DEBUG")

for file_path in file_paths:
    print(f"Procesando archivo: {file_path}")

    # Leer archivo CSV
    df = spark.read.csv(file_path, header=True, inferSchema=True)

    # Verificar el esquema del archivo cargado
    print(f"Esquema del archivo {file_path}:")
    df.printSchema()

    # Aplicar reglas de imputación
    try:
        df = apply_imputation_rules(df)
    except Exception as e:
        print(f"Error al aplicar reglas de imputación en {file_path}: {e}")
        continue  # Saltar al siguiente archivo si ocurre un error

    # Guardar archivo procesado en formato Parquet
    output_file_path = os.path.join(raw_zone_path, os.path.basename(file_path).replace(".csv", ".parquet"))
    try:
        df.write.parquet(output_file_path, mode="overwrite")
        print(f"Archivo procesado y guardado en: {output_file_path}")
    except Exception as e:
        print(f"Error al guardar archivo en formato Parquet: {e}")
.0


Procesando archivo: C:\Users\diego\DataspellProjects\air-quality-etl\data\landing-zone\city_day.csv
Esquema del archivo C:\Users\diego\DataspellProjects\air-quality-etl\data\landing-zone\city_day.csv:
root
 |-- City: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- PM2.5: double (nullable = true)
 |-- PM10: double (nullable = true)
 |-- NO: double (nullable = true)
 |-- NO2: double (nullable = true)
 |-- NOx: double (nullable = true)
 |-- NH3: double (nullable = true)
 |-- CO: double (nullable = true)
 |-- SO2: double (nullable = true)
 |-- O3: double (nullable = true)
 |-- Benzene: double (nullable = true)
 |-- Toluene: double (nullable = true)
 |-- Xylene: double (nullable = true)
 |-- AQI: double (nullable = true)
 |-- AQI_Bucket: string (nullable = true)

Error al aplicar reglas de imputación en C:\Users\diego\DataspellProjects\air-quality-etl\data\landing-zone\city_day.csv: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.

0.0

In [10]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

PySparkTypeError: [NOT_ITERABLE] Column is not iterable.