In [None]:
# Ingestión y Preparación de Datos

from pyspark.sql import SparkSession

# Inicializar la sesión de Spark
spark = SparkSession.builder.appName("DataIngestion").getOrCreate()

# Cargar datos desde Azure Blob Storage
data_path = "<ruta_a_tu_almacenamiento_blob>"
raw_data = spark.read.csv(data_path, header=True, inferSchema=True)

# Mostrar una vista previa de los datos
raw_data.show()

# Realizar transformaciones básicas
data_cleaned = raw_data.dropna()
data_cleaned = data_cleaned.dropDuplicates()

# Guardar los datos limpiados en formato Parquet
cleaned_data_path = "<ruta_a_tu_almacenamiento_para_datos_limpiados>"
data_cleaned.write.parquet(cleaned_data_path)

# Finalizar la sesión de Spark
spark.stop()

In [None]:
# Notebook 2: Transformaciones y Acciones en Apache Spark

from pyspark.sql import SparkSession

# Inicializar la sesión de Spark
spark = SparkSession.builder.appName("DataTransformations").getOrCreate()

# Leer los datos limpiados
data_cleaned_path = "<ruta_a_tu_almacenamiento_para_datos_limpiados>"
data = spark.read.parquet(data_cleaned_path)

# Aplicar transformaciones
data_transformed = data.withColumn("new_column", data["existing_column"] * 2)

# Realizar acciones y mostrar resultados
data_transformed.show()

# Guardar los datos transformados
transformed_data_path = "<ruta_a_tu_almacenamiento_para_datos_transformados>"
data_transformed.write.parquet(transformed_data_path)

# Finalizar la sesión de Spark
spark.stop()

In [None]:
# Notebook 3: Entrenamiento y Evaluación de un Modelo de ML

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Inicializar la sesión de Spark
spark = SparkSession.builder.appName("MLModelTraining").getOrCreate()

# Leer los datos transformados
data_transformed_path = "<ruta_a_tu_almacenamiento_para_datos_transformados>"
data = spark.read.parquet(data_transformed_path)

# Preparar los datos para ML
feature_columns = ["feature1", "feature2"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data_ml = assembler.transform(data)

# Dividir los datos en conjuntos de entrenamiento y prueba
train_data, test_data = data_ml.randomSplit([0.8, 0.2], seed=42)

# Entrenar el modelo
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Evaluar el modelo
evaluation = model.evaluate(test_data)
print(f"RMSE: {evaluation.rootMeanSquaredError}")

# Finalizar la sesión de Spark
spark.stop()


In [None]:
# Notebook 4: Implementación de Delta Lake y Streaming Estructurado

from pyspark.sql import SparkSession

# Inicializar la sesión de Spark
spark = SparkSession.builder.appName("DeltaLakeStreaming").getOrCreate()

# Definir ruta de almacenamiento Delta
delta_path = "<ruta_a_tu_almacenamiento_delta_lake>"

# Lectura continua de datos en streaming
data_stream = spark.readStream.format("csv").option("header", "true").load("<ruta_a_tu_fuente_streaming>")

# Escribir los datos en Delta Lake
data_stream.writeStream.format("delta").option("checkpointLocation", "<ruta_a_tu_checkpoint>").start(delta_path)

# Leer y mostrar datos de Delta Lake
data_delta = spark.read.format("delta").load(delta_path)
data_delta.show()

# Finalizar la sesión de Spark
spark.stop()