In [2]:
from pyspark.sql import SparkSession
import zipfile
import glob
import os


def load_data():
    """
    Loads all CSV files from ZIP archives in the data folder into a list of Spark DataFrames.
    """
    spark = SparkSession.builder.appName("Exercise6").enableHiveSupport().getOrCreate()

    base_dir = os.getcwd()
    zip_folder = os.path.join(base_dir, "data", "*.zip")
    report_dir = os.path.join(base_dir, "reports")

    os.makedirs(report_dir, exist_ok=True)
    zip_files = glob.glob(zip_folder)

    all_dfs = []

    for zip_path in zip_files:
        with zipfile.ZipFile(zip_path, 'r') as z:
            for file_name in z.namelist():

                # skip Mac system files
                if file_name.startswith("__MACOSX") or file_name.startswith("._"):
                    continue

                if file_name.endswith(".csv"):
                    print(f"Reading {file_name} from {zip_path}")
                    with z.open(file_name) as f:
                        data = f.read().decode("utf-8")
                        rdd = spark.sparkContext.parallelize(data.splitlines())
                        df = spark.read.csv(rdd, header=True, inferSchema=True)
                        all_dfs.append(df)

    print(f"\nLoaded {len(all_dfs)} CSV files from ZIPs.")
    return all_dfs


# def main():
all_dfs = load_data()
df_2019_Q4 = all_dfs[0]
df_2020_Q1 = all_dfs[1]

# print("First ZIP DataFrame:")
# df_2019_Q4.show(5, truncate=False)

# print("Second ZIP DataFrame:")
# df_2020_Q1.show(5, truncate=False)
# return all_dfs


# if __name__ == "__main__":
#     main()

25/11/11 12:09:57 WARN Utils: Your hostname, developer resolves to a loopback address: 127.0.1.1; using 192.168.29.61 instead (on interface enp3s0)
25/11/11 12:09:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/11 12:09:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Reading Divvy_Trips_2020_Q1.csv from /home/developer/Downloads/data_engineering_practices_1/Exercises/Exercise-6/data/Divvy_Trips_2020_Q1.zip


