# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [8]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.2X
%number_of_workers 10

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session ecb84d43-c817-4847-aa7f-3e664d0d1bdd.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session ecb84d43-c817-4847-aa7f-3e664d0d1bdd.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 5.0


You are already connected to a glueetl session ecb84d43-c817-4847-aa7f-3e664d0d1bdd.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: None
Setting new worker type to: G.2X


You are already connected to a glueetl session ecb84d43-c817-4847-aa7f-3e664d0d1bdd.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: None
Setting new number of workers to: 10



In [None]:
# Reading the CSV file from S3 bucket and displaying the data

In [9]:

s3_path = "s3://accidentdatasetbucket/Stagging/US_Accidents_March23.csv"

# Read CSV as Glue DynamicFrame
dyf = glueContext.create_dynamic_frame.from_options(
    format_options={"withHeader": True, "separator": ","}, 
    connection_type="s3",
    format="csv",
    connection_options={"paths": [s3_path]},
)




In [None]:
# Show schema

In [10]:
dyf.printSchema()

root
|-- ID: string
|-- Source: string
|-- Severity: string
|-- Start_Time: string
|-- End_Time: string
|-- Start_Lat: string
|-- Start_Lng: string
|-- End_Lat: string
|-- End_Lng: string
|-- Distance(mi): string
|-- Description: string
|-- Street: string
|-- City: string
|-- County: string
|-- State: string
|-- Zipcode: string
|-- Country: string
|-- Timezone: string
|-- Airport_Code: string
|-- Weather_Timestamp: string
|-- Temperature(F): string
|-- Wind_Chill(F): string
|-- Humidity(%): string
|-- Pressure(in): string
|-- Visibility(mi): string
|-- Wind_Direction: string
|-- Wind_Speed(mph): string
|-- Precipitation(in): string
|-- Weather_Condition: string
|-- Amenity: string
|-- Bump: string
|-- Crossing: string
|-- Give_Way: string
|-- Junction: string
|-- No_Exit: string
|-- Railway: string
|-- Roundabout: string
|-- Station: string
|-- Stop: string
|-- Traffic_Calming: string
|-- Traffic_Signal: string
|-- Turning_Loop: string
|-- Sunrise_Sunset: string
|-- Civil_Twilight: str

In [14]:
from pyspark.sql import SparkSession
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F




In [15]:
# Convert DynamicFrame to Spark DataFrame
df = dyf.toDF()



In [16]:
# Show first 5 rows to verify data
df.show(5)

+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+------------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|              Street|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Ameni

In [None]:
#Working with Missing, incoreect, and invalid data

In [None]:
# Check for missing (NaN) values

In [17]:
missing_values = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])




In [18]:
missing_values = df.select([F.count(F.when((F.col(c).isNull()) | (F.col(c) == 0), c)).alias(c) for c in df.columns])




In [19]:
# Show missing values per column
missing_values.show()

+---+------+--------+----------+--------+---------+---------+-------+-------+------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twil

In [20]:
total_rows = df.count()




In [21]:
# Count NaN and 0 values per column
missing_values_count = df.select([F.count(F.when((F.col(c).isNull()) | (F.col(c) == 0), c)).alias(c) for c in df.columns])




In [22]:
# Calculate the percentage of missing and 0 values
perc_missing = missing_values_count.select(
    [(F.col(c) / total_rows * 100).alias(c) for c in missing_values_count.columns]
)




In [23]:
# Show percentage of NaN and 0 values per column
perc_missing.show()

+---+------+--------+----------+--------+---------+---------+-------+-------+-----------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------------+-------------------+-----------+--------------------+-----------------+--------------+------------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|     Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|      Temperature(F)|      Wind_Chill(F)|Humidity(%)|        Pressure(in)|   Visibility(mi)|Wind_Direction|   Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_S

In [24]:
# Filter columns with 20% or more missing (NaN) or 0 values
columns_to_drop = perc_missing.select(
    [F.when(F.col(c) >= 20, c).alias(c) for c in perc_missing.columns]
).dropna(how='all')  # Drop columns that are entirely NaN (if no columns meet the threshold)




In [25]:
# Show the list of columns to be dropped
columns_to_drop.show()

+----+------+--------+----------+--------+---------+---------+-------+-------+------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|  ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Tw

In [26]:
# Drop the identified columns from the original DataFrame
columns_to_drop_list = [col for col in columns_to_drop.columns if columns_to_drop.select(col).first()[0] is not None]
df_cleaned = df.drop(*columns_to_drop_list)




