In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc=SparkContext.getOrCreate()
glueContext=GlueContext(sc)
spark=glueContext.spark_session
job=Job(glueContext)


In [27]:
vehicles_df_raw = glueContext.create_dynamic_frame.from_options(
    connection_type='s3',
    connection_options={
        "paths": ["s3://cloudprojectteam4/Traffic_Crashes_-_Vehicles.csv/"]
    },
    format='csv',
    format_options={"withHeader": True, "optimizePerformance": True}
)





In [28]:
vehicles_df_raw.printSchema()

root
|-- CRASH_UNIT_ID: string
|-- RD_NO: string
|-- CRASH_DATE: string
|-- UNIT_NO: string
|-- UNIT_TYPE: string
|-- NUM_PASSENGERS: string
|-- VEHICLE_ID: string
|-- CMRC_VEH_I: string
|-- MAKE: string
|-- MODEL: string
|-- LIC_PLATE_STATE: string
|-- VEHICLE_YEAR: string
|-- VEHICLE_DEFECT: string
|-- VEHICLE_TYPE: string
|-- VEHICLE_USE: string
|-- TRAVEL_DIRECTION: string
|-- MANEUVER: string
|-- TOWED_I: string
|-- FIRE_I: string
|-- OCCUPANT_CNT: string
|-- EXCEED_SPEED_LIMIT_I: string
|-- TOWED_BY: string
|-- TOWED_TO: string
|-- AREA_00_I: string
|-- AREA_01_I: string
|-- AREA_02_I: string
|-- AREA_03_I: string
|-- AREA_04_I: string
|-- AREA_05_I: string
|-- AREA_06_I: string
|-- AREA_07_I: string
|-- AREA_08_I: string
|-- AREA_09_I: string
|-- AREA_10_I: string
|-- AREA_11_I: string
|-- AREA_12_I: string
|-- AREA_99_I: string
|-- FIRST_CONTACT_POINT: string
|-- CMV_ID: string
|-- USDOT_NO: string
|-- CCMC_NO: string
|-- ILCC_NO: string
|-- COMMERCIAL_SRC: string
|-- GVWR: str

In [31]:
crashes_df_raw = glueContext.create_dynamic_frame.from_options(
    connection_type='s3',
    connection_options={
        "paths": ["s3://cloudprojectteam4/Traffic_Crashes_-_Crashes.csv/"]
    },
    format='csv',
    format_options={"withHeader": True, "optimizePerformance": True}
)





In [32]:
crashes_df_raw.printSchema()

root
|-- ﻿RD_NO: string
|-- CRASH_DATE_EST_I: string
|-- CRASH_DATE: string
|-- POSTED_SPEED_LIMIT: string
|-- TRAFFIC_CONTROL_DEVICE: string
|-- DEVICE_CONDITION: string
|-- WEATHER_CONDITION: string
|-- LIGHTING_CONDITION: string
|-- FIRST_CRASH_TYPE: string
|-- TRAFFICWAY_TYPE: string
|-- LANE_CNT: string
|-- ALIGNMENT: string
|-- ROADWAY_SURFACE_COND: string
|-- ROAD_DEFECT: string
|-- REPORT_TYPE: string
|-- CRASH_TYPE: string
|-- INTERSECTION_RELATED_I: string
|-- NOT_RIGHT_OF_WAY_I: string
|-- HIT_AND_RUN_I: string
|-- DAMAGE: string
|-- DATE_POLICE_NOTIFIED: string
|-- PRIM_CONTRIBUTORY_CAUSE: string
|-- SEC_CONTRIBUTORY_CAUSE: string
|-- STREET_NO: string
|-- STREET_DIRECTION: string
|-- STREET_NAME: string
|-- BEAT_OF_OCCURRENCE: string
|-- PHOTOS_TAKEN_I: string
|-- STATEMENTS_TAKEN_I: string
|-- DOORING_I: string
|-- WORK_ZONE_I: string
|-- WORK_ZONE_TYPE: string
|-- WORKERS_PRESENT_I: string
|-- NUM_UNITS: string
|-- MOST_SEVERE_INJURY: string
|-- INJURIES_TOTAL: string
|-

In [50]:
vehicles_df_spark = vehicles_df_raw.toDF()





In [55]:
import pandas as pd

