# Notebook 01: Ingestion from HDFS using Spark

**TerraFlow Analytics - Big Data Assessment**

This notebook demonstrates distributed data processing using PySpark to load raw GTFS data from HDFS, inspect it using Spark DataFrames, and save a bronze layer for further processing.

**Requirements Addressed:**
1. **Distributed data processing with PySpark**: Loading and parsing large GTFS files.
2. **Scalable storage with HDFS**: Reading from and writing to HDFS.
3. **Use Spark DataFrame and RDDs**: Handling temporal and spatial attributes.

**FAST VERSION** - Optimized for quick execution with robust fallback

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType
)

# -----------------------------
# Configuration (HDFS + Paths)
# -----------------------------
HDFS_NAMENODE = "hdfs://namenode:9000"

RAW_DATA_PATH = f"{HDFS_NAMENODE}/terraflow/data/raw/gtfs_data.csv"
BRONZE_OUTPUT_PATH = f"{HDFS_NAMENODE}/terraflow/data/processed/gtfs_bronze.parquet"

print("‚úÖ Config loaded")
print("NameNode:", HDFS_NAMENODE)
print("Raw Path:", RAW_DATA_PATH)
print("Bronze  :", BRONZE_OUTPUT_PATH)

‚úÖ Config loaded
NameNode: hdfs://namenode:9000
Raw Path: hdfs://namenode:9000/terraflow/data/raw/gtfs_data.csv
Bronze  : hdfs://namenode:9000/terraflow/data/processed/gtfs_bronze.parquet


In [2]:
# Initialize Spark Session (LOCAL MODE - Fast & Professional)
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

print("Initializing Spark session...")

# Stop any existing session
try:
    if 'spark' in globals() and spark is not None:
        spark.stop()
except Exception:
    pass

try:
    SparkSession._instantiatedSession = None
except Exception:
    pass