In [27]:
# Show the cleaned DataFrame schema
df_cleaned.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- End_Time: string (nullable = true)
 |-- Start_Lat: string (nullable = true)
 |-- Start_Lng: string (nullable = true)
 |-- End_Lat: string (nullable = true)
 |-- End_Lng: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: string (nullable = true)
 |-- Temperature(F): string (nullable = true)
 |-- Wind_Chill(F): string (nullable = true)
 |-- Humidity(%): string (nullable = true)
 |-- Pressure(in): string (nullable = true)
 |-- Visibility(mi): string (nullable = true)
 |-- Wind_Dire

In [27]:
# Unrealistic temperature values (outside -30°F to 130°F)
df_cleaned = df_cleaned.withColumn(
    "Temperature(F)",
    F.when((F.col("Temperature(F)") < -30) | (F.col("Temperature(F)") > 130), F.lit(None))
    .otherwise(F.col("Temperature(F)"))
)




In [28]:
# Show cleaned data with unrealistic temperature values handled
df_cleaned.show(5)

+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|End_Lat|End_Lng|         Description|              Street|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout

In [29]:
# Replace 'Visibility(mi)' values that are 0 or greater than 100 with NaN
df_cleaned = df_cleaned.withColumn(
    "Visibility(mi)",
    F.when((F.col("Visibility(mi)") == 0) | (F.col("Visibility(mi)") > 100), F.lit(None))
    .otherwise(F.col("Visibility(mi)"))
)




In [30]:
# Show the cleaned data with replaced 'Visibility(mi)' values
df_cleaned.show(5)

+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|End_Lat|End_Lng|         Description|              Street|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout

In [31]:
# Display rows where Wind_Speed(mph) is greater than 100
extreme_wind_speed = df_cleaned.filter(F.col("Wind_Speed(mph)") > 100)




In [32]:
# Show the rows with extreme wind speed
extreme_wind_speed.show()

+--------+-------+--------+-------------------+-------------------+------------------+-------------------+-------+-------+--------------------+-------------------+-------------+-----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|      ID| Source|Severity|         Start_Time|           End_Time|         Start_Lat|          Start_Lng|End_Lat|End_Lng|         Description|             Street|         City|     County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Ra

In [33]:
# Replace extreme wind speed values with NaN
df_cleaned = df_cleaned.withColumn(
    "Wind_Speed(mph)",
    F.when(F.col("Wind_Speed(mph)") > 100, F.lit(None))
    .otherwise(F.col("Wind_Speed(mph)"))
)




In [34]:
# Show the cleaned data with replaced 'Wind_Speed(mph)' values
df_cleaned.show(5)

+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|End_Lat|End_Lng|         Description|              Street|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout

In [None]:
# Handle Humidity(%) outliers (clip values outside the 10%-90% range)

In [35]:
df_cleaned = df_cleaned.withColumn(
    "Humidity(%)",
    F.when(F.col("Humidity(%)") < 10, F.lit(10))   # Set values less than 10 to 10
    .when(F.col("Humidity(%)") > 90, F.lit(90))    # Set values greater than 90 to 90
    .otherwise(F.col("Humidity(%)"))               # Leave other values unchanged
)




In [36]:
# Handle extreme Pressure(in) values (< 28 or > 32)
df_cleaned = df_cleaned.withColumn(
    "Pressure(in)",
    F.when((F.col("Pressure(in)") < 28) | (F.col("Pressure(in)") > 32), F.lit(None))
    .otherwise(F.col("Pressure(in)"))
)




In [37]:
# show rows with extreme Pressure(in) values (< 28 or > 32)
extreme_Pressure = df_cleaned.filter((F.col("Pressure(in)") < 28) | (F.col("Pressure(in)") > 32))




In [38]:
extreme_Pressure.show()

+---+------+--------+----------+--------+---------+---------+-------+-------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|
+---+------+--------+----------+--

In [39]:
# Show the cleaned data with the replaced Pressure(in) values
df_cleaned.show(5)

+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|End_Lat|End_Lng|         Description|              Street|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout

In [None]:
#Keep only the selected columns

In [45]:
useful_columns = ['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat',  
                  'Start_Lng', 'Distance(mi)', 'Street', 'City', 'County', 'State', 
                  'Timezone', 'Temperature(F)', 'Humidity(%)', 'Pressure(in)',
                  'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 
                  'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 
                  'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop',
                  'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset']





In [46]:
df_cleaned = df.select(useful_columns)




In [48]:
num_columns = len(df_cleaned.columns)




In [53]:
print(num_columns)

33


In [65]:
# Define the new S3 path for the transformed data
transformed_s3_path = "s3://accidentdatasetbucket/Transformed Data/"





In [67]:
# Coalesce the DataFrame to a single partition (single file) and write it to S3
df_cleaned.coalesce(1).write.format("csv").option("header", "true").mode("overwrite").save(transformed_s3_path)




#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


#### Example: Visualize data with matplotlib


#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)