# CS328 Assignment 2

* Main task: Spark Data Processing
    
* Student Name: 余坤屹 Yu Kunyi

* Student ID: 12013027


In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, desc, round, explode, expr
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
from datetime import datetime
import time


In [2]:
file_path = "./parking_data_sz.csv"
output_path = "./out/"
result_path = "./result/"

spark = SparkSession.builder.appName("MySpark").getOrCreate()

df = spark.read.csv(file_path, header=True)
df.show(5)
df.summary().show()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/05 15:46:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+-------------------+------------+-------------------+--------+----------------+
|           out_time|admin_region|            in_time|berthage|         section|
+-------------------+------------+-------------------+--------+----------------+
|2018-09-01 12:00:00|      南山区|2018-09-01 10:10:00|  201091|荔园路(蛇口西段)|
|2018-09-01 14:29:35|      南山区|2018-09-01 13:43:35|  201091|荔园路(蛇口西段)|
|2018-09-01 16:08:54|      南山区|2018-09-01 15:10:54|  201091|荔园路(蛇口西段)|
|2018-09-01 17:56:03|      南山区|2018-09-01 16:34:03|  201091|荔园路(蛇口西段)|
|2018-09-01 20:00:20|      南山区|2018-09-01 18:40:20|  201091|荔园路(蛇口西段)|
+-------------------+------------+-------------------+--------+----------------+


23/12/05 15:46:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-------------------+------------+-------------------+------------------+----------------+
|summary|           out_time|admin_region|            in_time|          berthage|         section|
+-------+-------------------+------------+-------------------+------------------+----------------+
|  count|            1048001|     1048001|            1048001|           1048001|         1048001|
|   mean|               NULL|        NULL|               NULL|207145.52988212797|            NULL|
| stddev|               NULL|        NULL|               NULL|2891.4981480978363|            NULL|
|    min|2018-09-01 10:10:00|      南山区|2018-09-01 10:10:00|            201091|中心路(后海湾段)|
|    25%|               NULL|        NULL|               NULL|          205127.0|            NULL|
|    50%|               NULL|        NULL|               NULL|          208244.0|            NULL|
|    75%|               NULL|        NULL|               NULL|          208676.0|            NULL|
|    max|2019-03-01 

                                                                                

## Task 0: Preprocessing

Filter out invalid data (time error, duplicates, N/A, non-integer berthage)
add column (parking_time)

In [3]:
df = df \
    .filter(col("in_time") < col("out_time")) \
    .dropDuplicates(["out_time", "in_time", "berthage"]) \
    .na.drop()

