## Modelagem preditiva com dados Out-of-Core utilizando Dask

Alunas: Ana Beatriz Parra Ferreira, Bruna Meinberg, Luana Abramoff 


### Dados Out-of-Core 

Dados out-of-core são conjuntos de dados que ultrapassam a capacidade de memória RAM disponível em uma máquina. Este termo é frequentemente utilizado no contexto de bancos de dados, sistemas de arquivos e computação científica, onde o manuseio eficiente de grandes volumes de dados é crítico. 

O principal desafio ao trabalhar com dados out-of-core é o gerenciamento eficiente do I/O (entrada e saída) de dados, uma vez que o acesso contínuo a um disco rígido ou a uma rede pode se tornar um gargalo significativo. Além disso, é fundamental manter a eficiência no processamento para evitar a deterioração do desempenho devido ao constante carregamento e descarregamento de dados.

A capacidade de processar dados out-of-core é crucial para muitas aplicações de ciência de dados, especialmente em um mundo onde o volume de dados continua a crescer exponencialmente. As ferramentas que suportam esses processos são essenciais para permitir análises complexas e insights que de outra forma não seriam possíveis.

### Estratégias de Processamento

Para processar dados out-of-core de maneira eficaz, as estratégias a seguir são comumente adotadas:

1. **Streaming de dados**: Os dados são processados sequencialmente em pequenos blocos, permitindo que operações sejam realizadas em cada bloco à medida que passam pela memória.
2. **Indexação eficiente**: Criar índices que permitem acessar rapidamente partes específicas dos dados sem a necessidade de carregar conjuntos de dados inteiros na memória.
3. **Algoritmos otimizados para out-of-core**: Utilizar ou desenvolver algoritmos que minimizam a necessidade de movimentação de dados entre o disco e a RAM.
4. **Parallel processing**: Paralelizar o processamento de dados tanto quanto possível para maximizar o uso de todos os núcleos disponíveis, tratando diferentes partes dos dados simultaneamente.

### Ferramentas e Tecnologias

Várias ferramentas e tecnologias foram desenvolvidas para facilitar o manuseio de dados out-of-core:

- **Dask**: Fornece estruturas de dados e uma programação paralela que permite trabalhar eficientemente com grandes conjuntos de dados de maneira simples e integrada.
- **Pandas com chunksize**: Pandas pode processar arquivos grandes em pedaços menores com o parâmetro `chunksize`, permitindo operações em partes do arquivo de cada vez.
- **Bibliotecas de aprendizado de máquina adaptadas**: Algumas bibliotecas de machine learning, como o `scikit-learn`, têm opções ou versões que suportam aprendizado incrementativo para dados grandes.


O seguinte notebook tem como objetivo mostrar de forma detalhada como fazer um modelo preditivo no seu computador para dados que nem cabem na RAM faazendo uso do Dask. 

-------





# Introdução ao Dask

Dask é uma biblioteca de computação paralela e distribuída que integra de maneira harmoniosa com o ecossistema do Python, principalmente com as bibliotecas Pandas, NumPy e Scikit-Learn. Desenvolvido para operar com conjuntos de dados que excedem a memória RAM disponível, o Dask permite análises complexas e modelos de aprendizado de máquina em grandes volumes de dados.

O Dask tornou-se uma ferramenta essencial no arsenal de cientistas de dados que enfrentam desafios associados ao manejo de grandes conjuntos de dados. Sua habilidade em se integrar de forma suave ao ambiente Python, junto com sua simplicidade de uso e poderoso desempenho, o destaca como uma escolha primordial para computação de dados em grande escala.

### Por que usar Dask?

A crescente disponibilidade de grandes conjuntos de dados pode ser um desafio considerável para os cientistas de dados. A limitação da memória RAM significa que dados extensos não podem ser processados diretamente. A solução tradicional para isso seria usar ferramentas como Spark ou Hadoop, que, embora eficazes, podem ser complexas e pesadas para configurar e operar. O Dask, por outro lado, oferece uma interface simples e flexível, que reduz a complexidade e se integra facilmente ao fluxo de trabalho Python existente.

## Principais Características do Dask

- **Escalabilidade**: Dask é escalável verticalmente e horizontalmente, ou seja, pode processar dados em um único computador usando seus múltiplos núcleos ou em um cluster de máquinas.
- **Flexibilidade**: Suporta diversas operações como agregações, joins, leitura de dados em diferentes formatos, além de algoritmos de aprendizado de máquina.
- **Desempenho**: Utiliza computação paralela e otimizações inteligentes para acelerar a execução.
- **Lazy Execution**: O Dask adia a execução das operações até que seja explicitamente solicitado. Isso permite a otimização e paralelização eficientes dos cálculos.
- **Integração**: Trabalha de maneira nativa com as bibliotecas de ciência de dados Python, como Pandas, NumPy e Scikit-Learn, facilitando a adoção por parte de quem já está familiarizado com esses pacotes.