spark = (
    SparkSession.builder
    .appName("TerraFlow_Ingestion_Bronze")
    .master("local[4]")  # 4 parallel threads for distributed processing
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000")
    
    # Performance optimization
    .config("spark.driver.memory", "2g")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.default.parallelism", "4")
    
    # HDFS connection settings
    .config("spark.hadoop.dfs.client.use.datanode.hostname", "true")
    
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

print("=" * 70)
print("SPARK SESSION INITIALIZED")
print("=" * 70)
print("Spark Version :", spark.version)
print("defaultFS     :", spark._jsc.hadoopConfiguration().get("fs.defaultFS"))
print("Parallelism   :", spark.sparkContext.defaultParallelism)
print("=" * 70)

Initializing Spark session...
SPARK SESSION INITIALIZED
Spark Version : 3.3.2
defaultFS     : hdfs://namenode:9000
Parallelism   : 4


In [3]:
# HDFS Helper Functions
def hdfs_fs():
    jvm = spark._jvm
    hconf = spark._jsc.hadoopConfiguration()
    return jvm.org.apache.hadoop.fs.FileSystem.get(hconf)

def hdfs_exists(path: str) -> bool:
    try:
        fs = hdfs_fs()
        jvm = spark._jvm
        return fs.exists(jvm.org.apache.hadoop.fs.Path(path))
    except Exception as e:
        print(f"  Error checking HDFS: {e}")
        return False

def hdfs_list(path: str, limit: int = 20):
    try:
        fs = hdfs_fs()
        jvm = spark._jvm
        p = jvm.org.apache.hadoop.fs.Path(path)
        if not fs.exists(p):
            print(f" HDFS path not found: {path}")
            return
        statuses = fs.listStatus(p)
        print(f"‚úÖ HDFS listing for: {path}")
        for i, st in enumerate(statuses):
            if i >= limit:
                print("... (truncated)")
                break
            print(" -", st.getPath().toString())
    except Exception as e:
        print(f"‚ö†Ô∏è  Error listing HDFS: {e}")

print(" Checking raw folder in HDFS:")
hdfs_list(f"{HDFS_NAMENODE}/terraflow/data/raw")

# ROBUST DATA LOADING STRATEGY
print("\n Checking data availability...")
USE_LOCAL_FALLBACK = False

if hdfs_exists(RAW_DATA_PATH):
    print("‚úÖ Raw CSV found in HDFS.")
else:
    print("‚ö†Ô∏è Raw file not found in HDFS (gtfs_data.csv). Checking local fallback...")
    # Check mounted volume
    import os
    local_path_1 = "/home/jovyan/work/data/raw/CPS6005-Assessment 2_GTFS_Data.csv"
    local_path_2 = "/home/jovyan/work/data/raw/gtfs_data.csv"
    
    if os.path.exists(local_path_1):
        RAW_DATA_PATH = f"file://{local_path_1}"
        USE_LOCAL_FALLBACK = True
        print(f" Found local fallback: {local_path_1}")
    elif os.path.exists(local_path_2):
        RAW_DATA_PATH = f"file://{local_path_2}"
        USE_LOCAL_FALLBACK = True
        print(f"‚úÖ Found local fallback: {local_path_2}")
    else:
        raise FileNotFoundError(" Data file not found in HDFS OR local volume! Please upload data.")

 Checking raw folder in HDFS:
‚úÖ HDFS listing for: hdfs://namenode:9000/terraflow/data/raw
 - hdfs://namenode:9000/terraflow/data/raw/gtfs_data.csv

 Checking data availability...
‚úÖ Raw CSV found in HDFS.


In [4]:
# Define schema for faster parsing
schema = StructType([
    StructField("stop_id_from", IntegerType(), True),
    StructField("stop_id_to", IntegerType(), True),
    StructField("trip_id", StringType(), True),
    StructField("arrival_time", StringType(), True),
    StructField("time", DoubleType(), True),
    StructField("speed", StringType(), True),
    StructField("Number_of_trips", IntegerType(), True),
    StructField("SRI", StringType(), True),
    StructField("Degree_of_congestion", StringType(), True),
])

print(f"üì• Reading CSV from: {RAW_DATA_PATH}")
df = spark.read.csv(RAW_DATA_PATH, header=True, schema=schema)

print("‚úÖ DataFrame loaded")
print("Columns:", len(df.columns))
print("Partitions:", df.rdd.getNumPartitions())

üì• Reading CSV from: hdfs://namenode:9000/terraflow/data/raw/gtfs_data.csv
‚úÖ DataFrame loaded
Columns: 9
Partitions: 2


In [5]:
print("\n‚úÖ Schema:")
df.printSchema()

print("\n‚úÖ Sample rows:")
df.show(5, truncate=False)

# Cache for performance
df = df.cache()
total_rows = df.count()

print(f"\n‚úÖ Total rows: {total_rows:,}")


‚úÖ Schema:
root
 |-- stop_id_from: integer (nullable = true)
 |-- stop_id_to: integer (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- time: double (nullable = true)
 |-- speed: string (nullable = true)
 |-- Number_of_trips: integer (nullable = true)
 |-- SRI: string (nullable = true)
 |-- Degree_of_congestion: string (nullable = true)


‚úÖ Sample rows:
+------------+----------+------------------------------------------------------------+------------+-----------+-----------+---------------+-----------+--------------------+
|stop_id_from|stop_id_to|trip_id                                                     |arrival_time|time       |speed      |Number_of_trips|SRI        |Degree_of_congestion|
+------------+----------+------------------------------------------------------------+------------+-----------+-----------+---------------+-----------+--------------------+
|36156       |38709     |NORMAL_333_Pune Station To  Hinjawadi Ma

In [6]:
# Null check on key columns
key_cols = ["speed", "SRI", "Degree_of_congestion"]
nulls = (
    df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in key_cols])
      .collect()[0]
      .asDict()
)

print("\n Null counts (key columns):")
for k, v in nulls.items():
    print(f"{k}: {v}")


 Null counts (key columns):
speed: 258
SRI: 313
Degree_of_congestion: 0


In [7]:
# ========================================
# RDD Operations 
# ========================================
print("\n" + "=" * 70)
print("RDD OPERATIONS ")
print("=" * 70)

rdd = df.rdd
print("\n RDD created from DataFrame")
print("   RDD partitions:", rdd.getNumPartitions())

# RDD Operation 1: Take sample rows
sample_rows = rdd.take(3)
print("\n RDD Operation 1: take(3) - First 3 rows:")
for i, row in enumerate(sample_rows, 1):
    print(f"   Row {i}: {row}")

# RDD Operation 2: Map transformation
print("\n RDD Operation 2: mapPartitions() - Count rows per partition:")
part_sizes = rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()
for i, size in enumerate(part_sizes):
    print(f"   Partition {i}: {size:,} rows")

# RDD Operation 3: Filter transformation
print("\n RDD Operation 3: filter() - Count high congestion records:")
high_congestion_count = rdd.filter(
    lambda row: row.Degree_of_congestion == "High" if row.Degree_of_congestion else False
).count()
print(f"   High congestion records: {high_congestion_count:,}")

print("\n" + "=" * 70)
print(" RDD operations demonstrated successfully!")
print("=" * 70)


