# PySpark ETL: Pipeline & Logistics Data Integration

## Objective
Load, join, and cleanse data from PostgreSQL (Inspection Logs) and MongoDB (Asset Telemetry). 
Aggregate sensor data to match inspection windows and persist as Parquet.

In [1]:
import os
import sys
# Set environment variables for PySpark to use the correct Python executable
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Set HADOOP_HOME to the local hadoop directory
os.environ['HADOOP_HOME'] = os.path.abspath('../hadoop')
os.environ['PATH'] += os.pathsep + os.path.join(os.environ['HADOOP_HOME'], 'bin')

# Set environment variables for PySpark to use the correct Python executable

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, avg, max, min

# Initialize Spark Session
# Note: In a real environment, you would need to include the necessary JARs for Postgres and MongoDB connectors
# .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.postgresql:postgresql:42.2.18")
spark = SparkSession.builder \
    .appName("PTT_Pipeline_Logistics_ETL") \
    .getOrCreate()

print("Spark Session Created")

Spark Session Created


In [2]:
# 1. Load Data from PostgreSQL (Mocked)
import random
from datetime import datetime, timedelta

inspection_data = []
for i in range(1, 21):  # Generate 20 segments
    seg_id = f"SEG-{100+i}"
    insp_date = (datetime(2023, 10, 1) + timedelta(days=random.randint(0, 30))).strftime("%Y-%m-%d")
    inspector = f"INS-{random.randint(1, 5):02d}"
    crack = random.choice([True, False])
    corrosion = round(random.uniform(0, 20), 1)
    maint_req = crack or (corrosion > 10)
    inspection_data.append((i, seg_id, insp_date, inspector, crack, corrosion, maint_req))

columns = ["inspection_id", "pipeline_segment_id", "inspection_date", "inspector_id", "crack_detected", "corrosion_level", "maintenance_required"]

df_inspections = spark.createDataFrame(inspection_data, columns)
df_inspections.show(5)

+-------------+-------------------+---------------+------------+--------------+---------------+--------------------+
|inspection_id|pipeline_segment_id|inspection_date|inspector_id|crack_detected|corrosion_level|maintenance_required|
+-------------+-------------------+---------------+------------+--------------+---------------+--------------------+
|            1|            SEG-101|     2023-10-14|      INS-05|          true|           19.6|                true|
|            2|            SEG-102|     2023-10-09|      INS-04|         false|           19.1|                true|
|            3|            SEG-103|     2023-10-17|      INS-04|          true|           11.1|                true|
|            4|            SEG-104|     2023-10-17|      INS-01|         false|           11.6|                true|
|            5|            SEG-105|     2023-10-29|      INS-05|         false|           17.2|                true|
+-------------+-------------------+---------------+------------+

In [3]:
# 2. Load Data from MongoDB (Mocked)
telemetry_data = []
for i in range(1, 21):  # Generate telemetry for ALL 20 segments
    seg_id = f"SEG-{100+i}"
    # Generate 5 readings per segment
    for j in range(5):
        ts = (datetime(2023, 10, 1, 10, 0) + timedelta(hours=j)).strftime("%Y-%m-%d %H:%M:%S")
        pressure = round(random.uniform(800, 1200), 1)
        temp = round(random.uniform(25, 35), 1)
        vibration = round(random.uniform(0, 5), 2)
        telemetry_data.append((seg_id, ts, pressure, temp, vibration))

telemetry_cols = ["pipeline_segment_id", "timestamp", "pressure_psi", "temperature_c", "vibration_level"]

df_telemetry = spark.createDataFrame(telemetry_data, telemetry_cols)
df_telemetry = df_telemetry.withColumn("timestamp", col("timestamp").cast("timestamp"))
df_telemetry.show(5)

+-------------------+-------------------+------------+-------------+---------------+
|pipeline_segment_id|          timestamp|pressure_psi|temperature_c|vibration_level|
+-------------------+-------------------+------------+-------------+---------------+
|            SEG-101|2023-10-01 10:00:00|       815.7|         29.9|           0.53|
|            SEG-101|2023-10-01 11:00:00|       917.9|         31.3|           1.56|
|            SEG-101|2023-10-01 12:00:00|       989.3|         27.8|           3.89|
|            SEG-101|2023-10-01 13:00:00|      1115.4|         27.3|           2.71|
|            SEG-101|2023-10-01 14:00:00|      1023.1|         33.3|           1.17|
+-------------------+-------------------+------------+-------------+---------------+
only showing top 5 rows


In [4]:
# 3. Aggregation & Join
# Aggregate telemetry by segment and day (simplified)

df_telemetry_agg = df_telemetry.groupBy("pipeline_segment_id") \
    .agg(
        avg("pressure_psi").alias("avg_pressure"),
        avg("temperature_c").alias("avg_temp"),
        max("vibration_level").alias("max_vibration")
    )

# Join with Inspections
df_final = df_inspections.join(df_telemetry_agg, "pipeline_segment_id", "left")
df_final.show()

+-------------------+-------------+---------------+------------+--------------+---------------+--------------------+------------------+------------------+-------------+
|pipeline_segment_id|inspection_id|inspection_date|inspector_id|crack_detected|corrosion_level|maintenance_required|      avg_pressure|          avg_temp|max_vibration|
+-------------------+-------------+---------------+------------+--------------+---------------+--------------------+------------------+------------------+-------------+
|            SEG-101|            1|     2023-10-14|      INS-05|          true|           19.6|                true|            972.28|29.919999999999998|         3.89|
|            SEG-103|            3|     2023-10-17|      INS-04|          true|           11.1|                true|1015.5600000000001|28.839999999999996|         3.65|
|            SEG-102|            2|     2023-10-09|      INS-04|         false|           19.1|                true| 976.9799999999999|              28.6| 

In [5]:
# 4. Persist to Parquet
output_path = "../data/processed_pipeline_data.parquet"
df_final.write.mode("overwrite").parquet(output_path)
print(f"Data saved to {output_path}")

Data saved to ../data/processed_pipeline_data.parquet