## Aplicações do Dask

O Dask é ideal para cenários onde o volume de dados é grande demais para a memória de uma máquina única, mas pequeno demais para justificar a complexidade de um sistema de processamento de dados distribuídos, como Hadoop ou Spark. É comumente utilizado em:

- Análises exploratórias de dados
- Processamento e limpeza de grandes conjuntos de dados
- Treinamento de modelos de machine learning em grandes volumes de dados
- Simulações e modelagens que exigem alta capacidade computacional


### Documentação

Para mais informações, acessar a documentação da biblioteca disponível em: https://docs.dask.org/en/stable/dataframe.html

-------


# Modelo Preditivo 

Os dados utilizados para o seguinte modelo são do Registro Nacional de Acidentes e Estatísticas de Trânsito (Renaest), sob a
coordenação do Departamento Nacional de Trânsito (Denatran),que organiza e junta os dados dos Detrans de
cada unidade federativa. Os dados desta base são alimentados pelos boletins de ocorrência da polícia.


## 1. Instalação e configuração 

Para instalar o Dask e outras bibliotecas necessárias, é necessário rodar o seguinte comando: 

```

!pip install dask[complete] 

```

O Dask tem um comportamento muito similar ao Pandas. De mesmo modo, o Dask.Array é analogo á biblioteca Numpy. 
Para utilizá-los após a instalação, basta importar as bibliotecas necessárias: 

In [1]:
import dask.dataframe as dd # import pandas as pd
import dask.array as da # import numpy as np
import matplotlib.pyplot as plt

import numpy as np

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.dummy import DummyRegressor



Importanto os dataframes que serão utilizados neste projeto: 

In [2]:
acidentes = dd.read_csv('dados/Acidentes_DadosAbertos_20230812.csv', blocksize=10e6, delimiter=';', assume_missing=True) # 10MB chunks  [manter suas partições com tamanho inferior a 100 MB]
localidade = dd.read_csv('dados/Localidade_DadosAbertos_20230812.csv', blocksize=10e6, delimiter=';', assume_missing=True) # 10MB chunks  [manter suas partições com tamanho inferior a 100 MB]
veiculo = dd.read_csv('dados/TipoVeiculo_DadosAbertos_20230812.csv', blocksize=10e6, delimiter=';', assume_missing=True) # 10MB chunks  [manter suas partições com tamanho inferior a 100 MB]

##### Realizando o merge das bases.
Para juntar a base 'acidentes' com 'localidade' a chave 'chv_localidade' será utilizada. Para concatenar com as informações de veículos, a chave 'num_acidente', que se refere ao número do boletim de ocorrência, será utilizada.

(Aviso: vai demorar um pouco)

In [3]:
#merge dos dfs 
result = dd.merge(acidentes, localidade, on='chv_localidade', how='inner')
result = dd.merge(result, veiculo, on='num_acidente', how='inner')
df_final = result.compute()


  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)
  df = reader(bio, **kwargs)


In [4]:
df_final.head(10)

