In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder \
    .appName("Vehicle_Crash_Case_Study") \
    .config("spark.debug.maxToStringFields","10000") \
    .getOrCreate()  

In [3]:
appId = spark.sparkContext.applicationId
appId

'local-1661942970737'

In [4]:
read_format = 'csv'
file_path = '../Data/'
file_name = 'Primary_Person_use.csv'

In [5]:
persons_df = spark.read.csv(f'{file_path}{file_name}', header=True)

In [6]:
#persons_df.show(truncate=False)
persons_df.createOrReplaceTempView("persons")

In [7]:
spark.sql('''select "Crashes [Male Killed]" as Description, \
    count(distinct p.crash_id) as Count \
    from persons p \
    where p.prsn_gndr_id in ("MALE") and p.DEATH_CNT == 1 and p.crash_id not in \
    (select distinct p.crash_id from persons p where p.prsn_gndr_id in ("FEMALE","UNKNOWN","NA"))''').show(truncate=False)

+---------------------+-----+
|Description          |Count|
+---------------------+-----+
|Crashes [Male Killed]|122  |
+---------------------+-----+



In [8]:
from itertools import groupby

non_male = persons_df.filter(persons_df.PRSN_GNDR_ID.isin(["FEMALE","UNKNOWN","NA"])) \
    .groupby("CRASH_ID").count() \
    .select("CRASH_ID")
#non_male.show()

In [15]:
male_killed = persons_df.filter((persons_df.PRSN_GNDR_ID == "MALE") & (persons_df.DEATH_CNT == 1)) \
    .groupBy("CRASH_ID").count() \
    .select("CRASH_ID").subtract(non_male) \
    .selectExpr("count(CRASH_ID) as crash_count") \
    .withColumn("description", F.lit("Crashes [male killed]")) \
    .select("description","crash_count")

In [17]:
non_male.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[CRASH_ID#17], functions=[])
   +- Exchange hashpartitioning(CRASH_ID#17, 200), ENSURE_REQUIREMENTS, [id=#1117]
      +- HashAggregate(keys=[CRASH_ID#17], functions=[])
         +- Project [CRASH_ID#17]
            +- Filter PRSN_GNDR_ID#25 IN (FEMALE,UNKNOWN,NA)
               +- FileScan csv [CRASH_ID#17,PRSN_GNDR_ID#25] Batched: false, DataFilters: [PRSN_GNDR_ID#25 IN (FEMALE,UNKNOWN,NA)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/e:/MyGitHubRepo/data-labs/data_engineering/Case_Study_Vehicle_Cr..., PartitionFilters: [], PushedFilters: [In(PRSN_GNDR_ID, [FEMALE,NA,UNKNOWN])], ReadSchema: struct<CRASH_ID:string,PRSN_GNDR_ID:string>


