In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ReadFromHDFS") \
    .getOrCreate()

# Read CSV file from HDFS
df = spark.read.option("header", "true").csv("hdfs://localhost:9000/input/bigdataproject/*.csv")

# Show results
df.show(5)


ERROR:root:KeyboardInterrupt while sending command.                (0 + 0) / 60]
Traceback (most recent call last):
  File "/home/aishanee/anaconda3/envs/vespa/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/aishanee/anaconda3/envs/vespa/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/aishanee/anaconda3/envs/vespa/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, month, when, col
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans

# Initialize Spark
spark = SparkSession.builder.appName("FlightDelayFullCluster").getOrCreate()

# Load data from HDFS
df = spark.read.option("header", "true").csv("hdfs://localhost:9000/input/bigdataproject/*.csv")

# Step 4: Filter out cancelled flights
df = df.filter("Cancelled = 0 OR Cancelled IS NULL")

# Delay columns to cast
delay_cols = [
    "DepDelayMinutes", "ArrDelayMinutes", "WeatherDelay",
    "CarrierDelay", "NASDelay", "LateAircraftDelay"
]
for colname in delay_cols:
    df = df.withColumn(colname, col(colname).cast("float"))

# Select columns
df = df.select("Origin", "Dest", "Reporting_Airline", "FlightDate", "DayOfWeek", *delay_cols)

# Add season column
df = df.withColumn("Month", month("FlightDate").cast("int"))
df = df.withColumn("Season", when(col("Month").isin([12, 1, 2]), "Winter")
                              .when(col("Month").isin([3, 4, 5]), "Spring")
                              .when(col("Month").isin([6, 7, 8]), "Summer")
                              .otherwise("Fall"))

#Fill missing values with 0 as null implies no delay
df = df.fillna(0)
# Group and average delay metrics
df_grouped = df.groupBy("Origin", "Dest", "Reporting_Airline", "Season", "DayOfWeek").agg(
    avg("DepDelayMinutes").alias("AvgDepDelay"),
    avg("ArrDelayMinutes").alias("AvgArrDelay"),
    avg("WeatherDelay").alias("AvgWeatherDelay"),
    avg("CarrierDelay").alias("AvgCarrierDelay"),
    avg("NASDelay").alias("AvgNASDelay"),
    avg("LateAircraftDelay").alias("AvgLateAircraftDelay")
)

# Encode categorical features
indexers = [
    StringIndexer(inputCol="Origin", outputCol="OriginIdx"),
    StringIndexer(inputCol="Dest", outputCol="DestIdx"),
    StringIndexer(inputCol="Reporting_Airline", outputCol="AirlineIdx"),
    StringIndexer(inputCol="Season", outputCol="SeasonIdx")
]
for indexer in indexers:
    df_grouped = indexer.fit(df_grouped).transform(df_grouped)

# Assemble vector
assembler = VectorAssembler(
    inputCols=[
        "OriginIdx", "DestIdx", "AirlineIdx", "SeasonIdx", "DayOfWeek",
        "AvgDepDelay", "AvgArrDelay", "AvgWeatherDelay",
        "AvgCarrierDelay", "AvgNASDelay", "AvgLateAircraftDelay"
    ],
    outputCol="features"
)
df_vector = assembler.transform(df_grouped)

# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=True, withStd=True)
df_scaled = scaler.fit(df_vector).transform(df_vector)

# Cluster
kmeans = KMeans(k=5, seed=42, featuresCol="scaledFeatures", predictionCol="cluster")
result = kmeans.fit(df_scaled).transform(df_scaled)

# Export
result.select(
    "Origin", "Dest", "Reporting_Airline", "Season", "DayOfWeek",
    "AvgDepDelay", "AvgArrDelay", "AvgWeatherDelay",
    "AvgCarrierDelay", "AvgNASDelay", "AvgLateAircraftDelay", "cluster"
).write.csv("hdfs://localhost:9000/user/aishanee/flight_clusters_all_delays", header=True, mode="overwrite")


In [6]:
spark.stop()