In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import types
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [2]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/usr/local/Cellar/apache-spark/3.2.1/libexec/jars/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/Users/amey/google/credentials/google_credentials.json")

In [3]:
sc = SparkContext(conf=conf)

sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.json.keyfile", "/Users/amey/google/credentials/google_credentials.json")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")

In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [5]:
#Bringing in only necessary fields for Crashes dataset
crashes_columns = ['CRASH_RECORD_ID','RD_NO','CRASH_DATE','POSTED_SPEED_LIMIT','WEATHER_CONDITION','LIGHTING_CONDITION','FIRST_CRASH_TYPE','STREET_NO','STREET_DIRECTION','STREET_NAME','INJURIES_TOTAL','INJURIES_FATAL']
df_crashes = spark.read.parquet('gs://dtc_capstone_data_quiet-rigging-347402/raw/crash/*.parquet').select(crashes_columns)

Row(CRASH_RECORD_ID='000cd9ff5c7aa5448f9ed6058c7b3f6b8504d234fa5cb7c93e0da7d408536077785beffa17785adc270a477de2f8bad33427895b27e7e22f3c537f290c9195eb', RD_NO='JF153597', CRASH_DATE='02/23/2022 12:15:00 PM', POSTED_SPEED_LIMIT=30, WEATHER_CONDITION='CLEAR', LIGHTING_CONDITION='DAYLIGHT', FIRST_CRASH_TYPE='TURNING', INJURIES_TOTAL=0, INJURIES_FATAL=0, LATITUDE=41.876038171, LONGITUDE=-87.701103023, LOCATION='POINT (-87.701103023191 41.876038171488)')

In [42]:
#df_people = spark.read.parquet('gs://dtc_capstone_data_quiet-rigging-347402/raw/people/*.parquet') \

vehicles_columns = ['CRASH_UNIT_ID','CRASH_RECORD_ID','RD_NO','CRASH_DATE','UNIT_NO','UNIT_TYPE','NUM_PASSENGERS','VEHICLE_ID','CMRC_VEH_I','MAKE','MODEL']
df_vehicles_parquet = spark.read.parquet('gs://dtc_capstone_data_quiet-rigging-347402/raw/vehicles/*.parquet').select(vehicles_columns)

In [25]:
df_vehicles.head()

Row(CRASH_UNIT_ID=829999, CRASH_RECORD_ID='24ddf9fd8542199d832e1c223cc474e5601b356f1d77a648bb792285d3438a8d5bd177787c5cc51d347da7b1effd5920cb5ff39ce97615a184266b0a7cb615ce', RD_NO='JD124535', CRASH_DATE='01/22/2020 06:25:00 AM', UNIT_NO=1, UNIT_TYPE='DRIVER', NUM_PASSENGERS=None, VEHICLE_ID=796949, CMRC_VEH_I='', MAKE='INFINITI', MODEL='UNKNOWN')

In [34]:
people_cloumns = ['PERSON_ID','PERSON_TYPE','CRASH_RECORD_ID','RD_NO','VEHICLE_ID','CRASH_DATE','SEX','AGE','BAC_RESULT']
df_people_parquet = spark.read.parquet('gs://dtc_capstone_data_quiet-rigging-347402/raw/people/*.parquet').select(people_cloumns)
df_people_parquet.head()

AnalysisException:  Column name "BAC_RESULT VALUE" contains invalid character(s). Please use alias to rename it.        

In [66]:

windowSpec  = Window.partitionBy("CRASH_RECORD_ID").orderBy("CRASH_UNIT_ID")

df_vehicles_parquet= df_vehicles_parquet.withColumn("veh_seq_nbr",row_number().over(windowSpec))
    

In [38]:
file = "gs://dtc_capstone_data_quiet-rigging-347402/raw/people/*.parquet"
df_people_parquet = spark.read.parquet(file)
for c in df_people_parquet.columns:
    df_people_parquet = df_people_parquet.withColumnRenamed(c, c.replace(" ", "_"))

df_people_parquetfin = spark.read.schema(df_people_parquet.schema).parquet(file)

AnalysisException:  Column name "BAC_RESULT VALUE" contains invalid character(s). Please use alias to rename it.        