Unnamed: 0,num_acidente,chv_localidade,data_acidente,uf_acidente,ano_acidente,mes_acidente,mes_ano_acidente,codigo_ibge_x,dia_semana,fase_dia,...,uf,codigo_ibge_y,municipio,regiao_metropolitana,qtde_habitantes,frota_total,frota_circulante,tipo_veiculo,ind_veic_estrangeiro,qtde_veiculos
0,2785821.0,GO5213756202112,2021-12-17,GO,2021.0,12.0,122021.0,5213756.0,SEXTA-FEIRA,MANHA,...,GO,5213756.0,MONTIVIDIU,nao,13935.0,8373.0,5785.0,NAO INFORMADO,NAO INFORMADO,2.0
1,2717519.0,GO5201405202201,2022-01-07,GO,2022.0,1.0,12022.0,5201405.0,SEXTA-FEIRA,NOITE,...,GO,5201405.0,APARECIDA DE GOIANIA,nao,601844.0,340791.0,250274.0,AUTOMOVEL,NAO INFORMADO,1.0
2,3343175.0,GO5201405202201,2022-01-07,GO,2022.0,1.0,12022.0,5201405.0,SEXTA-FEIRA,NOITE,...,GO,5201405.0,APARECIDA DE GOIANIA,nao,601844.0,340791.0,250274.0,CAMINHAO,NAO INFORMADO,2.0
3,3781899.0,GO5201405202201,2022-01-31,GO,2022.0,1.0,12022.0,5201405.0,SEGUNDA-FEIRA,NOITE,...,GO,5201405.0,APARECIDA DE GOIANIA,nao,601844.0,340791.0,250274.0,MOTOCICLETA,NAO INFORMADO,1.0
4,3781899.0,GO5201405202201,2022-01-31,GO,2022.0,1.0,12022.0,5201405.0,SEGUNDA-FEIRA,NOITE,...,GO,5201405.0,APARECIDA DE GOIANIA,nao,601844.0,340791.0,250274.0,AUTOMOVEL,NAO INFORMADO,1.0
5,59764.0,PE2611606201804,2018-04-28,PE,2018.0,4.0,42018.0,2611606.0,SABADO,NOITE,...,PE,2611606.0,RECIFE,sim,1634163.0,526762.0,347943.0,NAO INFORMADO,NAO INFORMADO,1.0
6,1605072.0,PE2611606201804,2018-04-26,PE,2018.0,4.0,42018.0,2611606.0,QUINTA-FEIRA,NOITE,...,PE,2611606.0,RECIFE,sim,1634163.0,526762.0,347943.0,NAO INFORMADO,NAO INFORMADO,1.0
7,833975.0,PE2611606201804,2018-04-04,PE,2018.0,4.0,42018.0,2611606.0,QUARTA-FEIRA,NAO INFORMADO,...,PE,2611606.0,RECIFE,sim,1634163.0,526762.0,347943.0,NAO INFORMADO,NAO INFORMADO,1.0
8,4085434.0,PE2611606201804,2018-04-02,PE,2018.0,4.0,42018.0,2611606.0,SEGUNDA-FEIRA,NAO INFORMADO,...,PE,2611606.0,RECIFE,sim,1634163.0,526762.0,347943.0,BICICLETA,NAO INFORMADO,1.0
9,221252.0,PE2616407201812,2018-12-09,PE,2018.0,12.0,122018.0,2616407.0,DOMINGO,NAO INFORMADO,...,PE,2616407.0,VITORIA DE SANTO ANTAO,nao,137915.0,55785.0,37514.0,NAO INFORMADO,NAO INFORMADO,1.0


In [5]:
# # Verificando a quantidade de valores faltantes por coluna
# df_final.isnull().sum()

Neste projeto, a variável de interesse será uma numérica de quantidade de envolvidos no acidente. A partir da varíavel quantidade de envolvidos (qtde_envolvidos), uma coluna Dummy dessas quantidades. 

Para isso, a função dask.bag.Bag.map_partitions da bibliteca Dask será utilizada:


In [6]:
df_final['qtde_envolvidos'].value_counts()

qtde_envolvidos
2.0     2399598
1.0     1917472
3.0      495192
0.0      165333
4.0      112493
5.0       32780
6.0       11577
7.0        4587
8.0        2170
9.0        1058
10.0        561
11.0        278
12.0        236
13.0        162
14.0        141
16.0         72
15.0         66
17.0         62
19.0         59
20.0         40
18.0         36
21.0         27
25.0         21
24.0         20
22.0         19
27.0         18
29.0         14
30.0         13
23.0         13
38.0         11
31.0          9
28.0          7
35.0          7
33.0          6
41.0          6
34.0          5
39.0          5
37.0          5
26.0          5
32.0          5
56.0          4
46.0          4
42.0          4
40.0          4
53.0          3
50.0          2
45.0          2
43.0          2
66.0          1
44.0          1
36.0          1
48.0          1
51.0          1
61.0          1
55.0          1
Name: count, dtype: int64

In [7]:
# Filtrando apenas colunas numéricas
numeric_cols = df_final.select_dtypes(include=['float64', 'int64'])
# Calculando correlação
corr_matrix = numeric_cols.corr() 
corr_matrix

