<a href="https://colab.research.google.com/github/danielreinaux/ETL_Spark/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Projeto de ETL e Estruturação de Dados com Apache Spark


## 1. Introdução:

Esse projeto foi desenvolvido como parte de um desafio em um processo de ETL. O foco do projeto é o tratamento de dados históricos de precipitação disponibilizados pelo Instituto Nacional de Meteorologia (INMET) no site https://portal.inmet.gov.br/dadoshistoricos.

### Demandas e Metodologia:

1. Extração Automatizada: A extração dos dados foi realizada de forma automatizada utilizando a linguagem Python e bibliotecas adequadas para manipulação de arquivos ZIP e CSV.

2. Transformação e Carga com PySpark: As transformações e cargas dos dados foram realizadas utilizando o PySpark, com a criação de tabelas virtuais (spark.sql). Foi utilizada a arquitetura de medalhões, onde:
  - Camada Bronze: Armazena os dados brutos extraídos dos arquivos ZIP.
  - Camada Prata: Contém os dados tratados e prontos para consumo.

## 2. Inicializando o Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## 3. Camada de Bronze

1. Coleta Automatizada de Dados: Acessamos o site do INMET para buscar e fazer download dos arquivos ZIP correspondentes aos anos 2000-2004, 2010-2014, e 2020-2024
2. Extração dos Arquivos: Os arquivos ZIP são extraídos para diretórios temporários, onde o conteúdo será organizado
3. Renomeação e Organização: Os arquivos CSV extraídos são renomeados de acordo com um padrão que facilita a identificação dos dados (Estado, Cidade, Ano) e então movidos para um diretório final, mantendo a estrutura de organização para as próximas etapas

In [None]:
import zipfile
import re
import shutil
import requests
from bs4 import BeautifulSoup

BASE_URL = "https://portal.inmet.gov.br/dadoshistoricos"
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, como Gecko) Chrome/92.0.4515.131 Safari/537.36"
}
# Diretórios dentro do Colab
ZIP_DIR = "/content/zips"
EXTRACT_DIR_BASE = os.path.join(ZIP_DIR, "extracted")
TEMP_DIR = os.path.join(ZIP_DIR, "temp")

# Anos desejados
ANOS_DESEJADOS = [str(ano) for ano in range(2000, 2005)] + \
                 [str(ano) for ano in range(2010, 2015)] + \
                 [str(ano) for ano in range(2020, 2025)]

# Certifique-se de criar os diretórios
os.makedirs(ZIP_DIR, exist_ok=True)
os.makedirs(EXTRACT_DIR_BASE, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)

def fetch_links(base_url, headers, anos_desejados):
    """Busca os links dos arquivos ZIP para os anos desejados."""
    response = requests.get(base_url, headers=headers)
    soup = BeautifulSoup(response.text, 'html.parser')
    links = []
    for ano in anos_desejados:
        link = soup.find('a', string=re.compile(f"ANO {ano}.*AUTOMÁTICA"))
        if link:
            links.append(link['href'])
    return links

def download_zip(link, zip_dir):
    """Faz o download do arquivo ZIP a partir de um link e retorna o caminho local."""
    filename = os.path.basename(link)
    zip_path = os.path.join(zip_dir, filename)

    response = requests.get(link)
    with open(zip_path, 'wb') as f:
        f.write(response.content)

    return zip_path

def extract_zip(zip_path, extract_dir, temp_dir):
    """Extrai o conteúdo do ZIP para um diretório temporário e retorna o diretório extraído."""
    if not os.path.exists(temp_dir):
        os.makedirs(temp_dir, exist_ok=True)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(temp_dir)

    return os.listdir(temp_dir)

def renomear_e_mover_csv(file, source_folder, target_folder):
    """Renomeia o arquivo CSV e move para o diretório final."""
    if file.endswith('.CSV'):
        state = file.split('_')[2]
        city = file.split('_')[4]
        year = file.split('_')[5][-4:]
        new_name = f"{state}_{city}_{year}.csv"
        final_path = os.path.join(target_folder, new_name)
        shutil.move(os.path.join(source_folder, file), final_path)

def process_zip(zip_path, extract_dir, temp_dir):
    """Processa um arquivo ZIP baixado: extrai, renomeia e move arquivos CSV."""
    extracted_items = extract_zip(zip_path, extract_dir, temp_dir)

    if len(extracted_items) == 1 and os.path.isdir(os.path.join(temp_dir, extracted_items[0])):
        main_folder = os.path.join(temp_dir, extracted_items[0])
        for item in os.listdir(main_folder):
            renomear_e_mover_csv(item, main_folder, extract_dir)
    else:
        for item in extracted_items:
            renomear_e_mover_csv(item, temp_dir, extract_dir)

    shutil.rmtree(temp_dir)