s3_file_path = "s3://cloudprojectteam4/Traffic_Crashes_-_Vehicles.csv/"
vehicles_df = pd.read_csv(s3_file_path, usecols=['RD_NO', 'UNIT_TYPE', 'MAKE', 'MODEL', 'VEHICLE_YEAR'])





In [56]:
vehicles_df.head()

      RD_NO UNIT_TYPE        MAKE          MODEL  VEHICLE_YEAR
0  JB278428    DRIVER       DODGE        CHARGER        2008.0
1  JB278428    PARKED     PONTIAC             G6        2005.0
2  JB281663    DRIVER  VOLKSWAGEN          JETTA           NaN
3  JB281663    DRIVER      NISSAN  NISSAN MAXIMA        2015.0
4  JB283806    DRIVER     UNKNOWN        UNKNOWN           NaN


In [57]:
crashes_df_spark = crashes_df_raw.toDF()




In [58]:
import pandas as pd

s3_file_path = "s3://cloudprojectteam4/Traffic_Crashes_-_Crashes.csv/"
crashes_df = pd.read_csv(s3_file_path, usecols=['RD_NO', 'CRASH_DATE', 'POSTED_SPEED_LIMIT', 'TRAFFIC_CONTROL_DEVICE', 'DEVICE_CONDITION', 'WEATHER_CONDITION', 'LIGHTING_CONDITION', 'TRAFFICWAY_TYPE', 'ALIGNMENT', 'ROADWAY_SURFACE_COND', 'ROAD_DEFECT', 'CRASH_TYPE', 'DAMAGE', 'PRIM_CONTRIBUTORY_CAUSE', 'MOST_SEVERE_INJURY', 'CRASH_HOUR', 'CRASH_DAY_OF_WEEK', 'CRASH_MONTH', 'LATITUDE', 'LONGITUDE'])




In [59]:
crashes_df.head()

      RD_NO              CRASH_DATE  ...   LATITUDE  LONGITUDE
0  JC123097  01/19/2019 11:45:00 PM  ...  41.859566 -87.659525
1  JC123084  01/19/2019 11:30:00 PM  ...  41.928335 -87.699915
2  JC123076  01/19/2019 11:16:00 PM  ...  41.870048 -87.715673
3  JC123075  01/19/2019 11:15:00 PM  ...  41.876569 -87.686450
4  JC123050  01/19/2019 10:52:00 PM  ...  41.946837 -87.690697

[5 rows x 20 columns]


In [60]:
# Merge the two datasets on the 'RD_NO' column
traffic_crashes_df = pd.merge(vehicles_df, crashes_df, on='RD_NO',how='inner')




In [61]:
# Print the first five rows of Traffic_Crashes_Vehicles.csv dataset
print(traffic_crashes_df.head())

      RD_NO UNIT_TYPE        MAKE  ... CRASH_MONTH   LATITUDE  LONGITUDE
0  JB278428    DRIVER       DODGE  ...           5  41.820475 -87.608080
1  JB278428    PARKED     PONTIAC  ...           5  41.820475 -87.608080
2  JB281663    DRIVER  VOLKSWAGEN  ...           5  41.968739 -87.672098
3  JB281663    DRIVER      NISSAN  ...           5  41.968739 -87.672098
4  JB283806    DRIVER     UNKNOWN  ...           5  41.736007 -87.645342

[5 rows x 24 columns]


In [62]:
# Show the dimensions of the dataset
print(traffic_crashes_df.shape)

(527452, 24)


In [63]:
# Drop rows with null values
traffic_crashes_df = traffic_crashes_df.dropna()




In [64]:
# Show the dimensions of the dataset
print(traffic_crashes_df.shape)

(429986, 24)


In [65]:
# Drop duplicates
traffic_crashes_df = traffic_crashes_df.drop_duplicates()




In [66]:
# Show the DataFrame
print(traffic_crashes_df.shape)

(429661, 24)


In [75]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from awsglue.context import GlueContext

# Initialize Spark and Glue Contexts
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = SparkSession.builder.getOrCreate()

# Convert Pandas DataFrame to Spark DataFrame (if needed)
# spark_df = spark.createDataFrame(pandas_df)

# Define the S3 output directory
output_dir = "s3://cloudprojectteam4/Cleaned Data/"

# Write the DataFrame to the S3 bucket in CSV format
spark_df.write.mode("overwrite").option("header", "true").csv(output_dir)