Unnamed: 0,num_acidente,ano_acidente,mes_acidente,mes_ano_acidente,codigo_ibge_x,num_end_acidente,cep_acidente,km_via_acidente,latitude_acidente,longitude_acidente,...,qtde_feridosilesos,qtde_obitos,ano_referencia,mes_referencia,mes_ano_referencia,codigo_ibge_y,qtde_habitantes,frota_total,frota_circulante,qtde_veiculos
num_acidente,1.0,0.001005,0.000916,0.000916,0.000221,-0.000424,0.00047,-0.000256,0.002254,0.001539,...,-0.001011,-0.000149,0.001005,0.000916,0.000916,0.000221,-0.001432,-0.00108,-0.001005,-0.000135
ano_acidente,0.001005,1.0,-0.105405,-0.105363,0.122885,0.101994,0.094125,0.020654,0.069463,0.050934,...,0.044712,-0.027138,1.0,-0.105405,-0.105363,0.122885,-0.011516,0.058086,0.102024,0.050882
mes_acidente,0.000916,-0.105405,1.0,1.0,0.02065,-0.008861,0.000891,-0.014775,-0.011952,-0.009259,...,-0.015921,0.002567,-0.105405,1.0,1.0,0.02065,-0.034197,-0.030581,-0.022469,-0.000473
mes_ano_acidente,0.000916,-0.105363,1.0,1.0,0.020655,-0.008857,0.000896,-0.014774,-0.01195,-0.009257,...,-0.015919,0.002566,-0.105363,1.0,1.0,0.020655,-0.034197,-0.030579,-0.022465,-0.000471
codigo_ibge_x,0.000221,0.122885,0.02065,0.020655,1.0,0.198092,-0.026387,-0.030664,0.128447,0.094188,...,0.038863,-0.056783,0.122885,0.02065,0.020655,1.0,0.053857,0.131957,0.14316,-0.03146
num_end_acidente,-0.000424,0.101994,-0.008861,-0.008857,0.198092,1.0,-0.010037,-0.011275,0.01778,0.013038,...,0.020084,-0.019461,0.101994,-0.008861,-0.008857,0.198092,0.241054,0.246989,0.290485,0.046965
cep_acidente,0.00047,0.094125,0.000891,0.000896,-0.026387,-0.010037,1.0,-0.003063,0.019784,0.014509,...,0.01065,-0.006622,0.094125,0.000891,0.000896,-0.026387,0.064591,0.027585,0.039533,0.016982
km_via_acidente,-0.000256,0.020654,-0.014775,-0.014774,-0.030664,-0.011275,-0.003063,1.0,0.006376,0.004676,...,0.005177,0.014165,0.020654,-0.014775,-0.014774,-0.030664,0.009268,0.013028,0.015884,0.011336
latitude_acidente,0.002254,0.069463,-0.011952,-0.01195,0.128447,0.01778,0.019784,0.006376,1.0,0.743322,...,-0.016299,-0.014219,0.069463,-0.011952,-0.01195,0.128447,0.043618,0.048195,0.04766,0.015038
longitude_acidente,0.001539,0.050934,-0.009259,-0.009257,0.094188,0.013038,0.014509,0.004676,0.743322,1.0,...,-0.011462,-0.008142,0.050934,-0.009259,-0.009257,0.094188,0.027929,0.031954,0.031567,0.011993


## Limpeza dos dados e transformação de variáveis

In [8]:
df_final.shape

(5144222, 49)

In [9]:
# Pegandos todas as colunas
# print(df_final.dtypes[:20])
# print(df_final.dtypes[20:40])
# print(df_final.dtypes[40:60])

In [10]:
categorical_columns = [
'chv_localidade',
'data_acidente',
'uf_acidente',
'dia_semana',
'fase_dia',
'tp_acidente',
'cond_meteorologica',
'end_acidente',
'bairro_acidente',
'tp_rodovia',
'cond_pista',
'tp_cruzamento',
'tp_pavimento',
'tp_curva',
'lim_velocidade',
'tp_pista',
'ind_guardrail',
'ind_cantcentral',
'ind_acostamento',
'regiao',
'uf',
'municipio',
'regiao_metropolitana',
'tipo_veiculo',
'ind_veic_estrangeiro',
'tp_acidente',
]

numerical_columns = [  
'num_acidente',
'ano_acidente',
'mes_acidente',
'mes_ano_acidente',
'codigo_ibge_x',
'num_end_acidente',
'cep_acidente',
'km_via_acidente',
'latitude_acidente',
'longitude_acidente',
'hora_acidente',
'qtde_acidente',
'qtde_acid_com_obitos',
'qtde_envolvidos',
'qtde_feridosilesos',
'qtde_obitos',
'ano_referencia',
'mes_referencia',
'mes_ano_referencia',
'codigo_ibge_y',
'qtde_habitantes',
'frota_total',
'frota_circulante',
'qtde_veiculos',
]

for column_group, column_type in (
    (categorical_columns, 'category'),
    (numerical_columns, 'float64'),
    #(ordinal_columns, 'int64'),
):
    for column in column_group:
        df_final[column] = df_final[column].astype(column_type)

