In [1]:
!pip install pyspark



In [11]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, expr, year, month
import pandas as pd
import matplotlib.pyplot as plt

# ==================================================
# Assignment: Airline Customer Data Analysis with PySpark
# ==================================================

# ----------------------------------------
# 1. Extract Data to DataFrame
# ----------------------------------------
# 1.1 Setup SparkSession
spark = SparkSession.builder \
    .appName("AirlineCustomerAnalysis") \
    .master("local[*]") \
    .getOrCreate()

# 1.2 Create output directories
def ensure_dir(path):
    os.makedirs(path, exist_ok=True)
for d in ["output/results", "output/graphs"]:
    ensure_dir(d)

# 1.3 Download datasets
os.system(
    "wget -q https://raw.githubusercontent.com/RafliFa31/Assignment_21/main/calendar.csv -O calendar.csv"
)
os.system(
    "wget -q https://raw.githubusercontent.com/RafliFa31/Assignment_21/main/customer_flight_activity.csv -O customer_flight_activity.csv"
)
os.system(
    "wget -q https://raw.githubusercontent.com/RafliFa31/Assignment_21/main/customer_loyalty_history.csv -O customer_loyalty_history.csv"
)

# 1.4 Read CSV into DataFrames
df_calendar = spark.read.option("header", True).csv("calendar.csv", inferSchema=True)
df_flights  = spark.read.option("header", True).csv("customer_flight_activity.csv", inferSchema=True)
df_loyalty  = spark.read.option("header", True).csv("customer_loyalty_history.csv", inferSchema=True)

# 1.5 Display schema for verification
df_calendar.printSchema()
df_flights.printSchema()
df_loyalty.printSchema()

# ----------------------------------------
# 2. Data Cleaning
# ----------------------------------------
# 2.1 Remove duplicate records
df_calendar = df_calendar.dropDuplicates()
df_flights  = df_flights.dropDuplicates()
df_loyalty  = df_loyalty.dropDuplicates()

# 2.2 Handle missing values
# Drop flights rows missing key identifiers, drop loyalty rows missing LoyaltyNumber
df_flights = df_flights.dropna(subset=["loyalty_number", "year", "month"])
df_loyalty = df_loyalty.dropna(subset=["loyalty_number"])

# 2.3 Cast Year and Month to integer for consistency
df_flights  = df_flights.withColumn("year", col("year").cast("int")).withColumn("month", col("month").cast("int"))

# ----------------------------------------
# 3. Data Transformation
# ----------------------------------------
# 3.1 Join customer flight activity with loyalty history
joined = df_flights.join(df_loyalty, on="loyalty_number", how="inner")

# 3.2 Compute NetPoints = PointsAccumulated - PointsRedeemed
joined = joined.withColumn("NetPoints", col("points_accumulated") - col("points_redeemed"))

# 3.3 Enrich with calendar (add MonthName) - Removed as MonthName is not in calendar.csv
# joined = joined.join(
#     df_calendar.select("Year", "Month", "MonthName"),
#     on=["Year", "Month"], how="left"
# )

# 3.4 Create YearMonth column for time series analyses
joined = joined.withColumn(
    "YearMonth", concat_ws("-", col("year"), expr("lpad(month,2,'0')"))
)

# 3.5 Register DataFrame as SQL view
joined.createOrReplaceTempView("all_data")

# ----------------------------------------
# 4. Analysis with PySpark SQL
# ----------------------------------------
# 4.1 Average flights per customer per year
avg_flights = spark.sql(
    "SELECT year, ROUND(AVG(`total_flights`),2) AS AvgFlightsPerCust "
    "FROM all_data GROUP BY year ORDER BY year"
)

# 4.2 Distribution of net points by loyalty card status
points_by_card = spark.sql(
    "SELECT `loyalty_card` AS Card, ROUND(AVG(NetPoints),2) AS AvgNetPoints "
    "FROM all_data GROUP BY `loyalty_card`"
)

# 4.3 Relationship between education level and flights
flights_by_edu = spark.sql(
    "SELECT education, ROUND(AVG(`total_flights`),2) AS AvgFlights "
    "FROM all_data GROUP BY education"
)

