In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f


In [2]:
spark = SparkSession.builder.appName('mySpark').master('local[*]').getOrCreate()
spark

In [3]:
df = spark.read.csv('US_Accidents_March23.csv', header = True, inferSchema = True)
#df.show(5)

In [4]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- V

In [5]:
df.count()

7728394

In [6]:
#null = [f.sum(f.when(f.col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]
#df_null = df.agg(*null)
#df_null.show()

+---+------+--------+----------+--------+---------+---------+-------+-------+------------+-----------+------+----+------+-----+-------+-------+--------+------------+-----------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID|Source|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|End_Lat|End_Lng|Distance(mi)|Description|Street|City|County|State|Zipcode|Country|Timezone|Airport_Code|Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Amenity|Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station|Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twil

In [7]:
df = df.drop('Source', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Zipcode', 'Airport_Code')
#df.show(5)

In [8]:
df = df.withColumn('Occurrence_day', f.to_date(f.col('Start_Time')))
#df.show(5)


In [9]:
df = df.withColumn('Weather_Timestamp', f.to_date(f.col('Weather_Timestamp')))
media_clima = df.groupBy('Weather_Timestamp').agg(
    {   
     'Temperature(F)': 'avg',
     'Wind_Chill(F)': 'avg',
     'Humidity(%)': 'avg',
     'Pressure(in)': 'avg',
     'Visibility(mi)': 'avg',
     'Wind_Speed(mph)': 'avg',
     'Precipitation(in)': 'avg'
    }
    ).orderBy('Weather_Timestamp')
#media_clima.show(5)

In [10]:
for coluna in media_clima.columns:
    if coluna.startswith('avg'):
        media_clima = media_clima.withColumn(coluna, f.round(f.col(coluna), 2))
#media_clima.show()

In [11]:
acidentes = df.groupBy('Occurrence_day', 'State', 'County', 'City', 'Street', 'Severity', 'Wind_Direction',\
                       'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit',\
                       'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop',\
                       'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'  
                       ).count().orderBy('Occurrence_day')
#acidentes.show()

In [12]:
acidentes = acidentes.join(media_clima, on = media_clima['Weather_Timestamp'] == acidentes['Occurrence_day']).orderBy('Occurrence_day')
#acidentes.show() 

In [13]:
acidentes = acidentes.drop('Weather_Timestamp')
#acidentes.show(10)


In [14]:
#null = [f.sum(f.when(f.col(c).isNull(), 1).otherwise(0)).alias(c) for c in acidentes.columns]
#df_null = acidentes.agg(*null)
#df_null.show()

In [15]:
acidentes = acidentes.dropna(how = 'any')
#acidentes.show()

In [16]:
acidentes.printSchema()

root
 |-- Occurrence_day: date (nullable = true)
 |-- State: string (nullable = true)
 |-- County: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Wind_Direction: string (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Amenity: boolean (nullable = true)
 |-- Bump: boolean (nullable = true)
 |-- Crossing: boolean (nullable = true)
 |-- Give_Way: boolean (nullable = true)
 |-- Junction: boolean (nullable = true)
 |-- No_Exit: boolean (nullable = true)
 |-- Railway: boolean (nullable = true)
 |-- Roundabout: boolean (nullable = true)
 |-- Station: boolean (nullable = true)
 |-- Stop: boolean (nullable = true)
 |-- Traffic_Calming: boolean (nullable = true)
 |-- Traffic_Signal: boolean (nullable = true)
 |-- Turning_Loop: boolean (nullable = true)
 |-- Sunrise_Sunset: string (nullable = true)
 |-- Civil_Twilight: string (nullable = true)
 |-- Nautical_Twilight: string (nu

In [17]:
acidentes.write.parquet('Data/Acidentes.parquet')

In [37]:
teste_parquet = spark.read.parquet('Data\Acidentes.parquet', header = True, inferSchema = True)
teste_parquet.show()

+--------------+-----+----------+--------------------+--------------------+--------+--------------+--------------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+-----+-------------------+----------------+----------------------+--------------------+------------------+-----------------+-------------------+
|Occurrence_day|State|    County|                City|              Street|Severity|Wind_Direction|   Weather_Condition|Amenity| Bump|Crossing|Give_Way|Junction|No_Exit|Railway|Roundabout|Station| Stop|Traffic_Calming|Traffic_Signal|Turning_Loop|Sunrise_Sunset|Civil_Twilight|Nautical_Twilight|Astronomical_Twilight|count|avg(Temperature(F))|avg(Humidity(%))|avg(Precipitation(in))|avg(Wind_Speed(mph))|avg(Wind_Chill(F))|avg(Pressure(in))|avg(Visibility(mi))|
+--------------+-----+----------+--------------------+--------------------+---