# df_final.dtypes

In [11]:
# Filtrando apenas as colunas com valor nulo
null_counts = df_final.isnull().sum()
print(null_counts[null_counts > 0])

fase_dia                      1
tp_acidente                   1
cond_meteorologica            1
end_acidente             205870
num_end_acidente              1
cep_acidente                  1
bairro_acidente         2402109
km_via_acidente               1
latitude_acidente       3932695
longitude_acidente      3932830
hora_acidente                 1
tp_rodovia                    1
cond_pista                    1
tp_cruzamento                 1
tp_pavimento                  1
tp_curva                      1
lim_velocidade                1
tp_pista                      1
ind_guardrail                 1
ind_cantcentral               1
ind_acostamento               1
qtde_acidente                 1
qtde_acid_com_obitos          1
qtde_envolvidos               1
qtde_feridosilesos            1
qtde_obitos                   1
dtype: int64


Avaliando a quantidade de valores nulos, percebe-se que as colunas end_acidente, bairro_acidente, latitude_acidente, longitude_acidente possuem uma grande quantidade ausente. 

| Variável     | qtd nulo (%)  
|--------------|--------------|
| end_acidente| 0.036 |
| bairro_acidente| 0.43 | 
| latitude_acidente | 0.77 | 
| longitude_acidente |  0.77|

Por mais que valores como longitude e latidude sejam muito importantes para uma avaliação sobre acidentes, a proporção de valores nulos de 77% é muito grande. Assim, os valores serão excluídos da análise, juntamente com bairro_acidente. Felizmente, a base do RENAEST possue outras variáveis de localidade que proporcionarão uma boa análise do local, compensando, portanto, a exclusão dos dados de longitude e latitude. 


In [12]:
#Como os a quantidade de nulos em end_acidente é 3% da base e possuimos muitos (!) dados, realizaremos um dropna() 
df_final = df_final.dropna(subset=['end_acidente'])

In [13]:
df_final.shape

(4938352, 49)

Fazendo uma limpeza nas colunas:

In [14]:
lista_drop = [
'latitude_acidente', #muitos valores nulos
'longitude_acidente', #muitos valores nulos
'bairro_acidente', #muitos valores nulos
'frota_total', # frota circulante foi considerado mais relevante para a análise
'codigo_ibge_x',
'codigo_ibge_y', 
'chv_localidade',
'data_acidente', # vamos usar o mes, dia da semana e período do dia
'tp_acidente', # fonte da nossa variavel de interesse
'uf_acidente', #igual a 'uf' que será utilizada
'tp_cruzamento', # muitos 'NAO INFORMADO'
'tp_curva', # muitos 'NAO INFORMADO'
'lim_velocidade', # muitos 'NAO INFORMADO'
'tp_pista', # muitos 'NAO INFORMADO'
'ind_veic_estrangeiro',
'regiao',  # ja usando informações sobre ufs
'ind_acostamento', # muitos 'NAO INFORMADO'
'ind_cantcentral', # muitos 'NAO INFORMADO'
'ind_guardrail', # muitos 'NAO INFORMADO'
'mes_ano_referencia', # muitos 'NAO INFORMADO'
'qtde_acidente', # só valores 1 
'fase_dia',   # vamos usar hora do acidente
'num_end_acidente',
'cep_acidente', 
'km_via_acidente', 
'num_acidente'  #boletim de ocorrencia
]

df_final.drop(columns=lista_drop, inplace=True)

## Separação treino-teste e modelagem inicial

A biblioteca Dask disponibiliza uma função de split treino-teste muito semelhante ao scikit-learn.

In [15]:
# Split features and target, and return.
X = df_final.drop(columns=['qtde_envolvidos']).copy()
y = df_final['qtde_envolvidos'].copy()

In [16]:
# no sklearn: from sklearn.model_selection import train_test_split 
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X, 
    y, 
    test_size=0.2, 
    random_state=42,
    shuffle=True
    )

## Definição do modelo 

In [39]:
import dask_ml
from dask_ml.model_selection import GridSearchCV , ShuffleSplit
from dask_ml.linear_model import LinearRegression
from dask_ml.preprocessing import StandardScaler
from dask_ml.impute import SimpleImputer
from dask_ml.preprocessing import OneHotEncoder
from dask.distributed import Client
#teste
import joblib


In [40]:
# Verificação explícita para garantir que dask e distributed estão instalados
try:
    import dask
    import distributed