# 4.4 Trend of total flights over time (per month)
trends = spark.sql(
    "SELECT YearMonth, SUM(`total_flights`) AS TotalFlights "
    "FROM all_data GROUP BY YearMonth ORDER BY YearMonth"
)

# 4.5 Export SQL results to CSV
for df, fname in [
    (avg_flights, "avg_flights_per_year.csv"),
    (points_by_card, "avg_points_by_card.csv"),
    (flights_by_edu, "avg_flights_by_education.csv"),
    (trends, "trend_flights_per_month.csv")
]:
    df.coalesce(1).write.csv(f"output/results/{fname}", header=True, mode="overwrite")

# ----------------------------------------
# 5. Visualization
# ----------------------------------------
# Convert Spark DataFrames to Pandas for plotting
df_list = {
    'avg_flights': avg_flights.toPandas(),
    'points_by_card': points_by_card.toPandas(),
    'flights_by_edu': flights_by_edu.toPandas(),
    'trends': trends.toPandas()
}

# 5.1 Trend Total Flights per Month
plt.figure(figsize=(10,6))
plt.plot(df_list['trends']['YearMonth'], df_list['trends']['TotalFlights'], marker='o', label='Total Flights')
plt.title('Trend Total Flights per Month')
plt.xlabel('Year-Month')
plt.ylabel('Total Flights')
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.savefig('output/graphs/trend_flights_per_month.png')
plt.close()

# 5.2 Average Flights per Customer per Year
plt.figure(figsize=(8,5))
plt.bar(df_list['avg_flights']['year'].astype(str), df_list['avg_flights']['AvgFlightsPerCust'], label='Avg Flights')
plt.title('Average Flights per Customer per Year')
plt.xlabel('Year')
plt.ylabel('Avg Flights per Customer')
plt.legend()
plt.tight_layout()
plt.savefig('output/graphs/avg_flights_per_year.png')
plt.close()

# 5.3 Average Net Points by Loyalty Card Status
plt.figure(figsize=(8,5))
plt.bar(df_list['points_by_card']['Card'], df_list['points_by_card']['AvgNetPoints'], label='Avg Net Points')
plt.title('Average Net Points by Loyalty Card Status')
plt.xlabel('Loyalty Card')
plt.ylabel('Avg Net Points')
plt.legend()
plt.tight_layout()
plt.savefig('output/graphs/avg_points_by_card.png')
plt.close()

# 5.4 Average Flights by Education Level (extra)
plt.figure(figsize=(10,6))
plt.bar(df_list['flights_by_edu']['education'], df_list['flights_by_edu']['AvgFlights'], label='Avg Flights')
plt.title('Average Flights by Education Level')
plt.xlabel('Education')
plt.ylabel('Avg Flights')
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.savefig('output/graphs/avg_flights_by_education.png')
plt.close()

# ----------------------------------------
# 6. Save Full Dataset Outputs
# ----------------------------------------
# Save enriched full dataset for reference
joined = joined.drop("_c0") # Drop the extra column before saving
joined.write.parquet("output/results/full_customer_data.parquet", mode="overwrite")
joined.write.json("output/results/full_customer_data.json", mode="overwrite")

# ----------------------------------------
# 7. Documentation Placeholder
# ----------------------------------------
# Dokumentasi langkah ETL, query SQL, dan insight dapat ditulis di README.md atau report.md
# Contoh:
# - 1. Extract Data: calendar.csv, ...
# - 2. Clean: dropDuplicates, dropna
# - 3. Transform: join, NetPoints, YearMonth
# - 4. SQL Analysis: avg_flights, ...
# - 5. Visual: trend_flights_per_month.png, ...
# - 6. Save: CSV, Parquet, JSON

print("ETL, SQL analysis, dan visualisasi selesai. Output di folder output/")

root
 |-- _c0: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- start_of_the_year: date (nullable = true)
 |-- start_of_the_quarter: date (nullable = true)
 |-- start_of_the_month: date (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- loyalty_number: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- total_flights: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- points_accumulated: double (nullable = true)
 |-- points_redeemed: integer (nullable = true)
 |-- dollar_cost_points_redeemed: integer (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- loyalty_number: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- education: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- ma