In [31]:
vehiclesCustomSchema = types.StructType([
    types.StructField("CRASH_UNIT_ID", types.IntegerType(), True),
    types.StructField("CRASH_RECORD_ID", types.StringType(), True),
    types.StructField("RD_NO", types.StringType(), True),
    types.StructField("CRASH_DATE", types.TimestampType(), True),
    types.StructField("UNIT_NO", types.StringType(), True),
    types.StructField("UNIT_TYPE", types.StringType(), True),
    types.StructField("NUM_PASSENGERS", types.StringType(), True),
    types.StructField("VEHICLE_ID", types.IntegerType(), True),
    types.StructField("CMRC_VEH_I", types.StringType(), True),
    types.StructField("MAKE", types.StringType(), True),
    types.StructField("MODEL", types.IntegerType(), True)  
    ])

def read_parquet_(path, schema) : 
    return spark.read.format("parquet")\
                             .schema(schema)\
                             .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")\
                             .load(path)

vehicles_path = 'gs://dtc_capstone_data_quiet-rigging-347402/raw/vehicles/*.parquet'
df_vehicles = read_parquet_(vehicles_path, vehiclesCustomSchema)

In [67]:
df_vehicles_parquet.registerTempTable('vehicles')
df_vehicle_raw = spark.sql("""
SELECT 
    CRASH_UNIT_ID, 
    CRASH_RECORD_ID,
    RD_NO,
    CRASH_DATE, 
    UNIT_NO, 
    UNIT_TYPE,
    NUM_PASSENGERS,
    -- Revenue calculation 
    VEHICLE_ID,
    CMRC_VEH_I,
    MAKE,
    MODEL,
    veh_seq_nbr
FROM
    vehicles
    where CRASH_RECORD_ID = '24ddf9fd8542199d832e1c223cc474e5601b356f1d77a648bb792285d3438a8d5bd177787c5cc51d347da7b1effd5920cb5ff39ce97615a184266b0a7cb615ce'
""")

df_vehicle_raw.head()

Row(CRASH_UNIT_ID=829999, CRASH_RECORD_ID='24ddf9fd8542199d832e1c223cc474e5601b356f1d77a648bb792285d3438a8d5bd177787c5cc51d347da7b1effd5920cb5ff39ce97615a184266b0a7cb615ce', RD_NO='JD124535', CRASH_DATE='01/22/2020 06:25:00 AM', UNIT_NO=1, UNIT_TYPE='DRIVER', NUM_PASSENGERS=None, VEHICLE_ID=796949, CMRC_VEH_I='', MAKE='INFINITI', MODEL='UNKNOWN', veh_seq_nbr=1)

In [50]:
df_vehicle_1 = spark.sql("""
SELECT 
    CRASH_RECORD_ID,
    count(CRASH_UNIT_ID) as num_vehicles
FROM
    vehicles
    group by 1
    having num_vehicles >=2
""")

In [51]:
df_vehicle_1.head(20)