except ImportError as e:
    print(f"Erro: {e}. Certifique-se de que dask e distributed estão instalados.")
    raise

In [41]:
# Iniciar o cliente Dask
client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB', ip='127.0.0.1:0')
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 14.90 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:58576,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 14.90 GiB

0,1
Comm: tcp://127.0.0.1:58595,Total threads: 2
Dashboard: http://127.0.0.1:58596/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:58579,
Local directory: C:\Users\bruna\AppData\Local\Temp\dask-scratch-space\worker-7s_8dp13,Local directory: C:\Users\bruna\AppData\Local\Temp\dask-scratch-space\worker-7s_8dp13

0,1
Comm: tcp://127.0.0.1:58598,Total threads: 2
Dashboard: http://127.0.0.1:58600/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:58581,
Local directory: C:\Users\bruna\AppData\Local\Temp\dask-scratch-space\worker-zny27bdg,Local directory: C:\Users\bruna\AppData\Local\Temp\dask-scratch-space\worker-zny27bdg

0,1
Comm: tcp://127.0.0.1:58599,Total threads: 2
Dashboard: http://127.0.0.1:58601/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:58583,
Local directory: C:\Users\bruna\AppData\Local\Temp\dask-scratch-space\worker-j02qo3xr,Local directory: C:\Users\bruna\AppData\Local\Temp\dask-scratch-space\worker-j02qo3xr

0,1
Comm: tcp://127.0.0.1:58604,Total threads: 2
Dashboard: http://127.0.0.1:58605/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:58585,
Local directory: C:\Users\bruna\AppData\Local\Temp\dask-scratch-space\worker-i2gpy0m3,Local directory: C:\Users\bruna\AppData\Local\Temp\dask-scratch-space\worker-i2gpy0m3


Acessando o link do dashboard do cliente, é possível visualizar informações importantes do processamento
![alt text](img/image.png)

In [42]:


categorical_features = X.select_dtypes(include=['category']).columns #variáveis categóricas
numerical_features = X.select_dtypes(include=['float64', 'int64']).columns #variáveis numéricas

num_pipeline = Pipeline([ 
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
])

cat_pipeline = Pipeline([
    ('encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore')),    #drop='first')),
])

preprocessing_pipeline = ColumnTransformer(
    transformers=[
        ('num', num_pipeline, numerical_features),
        ('cat', cat_pipeline, categorical_features),
    ],
    remainder='passthrough',
)

pipe = Pipeline([
    ('preprocessor', preprocessing_pipeline),
    ('lasso', LinearRegression(penalty='l1')),
])

pipe

Com a nossa pipeline construída, incluindo o tratamento das colunas categóricas e numéricas, assim como o modelo de regressão linear, é possível treinarmos nosso modelo:

Parte importante do processo de machine learning é testar os parâmetros do nosso modelo. Nesse caso, realizamos um GridSearch com diferentes parÂmetros para nossa regressão linear com penalidade l1 (isso é semelhante a um Lasso)

In [44]:
# Convertendo os dados para dask dataframe
X_train_dask = dd.from_pandas(X_train, npartitions=4)
y_train_dask = dd.from_pandas(y_train, npartitions=4)

# Grid de parâmetros
param_grid = {
    'lasso__alpha': [0.1, 1, 10, 100]
}


# Configuração do GridSearchCV
num_splits = 5
test_fraction = 0.2
num_samples_total = len(y_train)
num_samples_test = int(test_fraction * num_samples_total)
num_samples_total = len(y_train)
num_samples_train = num_samples_total - num_samples_test

grid = GridSearchCV(
    estimator = pipe,
    param_grid=param_grid,
    cv=num_splits,  # Aqui, ShuffleSplit não é necessário já que o Dask GridSearchCV utiliza sua própria metodologia
    scoring='neg_root_mean_squared_error'
)


# Verificar a configuração do pipeline
print(pipe)

try:
    grid.fit(X_train_dask, y_train_dask)
except Exception as e:
    print(f"Erro durante o ajuste do modelo: {e}")
finally:
    # O cliente Dask pode ser fechado após o uso
    client.close()