def main():
    """Função principal para orquestrar o processo de ETL."""
    links = fetch_links(BASE_URL, HEADERS, ANOS_DESEJADOS)

    for link in links:
        try:
            zip_path = download_zip(link, ZIP_DIR)
            ano = os.path.basename(zip_path).split('.')[0]
            extract_dir = os.path.join(EXTRACT_DIR_BASE, ano)
            if not os.path.exists(extract_dir):
                os.makedirs(extract_dir, exist_ok=True)

            process_zip(zip_path, extract_dir, TEMP_DIR)
        except Exception as e:
            print(f"Erro ao processar {link}: {e}")

if __name__ == "__main__":
    main()


##  4. Camada de prata

Continuei aqui o processo de ETL avançando para a Camada de Prata, onde os dados brutos, já extraídos e organizados na Camada de Bronze, serão transformados e preparados para consumo final. A primeira parte da camanda de prata envolveu:

1. Normalização da colunas: Os nomes das colunas são normalizados para remover espaços, pontuações, e caracteres especiais, garantindo uniformidade e compatibilidade

2. Extração de Metadados e Colunas: A partir das primeiras linhas de cada arquivo CSV, extraímos os metadados e os nomes das colunas de dados, que serão usados para estruturar o DataFrame final.

3. Criação de DataFrames: Com os dados processados, criamos DataFrames estruturados, que incluem tanto os dados variáveis quanto as colunas fixas extraídas dos metadados.

4. Salvamento dos Dados: Os DataFrames são consolidados e salvos em arquivos CSV

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, when, col
import glob
import time
from functools import reduce

def initialize_output_directory(output_dir):
    "Função que cria o diretório de saída"
    os.makedirs(output_dir, exist_ok=True)

def normalizar_colunas(nome):
    "Função para normalizar os nomes das colunas"
    return nome.replace(" ", "_").replace(":", "").replace(".", "").replace(",", "").strip()

def initialize_spark_session(app_name="Processamento CSV"):
    "Função para inicializar a sessão do Spark"
    return SparkSession.builder.appName(app_name).getOrCreate()

def extract_metadata(rdd):
    "Função para extrair metadados das 8 primeiras linhas dos arquivos CSVs"
    metadados = rdd.take(8)
    colunas_fixas = [normalizar_colunas(linha.split(";")[0].strip()) for linha in metadados]
    valores_fixos = [linha.split(";")[1].strip() for linha in metadados]
    return colunas_fixas, valores_fixos

def extract_data_columns(rdd):
    "Função que busca o nome do resto das colunas, presentes na linha 9"
    linha_9 = rdd.take(9)[8]
    colunas_dados = [normalizar_colunas(col.strip()) for col in linha_9.split(";") if col.strip()]
    return colunas_dados

def process_csv(file_path, spark):
    "Função para processar um único arquivo CSV"
    rdd = spark.sparkContext.textFile(file_path)
    colunas_fixas, valores_fixos = extract_metadata(rdd)
    colunas_dados = extract_data_columns(rdd)
    rdd_dados = rdd.zipWithIndex().filter(lambda x: x[1] >= 9).keys()
    df_dados = rdd_dados.map(lambda line: line.split(";")).toDF(colunas_dados)

    for i, col in enumerate(colunas_fixas):
        df_dados = df_dados.withColumn(col, lit(valores_fixos[i]))

    df_dados = df_dados.select(colunas_fixas + colunas_dados)

    return df_dados

def save_dataframe_to_csv(df, output_dir, ano):
    "Função para salvar o DataFrame processado em um arquivo CSV"
    output_file_dir = os.path.join(output_dir, f"{ano}_prata")
    df.coalesce(1).write.option("header", "true") \
        .option("delimiter", ";") \
        .option("encoding", "UTF-8") \
        .csv(output_file_dir)

    generated_csv = glob.glob(output_file_dir + "/part-*.csv")[0]
    final_csv = os.path.join(output_dir, f"{ano}_prata.csv")
    shutil.move(generated_csv, final_csv)

    shutil.rmtree(output_file_dir)

