In [1]:
import sys
from pathlib import Path
from pyspark.sql import functions as F

project_root = Path().resolve().parents[1]
sys.path.append(str(project_root))

from session_start import get_spark

spark = get_spark("us-accidents-eda")

df = (
    spark.read.csv(
    "s3a://us-accidents-dashboard-1445/US_Accidents_March23_sampled_500k.csv",
    header=True,
    inferSchema=True
    )
    .withColumn(
    "Start_Time_ts",
    F.to_timestamp("Start_Time")
    )
)

def write_parquet(df_out, table_name: str, coalesce_one: bool = True):
    out_path = f"s3a://us-accidents-dashboard-1445/processed/{table_name}"
    writer = df_out.coalesce(1) if coalesce_one else df_out
    writer.write.mode("overwrite").parquet(out_path)
    return out_path


def validate_parquet(path: str, n: int = 20):
    chk = spark.read.parquet(path)
    chk.show(n, truncate=False)
    chk.printSchema()
    print("rows:", chk.count())
    return chk

df.printSchema()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/13 12:31:30 WARN Utils: Your hostname, shousenjotekimac-pro.local, resolves to a loopback address: 127.0.0.1; using 10.16.87.40 instead (on interface en0)
26/01/13 12:31:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/josiejiang/anaconda3/envs/us_accidents_dashboard/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/josiejiang/.ivy2_spark/cache
The jars for the packages stored in: /Users/josiejiang/.ivy2_spark/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-af9015d4-2c67-4dbb-afcd-daea544f10d6;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.4.2 in central
	found software.amazon.awssdk#bundle;2.29.52 in central

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- V

                                                                                

In [None]:
write_parquet

In [2]:
state_quarter_counts = (
    df
    .filter(F.col("State").isNotNull())
    .withColumn("Year", F.year("Start_Time_ts"))
    .withColumn("Quarter", F.quarter("Start_Time_ts"))
    .groupBy("State", "Year", "Quarter")
    .agg(F.count("*").alias("accident_count"))
    .orderBy("State", "Year", "Quarter")
)

state_quarter_counts.show(20, truncate=False)
state_quarter_counts.printSchema()

write_parquet(state_quarter_counts, "state_quarter_counts")


                                                                                

+-----+----+-------+--------------+
|State|Year|Quarter|accident_count|
+-----+----+-------+--------------+
|AL   |2016|2      |1             |
|AL   |2016|3      |6             |
|AL   |2016|4      |4             |
|AL   |2017|1      |3             |
|AL   |2017|2      |13            |
|AL   |2017|3      |76            |
|AL   |2017|4      |92            |
|AL   |2018|1      |190           |
|AL   |2018|2      |187           |
|AL   |2018|3      |217           |
|AL   |2018|4      |332           |
|AL   |2019|1      |306           |
|AL   |2019|2      |321           |
|AL   |2019|3      |270           |
|AL   |2019|4      |344           |
|AL   |2020|1      |276           |
|AL   |2020|2      |246           |
|AL   |2020|3      |223           |
|AL   |2020|4      |553           |
|AL   |2021|1      |414           |
+-----+----+-------+--------------+
only showing top 20 rows
root
 |-- State: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable 

                                                                                

's3a://us-accidents-dashboard-1445/processed/state_quarter_counts'

In [3]:

validate_parquet('s3a://us-accidents-dashboard-1445/processed/state_quarter_counts')


+-----+----+-------+--------------+
|State|Year|Quarter|accident_count|
+-----+----+-------+--------------+
|AL   |2016|2      |1             |
|AL   |2016|3      |6             |
|AL   |2016|4      |4             |
|AL   |2017|1      |3             |
|AL   |2017|2      |13            |
|AL   |2017|3      |76            |
|AL   |2017|4      |92            |
|AL   |2018|1      |190           |
|AL   |2018|2      |187           |
|AL   |2018|3      |217           |
|AL   |2018|4      |332           |
|AL   |2019|1      |306           |
|AL   |2019|2      |321           |
|AL   |2019|3      |270           |
|AL   |2019|4      |344           |
|AL   |2020|1      |276           |
|AL   |2020|2      |246           |
|AL   |2020|3      |223           |
|AL   |2020|4      |553           |
|AL   |2021|1      |414           |
+-----+----+-------+--------------+
only showing top 20 rows
root
 |-- State: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable 

DataFrame[State: string, Year: int, Quarter: int, accident_count: bigint]