# PIPELINE DE DADOS - A1 - APLICANDO CONHECIMENTO
## Importação das bibliotecas

In [113]:
import os
import requests
import pandas as pd
import glob
import psycopg2
from io import StringIO
import csv
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta

## Criação de funções para inserir dados no Banco de Dados

In [114]:
def get_lista_url(dias):
    # Data de hoje
    data_atual = datetime.now()

    # Lista para armazenar os nomes dos arquivos
    lista = []

    # Gerar uma lista com x dias a partir da variável dias
    for i in range(dias):
        # Calcular a data correspondente
        data = data_atual - timedelta(days=i)
        # Formatando no estilo 'YYYYMMDD'
        data_formatada = data.strftime('%Y%m%d')
        # Criar o nome do arquivo
        nome_arquivo = f'https://dataserver-coids.inpe.br/queimadas/queimadas/focos/csv/diario/Brasil/focos_diario_br_{data_formatada}.csv'
        # Adicionar à lista
        lista.append(nome_arquivo)

    return lista

In [115]:
def download_files(urls, storage):
    
    # Cria a pasta 'storage' se ela não existir
    if not os.path.exists(storage):
        os.makedirs(storage)
    
    # Itera sobre as URLs para fazer o download dos arquivos
    for url in urls:
        try:
            # Extrai o nome do arquivo da URL
            file_name = url.split('/')[-1]
            
            # Caminho completo para salvar o arquivo
            file_path = os.path.join(storage, file_name)
            
            # Faz o download do arquivo
            response = requests.get(url)
            response.raise_for_status()  # Verifica se houve erros na solicitação
            
            # Salva o arquivo na pasta 'storage'
            with open(file_path, 'wb') as arquivo:
                arquivo.write(response.content)
                
            print(f"Arquivo {file_name} salvo com sucesso em 'storage'.")
        except Exception as e:
            print(f"Erro ao baixar {url}: {e}")

In [116]:
def csv_to_df(storage):
    # Criação de uma lista com os arquivos csv
    csv_files = glob.glob(f'{storage}/*.csv')

    # Definição do schema do arquivo csv
    schema = {
        'id': 'object',
        'lat': 'object',
        'lon': 'object',
        'data_hora_gmt': 'object',
        'satelite' : 'object',
        'municipio': 'object',
        'estado': 'object',
        'pais': 'object',
        'municipio_id': 'object',
        'estado_id' : 'object',
        'pais_id': 'object',
        'numero_dias_sem_chuva': 'object',
        'precipitacao': 'object',
        'risco_fogo': 'object',
        'bioma': 'object',
        'frp': 'object'
    }
    # Criação de um dataframe vazio
    df = pd.DataFrame()
    
    # Percorre todos os arquivos na pasta
    for csv_file in csv_files:
        df_csv = pd.read_csv(csv_file, dtype=schema, delimiter=',')
        df = pd.concat([df, df_csv])
    
    # Trata o dataframe para substituir os NAN por null
    df = df.replace({pd.NA: None})
    
    return df

In [117]:
def get_connection_postgres():
    # Definir os parâmetros de conexão com o PostgreSQL
    conn = psycopg2.connect(
        host="localhost",
        port="15432",
        database="mack", 
        user="mack",
        password="mack"
    )

    return conn

In [118]:
def create_table_db():
    # Variável para se conectar ao postgres
    conn = get_connection_postgres()

    # Criação de um cursor para executar comandos SQL
    cursor = conn.cursor()
    
    # Definir o comando SQL para criar a tabela
    criar_tabela_sql = """
    CREATE TABLE IF NOT EXISTS focos_queimadas (
        id TEXT,
        lat TEXT,
        lon TEXT,
        data_hora_gmt TEXT,
        satelite TEXT,
        municipio TEXT,
        estado TEXT,
        pais TEXT,
        municipio_id TEXT,
        estado_id TEXT,
        pais_id TEXT,
        numero_dias_sem_chuva TEXT,
        precipitacao TEXT,
        risco_fogo TEXT,
        bioma TEXT,
        frp TEXT
    );
    """
    
    try:
        # Executar o comando SQL
        cursor.execute(criar_tabela_sql)
        conn.commit()  # Confirma a transação no banco de dados
        print("Tabela criada com sucesso!")
    except Exception as e:
        print(f"Erro ao criar a tabela: {e}")
        conn.rollback()  # Reverte a transação em caso de erro
    finally:
        # Fechar a conexão com o banco de dados
        cursor.close()
        conn.close()

