In [None]:
!pip3 install -r requirements.txt

In [None]:
%run schema.py

In [None]:
%run create_table.ipynb

In [None]:
# Importando as bibliotecas
import requests
import zipfile
import io
import os
import logging
import pandas as pd
from typing import Optional, Dict
from dotenv import load_dotenv
from snowflake.connector.pandas_tools import write_pandas

In [None]:
load_dotenv()

In [None]:
class Logger():
    def __init__(self):
        self.format = '%(asctime)s - %(levelname)s - %(message)s'


    def set_logger(self):
        logging.basicConfig(format=self.format, level=logging.INFO)
        logger = logging.getLogger()

        return logger   

In [None]:
class SnowflakeHandler:
    """
    Classe para gerenciar a conexão e operações com Snowflake.

    Atributos:
    ----------
    user: str
        Nome do usuário para a conexão com Snowflake.
    password: str
        Senha do usuário para a conexão com Snowflake.
    account: str
        Identificador da conta Snowflake.
    warehouse: str
        Nome do warehouse a ser usado na conexão.
    database: str
        Nome do banco de dados a ser usado na conexão.
    schema: str
        Nome do schema a ser usado na conexão.
    """

    def __init__(self):
        """
        Inicializa a classe SnowflakeHandler carregando as variáveis de ambiente.
        """
        self.session = None
        self.user = os.getenv("SNOWFLAKE_USER")
        self.password = os.getenv("SNOWFLAKE_PASSWORD")
        self.account = os.getenv("SNOWFLAKE_ACCOUNT_NAME")
        self.warehouse = os.getenv("SNOWFLAKE_WAREHOUSE")
        self.database = os.getenv("SNOWFLAKE_DATABASE")
        self.schema = os.getenv("SNOWFLAKE_SCHEMA")
        self.logger_class = Logger()
        self.logger = self.logger_class.set_logger()        

    def connect(self):
        """
        Estabelece uma conexão com Snowflake usando a configuração fornecida.
        
        Retorna:
        --------
        None
        """

        SNOWFLAKE_CONFIG = {
            'user': self.user,
            'password': self.password,
            'account': self.account,
            'warehouse': self.warehouse,
            'database': self.database,
            'schema': self.schema
        }         
          
        self.session = Session.builder.configs(SNOWFLAKE_CONFIG).create()

    def close_connection(self):
        """
        Fecha a conexão com Snowflake.
        
        Retorna:
        --------
        None
        """
        if self.session is not None:
            self.session.close()

    def save_dataframe(self, df: pd.DataFrame, table_name: str, column_names: list) -> None:
        """
        Salva um DataFrame no Snowflake, sobrescrevendo o conteúdo da tabela.

        Parâmetros:
        -----------
        df: pd.DataFrame
            O DataFrame que será salvo no Snowflake.
        table_name: str
            O nome da tabela onde o DataFrame será salvo.
        column_names: list
            Lista de nomes das colunas que devem ser usadas no DataFrame.
        
        Retorna:
        --------
        None
        """
        if self.session is None:
            raise ConnectionError("A conexão com Snowflake não está estabelecida.")

        df.columns = column_names

        # Limpar aspas e espaços desnecessários dos nomes das colunas
        df.columns = df.columns.str.replace(r'^"|"$', '', regex=True).str.strip()

        # Apagar o conteúdo existente na tabela
        try:
            self.session.sql(f"DELETE FROM {table_name}").collect()
            self.logger.info(f"Conteúdo da tabela '{table_name}' deletado com sucesso.")
        except Exception as e:
            self.logger.error(f"Erro ao deletar o conteúdo da tabela '{table_name}': {e}")
            raise

        # Inserir o novo DataFrame na tabela
        success, nchunks, nrows, _ = write_pandas(self.session.connection, df, table_name)

        if success:
            self.logger.info(f"DataFrame salvo com sucesso na tabela '{table_name}'. Total de linhas: {nrows}.")
        else:
            self.logger.error(f"Falha ao salvar o DataFrame na tabela '{table_name}'.")


In [None]:
class UrbanMobilityData:
    def __init__(self, url: str):
        """
        Inicializa a classe com a URL do arquivo zip.

        :param url: URL do arquivo zipado do Kaggle.
        """
        self.url = url
        self.file_paths = {
            "Household": "Household.csv",
            "Person": "Person.csv",
            "Stage": "Stage.csv",
            "Trip": "Trip.csv"
        }
        self.logger_class = Logger()
        self.logger = self.logger_class.set_logger()            

    def download_zip(self) -> Optional[bytes]:
        """
        Faz o download do arquivo zipado da URL fornecida.

        :return: Conteúdo do arquivo zipado em bytes, ou None em caso de erro.
        """
        try:
            response = requests.get(self.url)
            if response.status_code == 200:
                return response.content
            else:
                self.logger.error(f"Falha ao fazer o download dos arquivos. Status code: {response.status_code}")
                return None
        except Exception as e:
            self.logger.error(f"Erro ao baixar o arquivo: {e}")
            return None

    def extract_zip_to_dataframes(self, zip_content: bytes) -> Dict[str, Optional[pd.DataFrame]]:
        """
        Extrai o conteúdo do arquivo zipado e carrega os arquivos CSV em DataFrames.

        :param zip_content: Conteúdo do arquivo zipado em bytes.
        :return: Dicionário onde as chaves são os nomes dos arquivos e os valores são os DataFrames ou None.
        """
        dataframes = {}
        try:
            with zipfile.ZipFile(io.BytesIO(zip_content), 'r') as zip_ref:
                for file_name in zip_ref.namelist():
                    if file_name in self.file_paths.values():
                        with zip_ref.open(file_name) as file:
                            df = pd.read_csv(file, sep=';', on_bad_lines='skip')
                            key = [k for k, v in self.file_paths.items() if v == file_name][0]
                            dataframes[key] = df
                            self.logger.info(f"DataFrame {key} carregado com sucesso")
        except Exception as e:
            self.logger.error(f"Erro ao processar o arquivo zipado: {e}")
        return dataframes

In [None]:
if __name__ == "__main__":
    url = os.getenv("URL")
    logger_class = Logger()
    logger = logger_class.set_logger()    
    data_handler = UrbanMobilityData(url)
    create_table = SnowflakeTableCreator()


    # Fazendo o download do arquivo zip
    zip_content = data_handler.download_zip()
    if zip_content:
        # Extraindo o conteúdo do arquivo zip e carregando os DataFrames
        dataframes = data_handler.extract_zip_to_dataframes(zip_content)

        for name, df in dataframes.items():
            if df is not None:
                logger.info(name)

                handler = SnowflakeHandler()
                handler.connect()
                create_table.connect()

                target_table_location = name.upper()

                match name.upper():
                    case 'PERSON':
                        create_table.create_table(target_table_location, table_person)
                    case 'HOUSEHOLD':
                        create_table.create_table(target_table_location, table_household)
                    case 'STAGE':
                        create_table.create_table(target_table_location, table_stage)
                    case 'TRIP':
                        create_table.create_table(target_table_location, table_trip)

                create_table.disconnect()

                expected_columns = [c.upper() for c in df.columns]

                handler.save_dataframe(df, target_table_location, expected_columns)
        
                handler.close_connection()