In [None]:
import json
import os

import numpy as np
import pandas as pd
from resolve_path import ajuste_path, read_input
from unidecode import unidecode

In [None]:
df = read_input("BD Acid_fechamento_Ago.csv")
pathInput = "data/input/"
pathInput = ajuste_path(pathInput)

# Filtrar apenas arquivos Excel que contenham "acid" no nome e tenham extensão .xlsx ou .xls
excel_files = [f for f in os.listdir(pathInput) if (
    "acid" in f.lower()) and (f.endswith('.xlsx') or f.endswith('.xls'))]

# Pegar o caminho completo dos arquivos
full_paths = [os.path.join(pathInput, f) for f in excel_files]

# Verificar se há arquivos que correspondam ao critério
if full_paths:
    # Identificar o arquivo mais recente
    latest_file = max(full_paths, key=os.path.getmtime)
    print(f"Arquivo mais recente com 'acid' no nome: {latest_file}")
else:
    print("Nenhum arquivo com 'acid' no nome foi encontrado.")

# Identificar o arquivo mais recente
latest_file = max(full_paths, key=os.path.getmtime)

latest_file_csv = latest_file.replace(".xlsx", ".csv")

df = pd.read_csv(latest_file_csv, sep="#", encoding="utf-8")

## Pré processamento das colunas:

In [None]:
df.columns = df.columns.str.lower()
df.columns = df.columns.map(lambda x: unidecode(str(x)))
df.columns = df.columns.str.replace(" ", "_")
df.info()

## Pré processamento das linhas

In [None]:
def formata_elemento(elemento):
    # # print(elemento)
    if isinstance(elemento, str):
        elemento = elemento.lower()
        elemento = unidecode(elemento)
        elemento = elemento.replace("\n", " ")
        elemento = elemento.replace("\r", " ")
        elemento = elemento.replace("\t", " ")

    return elemento

for column in df.columns:
    df[column] = df[column].apply(formata_elemento)

### Utilitários

In [None]:
nan_dict = {
    "na": np.nan,
    "nan": np.nan,
    "n.a": np.nan,
    "n.a.": np.nan,
    "n/a": np.nan,
    "-": np.nan,
}

### Empregado

In [None]:
df["empregado"].unique()

### Classificação

In [None]:
df["classificacao"].unique()

### Empresa

In [None]:
df["empresa"].unique()

### Fornecedor

In [None]:
df["fornecedor"] = df["fornecedor"].replace(nan_dict)
df["fornecedor"].unique()

### Colocando próprios com fornecedor como terceiro

In [None]:
df.loc[df["fornecedor"].notna(), "empregado"] = "terceiro"
print(df["empregado"].unique())

### ID

In [None]:
df["id"] = df["id"].str.lstrip("\t")

df["id"] = df["id"].replace(nan_dict)

indices_com_repeticao = []
print("Número de linhas antes da separação de ids agrupados: ", len(df))
for index, row in df.iterrows():
    if isinstance(row['id'], str) and ' e ' in row['id']:

        matriculas = row['id'].replace(' e ', '|')
        matriculas = matriculas.replace(', ', '|')
        matriculas = matriculas.strip()

        nomes = row['nome'].replace(' e ', '|')
        nomes = nomes.replace(', ', '|')

        lista_matriculas = matriculas.split('|')
        lista_nomes = nomes.split('|')

        for i in range(len(lista_matriculas)):
            nova_linha = row.copy()
            nova_linha['id'] = lista_matriculas[i]
            nova_linha['nome'] = lista_nomes[i]
            df = pd.concat([df, pd.DataFrame([nova_linha])], ignore_index=True)

        indices_com_repeticao.append(index)

print("Número de linhas após a separação de ids agrupados: ", len(df))
df = df.drop(indices_com_repeticao)
print("Número de linhas após a remoção de ids agrupados: ", len(df))

### Dia

In [None]:
df["dia"].unique()

### Mes

In [None]:
df["mes"].unique()

### Ano

In [None]:
df["ano"].unique()

### Criando a coluna data

In [None]:
# preenchendo com 12 horas o que está nan para não perder a singularidade da hora
df["hora"] = df["hora"].fillna(12)

valores = df["hora"].unique()

subs = {}
for hora in valores:
    try:
        subs[hora] = hora[:2]
    except:
        # subs['Hora'][np.nan] = 'nao informado'
        pass

print(json.dumps(subs, indent=4))

df["hora"] = df["hora"].replace(
    subs)

# Criando coluna "Data" a partir das colunas 'day', 'month' e 'year'
df['data'] = pd.to_datetime(
    df[['ano', 'mes', 'dia', 'hora']].rename(
        columns={'ano': 'year', 'mes': 'month', 'dia': 'day', 'hora': 'hour'}
    ))

# Renomeando as colunas de volta
df.rename(
    columns={'day': 'dia', 'month': 'mes', 'year': 'ano'}, inplace=True)

### Descrição

In [None]:
df["descricao"] = df["descricao"].replace(nan_dict)

### Coordenadas

In [None]:
def processa_coordenada(value):
    if isinstance(value, str):
        value = value.replace(",", ".")  # Substitui vírgula por ponto
        value = value.replace("°", "")  # Remove "°"
        return float(value)

    return value


def corrige_coordenada(df):
    # Corrigir longitude digitada errada (número muito grande)
    df["longitude"] = df["longitude"].apply(
        lambda x: x / 100000 if pd.notna(x) and x < -100 else x
    )
    # Corrigir sinal das coordenadas trocado
    df["longitude"] = df["longitude"].apply(
        lambda x: x * (-1) if pd.notna(x) and x > 0 else x
    )
    # Corrigir vírgulas das coordenadas digitadas errado
    df["longitude"] = df["longitude"].apply(
        lambda x: x * 10 if pd.notna(x) and x > -10 else x
    )
    # Desconsiderar pontos na Antártica
    df = df[df["latitude"].apply(lambda x: pd.notna(
        x) and x > -40 if pd.notna(x) else True)]

    return df

Latitude

In [None]:
print(df[["latitude", "longitude"]].isna().sum())

print(df["latitude"].unique())
df["latitude"] = df["latitude"].apply(processa_coordenada)
print(df["latitude"].unique())

Longitude

In [None]:
print(df["longitude"].unique())
df["longitude"] = df["longitude"].apply(processa_coordenada)
print(df["longitude"].unique())

In [None]:
df = corrige_coordenada(df)
print(df[["latitude", "longitude"]].isna().sum())

### Potencial

In [None]:
print(df["potencial"].unique())

### Exportar o dataset

In [None]:
pathUtil = "data/util/"
pathUtil = ajuste_path(pathUtil)

df.to_csv(pathUtil + "acidentes/" + "acidentes_preprocessado.csv",
          sep="#", encoding="utf-8", index=False)