[Row(CRASH_RECORD_ID='26201e24dadcfecc3cfe69f10b7224a4df15f0baecf2e955ef8f8a161eedf196888c395ce9dd5fac43cd1d06f2111cd516400b90912744bd7bfaa0cb67ae3641', num_vehicles=2),
 Row(CRASH_RECORD_ID='cb4362a84a3105a0b6b758fb5272e58c85a58f01ee2e180765100255ca8f845e36f024605e97ab9dd35c7bd6b53169f48b2702da1c62289aa5cf047a507428e0', num_vehicles=4),
 Row(CRASH_RECORD_ID='dddf911ff30ead410b91b853eeea6950051afad37014ac3ea029729b6e8af868b904db194c3fa16a1a918b882772e60166d5c26196211597b006bcc9fb7d67a4', num_vehicles=2),
 Row(CRASH_RECORD_ID='c3b6d574ef99fd6240597d05f803a89dc0b202619d5b788ddf99eed81073ca2f6cada9d8359efacaac664afcd11598339ddd7b134e7cadb0c5810f826ae5f740', num_vehicles=2),
 Row(CRASH_RECORD_ID='07cf72f5dc21e8251606dcebc1fd7ff511d6a1808f97f9d260203c2fe4428ec0c27e73cafb17c2addd3b35af40129524d046d71754c1ecab7318746841f56667', num_vehicles=2),
 Row(CRASH_RECORD_ID='71cf510bf81b49034a82a35b8156c79afbbc2011a1233f334ac71456d57767bd424f1ae151f6121edf164223c55b8c06829d52417bd8feca0985bfcaa40de680

In [68]:
df_vehicle_2 = spark.sql("""
SELECT 
    CRASH_UNIT_ID, 
    CRASH_RECORD_ID,
    RD_NO,
    CRASH_DATE, 
    UNIT_NO, 
    UNIT_TYPE,
    NUM_PASSENGERS,
    -- Revenue calculation 
    VEHICLE_ID,
    CMRC_VEH_I,
    MAKE,
    MODEL,
    veh_seq_nbr
FROM
    vehicles
    where CRASH_RECORD_ID = 'cb4362a84a3105a0b6b758fb5272e58c85a58f01ee2e180765100255ca8f845e36f024605e97ab9dd35c7bd6b53169f48b2702da1c62289aa5cf047a507428e0'
""")

df_vehicle_2.head(20)

[Row(CRASH_UNIT_ID=940084, CRASH_RECORD_ID='cb4362a84a3105a0b6b758fb5272e58c85a58f01ee2e180765100255ca8f845e36f024605e97ab9dd35c7bd6b53169f48b2702da1c62289aa5cf047a507428e0', RD_NO='JD334931', CRASH_DATE='08/16/2020 06:30:00 PM', UNIT_NO=1, UNIT_TYPE='BICYCLE', NUM_PASSENGERS=None, VEHICLE_ID=None, CMRC_VEH_I='', MAKE='', MODEL='', veh_seq_nbr=1),
 Row(CRASH_UNIT_ID=940086, CRASH_RECORD_ID='cb4362a84a3105a0b6b758fb5272e58c85a58f01ee2e180765100255ca8f845e36f024605e97ab9dd35c7bd6b53169f48b2702da1c62289aa5cf047a507428e0', RD_NO='JD334931', CRASH_DATE='08/16/2020 06:30:00 PM', UNIT_NO=2, UNIT_TYPE='DRIVER', NUM_PASSENGERS=None, VEHICLE_ID=891177, CMRC_VEH_I='', MAKE='FORD', MODEL='MUSTANG', veh_seq_nbr=2),
 Row(CRASH_UNIT_ID=957468, CRASH_RECORD_ID='cb4362a84a3105a0b6b758fb5272e58c85a58f01ee2e180765100255ca8f845e36f024605e97ab9dd35c7bd6b53169f48b2702da1c62289aa5cf047a507428e0', RD_NO='JD334931', CRASH_DATE='08/16/2020 06:30:00 PM', UNIT_NO=2, UNIT_TYPE='BICYCLE', NUM_PASSENGERS=None, VEHIC

In [75]:
df_vehicle_summ = spark.sql("""
SELECT B.CRASH_RECORD_ID,
    B.RD_NO,
    B.CRASH_DATE, 
    
    max(VEHICLE1_ID) as VEHICLE1_ID,
    max(VEHICLE1_MAKE) as VEHICLE1_MAKE,
    max(VEHICLE1_MODEL) as VEHICLE1_MODEL,
    
    max(VEHICLE2_ID) as VEHICLE2_ID,
    max(VEHICLE2_MAKE) as VEHICLE2_MAKE,
    max(VEHICLE2_MODEL) as VEHICLE2_MODEL,
    
    max(VEHICLE3_ID) as VEHICLE3_ID ,
    max(VEHICLE3_MAKE) as VEHICLE3_MAKE,
    max(VEHICLE3_MODEL) as VEHICLE3_MODEL ,
    
    max(VEHICLE4_ID) as VEHICLE4_ID ,
    max(VEHICLE4_MAKE) as VEHICLE4_MAKE,
    max(VEHICLE4_MODEL) as VEHICLE4_MODEL
    FROM 
(SELECT 
    a.CRASH_RECORD_ID,
    a.RD_NO,
    a.CRASH_DATE, 
    
    b1.CRASH_UNIT_ID as VEHICLE1_ID,
    case when b1.MAKE is null then b1.UNIT_TYPE else b1.MAKE end as VEHICLE1_MAKE,
    case when b1.MODEL is null then b1.UNIT_TYPE else b1.MODEL end as VEHICLE1_MODEL,
    
    b2.CRASH_UNIT_ID as VEHICLE2_ID,
    case when b2.MAKE is null then b2.UNIT_TYPE else b2.MAKE end as VEHICLE2_MAKE,
    case when b2.MODEL is null then b2.UNIT_TYPE else b2.MODEL end as VEHICLE2_MODEL,
    
    b3.CRASH_UNIT_ID as VEHICLE3_ID,
    case when b3.MAKE is null then b3.UNIT_TYPE else b3.MAKE end as VEHICLE3_MAKE,
    case when b3.MODEL is null then b3.UNIT_TYPE else b3.MODEL end as VEHICLE3_MODEL,
    
    b4.CRASH_UNIT_ID as VEHICLE4_ID,
    case when b4.MAKE is null then b4.UNIT_TYPE else b4.MAKE end as VEHICLE4_MAKE,
    case when b4.MODEL is null then b4.UNIT_TYPE else b4.MODEL end as VEHICLE4_MODEL
    
FROM
    vehicles a
    
    left join (select CRASH_RECORD_ID, CRASH_DATE, CRASH_UNIT_ID,VEHICLE_ID,MAKE,MODEL,UNIT_TYPE
                from vehicles where veh_seq_nbr = 1 ) b1
    on a.CRASH_RECORD_ID = b1.CRASH_RECORD_ID
    and a.CRASH_DATE = b1.CRASH_DATE
    and a.CRASH_UNIT_ID = b1.CRASH_UNIT_ID
    and a.VEHICLE_ID = b1.VEHICLE_ID
    
    
    left join (select CRASH_RECORD_ID, CRASH_DATE, CRASH_UNIT_ID,VEHICLE_ID,MAKE,MODEL,UNIT_TYPE
                from vehicles where  veh_seq_nbr = 2) b2
    on a.CRASH_RECORD_ID = b2.CRASH_RECORD_ID
    and a.CRASH_DATE = b2.CRASH_DATE
    and a.CRASH_UNIT_ID = b2.CRASH_UNIT_ID
    and a.VEHICLE_ID = b2.VEHICLE_ID
    
    left join (select CRASH_RECORD_ID, CRASH_DATE, CRASH_UNIT_ID,VEHICLE_ID,MAKE,MODEL,UNIT_TYPE
                from vehicles where  veh_seq_nbr = 3) b3
    on a.CRASH_RECORD_ID = b3.CRASH_RECORD_ID
    and a.CRASH_DATE = b3.CRASH_DATE
    and a.CRASH_UNIT_ID = b3.CRASH_UNIT_ID
    and a.VEHICLE_ID = b3.VEHICLE_ID
    
    left join (select CRASH_RECORD_ID, CRASH_DATE, CRASH_UNIT_ID,VEHICLE_ID,MAKE,MODEL ,UNIT_TYPE
                from vehicles where  veh_seq_nbr = 4) b4
    on a.CRASH_RECORD_ID = b4.CRASH_RECORD_ID
    and a.CRASH_DATE = b4.CRASH_DATE
    and a.CRASH_UNIT_ID = b4.CRASH_UNIT_ID
    and a.VEHICLE_ID = b4.VEHICLE_ID
    
--where a.CRASH_RECORD_ID = 'cb4362a84a3105a0b6b758fb5272e58c85a58f01ee2e180765100255ca8f845e36f024605e97ab9dd35c7bd6b53169f48b2702da1c62289aa5cf047a507428e0'
group by 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15) B
group by 1,2,3
""")

In [76]:
df_vehicle_2.head(20)

[Row(CRASH_RECORD_ID='cb4362a84a3105a0b6b758fb5272e58c85a58f01ee2e180765100255ca8f845e36f024605e97ab9dd35c7bd6b53169f48b2702da1c62289aa5cf047a507428e0', RD_NO='JD334931', CRASH_DATE='08/16/2020 06:30:00 PM', VEHICLE1_ID=None, VEHICLE1_MAKE=None, VEHICLE1_MODEL=None, VEHICLE2_ID=940086, VEHICLE2_MAKE='FORD', VEHICLE2_MODEL='MUSTANG', VEHICLE3_ID=None, VEHICLE3_MAKE=None, VEHICLE3_MODEL=None, VEHICLE4_ID=957469, VEHICLE4_MAKE='FORD', VEHICLE4_MODEL='MUSTANG')]