In [119]:
def inserir_dados_db(df, table):
    df_inclusao = df.copy()
    
    # Verificar se o DataFrame tem exatamente 16 colunas
    expected_columns = 16
    if df_inclusao.shape[1] != expected_columns:
        raise ValueError(f"O DataFrame deve ter exatamente {expected_columns} colunas.")
    
    # Variável para se conectar ao postgres
    conn = get_connection_postgres()

    sio = StringIO()
    writer = csv.writer(sio)
    writer.writerows(df_inclusao.values)
    sio.seek(0)
    try:
        with conn.cursor() as c:
            c.copy_expert(
                sql=f"""
                COPY {table} (
                    id,
                    lat,
                    lon,
                    data_hora_gmt,
                    satelite,
                    municipio,
                    estado,
                    pais,
                    municipio_id,
                    estado_id,
                    pais_id,
                    numero_dias_sem_chuva,
                    precipitacao,
                    risco_fogo,
                    bioma,
                    frp
                ) FROM STDIN WITH CSV""",
                file=sio
            )
            conn.commit()
            print('Dados inseridos na tabela com sucesso!')
    except Exception as e:
        print(f"Erro ao inserir dados: {e}")
        conn.rollback()  # Reverte a transação em caso de erro
    finally:
        # Fechar a conexão com o banco de dados
        conn.close()

In [120]:
def excluir_linhas_db(df, table):
    df_datas = df.copy()

    # Extraindo as datas distintas da coluna 'data_hora_gmt' no formato YYYY-MM-DD
    df_datas['data'] = df_datas['data_hora_gmt'].str[:10]  # Mantém apenas a data (YYYY-MM-DD)
    datas_distintas = df_datas['data'].unique()  # Obtém os valores únicos de data

    # Conectar ao banco de dados PostgreSQL usando SQLAlchemy
    engine = create_engine('postgresql+psycopg2://mack:mack@localhost:15432/mack')

    try:
        with engine.connect() as conn:
            # Executar uma query DELETE para cada data distinta
            for data in datas_distintas:
                query_delete = text(f"DELETE FROM {table} WHERE data_hora_gmt::text LIKE '{data}%';")
                conn.execute(query_delete)
                print(f"Linha(s) com data {data} excluída(s) com sucesso.")
    except Exception as e:
        print(f"Erro ao excluir as linhas: {e}")

In [121]:
def excluir_arquivos(storage):
    """
    Remove todos os arquivos de um diretório especificado.

    Parameters:
    path (str): O caminho do diretório onde os arquivos serão excluídos.
    """
     # Verificar se o diretório existe
    if not os.path.isdir(storage):
        print(f"O caminho especificado não é um diretório válido: {storage}")
        return

    # Listar todos os arquivos e pastas no diretório
    for nome_arquivo in os.listdir(storage):
        caminho_arquivo = os.path.join(storage, nome_arquivo)
        
        try:
            os.remove(caminho_arquivo)  # Excluir o arquivo
            print(f"Arquivo {caminho_arquivo} excluído com sucesso.")
        except Exception as e:
            print(f"Erro ao excluir o arquivo {caminho_arquivo}: {e}")

## Chama as funções de forma ordenada para excluir os dados antigos e inserir novos dados

In [122]:
# Chamar a função para criar a tabela
create_table_db()

Tabela criada com sucesso!


In [123]:
# Gera a lista de urls
lista_urls = get_lista_url(10)

# Define o caminho completo da pasta 'storage'
storage = os.path.join(os.getcwd(), '..', 'storage')

# Chama a função para excluir os arquivos CSV já baixados
excluir_arquivos(storage)

# Chama a função para fazer os downloads do arquivos
download_files(lista_urls, storage)

Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240902.csv excluído com sucesso.
Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240904.csv excluído com sucesso.
Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240901.csv excluído com sucesso.
Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240831.csv excluído com sucesso.
Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240830.csv excluído com sucesso.
Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240905.csv excluído com sucesso.
Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240903.csv excluído com sucesso.
Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240908.csv excluído com sucesso.
Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240906.csv excluído com sucesso.
Arquivo /home/yuri/mack/src/../storage/focos_diario_br_20240907.csv excluído com sucesso.
Arquivo focos_diario_br_20240908.csv salvo com sucesso em 'storage'.
Arquivo focos_diario_br_2024090

In [124]:
# Gera um dataframe do pandas a partir dos arquivos CSV
df = csv_to_df(storage)

# Chama a função para excluir os dados já existes para não ter duplicados
excluir_linhas_db(df, 'focos_queimadas')

# Chama a função para inserir os dados no DB do Postgres
inserir_dados_db(df, 'focos_queimadas')

Linha(s) com data 2024-09-02 excluída(s) com sucesso.
Linha(s) com data 2024-09-04 excluída(s) com sucesso.
Linha(s) com data 2024-09-01 excluída(s) com sucesso.
Linha(s) com data 2024-08-31 excluída(s) com sucesso.
Linha(s) com data 2024-08-30 excluída(s) com sucesso.
Linha(s) com data 2024-09-05 excluída(s) com sucesso.
Linha(s) com data 2024-09-03 excluída(s) com sucesso.
Linha(s) com data 2024-09-08 excluída(s) com sucesso.
Linha(s) com data 2024-09-06 excluída(s) com sucesso.
Linha(s) com data 2024-09-07 excluída(s) com sucesso.
Dados inseridos na tabela com sucesso!
