In [None]:
import logging
import time
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os
from google.cloud import bigquery
from tabulate import tabulate
import requests
from urllib.parse import urlparse
from pyspark.sql import SparkSession
from dotenv import load_dotenv

load_dotenv("url_empresas")

spark = (
        SparkSession.builder
        .appName("ETL_Empresas")
        .config("spark.jars", os.getenv("bigquery_jar"))
        .config("spark.driver.memory", "8g")
        .config("spark.sql.catalogImplementation", "in-memory")
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        .getOrCreate()
    )
    spark.conf.set("spark.sql.debug.maxToStringFields", 1000)


In [18]:
url_empresas = "https://engenheiro-dados-dados-teste.s3.sa-east-1.amazonaws.com/empresas.csv"

logging.info(f"--> Baixando arquivo de: {url_empresas}")
resposta = requests.get(url_empresas, stream=True)
resposta.raise_for_status()
arquivo_csv = spark.read.csv(resposta)

schema = StructType([StructField(f"_c{i}", StringType(), True) for i in range(30)])

df = (spark.read.option("sep", ";")
                .option("quote", '"') 
                .option("escape", '"')
                .option("multiLine", True) 
                .option("header", "false")
                .option("encoding", "latin1")
                .schema(schema)
                .csv(arquivo_csv)
    )


PySparkTypeError: [NOT_STR_OR_LIST_OF_RDD] Argument `path` should be a str or list[RDD], got Response.

In [None]:
df = spark.read.parquet(arquivo_parquet)
print(tabulate(df_municipios.head(10), headers='keys', tablefmt="psql"))

In [None]:
# Bibliotecas 
import pandas as pd
import requests
import logging
import time
from tabulate import tabulate
from pandas_gbq import to_gbq
from google.cloud import bigquery
import os
import tqdm

# Configuração do logs
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# URLs
url_empresas = "https://engenheiro-dados-dados-teste.s3.sa-east-1.amazonaws.com/empresas.csv"
# url_municipios = ""

# Funções do pipeline
def carregar_dados():
    logging.info("Carregando dados das empresas...")
    inicio_carga = time.time()

    df_empresas = pd.read_csv(url_empresas, sep=';', encoding='latin1', dtype=str, header=None)
    fim_carga = time.time()

    tempo_execucao = fim_carga - inicio_carga
    logging.info(f"Carregamento concluido! Tempo execução: {tempo_execucao:.2f} segundos.")

    return df_empresas

def tratamento_dados(df_empresas):
    logging.info("Iniciando o tratamento dos dados...")
    inicio_tratamento = time.time()

    # Renomeando as colunas
    df_empresas.columns = [
        "basico", "ordem", "dv", "identificador", "empresa", "sit_cadastral",
        "dt_sit_cadastral", "mot_sit_cadastral", "cidade_ext", "pais", "dt_inicio",
        "cnae_principal", "cnae_secundario", "tp_logradouro", "logradouro", "numero",
        "complemento", "bairro", "cep", "uf", "municipio", "ddd_telefone_1",
        "telefone_1", "ddd_telefone_2", "telefone_2", "ddd_fax", "telefone_fax",
        "email", "sit_especial", "dt_sit_especial"
    ]

    # Formatar coluna CNPJ
    df_empresas["cnpj"] = (
        df_empresas["basico"].str.zfill(8) +
        df_empresas["ordem"].str.zfill(4) +
        df_empresas["dv"].str.zfill(2)
    )
    df_empresas["cnpj"] = df_empresas["cnpj"].apply(
        lambda cnpj: f"{cnpj[:2]}.{cnpj[2:5]}.{cnpj[5:8]}/{cnpj[8:12]}-{cnpj[12:]}"
    )

    df_empresas.drop(["basico", "ordem", "dv"], axis=1, inplace=True)

    # Colunas em string
    colunas_texto = [
        "empresa", "email", "complemento", "tp_logradouro", "logradouro",
        "bairro", "cnae_principal", "cnae_secundario", "telefone_1", "telefone_2",
        "telefone_fax", "ddd_telefone_1", "ddd_telefone_2", "ddd_fax"
    ]
    for col in colunas_texto:
        if col in df_empresas.columns:
            df_empresas[col] = df_empresas[col].fillna("não informado").str.lower().str.strip()

    # Tratar colunas de data
    colunas_datas = ["dt_sit_cadastral", "dt_inicio", "dt_sit_especial"]
    for col in colunas_datas:
        if col in df_empresas.columns:
            df_empresas[col] = pd.to_datetime(df_empresas[col], errors="coerce").dt.date
    
    # Tratar colunas Matriz/Filial
    df_empresas["identificador"] = df_empresas["identificador"].replace({
        "1": "matriz",
        "2": "filial"
    })
    
    # Tratar situação cadastrar
    df_empresas["sit_cadastral"] = df_empresas["sit_cadastral"].replace({
        "01": "nula",
        "02": "ativa",
        "03": "suspensa",
        "04": "inapta",
        "08": "baixada"
    })

    # Tratar numero
    df_empresas["numero"] = df_empresas["numero"].fillna("s/n")
    
    # Convetendo colunas NaN
    df_empresas = df_empresas.where(pd.notnull(df_empresas), None)
    
    fim_tratamento = time.time()
    tempo_execucao = fim_tratamento - inicio_tratamento
    logging.info(f"Colunas tratadas! Tempo execução: {tempo_execucao:.2f} segundos.")

    return df_empresas

if __name__ == "__main__":
    df_empresas = carregar_dados()
    df_empresas = tratamento_dados(df_empresas)
    print(tabulate(df_empresas.head(20), headers='keys', tablefmt="psql"))
    # print(df_empresas_tratado.head(10))


KeyboardInterrupt: 