25/11/11 12:10:07 WARN TaskSetManager: Stage 0 contains a task of very large size (17517 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:10:13 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
25/11/11 12:10:14 WARN TaskSetManager: Stage 1 contains a task of very large size (17517 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Reading Divvy_Trips_2019_Q4.csv from /home/developer/Downloads/data_engineering_practices_1/Exercises/Exercise-6/data/Divvy_Trips_2019_Q4.zip


25/11/11 12:10:20 WARN TaskSetManager: Stage 2 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:10:24 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 2 (TID 5): Attempting to kill Python Worker
25/11/11 12:10:24 WARN TaskSetManager: Stage 3 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
[Stage 3:>                                                          (0 + 4) / 4]


Loaded 2 CSV files from ZIPs.


                                                                                

## Question 1

In [3]:
from pyspark.sql import functions as F
import pandas as pd

df = df_2020_Q1.withColumn("date", F.to_date(F.col("start_time")))

avg_trip_duration_per_day = (
    df.groupBy("date")
      .agg(F.avg("tripduration").alias("avg_trip_duration"))
      .orderBy("date")
)
avg_trip_duration_per_day.toPandas().to_csv("reports/avg_trip_per_day.csv", index=False)
avg_trip_duration_per_day.show(10, truncate=False)


25/11/11 12:10:36 WARN TaskSetManager: Stage 4 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:10:43 WARN TaskSetManager: Stage 12 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.

+----------+------------------+
|date      |avg_trip_duration |
+----------+------------------+
|2019-10-01|519.1034563470391 |
|2019-10-02|489.1325637447672 |
|2019-10-03|507.1099269445638 |
|2019-10-04|502.3697794462694 |
|2019-10-05|529.6975832789027 |
|2019-10-06|540.2795057520239 |
|2019-10-07|515.5357023690357 |
|2019-10-08|515.4543694020819 |
|2019-10-09|512.1497682738434 |
|2019-10-10|505.07017693819984|
+----------+------------------+
only showing top 10 rows



                                                                                

## Question 2

In [4]:

df_with_date = df_2020_Q1.withColumn("date", F.to_date(F.col("start_time")))

trips_per_day = (
    df_with_date.groupBy("date")
    .agg(F.count("*").alias("total_trips"))
    .orderBy("date")
)
trips_per_day.toPandas().to_csv("reports/trips_per_day.csv", index=False)
trips_per_day.show(10, truncate=False)

25/11/11 12:11:01 WARN TaskSetManager: Stage 15 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:11:05 WARN TaskSetManager: Stage 23 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.

+----------+-----------+
|date      |total_trips|
+----------+-----------+
|2019-10-01|18425      |
|2019-10-02|9882       |
|2019-10-03|15647      |
|2019-10-04|14570      |
|2019-10-05|10452      |
|2019-10-06|13396      |
|2019-10-07|17256      |
|2019-10-08|17537      |
|2019-10-09|17226      |
|2019-10-10|15795      |
+----------+-----------+
only showing top 10 rows



                                                                                

## Question 3

In [5]:
from pyspark.sql.window import Window

df_with_month = df_2020_Q1.withColumn("month", F.month(F.col("start_time")))

station_count = df_with_month.groupby("month", "from_station_name").agg(F.count("*").alias("trip_count"))

window_spec = Window.partitionBy("month").orderBy(F.col("trip_count").desc())
ranked_stations = (
    station_count
    .withColumn("rank", F.row_number().over(window_spec))
    .filter(F.col("rank") == 1)
    .orderBy("month")
)
ranked_stations.toPandas().to_csv("reports/most_popular_start_station.csv", index=False)
ranked_stations.show(10, truncate=False)



25/11/11 12:11:50 WARN TaskSetManager: Stage 26 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:11:54 WARN TaskSetManager: Stage 39 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.

+-----+-------------------+----------+----+
|month|from_station_name  |trip_count|rank|
+-----+-------------------+----------+----+
|10   |Canal St & Adams St|6564      |1   |
|11   |Canal St & Adams St|3445      |1   |
|12   |Canal St & Adams St|2928      |1   |
+-----+-------------------+----------+----+



                                                                                

## Question 4

In [6]:
df = df_2020_Q1.withColumn("start_date", F.to_date(F.col("start_time")))
max_date = df.select(F.max("start_date")).collect()[0][0]

two_weeks_ago = F.date_sub(F.lit(max_date),14)

df_last_two_weeks = df.filter(F.col("start_date") >= two_weeks_ago).alias("df_last_two_weeks")

station_counts = (
    df_last_two_weeks.groupBy("start_date", "from_station_name")
    .agg(F.count("*").alias("trip_count"))
)

window_spec = Window.partitionBy("start_date").orderBy(F.desc("trip_count"))

top_3_stations_per_day = (
    station_counts
    .withColumn("rank", F.row_number().over(window_spec))
    .filter(F.col("rank") <= 3)
    .orderBy("start_date", "rank")
)

top_3_stations_per_day.coalesce(1).write.mode("overwrite").option("header", True).csv("reports/top_3_trip_stations_last_two_weeks.csv")

top_3_stations_per_day.show(truncate=False)


25/11/11 12:12:10 WARN TaskSetManager: Stage 45 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:12:13 WARN TaskSetManager: Stage 48 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:12:17 WARN TaskSetManager: Stage 61 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.

+----------+----------------------------+----------+----+
|start_date|from_station_name           |trip_count|rank|
+----------+----------------------------+----------+----+
|2019-12-17|Canal St & Adams St         |153       |1   |
|2019-12-17|Clinton St & Madison St     |144       |2   |
|2019-12-17|Clinton St & Washington Blvd|124       |3   |
|2019-12-18|Canal St & Adams St         |123       |1   |
|2019-12-18|Clinton St & Madison St     |115       |2   |
|2019-12-18|Clinton St & Washington Blvd|94        |3   |
|2019-12-19|Canal St & Adams St         |133       |1   |
|2019-12-19|Clinton St & Madison St     |123       |2   |
|2019-12-19|Clinton St & Washington Blvd|95        |3   |
|2019-12-20|Canal St & Adams St         |131       |1   |
|2019-12-20|Clinton St & Washington Blvd|109       |2   |
|2019-12-20|Clinton St & Madison St     |94        |3   |
|2019-12-21|Streeter Dr & Grand Ave     |63        |1   |
|2019-12-21|Kingsbury St & Kinzie St    |47        |2   |
|2019-12-21|We

                                                                                

## Question 5

In [7]:
total_avg = df_2020_Q1.filter(F.isnotnull(F.col("gender"))).groupBy("gender").agg(F.avg("tripduration").alias("avg_trip_duration"))

total_avg.toPandas().to_csv("reports/male_or_female.csv", header=True, index=False)
total_avg.show()

25/11/11 12:12:31 WARN TaskSetManager: Stage 67 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:12:35 WARN TaskSetManager: Stage 70 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.

+------+-----------------+
|gender|avg_trip_duration|
+------+-----------------+
|Female|509.8082474784837|
|  Male|478.6205057415161|
+------+-----------------+



                                                                                

## Question 6

In [8]:
age = df_2020_Q1.withColumn("age", F.lit(2020) - F.col("birthyear"))
age = age.filter((F.col("age") > 0) & (F.col("age") < 100))

longest_trip = age.orderBy(F.desc("tripduration")).select("age", "tripduration").limit(10).withColumn("trip_type", F.lit("Longest"))

shortest_trip = age.orderBy(F.asc("tripduration")).select("age", "tripduration").limit(10).withColumn("trip_type", F.lit("Shortest"))

combined = longest_trip.union(shortest_trip)
combined.toPandas().to_csv("reports/top_10_ages.csv", index=False)

combined.show(truncate=False)

25/11/11 12:12:51 WARN TaskSetManager: Stage 73 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:12:53 WARN TaskSetManager: Stage 74 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:12:57 WARN TaskSetManager: Stage 76 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.
25/11/11 12:12:59 WARN TaskSetManager: Stage 77 contains a task of very large size (24035 KiB). The maximum recommended task size is 1000 KiB.

+---+------------+---------+
|age|tripduration|trip_type|
+---+------------+---------+
|35 |999.0       |Longest  |
|33 |999.0       |Longest  |
|58 |999.0       |Longest  |
|29 |999.0       |Longest  |
|28 |999.0       |Longest  |
|25 |999.0       |Longest  |
|30 |999.0       |Longest  |
|69 |999.0       |Longest  |
|31 |999.0       |Longest  |
|28 |999.0       |Longest  |
|58 |1,000.0     |Shortest |
|28 |1,000.0     |Shortest |
|44 |1,000.0     |Shortest |
|32 |1,000.0     |Shortest |
|63 |1,000.0     |Shortest |
|32 |1,000.0     |Shortest |
|23 |1,000.0     |Shortest |
|40 |1,000.0     |Shortest |
|37 |1,000.0     |Shortest |
|26 |1,000.0     |Shortest |
+---+------------+---------+



                                                                                