Pipeline(steps=[('preprocessor',
                 ColumnTransformer(remainder='passthrough',
                                   transformers=[('num',
                                                  Pipeline(steps=[('imputer',
                                                                   SimpleImputer()),
                                                                  ('scaler',
                                                                   StandardScaler())]),
                                                  Index(['ano_acidente', 'mes_acidente', 'mes_ano_acidente', 'hora_acidente',
       'qtde_acid_com_obitos', 'qtde_feridosilesos', 'qtde_obitos',
       'ano_referencia', 'mes_referencia', 'qtde_habitantes',
       'frota_circulante', 'qtde_veiculos'],
      dtype='object')),
                                                 ('cat',
                                                  Pipeline(steps=[('encoder',
                                                              

KeyboardInterrupt: 

Observando no dashboard, é visível que o "cliente" criado está trabalhando
![alt text](img/image-2.png)

Por algum motivo, este código não roda, no computador de uma das intergantes do grupo, foi apresentado um erro como:
```
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(
cv-n-samples-f5f29692aa0e3a56f1b3c60999bf87b1 has failed... retrying
Erro durante o ajuste do modelo: 'cv-split-f5f29692aa0e3a56f1b3c60999bf87b1'
´´´

Já no computador de outra integrante, houve um problema no qual o dask não conseguia "desserializar". Por isso, para o teste do porque esse mal funcionamento, selecionamos 10% da base de teste para testar com Sk Learn, para tentar ntender se o problema está ligado aos dados ou a construção do processamento

Agora, aparentemente, ele apresenta o comportamento correto, porém demora muito para rodar e pode ser que seja pouco prático para a correção, por isso, para rodar uma célula mais rápida para teste, vamos analisar uma parcela menor dos dados com Lasso, ou seja, usando sk-learn. Assim, podemos ver o comportamento também de datasets menores

## Para a análise dos resultados

In [65]:
import numpy as np

from scipy.stats import t


def corrected_std(differences, n_train, n_test):
    """Corrects standard deviation using Nadeau and Bengio's approach.

    Parameters
    ----------
    differences : ndarray of shape (n_samples,)
        Vector containing the differences in the score metrics of two models.
    n_train : int
        Number of samples in the training set.
    n_test : int
        Number of samples in the testing set.

    Returns
    -------
    corrected_std : float
        Variance-corrected standard deviation of the set of differences.
    """
    # kr = k times r, r times repeated k-fold crossvalidation,
    # kr equals the number of times the model was evaluated
    kr = len(differences)
    corrected_var = np.var(differences, ddof=1) * (1 / kr + n_test / n_train)
    corrected_std = np.sqrt(corrected_var)
    return corrected_std


def compute_corrected_ttest(differences, df, n_train, n_test):
    """Computes right-tailed paired t-test with corrected variance.

    Parameters
    ----------
    differences : array-like of shape (n_samples,)
        Vector containing the differences in the score metrics of two models.
    df : int
        Degrees of freedom.
    n_train : int
        Number of samples in the training set.
    n_test : int
        Number of samples in the testing set.

    Returns
    -------
    t_stat : float
        Variance-corrected t-statistic.
    p_val : float
        Variance-corrected p-value.
    """
    mean = np.mean(differences)
    std = corrected_std(differences, n_train, n_test)
    t_stat = mean / std
    p_val = t.sf(np.abs(t_stat), df)  # right-tailed t-test
    return t_stat, p_val

In [None]:
results_df = pd.DataFrame(grid.cv_results_) \
    .sort_values(by='rank_test_score')

results_df = results_df \
    .set_index(
        results_df["params"] \
            .apply(lambda x: "_".join(str(val) for val in x.values()))
    ) \
    .rename_axis("model")

model_scores = results_df.filter(regex=r"split\d*_test_score")

O melhor parâmetro utilizado foi:

In [None]:
model_scores.index[0]

In [None]:
model_1_scores = model_scores.iloc[0].values  # scores of the best model
model_2_scores = model_scores.iloc[1].values  # scores of the second-best model

differences = model_1_scores - model_2_scores

n = differences.shape[0]  # number of test sets
dof = n - 1

t_stat, p_val = compute_corrected_ttest(
    differences,
    dof,
    num_samples_train,
    num_samples_test,
)
print(f"Corrected t-statistic: {t_stat:.3f}")
print(f"Corrected p-value: {p_val:.3f}")

### Fazendo a regressão e análise de parâmetros com sklearn em uma pequena parte do dataset

Importando as bibliotecas necessárias:

In [50]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.dummy import DummyRegressor
from sklearn.impute import SimpleImputer
from sklearn.linear_model import Lasso

In [57]:
import pandas as pd

# Concatenar X_train e y_train para garantir que os índices estejam alinhados
train_data = pd.concat([X_train, y_train], axis=1)

# Pegando apenas uma porção dos dados para testar com sklearn
train_sample = train_data.sample(frac=0.001, random_state=42)

# Separando novamente X_train_sample e y_train_sample após a amostragem
X_train_sample = train_sample.drop(columns=['qtde_envolvidos'])  # Substitua 'target_column_name' pelo nome da coluna alvo
y_train_sample = train_sample['qtde_envolvidos']  # Substitua 'target_column_name' pelo nome da coluna alvo

# Verificando os shapes para garantir que estão correspondentes
print("Shape de X_train_sample:", X_train_sample.shape)
print("Shape de y_train_sample:", y_train_sample.shape)


Shape de X_train_sample: (3951, 22)
Shape de y_train_sample: (3951,)


In [58]:
categorical_features = X_train_sample.select_dtypes(include=['category']).columns #variáveis categóricas
numerical_features = X_train_sample.select_dtypes(include=['float64', 'int64']).columns #variáveis numéricas

num_pipeline = Pipeline([ 
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
])

cat_pipeline = Pipeline([
    ('encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore')),    #drop='first')),
])

preprocessing_pipeline = ColumnTransformer(
    transformers=[
        ('num', num_pipeline, numerical_features),
        ('cat', cat_pipeline, categorical_features),
    ],
    remainder='passthrough',
)

pipe = Pipeline([
    ('preprocessor', preprocessing_pipeline),
    ('lasso', Lasso()),
])

pipe

In [61]:
from sklearn.model_selection import GridSearchCV, ShuffleSplit

# Grid de parâmetros
param_grid = {
    'lasso__alpha': [0.1, 1, 10, 100]
}

# Configuração do GridSearchCV
num_splits = 5
test_fraction = 0.2
num_samples_total = len(y_train_sample)
num_samples_test = int(test_fraction * num_samples_total)
num_samples_train = num_samples_total - num_samples_test

grid = GridSearchCV(
    estimator = pipe,
    param_grid=param_grid,
    cv=num_splits,  # Aqui, ShuffleSplit não é necessário já que o Dask GridSearchCV utiliza sua própria metodologia
    scoring='neg_root_mean_squared_error'
)


grid.fit(X_train_sample, y_train_sample)

In [62]:
results_df = pd.DataFrame(grid.cv_results_) \
    .sort_values(by='rank_test_score')

results_df = results_df \
    .set_index(
        results_df["params"] \
            .apply(lambda x: "_".join(str(val) for val in x.values()))
    ) \
    .rename_axis("model")

model_scores = results_df.filter(regex=r"split\d*_test_score")

In [63]:
model_scores.index[0]

'0.1'

In [64]:
mean_perf = model_scores.agg(['mean', 'std'], axis=1)
mean_perf['std'] = mean_perf['std'] / np.sqrt(num_splits)
mean_perf = mean_perf.sort_values('mean', ascending=False)
mean_perf

Unnamed: 0_level_0,mean,std
model,Unnamed: 1_level_1,Unnamed: 2_level_1
0.1,-0.151676,0.008326
1.0,-0.870417,0.010837
10.0,-0.870417,0.010837
100.0,-0.870417,0.010837


In [67]:
model_1_scores = model_scores.iloc[0].values  # scores of the best model
model_2_scores = model_scores.iloc[1].values  # scores of the second-best model

differences = model_1_scores - model_2_scores

n = differences.shape[0]  # number of test sets
dof = n - 1

t_stat, p_val = compute_corrected_ttest(
    differences,
    dof,
    num_samples_train,
    num_samples_test,
)
print(f"Corrected t-statistic: {t_stat:.3f}")
print(f"Corrected p-value: {p_val:.3f}")

Corrected t-statistic: 151.398
Corrected p-value: 0.000


## Concluindo

Trabalhar com dados grandes é um trabalho mais complexo que parece. Por mais que no código mude apenas a biblioteca, o trabalho por trás é mais complexo. Pelo fato da nossa base contem uma quantidade enorme de dados, este projeto exigiu muito mais do hardware que qualquer outro projeto menor. O dask foi uma ferramenta importantissima para a contrução da nossa análise e modelagem, afinal a sua forma de processamento facilita o trabalho com grandes bases. Mesmo assim, rodar a pipeline foi um processo extremamente demorado. No geral, independentemente, o modelo teve um bom desempenho. O único modelo utilizado foi a regressão linear, com penalidade l1 e essa penalidade tem como objetivo minimizar os valores observados e os valores previstos. O melhor parâmetro "alpha" a ser utilizado foi o 0.1, com um valor-p que se mostrou coerente. Em suma, esse foi um projeto de muito aprendizado e enriquecedor, possibilitando aplicar conhecimentos adiquiridos ao longo da matéria e enfrentar desafios relacionados a trabalhar com grandes dados