In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [28]:
spark = SparkSession.builder \
.appName("Projeto") \
.getOrCreate()

ConnectionRefusedError: [Errno 111] Connection refused

In [9]:
df_sample = spark.read.csv("UK_Accident.csv", header=True, inferSchema=False)
df_sample.show(5)

+---+--------------+---------------------+----------------------+---------+---------+------------+-----------------+------------------+--------------------+----------+-----------+-----+--------------------------+-------------------------+--------------+---------------+------------------+-----------+--------------------+--------------+---------------+---------------------------------+---------------------------------------+--------------------+--------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+----+
|_c0|Accident_Index|Location_Easting_OSGR|Location_Northing_OSGR|Longitude| Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|      Date|Day_of_Week| Time|Local_Authority_(District)|Local_Authority_(Highway)|1st_Road_Class|1st_Road_Number|         Road_Type|Speed_limit|    Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Cr

In [12]:
accidents = spark.read.csv("UK_Accident.csv", header=True, inferSchema=True)
traffic = spark.read.csv("local_authority_traffic.csv", header=True, inferSchema=True)

accidents.write.parquet("UK_Accident.parquet", mode="overwrite")
traffic.write.parquet("local_authority_traffic.parquet", mode="overwrite")

df_accidents = spark.read.parquet("UK_Accident.parquet")
df_traffic = spark.read.parquet("local_authority_traffic.parquet")

In [18]:
df_accidents = df_accidents.withColumnRenamed("Accident_Index", "Accident_ID") \
                     .withColumnRenamed("Local_Authority_(District)", "Authority_ID") \
                    .filter(col("Year").between(2005,2014))
df_traffic = df_traffic.withColumnRenamed("local_authority_id", "Authority_ID") \
                 .withColumnRenamed("local_authority_name", "Authority_Name") \
                 .withColumnRenamed("year", "Year")\
                .filter(col("Year").between(2005,2014))

df_accidents = df_accidents.drop("Date")

df_accidents = df_accidents.dropDuplicates()
df_traffic = df_traffic.dropDuplicates()

combined = df_accidents.join(df_traffic, on=["Authority_ID", "Year"], how="inner")

combined = combined.fillna({"Weather_Conditions": "Unknown", "Speed_limit": 0})

from pyspark.sql.functions import when
combined = combined.withColumn("Period_of_Day", 
                                when(col("Time") < "12:00", "Morning")
                                .when((col("Time") >= "12:00") & (col("Time") < "18:00"), "Afternoon")
                                .otherwise("Evening"))

combined.show(5)

combined.printSchema()

print(f"Número de registos combinados: {combined.count()}")

+------------+----+------+-----------+---------------------+----------------------+---------+---------+------------+-----------------+------------------+--------------------+-----------+-------------------+-------------------------+--------------+---------------+------------------+-----------+--------------------+--------------+---------------+---------------------------------+---------------------------------------+--------------------+--------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+--------------+--------------+-----------------+--------------+------------------+-------------+
|Authority_ID|Year|   _c0|Accident_ID|Location_Easting_OSGR|Location_Northing_OSGR|Longitude| Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|Day_of_Week|               Time|Local_Authority_(Highway)|1st_Road_Class|1st_Road_Number|         Road_Typ

In [24]:
# Estatísticas descritivas para o dataset combinado
combined.describe(["Number_of_Vehicles", "Number_of_Casualties", "cars_and_taxis", "all_motor_vehicles"]).show()

# Contagem de acidentes por gravidade
combined.groupBy("Accident_Severity").count().show()

# Contagem por condições meteorológicas
combined.groupBy("Weather_Conditions").count().show()

# Número médio de veículos envolvidos em acidentes por ano
combined.groupBy("Year").agg({"Number_of_Vehicles": "avg"}).show()

# Volume total de tráfego por ano
combined.groupBy("Year").agg({"all_motor_vehicles": "sum"}).show()

# Correlação entre o número de veículos e o número de vítimas
from pyspark.sql.functions import corr
combined.select(corr("Number_of_Vehicles", "Number_of_Casualties")).show()

# Encontrar registros com tráfego anormalmente alto
combined.filter(combined["all_motor_vehicles"] > 1e9).show()

ConnectionRefusedError: [Errno 111] Connection refused