## 0. Introdução

## 1. Importando dados

### 1.1. Importando dados do S3 para pandas

In [1]:
import boto3

# criando sessão do s3
session = boto3.Session(
    aws_access_key_id = 'AKIAQ7W6N2BJJAHR5AKF',
    aws_secret_access_key='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'
)

#criando objeto s3, que vai criar a conexão
s3 = session.resource('s3')

#criando o arquivo que vamos importar do s3
s3object = s3.Object('beatriz-yaginuma','trabalho-computacao-nuvem/US_Accidents_Dec20_updated.csv')
s3object.get()

#fazendo chamada e jogando dados num objeto
file = s3object.get()['Body']

In [2]:
%%time 

#importando no pandas

import pandas as pd

pandas_df = pd.read_csv(file)

CPU times: user 8.02 s, sys: 958 ms, total: 8.98 s
Wall time: 9.47 s


In [3]:
pandas_df.shape

(1516064, 47)

### 1.2. Importando do Pandas para Spark

Ao invés de criar um contexto para cada interação específica do Spark, vamos criar uma sessão, que é uma combinação de todos os contextos diferentes do Spark.

In [4]:
from pyspark.sql.session import SparkSession

spark = SparkSession.builder \
                    .appName('trabalho') \
                    .getOrCreate()

21/09/29 05:08:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
#criando dataframe
spark_df = spark.createDataFrame(pandas_df)

Problema: algumas colunas na base tem valores ausentes, o que leva o Pandas a representá-los como tipos mistos (string para não ausentes, NaN para valores ausentes).

In [None]:
list(pandas_df.dtypes)

Para resolver isso, precisamos fornecer um schema para a função createDataFrame.
Vamos usar algumas funções para automatizar a construção do schema:

In [None]:
from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    elif f == 'bool': return BooleanType()
    else: return StringType()

def define_structure(string, pandas_dtype):
    try: spark_dtype = equivalent_type(pandas_dtype)
    except: spark_dtype = StringType()
    return StructField(string, spark_dtype)