def process_all_csvs_by_year(base_dir, output_base_dir, spark):
    "Função para processar todos os arquivos CSV por ano"
    initialize_output_directory(output_base_dir)
    start_time_total = time.time()
    anos = sorted([d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))])

    for ano in anos:
        start_time = time.time()
        bronze_dir = os.path.join(base_dir, ano)
        all_files = glob.glob(os.path.join(bronze_dir, "*.csv"))

        dataframes = []

        print(f'Iniciando o processamento para o ano: {ano}')

        for file_path in all_files:
            df_dados = process_csv(file_path, spark)
            dataframes.append(df_dados)

        df_final = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)
        save_dataframe_to_csv(df_final, output_base_dir, ano)

        total_time = time.time() - start_time
        print(f"Processamento do ano {ano} concluído em {total_time:.2f} segundos.")

    total_time_total = time.time() - start_time_total
    print(f"Tempo total para processar todos os anos: {total_time_total:.2f} segundos")


In [None]:
# Função principal para executar o processo
def main():
    "Função principal"
    bronze_dir = os.path.join(ZIP_DIR, "extracted")
    prata_dir = "/content/prata"
    spark = initialize_spark_session("Processamento CSV")
    process_all_csvs_by_year(bronze_dir, prata_dir, spark)
    spark.stop()

if __name__ == "__main__":
    main()

Iniciando o processamento para o ano: 2000
Processamento do ano 2000 concluído em 4.47 segundos.
Iniciando o processamento para o ano: 2001
Processamento do ano 2001 concluído em 15.63 segundos.
Iniciando o processamento para o ano: 2002
Processamento do ano 2002 concluído em 34.47 segundos.
Iniciando o processamento para o ano: 2003
Processamento do ano 2003 concluído em 62.29 segundos.
Iniciando o processamento para o ano: 2004
Processamento do ano 2004 concluído em 67.36 segundos.
Iniciando o processamento para o ano: 2010
Processamento do ano 2010 concluído em 617.94 segundos.
Iniciando o processamento para o ano: 2011
Processamento do ano 2011 concluído em 693.65 segundos.
Iniciando o processamento para o ano: 2012
Processamento do ano 2012 concluído em 754.75 segundos.
Iniciando o processamento para o ano: 2013
Processamento do ano 2013 concluído em 793.35 segundos.
Iniciando o processamento para o ano: 2014
Processamento do ano 2014 concluído em 823.86 segundos.
Iniciando o proc

## 5. Continuação da Camada de Prata

Na segunda parte da Camada de Prata, realizamos o refinamento final dos dados processados. Nesta etapa, nos concentramos em três principais ações:

1. Normalização de Cabeçalhos e Valores, removendo colunas especiais geradas pela não decodificação
2. Tratamento de Valores Nulos: Substituindo valores -999 por valores nulos de fato ('null')
3. Combinação de DataFrames: Consolidar os dados dos arquivos separados em anos, em um único CSV, facilitando a manipulação e análise subsequente.

In [None]:
# Definindo o cabeçalho desejado fora do fluxo principal do código
new_header = [
    'REGIAO', 'UF', 'ESTACAO', 'CODIGO_(WMO)', 'LATITUDE', 'LONGITUDE',
    'ALTITUDE', 'DATA_DE_FUNDACAO_(YYYY-MM-DD)', 'DATA_(YYYY-MM-DD)', 'HORA_(UTC)', 'PRECIPITACAO_TOTAL',
    'HORARIO_(mm)', 'PRESSAO_ATMOSFERICA_AO_NIVEL_DA_ESTACAO_HORARIA_(mB)',
    'PRESSAO_ATMOSFERICA_MAX_NA_HORA_ANT_(AUT)_(mB)', 'PRESSAO_ATMOSFERICA_MIN_NA_HORA_ANT_(AUT)_(mB)',
    'RADIACAO_GLOBAL_(KJ/m_2)', 'TEMPERATURA_DO_AR_-_BULBO_SECO_HORARIA_(°C)', 'TEMPERATURA_DO_PONTO_DE_ORVALHO_(°C)',
    'TEMPERATURA_MAXIMA_NA_HORA_ANT_(AUT)_(°C)', 'TEMPERATURA_MINIMA_NA_HORA_ANT_(AUT)_(°C)',
    'TEMPERATURA_ORVALHO_MAX_NA_HORA_ANT_(AUT)_(°C)', 'TEMPERATURA_ORVALHO_MIN_NA_HORA_ANT_(AUT)_(°C)',
    'UMIDADE_REL_MAX_NA_HORA_ANT_(AUT)_(%)', 'UMIDADE_REL_MIN_NA_HORA_ANT_(AUT)_(%)',
    'UMIDADE_RELATIVA_DO_AR_HORARIA_(%)', 'VENTO_DIRECAO_HORARIA_(gr)_(°_(gr))',
    'VENTO_RAJADA_MAXIMA_(m/s)', 'VENTO_VELOCIDADE_HORARIA_(m/s)'
]