RDD OPERATIONS 

 RDD created from DataFrame
   RDD partitions: 2

 RDD Operation 1: take(3) - First 3 rows:
   Row 1: Row(stop_id_from=36156, stop_id_to=38709, trip_id='NORMAL_333_Pune Station To  Hinjawadi Maan Phase 3_Up-0855_0', arrival_time='09:13:54', time=0.027222222, speed='14.47956475', Number_of_trips=9, SRI='-0.40816322', Degree_of_congestion='Very smooth')
   Row 2: Row(stop_id_from=36156, stop_id_to=38709, trip_id='NORMAL_115P_Pune Station to Hinjawadi Phase 3_Up-0845_0', arrival_time='09:03:01', time=0.032222222, speed='12.23273572', Number_of_trips=9, SRI='1.2068965', Degree_of_congestion='Smooth')
   Row 3: Row(stop_id_from=36156, stop_id_to=38709, trip_id='NORMAL_100_Ma Na Pa to Hinjawadi Maan Phase 3_Up-0915_0', arrival_time='09:15:00', time=0.058333333, speed='6.7571302', Number_of_trips=9, SRI='5.142857', Degree_of_congestion='Heavy congestion')

 RDD Operation 2: mapPartitions() - Count rows per partition:
   Partition 0: 34,373 rows
   Partition 1: 32,540 rows

 

In [8]:
print("\n Writing Bronze dataset to HDFS:", BRONZE_OUTPUT_PATH)

# Write to HDFS in Parquet format
(
    df.coalesce(4)
      .write
      .mode("overwrite")
      .parquet(BRONZE_OUTPUT_PATH)
)

print(" Bronze write complete")

print("\n Listing processed folder:")
hdfs_list(f"{HDFS_NAMENODE}/terraflow/data/processed")

if not hdfs_exists(BRONZE_OUTPUT_PATH):
    raise RuntimeError(" Bronze output not found in HDFS after write")

# Verify by reading back
df_bronze = spark.read.parquet(BRONZE_OUTPUT_PATH)

print("\n Bronze sample:")
df_bronze.show(5, truncate=False)

bronze_rows = df_bronze.count()
print("\n" + "=" * 70)
print("BRONZE LAYER VERIFICATION")
print("=" * 70)
print(f"Raw rows   : {total_rows:,}")
print(f"Bronze rows: {bronze_rows:,}")

if bronze_rows == total_rows:
    print(" SUCCESS: Bronze output matches raw row count.")
else:
    print(" WARNING: Row counts differ")
print("=" * 70)


 Writing Bronze dataset to HDFS: hdfs://namenode:9000/terraflow/data/processed/gtfs_bronze.parquet
 Bronze write complete

 Listing processed folder:
‚úÖ HDFS listing for: hdfs://namenode:9000/terraflow/data/processed
 - hdfs://namenode:9000/terraflow/data/processed/gtfs_bronze.parquet
 - hdfs://namenode:9000/terraflow/data/processed/gtfs_silver.parquet
 - hdfs://namenode:9000/terraflow/data/processed/route_stats.parquet

 Bronze sample:
+------------+----------+------------------------------------------------------------------+------------+-----------+-----------+---------------+------------+--------------------+
|stop_id_from|stop_id_to|trip_id                                                           |arrival_time|time       |speed      |Number_of_trips|SRI         |Degree_of_congestion|
+------------+----------+------------------------------------------------------------------+------------+-----------+-----------+---------------+------------+--------------------+
|34871       |337

In [9]:
print("\n" + "=" * 70)
print("‚úÖ NOTEBOOK 01 COMPLETE!")
print("=" * 70)
print("\nSummary:")
print(f"  ‚Ä¢ Loaded {total_rows:,} records from HDFS")
print(f"  ‚Ä¢ Created Bronze layer in Parquet format")
print(f"  ‚Ä¢ Demonstrated DataFrame operations")
print(f"  ‚Ä¢ Demonstrated RDD operations")
print(f"  ‚Ä¢ Used HDFS for scalable storage")
print("\n‚û°Ô∏è  Proceed to Notebook 02: Data Cleaning")
print("=" * 70)



‚úÖ NOTEBOOK 01 COMPLETE!

Summary:
  ‚Ä¢ Loaded 66,913 records from HDFS
  ‚Ä¢ Created Bronze layer in Parquet format
  ‚Ä¢ Demonstrated DataFrame operations
  ‚Ä¢ Demonstrated RDD operations
  ‚Ä¢ Used HDFS for scalable storage

‚û°Ô∏è  Proceed to Notebook 02: Data Cleaning