# Given pandas dataframe, it will return a spark's dataframe.
def define_schema(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    return StructType(struct_list)

- A primeira função tem como input o tipo de dados do pandas, e retorna o tipo equivalente do Spark.
- A segunda função tem como input o nome de uma variável e seu tipo de dados do pandas, e aplica a primeira função para retornar o StructField da variável + o tipo de dados do Spark
- A terceira função tem como input o dataframe do pandas, e constro listas de todas as columas e seus respectivos tipos do dataframe para aplicar a segunda função e criar uma terceira lista de todos os struct types

In [7]:
# Aplicando a função
p_schema = define_schema(pandas_df)
p_schema

StructType(List(StructField(ID,StringType,true),StructField(Severity,LongType,true),StructField(Start_Time,StringType,true),StructField(End_Time,StringType,true),StructField(Start_Lat,FloatType,true),StructField(Start_Lng,FloatType,true),StructField(End_Lat,FloatType,true),StructField(End_Lng,FloatType,true),StructField(Distance(mi),FloatType,true),StructField(Description,StringType,true),StructField(Number,FloatType,true),StructField(Street,StringType,true),StructField(Side,StringType,true),StructField(City,StringType,true),StructField(County,StringType,true),StructField(State,StringType,true),StructField(Zipcode,StringType,true),StructField(Country,StringType,true),StructField(Timezone,StringType,true),StructField(Airport_Code,StringType,true),StructField(Weather_Timestamp,StringType,true),StructField(Temperature(F),FloatType,true),StructField(Wind_Chill(F),FloatType,true),StructField(Humidity(%),FloatType,true),StructField(Pressure(in),FloatType,true),StructField(Visibility(mi),Floa

In [None]:
%%time

# Importando os dados para o Spark, agora definindo o Schema
spark_df = spark.createDataFrame(pandas_df, p_schema)

## 2. Pré-processamento

### 2.1. Descrição da base

In [8]:
# Analisando os dados
spark_df.printSchema()

root
 |-- Severity: long (nullable = true)
 |-- Start_Time: string (nullable = true)
 |-- Start_Lat: float (nullable = true)
 |-- Start_Lng: float (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Temperature(F): float (nullable = true)
 |-- Wind_Chill(F): float (nullable = true)
 |-- Humidity(%): float (nullable = true)
 |-- Pressure(in): float (nullable = true)
 |-- Visibility(mi): float (nullable = true)
 |-- Wind_Direction: string (nullable = true)
 |-- Wind_Speed(mph): float (nullable = true)
 |-- Precipitation(in): float (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Amenity: boolean (nullable = true)
 |-- Bump: boolean (nullable = true)
 |-- Crossing: boolean (nullable = true)
 |-- Give_Way: boolean (nullable = true)
 |-- Junction: boolean (nullable = true)
 |-- No_Exit: boolean (nullable = true)
 |-- Railway: boolean (nullable = true)
 |-- Roundabout: boolean (nulla

Traffic Attributes (4):

- ID: This is a unique identifier of the accident record.
Source: Indicates source of the accident report (i.e. the API which reported the accident.).
- TMC: A traffic accident may have a Traffic Message Channel (TMC) code which provides more detailed description of the event.
- Severity: Shows the severity of the accident, a number between 1 and 4, where 1 indicates the least impact on traffic (i.e., short delay as a result of the accident) and 4 indicates a significant impact on traffic (i.e., long delay).
- Start_Time: Shows start time of the accident in local time zone.
- End_Time: Shows end time of the accident in local time zone.
- Start_Lat: Shows latitude in GPS coordinate of the start point.
- Start_Lng: Shows longitude in GPS coordinate of the start point.
- End_Lat: Shows latitude in GPS coordinate of the end point.
- End_Lng: Shows longitude in GPS coordinate of the end point.
- Distance(mi): The length of the road extent affected by the accident.
- Description: Shows natural language description of the accident.

Address Attributes (3):

- State: Shows the state in address field.
- Country: Shows the country in address field.
- Timezone: Shows timezone based on the location of the accident (eastern, central, etc.).

Weather Attributes (9):

- Temperature(F): Shows the temperature (in Fahrenheit).
- Wind_Chill(F): Shows the wind chill (in Fahrenheit).
- Humidity(%): Shows the humidity (in percentage).
- Pressure(in): Shows the air pressure (in inches).
- Visibility(mi): Shows visibility (in miles).
- Wind_Direction: Shows wind direction.
- Wind_Speed(mph): Shows wind speed (in miles per hour).
- Precipitation(in): Shows precipitation amount in inches, if there is any.
- Weather_Condition: Shows the weather condition (rain, snow, thunderstorm, fog, etc.).

POI Attributes (13):

- Amenity: A Point-Of-Interest (POI) annotation which indicates presence of amenity in a nearby location.
- Bump: A POI annotation which indicates presence of speed bump or hump in a nearby location.
- Crossing: A POI annotation which indicates presence of crossing in a nearby location.
- Give_Way: A POI annotation which indicates presence of give_way sign in a nearby location.
- Junction: A POI annotation which indicates presence of junction in a nearby location.
- No_Exit: A POI annotation which indicates presence of no_exit sign in a nearby location.
- Railway: A POI annotation which indicates presence of railway in a nearby location.
- Roundabout: A POI annotation which indicates presence of roundabout in a nearby location.
- Station: A POI annotation which indicates presence of station (bus, train, etc.) in a nearby location.
- Stop: A POI annotation which indicates presence of stop sign in a nearby location.
- Traffic_Calming: A POI annotation which indicates presence of traffic_calming means in a nearby location.
- Traffic_Signal: A POI annotation which indicates presence of traffic_signal in a nearby location.
- Turning_Loop: A POI annotation which indicates presence of turning_loop in a nearby location.

Period-of-Day (4):

- Sunrise_Sunset: Shows the period of day (i.e. day or night) based on sunrise/sunset.
- Civil_Twilight: Shows the period of day (i.e. day or night) based on civil twilight.
- Nautical_Twilight: Shows the period of day (i.e. day or night) based on nautical twilight.
- Astronomical_Twilight: Shows the period of day (i.e. day or night) based on astronomical twilight.

### 2.2. Pré-processamento

## Rascunho

In [None]:
import matplotlib.pyplot as plt


In [20]:
#importando no spark
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)

In [34]:
spark = SparkSession.builder \
            .appName("my_app") \
            .config('spark.sql.codegen.wholeStage', False) \
            .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "AKIAQ7W6N2BJJAHR5AKF")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "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")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "eu-west-3.amazonaws.com")

In [35]:
data = spark.read.csv("s3a://beatriz-yaginuma/trabalho-computacao-nuvem/US_Accidents_Dec20_updated.csv")

Py4JJavaError: An error occurred while calling o129.csv.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2595)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3269)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:377)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:795)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2499)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2593)
	... 25 more


