# Parking Analysis

## Initialize

### Import Packages

In [23]:
import os
import csv
import time

from pyspark import SparkContext
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, TimestampType, StringType

### Global Variable

In [24]:
NEED_SAVE: bool = True
NEED_HOLD: bool = False

### Predefine work class

In [25]:
class ParkingDb:
    __parent_path = os.path.dirname(os.getcwd())

    # schema definition
    schema = StructType([
        StructField(name="out_time", dataType=TimestampType()),
        StructField(name="admin_region", dataType=StringType()),
        StructField(name="in_time", dataType=TimestampType()),
        StructField(name="berthage", dataType=StringType()),
        StructField(name="section", dataType=StringType()),

    ])

    @classmethod
    def __get_data_file_path(cls, filename: str) -> str:
        file_path = os.path.join(cls.__parent_path, "data", filename)
        return file_path

    @classmethod
    def __dataframe_to_csv(cls, df: DataFrame, output_name: str) -> None:
        headers = df.columns
        rows = df.collect()
        output_file_path = cls.__get_data_file_path(output_name)

        with open(output_file_path, 'w', newline='', encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerow(headers)
            for row in rows:
                writer.writerow(row)

    def __init__(self):
        self.sc: SparkContext
        self.spark: SparkSession
        self.df: DataFrame
        self.__tasks: tuple[DataFrame, DataFrame,
        DataFrame, DataFrame, DataFrame]
        self.__init_data()

    def __init_data(self):
        """Initialize the data and create the dataframe"""

        # Create a Spark context
        sc: SparkContext = SparkContext.getOrCreate()

        # Create a Spark session
        spark: SparkSession = SparkSession \
            .builder \
            .appName("Parking Data Analysis") \
            .getOrCreate()

        # Load data from csv file
        df: DataFrame = spark.read.csv(
            path=ParkingDb.__get_data_file_path("parking_data_sz.csv"),
            header=True,
            schema=ParkingDb.schema
        )

        # filter out invalid data by checking out_time > in_time
        cleaned_data: DataFrame = df.filter(
            (f.col("out_time").isNotNull()) &
            (f.col("in_time").isNotNull()) &
            (f.col("out_time") > f.col("in_time"))
        )

        # calculate parking time length and drop out_time column
        df_convert_out: DataFrame = cleaned_data.withColumn(
            colName="parking_time_length",
            col=f.unix_timestamp("out_time") - f.unix_timestamp("in_time")
        ).drop("out_time")

        # cache the final dataframe
        # df_convert_out = df_convert_out.cache()
        df_final = df_convert_out

        # assign to instance variables
        self.sc = sc
        self.spark = spark
        self.df = df_final
        self.__tasks = self.__execute()

    def __task1(self) -> DataFrame:
        return self.df.groupBy("section") \
            .agg(f.countDistinct("berthage").alias("count")) \
            # .orderBy("section")

    def __task2(self) -> DataFrame:
        return self.df.select("berthage", "section").distinct() \
            # .orderBy("section", "berthage")

    def __task3(self) -> DataFrame:
        return self.df.groupBy("section") \
            .agg(f.avg("parking_time_length").cast("integer").alias("avg_parking_time")) \
            # .orderBy("section")

    def __task4(self) -> DataFrame:
        return self.df.groupBy("berthage") \
            .agg(f.avg("parking_time_length").cast("integer").alias("avg_parking_time")) \
            .orderBy(f.col("avg_parking_time").desc())

    def __task5(self) -> DataFrame:
        # create a new dataframe with hourly time range
        df_hours: DataFrame = self.df.withColumn(
            colName="start_time",
            col=f.date_trunc("hour", f.col("in_time"))
        ).withColumn(
            colName="end_time",
            col=f.date_trunc("hour", f.col("in_time")) + f.expr("INTERVAL 1 HOUR")
        )

        # calculate total berthages for each section
        total_berthages: DataFrame = self.df.select("section", "berthage").distinct() \
            .groupBy("section").count().withColumnRenamed(
            existing="count",
            new="total_berthages"
        )

        # calculate hourly usage for each section
        hourly_usage: DataFrame = df_hours.groupBy("start_time", "end_time", "section") \
            .agg(f.countDistinct("berthage").alias("count"))

        # join the two dataframes and calculate the percentage
        result: DataFrame = hourly_usage.join(total_berthages, "section") \
            .withColumn(
            colName="percentage",
            col=f.format_number(f.col("count") / f.col("total_berthages") * 100, 1)
        ).select("start_time", "end_time", "section", "count", "percentage") \
            .orderBy("section", "start_time")

        return result

    def __execute(self) -> tuple[DataFrame, DataFrame, DataFrame, DataFrame, DataFrame]:
        return self.__task1(), self.__task2(), self.__task3(), self.__task4(), self.__task5()

    def __get_task_frame(self, task_id: int) -> DataFrame:
        return self.__tasks[task_id - 1]

    def show_task_frame(self, task_id: int):
        self.__get_task_frame(task_id).show(20, 20)

    def export_all_frame_to_csv(self):
        for (i, frame) in enumerate(self.__tasks):
            ParkingDb.__dataframe_to_csv(frame, f"r{i + 1}.csv")

## Instance

### Create an instance

In [26]:
db = ParkingDb()

### Export Csv files

In [27]:
if NEED_SAVE:
    db.export_all_frame_to_csv()

                                                                                

## Show task results

### Task 1

In [28]:
db.show_task_frame(1)

[Stage 39:>                                                         (0 + 4) / 4]

+----------------------+-----+
|               section|count|
+----------------------+-----+
|            科技南一路|   92|
|            高新南九道|   26|
|        创业路(南油段)|   72|
|              文心四路|   24|
|            高新南七道|   41|
|      招商路(蛇口西段)|   61|
|            科技南十路|   29|
|后海大道辅道(蛇口北段)|   58|
|                登良路|   29|
|        学府路(前海段)|   11|
|              工业九路|   50|
|              文心三路|   12|
|              文心一路|   13|
|      荔园路(蛇口西段)|   70|
|      荔园路(蛇口东段)|   54|
|            高新南环路|  240|
|                四海路|   55|
|              科技南路|   87|
|              文心五路|   14|
|              海德二道|   42|
+----------------------+-----+
only showing top 20 rows



                                                                                

### Task 2

In [29]:
db.show_task_frame(2)

+--------+--------------------+
|berthage|             section|
+--------+--------------------+
|  201124|    荔园路(蛇口西段)|
|  204078|      创业路(南油段)|
|  204129|      创业路(南油段)|
|  204183|  海德二道（南油段）|
|  205098|后海大道辅道(后海段)|
|  201198|    招商路(蛇口西段)|
|  202304|        海月路(东段)|
|  203291|    荔园路(蛇口东段)|
|  201159|    荔园路(蛇口西段)|
|  201168|    荔园路(蛇口西段)|
|  203290|    荔园路(蛇口东段)|
|  203296|    荔园路(蛇口东段)|
|  201183|    招商路(蛇口西段)|
|  201246|    招商路(蛇口西段)|
|  202207|            金世纪路|
|  203258|    荔园路(蛇口东段)|
|  205032|后海大道辅道(后海段)|
|  205044|后海大道辅道(后海段)|
|  201201|    招商路(蛇口西段)|
|  203350|            工业九路|
+--------+--------------------+
only showing top 20 rows



                                                                                

### Task 3

In [30]:
db.show_task_frame(3)



+----------------------+----------------+
|               section|avg_parking_time|
+----------------------+----------------+
|        创业路(南油段)|            4091|
|      招商路(蛇口西段)|            2956|
|后海大道辅道(蛇口北段)|            4003|
|              工业九路|            3014|
|      荔园路(蛇口西段)|            3300|
|      荔园路(蛇口东段)|            3442|
|                四海路|            3562|
|              金世纪路|            3613|
|          海月路(东段)|            3370|
|                南新路|            3361|
|    海德二道（南油段）|            3123|
|  后海大道辅道(后海段)|            3731|
|              文心四路|            2895|
|                登良路|            3171|
|        学府路(前海段)|            2957|
|              文心三路|            3551|
|              文心一路|            4064|
|      中心路(后海湾段)|            4472|
|                龙城路|            3814|
|                南头街|            3638|
+----------------------+----------------+
only showing top 20 rows



                                                                                

### Task 4

In [31]:
db.show_task_frame(4)

[Stage 51:>                                                         (0 + 4) / 4]

+--------+----------------+
|berthage|avg_parking_time|
+--------+----------------+
|  207171|           13020|
|  210459|           11220|
|  211087|           10230|
|  210034|            9780|
|211271.0|            7740|
|  202232|            7440|
|  210461|            7160|
|  211085|            6924|
|  207181|            6846|
|  211022|            6685|
|  211109|            6517|
|  210035|            6432|
|  207179|            6394|
|  203118|            6368|
|  207176|            6360|
|  207182|            6187|
|  207152|            6185|
|  207143|            6120|
|  210457|            6120|
|  207153|            5970|
+--------+----------------+
only showing top 20 rows



                                                                                

### Task 5

In [32]:
db.show_task_frame(5)

                                                                                

+-------------------+-------------------+----------------+-----+----------+
|         start_time|           end_time|         section|count|percentage|
+-------------------+-------------------+----------------+-----+----------+
|2018-09-01 10:00:00|2018-09-01 11:00:00|中心路(后海湾段)|   22|      34.9|
|2018-09-01 11:00:00|2018-09-01 12:00:00|中心路(后海湾段)|   15|      23.8|
|2018-09-01 12:00:00|2018-09-01 13:00:00|中心路(后海湾段)|   13|      20.6|
|2018-09-01 13:00:00|2018-09-01 14:00:00|中心路(后海湾段)|   11|      17.5|
|2018-09-01 14:00:00|2018-09-01 15:00:00|中心路(后海湾段)|   18|      28.6|
|2018-09-01 15:00:00|2018-09-01 16:00:00|中心路(后海湾段)|   18|      28.6|
|2018-09-01 16:00:00|2018-09-01 17:00:00|中心路(后海湾段)|   18|      28.6|
|2018-09-01 17:00:00|2018-09-01 18:00:00|中心路(后海湾段)|   20|      31.7|
|2018-09-01 18:00:00|2018-09-01 19:00:00|中心路(后海湾段)|   24|      38.1|
|2018-09-01 19:00:00|2018-09-01 20:00:00|中心路(后海湾段)|   17|      27.0|
|2018-09-02 10:00:00|2018-09-02 11:00:00|中心路(后海湾段)|   12|      19.0|
|2018-09-02 1

### Hold and Stop Spark Context

In [33]:
if NEED_HOLD:
    time.sleep(1_000)

db.sc.stop()