In [0]:
# Verificando la versión de Spark para el portafolio
print(f"Spark Version: {spark.version}")

# Prueba de creación de un pequeño DataFrame
data = [("Pipeline_01", "Pressure_High"), ("Pipeline_02", "Normal")]
df_test = spark.createDataFrame(data, ["Pipe_ID", "Status"])
display(df_test)

In [0]:
# 1. Definir la ruta que copiaste
csv_path = "/Volumes/workspace/default/raw_data/market_pipe_thickness_loss_dataset.csv"

# 2. Leer el dataset con Spark
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_path)

# 3. Mostrar los primeros resultados para el portafolio
print("Dataset de Tuberías cargado exitosamente:")
display(df.limit(10))

In [0]:
# Un pequeño vistazo para el estadista (Marcel)
print("Resumen de variables críticas para integridad de tuberías:")
df.select("Thickness_mm", "Max_Pressure_psi", "Temperature_C", "Corrosion_Impact_Percent").summary().show()

# Verificación de datos nulos (Calidad de Datos)
from pyspark.sql.functions import col, count, when
print("Reporte de Calidad de Datos (Nulos):")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [0]:
from pyspark.sql import functions as F

# Renombrando columnas para estandarizar el dataset
df_clean = df.withColumnRenamed("Thickness_mm", "thickness_mm") \
             .withColumnRenamed("Max_Pressure_psi", "max_pressure_psi") \
             .withColumnRenamed("Temperature_C", "temperature_c") \
             .withColumnRenamed("Corrosion_Impact_Percent", "corrosion_impact_pct") \
             .withColumnRenamed("Thickness_Loss_mm", "thickness_loss_mm")

# Crear una columna calculada de ejemplo: Porcentaje de vida útil restante
df_final = df_clean.withColumn("remaining_life_pct", 100 - F.col("corrosion_impact_pct"))

print("Columnas actualizadas exitosamente:")
df_final.select("thickness_mm", "max_pressure_psi", "remaining_life_pct").show(5)