In [14]:
# Criando sessão do Spark
from pyspark import SparkContext

sc = SparkContext("local","car_accidents")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=car_accidents, master=local) created by __init__ at /tmp/ipykernel_15/1799683984.py:5 

In [15]:
#criando um novo contexto SQL em cima do contexto existente
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [1]:
sc.stop()

NameError: name 'sc' is not defined

In [None]:
#Olhando apenas para um subconjunto dos dados
pandas_df = pandas_df[['Severity','Start_Time','Start_Lat','Start_Lng','State','Country','Timezone',
                       'Temperature(F)','Wind_Chill(F)','Humidity(%)','Pressure(in)','Visibility(mi)',
                       'Wind_Direction','Wind_Speed(mph)','Precipitation(in)', 'Weather_Condition',
                       'Amenity','Bump','Crossing','Give_Way','Junction','No_Exit','Railway','Roundabout','Station',
                       'Stop','Traffic_Calming','Traffic_Signal','Turning_Loop',
                       'Sunrise_Sunset','Civil_Twilight','Nautical_Twilight','Astronomical_Twilight']]

In [9]:
list(df_pandas.columns)

['Severity',
 'Start_Time',
 'Start_Lat',
 'Start_Lng',
 'Side',
 'State',
 'Country',
 'Timezone',
 'Temperature(F)',
 'Wind_Chill(F)',
 'Humidity(%)',
 'Pressure(in)',
 'Visibility(mi)',
 'Wind_Direction',
 'Wind_Speed(mph)',
 'Precipitation(in)',
 'Weather_Condition',
 'Amenity',
 'Bump',
 'Crossing',
 'Give_Way',
 'Junction',
 'No_Exit',
 'Railway',
 'Roundabout',
 'Station',
 'Stop',
 'Traffic_Calming',
 'Traffic_Signal',
 'Turning_Loop',
 'Sunrise_Sunset',
 'Civil_Twilight',
 'Nautical_Twilight',
 'Astronomical_Twilight']

In [11]:
list(df_pandas.dtypes)

[dtype('int64'),
 dtype('O'),
 dtype('float64'),
 dtype('float64'),
 dtype('O'),
 dtype('O'),
 dtype('O'),
 dtype('O'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('O'),
 dtype('float64'),
 dtype('float64'),
 dtype('O'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('bool'),
 dtype('O'),
 dtype('O'),
 dtype('O'),
 dtype('O')]

In [None]:
#definindo schema 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

mySchema = StructType([ StructField('Severity', StringType(), True)\
                       ,StructField("Quantidade vinculos ativos", IntegerType(), True)\
                       ,StructField("UF", IntegerType(), True)\
                       ,StructField("Ind Simples", IntegerType(), True)])

In [None]:
from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    elif f == 'bool': return BooleanType()
    else: return StringType()

def define_structure(string, pandas_dtype):
    try: spark_dtype = equivalent_type(pandas_dtype)
    except: spark_dtype = StringType()
    return StructField(string, spark_dtype)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)