def normalizar_cabecalhos_e_valores(df):
    """
    Normaliza os cabeçalhos e valores das colunas, removendo caracteres especiais.
    """
    for col in df.columns:
        df = df.withColumnRenamed(col, col.encode('ascii', 'ignore').decode('ascii'))
    return df

def tratar_valores_nulos(df):
    """
    Substitui valores -999 por null no DataFrame.
    """
    df = df.select([when(col(c) == -999, None).otherwise(col(c)).alias(c) for c in df.columns])
    return df

def carregar_e_normalizar_csvs(prata_dir):
    """
    Carrega todos os arquivos CSV do diretório fornecido, normaliza os cabeçalhos e os valores,
    substitui valores -999 por null, e retorna uma lista de DataFrames.
    """
    all_csvs = glob.glob(os.path.join(prata_dir, "*.csv"))
    dataframes = []

    for csv_file in all_csvs:
        df = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv(csv_file)
        df = normalizar_cabecalhos_e_valores(df)
        df = tratar_valores_nulos(df)
        dataframes.append(df)

    return dataframes

def combinar_dataframes(dataframes):
    """
    Combina todos os DataFrames em um único DataFrame usando a operação de união (union).
    """
    df_final = dataframes[0]
    for df in dataframes[1:]:
        df_final = df_final.union(df)

    return df_final

def renomear_cabecalhos(df_final, new_header):
    """
    Renomeia os cabeçalhos do DataFrame final com base na lista fornecida em new_header.
    """
    for old_col, new_col in zip(df_final.columns, new_header):
        df_final = df_final.withColumnRenamed(old_col, new_col)
    return df_final

def salvar_dataframe(df_final, output_dir, output_filename):
    """
    Salva o DataFrame final em um arquivo CSV, movendo-o para fora do diretório temporário
    e removendo o diretório temporário após a operação.
    """
    output_temp_dir = os.path.join(output_dir, "temp_output")
    df_final.coalesce(1).write.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv(output_temp_dir)

    generated_csv = glob.glob(output_temp_dir + "/part-*.csv")[0]
    final_csv = os.path.join(output_dir, output_filename)
    shutil.move(generated_csv, final_csv)

    shutil.rmtree(output_temp_dir)

    return final_csv

def processar_csvs(prata_dir, new_header, output_filename="dataset_final.csv"):
    """
    Processa todos os arquivos CSV no diretório fornecido, combinando-os em um único CSV final,
    substituindo valores -999 por null e renomeando os cabeçalhos das colunas com base na lista new_header.
    """
    # Carregando e normalizando os CSVs
    dataframes = carregar_e_normalizar_csvs(prata_dir)

    # Combinando os DataFrames em um DataFrame final
    df_final = combinar_dataframes(dataframes)

    # Fazendo ajustes na coluna
    df_final = renomear_cabecalhos(df_final, new_header)

    # Salvando o DataFrame final
    final_csv = salvar_dataframe(df_final, prata_dir, output_filename)

    print(f"Todos os arquivos CSV foram combinados e salvos em um único arquivo CSV: {final_csv}")

spark = SparkSession.builder.appName("Juntar CSVs").getOrCreate()

prata_dir = "/content/prata"

processar_csvs(prata_dir, new_header)


Todos os arquivos CSV foram combinados e salvos em um único arquivo CSV: /content/prata/dataset_final.csv


### Exemplo do DataFrame final

In [None]:
spark = SparkSession.builder.appName("Visualizar Dataset").getOrCreate()
csv_path = "/content/prata/dataset_final.csv"
df = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv(csv_path)
df.show(15)
spark.stop()


+------+---+---------------+------------+-----------+------------+--------+-----------------------------+-----------------+----------+------------------+------------+----------------------------------------------------+----------------------------------------------+----------------------------------------------+------------------------+-------------------------------------------+------------------------------------+-----------------------------------------+-----------------------------------------+----------------------------------------------+----------------------------------------------+-------------------------------------+-------------------------------------+----------------------------------+-----------------------------------+-------------------------+
|REGIAO| UF|        ESTACAO|CODIGO_(WMO)|   LATITUDE|   LONGITUDE|ALTITUDE|DATA_DE_FUNDACAO_(YYYY-MM-DD)|DATA_(YYYY-MM-DD)|HORA_(UTC)|PRECIPITACAO_TOTAL|HORARIO_(mm)|PRESSAO_ATMOSFERICA_AO_NIVEL_DA_ESTACAO_HORARIA_(mB)|PRESSAO_AT