df = df \
    .withColumn("berthage", col("berthage").cast("int")) \
    .withColumn("in_time_unix", unix_timestamp("in_time", "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("out_time_unix", unix_timestamp("out_time", "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("parking_time", col("out_time_unix") - col("in_time_unix"))
df = df.drop("out_time_unix").drop("in_time_unix")

df.summary().show()




+-------+-------------------+------------+-------------------+-----------------+----------------+------------------+
|summary|           out_time|admin_region|            in_time|         berthage|         section|      parking_time|
+-------+-------------------+------------+-------------------+-----------------+----------------+------------------+
|  count|             970925|      970925|             970925|           970925|          970925|            970925|
|   mean|               NULL|        NULL|               NULL|207133.8137744934|            NULL|3464.0354919278006|
| stddev|               NULL|        NULL|               NULL|2880.969804832561|            NULL|  3525.50275486409|
|    min|2018-09-01 10:11:00|      南山区|2018-09-01 10:10:00|           201091|中心路(后海湾段)|                60|
|    25%|               NULL|        NULL|               NULL|           205127|            NULL|              1140|
|    50%|               NULL|        NULL|               NULL|           2

                                                                                

In [4]:
# util
def storeCSV(new_name, path=output_path, result=result_path):
    f = [f for f in os.listdir(path) if f.startswith("part")][0]
    old_path = os.path.join(path, f)
    new_path = os.path.join(result, new_name)
    os.rename(old_path, new_path)


## Task 1: total number of berthages in each section

Output the total number of berthages in each section. The output file should have two columns, with the headers being section and count.

In [5]:
result_df = df \
    .groupBy("section") \
    .agg({"berthage": "count"}) \
    .withColumnRenamed("count(berthage)", "count")

result_df.show(5)
result_df.write.csv(output_path, header=True, mode="overwrite")
storeCSV(new_name="r1.csv")
# time.sleep(60)


+--------------+-----+
|       section|count|
+--------------+-----+
|    科技南一路|11480|
|    高新南九道| 9176|
|创业路(南油段)|22573|
|      文心四路|30072|
|    高新南七道|13613|
+--------------+-----+


## Task 2: all unique ids (berthages) with their sections

Output all unique ids (berthages), associated with their sections. The output file should have two columns, with the headers being berthage and section.

In [6]:
result_df = df \
    .select("berthage", "section") \
    .distinct()

result_df.show(5)
result_df.write.csv(output_path, header=True, mode="overwrite")
storeCSV(new_name="r2.csv")


+--------+----------+
|berthage|   section|
+--------+----------+
|  211147|科技南十路|
|  208475|  海德一道|
|  203372|  工业九路|
|  203023|    四海路|
|  206058|    常兴路|
+--------+----------+


## Task 3: Average parking time per section

Output for each section:the average parking time of a car in that section.The output file should have two columns, with the headers being section and avg_parking_time. The average parking time should be counted in seconds as an integer.

In [7]:
result_df = df \
    .groupBy("section") \
    .agg(F.avg("parking_time").cast("int").alias("avg_parking_time"))

result_df.show(5)
result_df.write.csv(output_path, header=True, mode="overwrite")
storeCSV(new_name="r3.csv")


                                                                                

+--------------+----------------+
|       section|avg_parking_time|
+--------------+----------------+
|    科技南一路|            3727|
|    高新南九道|            3843|
|创业路(南油段)|            4091|
|      文心四路|            2895|
|    高新南七道|            4256|
+--------------+----------------+


## Task 4: Average parking time per berthage, in descending order

Output the average parking time for each berthage, sorted in descending order. The output file should have two columns, with the headers being berthage and avg_parking_time. The average parking time should be counted in seconds as an integer.

In [8]:
result_df = df \
    .groupby("berthage") \
    .agg(F.avg("parking_time").cast("int").alias("avg_parking_time"))
result_df = result_df.orderBy(desc(result_df.avg_parking_time))

result_df.show(5)
result_df.write.csv(output_path, header=True, mode="overwrite")
storeCSV(new_name="r4.csv")

+--------+----------------+
|berthage|avg_parking_time|
+--------+----------------+
|  207171|           13020|
|  210459|           11220|
|  211087|           10230|
|  210034|            9780|
|  202232|            7440|
+--------+----------------+


## Task 5: Berthage usage per section, hour

Output for each section: the total number of berthages in use (“in use” means there is at least one car in that berthage) and the percentage out of the total number of berthages in that section, in a one-hour interval (e.g. during 09:00:00-10:00:00). The output file should have five columns, with the headers being start_time, end_time, section, count and percentage. The percentage value should be rounded to one decimal place (e.g. 67.8%). The data format of start_time and end_time should be “YYYY-MM-DD HH:MM:SS”, e.g. 2018-09-01 12:00:00

In [9]:
def _format_time_range(a):
    b = a + 1
    start_time = datetime.strptime(f"{a}:00:00", "%H:%M:%S")
    end_time = datetime.strptime(f"{b}:00:00", "%H:%M:%S")
    formatted_range = f"{start_time.strftime('%H:%M:%S')}-{end_time.strftime('%H:%M:%S')}"
    return formatted_range


def _map_time_scope(start_time, end_time) -> list:
    start_hour = int(start_time.split(" ")[1].split(":")[0])
    end_hour = int(end_time.split(" ")[1].split(":")[0])
    end_hour += 1 if not end_time.endswith("00:00") else 0
    return [_format_time_range(i) for i in range(start_hour, end_hour)]


spark.udf.register("map_time_scope_udf", _map_time_scope, ArrayType(StringType()))


<function __main__._map_time_scope(start_time, end_time) -> list>

In [10]:
count_df = df.groupBy("section").agg(F.countDistinct("berthage").alias("berthage_count"))

tmp = df.select("section",
                explode(expr("map_time_scope_udf(in_time, out_time)")).alias("time_scope"),
                "berthage")
result_df = tmp \
    .groupby("section", "time_scope") \
    .agg(F.countDistinct("berthage").alias("count")) \
    .join(count_df, "section")

result_df = result_df \
    .withColumn("percentage", round((col("count") / col("berthage_count")) * 100, 1)) \
    .drop("berthage_count") \
    .withColumn("start_time", col("time_scope").substr(0, 8)) \
    .withColumn("end_time", col("time_scope").substr(10, 8)) \
    .drop("time_scope")

result_df.show(5)
result_df.write.csv(output_path, header=True, mode="overwrite")
storeCSV(new_name="r5.csv")

                                                                                

+--------------------+-----+----------+----------+--------+
|             section|count|percentage|start_time|end_time|
+--------------------+-----+----------+----------+--------+
|    中心路(后海湾段)|   63|     100.0|  19:00:00|20:00:00|
|后海大道辅道(后海段)|   80|     100.0|  20:00:00|21:00:00|
|      创业路(南油段)|   71|      98.6|  17:00:00|18:00:00|
|        科技南十二路|   10|     100.0|  17:00:00|18:00:00|
|          高新南十道|   69|      94.5|  16:00:00|17:00:00|
+--------------------+-----+----------+----------+--------+


                                                                                

-----done-----