<a href="https://colab.research.google.com/github/amandamayra/Ingestao-de-dados/blob/main/data_ingestion_spark_sql_etl_ATIVIDADE_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# # Instalar dependências
# !pip install \
#     pyspark \
#     dbt-core \
#     dbt-postgres

# python -m pip install \
#   dbt-core \
#   dbt-postgres \
#   dbt-redshift \
#   dbt-snowflake \
#   dbt-bigquery \
# ----------------------- #
import subprocess
import sys

def install_or_update(package):
    # Atualiza o instalador de pacotes
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pip"])
    # Tenta importar o pacote. Se falhar, instala-o.
    try:
        __import__(package)
        # Se o pacote já estiver instalado, tenta atualizá-lo
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", package])
    except ImportError:
        # Se o pacote não estiver instalado, instala-o
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# Lista de pacotes que você deseja verificar/instalar/atualizar
required_packages = ['cchardet', 'pandas', 'ydata-profiling', 'pyspark']

for package in required_packages:
    install_or_update(package)


In [None]:
# Importar bibliotecas necessárias
import os
# import dbt.main as dbt
from google.colab import drive
from pyspark import SparkContext as SC
from pyspark.sql import SparkSession as SS, DataFrame as DF
from pyspark.sql.functions import col

In [None]:
# Montar o Google Drive
drive.mount('/content/drive/', force_remount=True)
# Definir diretório de trabalho
os.chdir('/content/drive/MyDrive/Colab Notebooks/')

# Configurar o SparkSession
spark = SS.builder \
    .master('local') \
    .appName("AtividadeETL_4") \
    .config("spark.driver.memory", "6g") \
    .config("spark.logConf", True) \
    .getOrCreate()
SC.setLogLevel(spark, "ERROR")

# Definir caminhos (ajuste conforme sua estrutura de pastas no Google Drive)
# full_path = "/content/drive/MyDrive/Colab Notebooks/AtividadeETL-4"
base_path = "./AtividadeETL-4/data"
raw_path = f"{base_path}/raw"
trusted_path = f"{base_path}/trusted"
delivery_path = f"{base_path}/delivery"

Mounted at /content/drive/


In [None]:
import cchardet
import pandas as pd

# Função para detectar o encoding do arquivo
def detect_encoding_cchardet(file_path):
    with open(file_path, 'rb') as file:
        result = cchardet.detect(file.read())
    return result

path = f'{base_path}/Bancos/EnquadramentoInicia_v2.tsv'

result = detect_encoding_cchardet(path)
print(f"The most likely encoding is: {result['encoding']}, with confidence {(result['confidence']*100) // 1} %.", '\n')

# Lê o arquivo usando o encoding detectado
df = pd.read_csv(path, sep='\t', encoding=result['encoding'])

# Exibe as primeiras linhas do DataFrame com a nova codificação de caracteres
print(df.head())


The most likely encoding is: UTF-8, with confidence 99.0 %. 

  Segmento      CNPJ                                  Nome
0       S1         0          BANCO DO BRASIL - PRUDENCIAL
1       S1  60746948                 BRADESCO - PRUDENCIAL
2       S1  30306294              BTG PACTUAL - PRUDENCIAL
3       S1    360305  CAIXA ECONOMICA FEDERAL - PRUDENCIAL
4       S1  60872504                     ITAU - PRUDENCIAL


In [None]:
enc_character = 'cp1252' # utf8
bancos_file = f'{base_path}/Bancos/EnquadramentoInicia_v2.tsv'
empregados_files = [f'{base_path}/Empregados/glassdoor_consolidado_join_{suffix}_v2.csv' for suffix in ['match_less', 'match']]
reclamacoes_folder = f'{base_path}/Reclamações'

def read_raw_data(file_path, separator):
    '''
    Função para ler arquivos de dados CSV,
    TSV de acordo com os separadores.
    '''
    return spark.read.csv( \
        file_path, \
        sep=separator, \
        header=True, \
        encoding=enc_character \
        )

def treat_data(df):
    '''
    Função para tratar dados.
    ### # Remove duplicatas e valores nulos #
    '''
    df = df.dropDuplicates().na.drop()
    return df

# Leitura e tratamento dos dados dos Bancos
df_bancos = read_tsv(bancos_file)
df_bancos = treat_data(df_bancos)
df_bancos.createOrReplaceTempView("bancos_view")
spark.sql("SELECT * FROM bancos_view LIMIT 50").show(truncate=False)

# Leitura e tratamento dos dados de Empregados
df_empregados = read_csv(empregados_files)
df_empregados = treat_data(df_empregados)

# Leitura e tratamento dos dados de Reclamações
reclamacoes_files = [os.path.join(reclamacoes_folder, f) for f in os.listdir(reclamacoes_folder) if f.endswith('.csv')]
df_reclamacoes = read_csv(reclamacoes_files)
df_reclamacoes = treat_data(df_reclamacoes)

# Exibição de informações sobre os DataFrames
print("Informações sobre o DataFrame de Bancos:")
df_bancos.show(truncate=False)
df_bancos.printSchema()

print("\nInformações sobre o DataFrame de Empregados:")
df_empregados.show(truncate=False)
df_empregados.printSchema()

print("\nInformações sobre o DataFrame de Reclamações:")
df_reclamacoes.show(truncate=False)
df_reclamacoes.printSchema()

# Merge dos DataFrames
# Assumindo que há colunas comuns para fazer o merge. Ajuste conforme necessário.
merged_df = df_bancos.join(df_empregados, df_bancos["Nome"] == df_empregados["employer_name"], "left")
merged_df = merged_df.join(df_reclamacoes, merged_df["CNPJ"] == df_reclamacoes["CNPJ"], "left")

# Tratamento final do DataFrame mesclado
merged_df = treat_data(merged_df)

# Exibição de informações sobre o DataFrame final
print("\nInformações sobre o DataFrame Mesclado Final:")
merged_df.show(truncate=False)
merged_df.printSchema()

# Salvando o resultado final
merged_df.write.csv('resultado_merge_final.csv', header=True, mode='overwrite')
print("\nArquivo 'resultado_merge_final.csv' salvo com sucesso.")


+--------+--------+---------------------------------------------------------------------------------------------------------+
|Segmento|CNPJ    |Nome                                                                                                     |
+--------+--------+---------------------------------------------------------------------------------------------------------+
|S4      |16657210|CHLOE DTVM LTDA.                                                                                         |
|S5      |66788142|COOPERATIVA DE CREDITO DE LIVRE ADMISSï¿½O DE Cï¿½NDIDO MOTA E REGIï¿½O.                                 |
|S5      |1601342 |COOPERATIVA DE CRï¿½DITO DE LIVRE ADMISSï¿½O DE MICRO REGIï¿½ES DE GOIï¿½NIA E ADJACENTES LTDA.          |
|S5      |5745533 |COOPERATIVA DE CRï¿½DITO RURAL COM INTERAï¿½ï¿½O SOLIDï¿½RIA DE ITATIBA DO SUL - CRESOL ITATIBA DO SUL   |
|S5      |4223451 |CREDILON SCM REG. LONDRINA LTD                                                                     

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `employer_name` cannot be resolved. Did you mean one of the following? [`employer_name|reviews_count|culture_count|salaries_count|benefits_count|employer-website|employer-headquarters|employer-founded|employer-industry|employer-revenue|url|Geral|Cultura e valores|Diversidade e inclusÃ£o|Qualidade de vida|Alta lideranÃ§a|RemuneraÃ§Ã£o e benefÃ­cios|Oportunidades de carreira|Recomendam para outras pessoas(%)|Perspectiva positiva da empresa(%)|Segmento|Nome|match_percent`].

In [None]:
import pandas as pd
from ydata_profiling import ydata-profiling

df = pd.read_csv("data/Train_Dataset.csv")
profile = ydata-profiling(df, title="Pandas Profiling Report")
profile

In [None]:
def load_dataframe(file_path: str) -> DF:
    '''
    Carrega um arquivo CSV, selecione colunas e retorna um DataFrame.

    Args:
        file_path: O caminho do arquivo CSV a ser carregado.

    Returns:
        O DataFrame contendo os dados do arquivo
            com colunas selecionadas conforme especificar.
    '''
    df = (spark.read \
          .options(index=False, \
                   inferSchema=True, \
                   header=True, \
                   delimiter=",") \
          .csv(file_path))
    return df

In [None]:
def remove_long_rows(df: DF) -> DF:
    '''
    Remove as linhas do DataFrame que contenham algum valor acima de 256 caracteres.

    Args:
        df: O DataFrame a ser filtrado.

    Returns:
        O DataFrame resultante após a remoção das linhas.
    '''
    selected_columns_list = df.columns
    df_filtered = df
    for column in selected_columns_list:
        df_filtered = (
            df_filtered.withColumn
                (
                column, \
                regexp_replace(df_filtered[column], \
                "\\n{1,2}", "") \
                ) \
            .filter(length(df_filtered[column]) <= 256)
        )
    return df_filtered

In [None]:
def filtering_df_languages(df_filter_language: DF) -> DF:
    '''
    Filtra um DataFrame com base nos idiomas detectados nas duas primeiras colunas.

    Args:
        df_filter_language: DataFrame a ser filtrado.

    Returns:
        DataFrame resultante após a filtragem.
    '''

    asian_languages = ['zh', 'ja', 'ko']
    cyrillic_languages = ['ru', 'uk', 'bg']
    middle_east_languages = ['ar', 'fa']
    languages_filter_list = asian_languages + cyrillic_languages + middle_east_languages

    # Função para detectar o idioma do texto
    def detect_language(texts: List[str]) -> List[str]:
        languages = [langid.classify(text)[0] for text in texts]
        return languages

    # Função para filtrar idiomas
    def filter_languages(languages: List[str]) -> bool:
        return all(language not in languages_filter_list for language in languages)

    detect_lang_udf = udf(detect_language, ArrayType(StringType()))
    filter_lang_udf = udf(filter_languages, BooleanType())

    # DataFrame resultante
    filtered_df = df_filter_language

    for column in df_filter_language.columns[:2]:
        language_array = detect_lang_udf(filtered_df[column])  # Array de idiomas detectados
        filtered_df = filtered_df.withColumn("languages", language_array) \
            .filter(filter_lang_udf(col("languages"))) \
            .drop("languages")

    return filtered_df

In [None]:
def save_as_clean_csv(df: DF, file_name: str, output_folder: str) -> None:
    '''
    Salva o DataFrame em um arquivo CSV, renomeia e move para uma pasta especificada.

    Args:
        df: O DataFrame a ser salvo.
        file_name: O nome do arquivo CSV de saída (sem o sufixo "_limpos.csv").
        output_folder: O nome da pasta de saída em que o arquivo será movido.

    '''
    csv_folder = output_folder + file_name
    df.repartition(1) \
        .write \
        .options(header=True, single=True) \
        .mode('overwrite') \
        .csv(csv_folder)

    old_file_path = glob.glob(os.path.join(csv_folder, "*.csv"))
    clean_file_path = os.path.join(cleaned_path, file_name + '_limpos.csv')
    if not os.path.exists(cleaned_path):
        os.mkdir(cleaned_path)
    shutil.move(old_file_path[0], clean_file_path)

---

In [None]:
# Função para ingestão de dados (camada RAW)
def ingest_raw_data():
    # Ingerir dados de reclamações
    df_reclamacoes = spark.read.csv(f"{base_path}/dados_originais/reclamacoes.csv", header=True, sep="|")
    df_reclamacoes.write.mode("overwrite").parquet(f"{raw_path}/reclamacoes")

    # Ingerir dados de empregados
    df_empregados = spark.read.csv(f"{base_path}/dados_originais/empregados.csv", header=True, sep="|")
    df_empregados.write.mode("overwrite").parquet(f"{raw_path}/empregados")

    # Ingerir dados de bancos
    df_bancos = spark.read.csv(f"{base_path}/dados_originais/bancos.csv", header=True, sep="\t")
    df_bancos.write.mode("overwrite").parquet(f"{raw_path}/bancos")


In [None]:
# Função para transformação inicial (camada Trusted)
def transform_to_trusted():
    # Transformar reclamações
    df_reclamacoes = spark.read.parquet(f"{raw_path}/reclamacoes")
    df_reclamacoes_trusted = df_reclamacoes.select(
        col("Ano").alias("ano"),
        col("Trimestre").alias("trimestre"),
        col("CNPJ IF").alias("cnpj"),
        col("Instituição financeira").alias("instituicao"),
        col("Quantidade total de reclamações").cast("int").alias("total_reclamacoes")
    )
    df_reclamacoes_trusted.write.mode("overwrite").parquet(f"{trusted_path}/reclamacoes")

    # Transformar empregados
    df_empregados = spark.read.parquet(f"{raw_path}/empregados")
    df_empregados_trusted = df_empregados.select(
        col("Nome").alias("instituicao"),
        col("Geral").cast("float").alias("avaliacao_geral"),
        col("Recomendam para outras pessoas(%)").cast("float").alias("recomendacao")
    )
    df_empregados_trusted.write.mode("overwrite").parquet(f"{trusted_path}/empregados")

    # Transformar bancos
    df_bancos = spark.read.parquet(f"{raw_path}/bancos")
    df_bancos_trusted = df_bancos.select(
        col("Segmento").alias("segmento"),
        col("CNPJ").alias("cnpj"),
        col("Nome").alias("instituicao")
    )
    df_bancos_trusted.write.mode("overwrite").parquet(f"{trusted_path}/bancos")


In [None]:
# Função para transformação final (camada Delivery)
def transform_to_delivery():
    # Aqui usaremos SQL via Spark para simular o que seria feito no DBT
    spark.read.parquet(f"{trusted_path}/reclamacoes").createOrReplaceTempView("reclamacoes")
    spark.read.parquet(f"{trusted_path}/empregados").createOrReplaceTempView("empregados")
    spark.read.parquet(f"{trusted_path}/bancos").createOrReplaceTempView("bancos")

    df_final = spark.sql("""
        SELECT
            b.segmento,
            b.cnpj,
            b.instituicao,
            r.ano,
            r.trimestre,
            r.total_reclamacoes,
            e.avaliacao_geral,
            e.recomendacao
        FROM bancos b
        LEFT JOIN reclamacoes r ON b.cnpj = r.cnpj
        LEFT JOIN empregados e ON b.instituicao = e.instituicao
    """)

    df_final.write.mode("overwrite").parquet(f"{delivery_path}/tabela_final")


In [None]:
# Executar o pipeline
ingest_raw_data()
transform_to_trusted()
transform_to_delivery()

# Visualizar os resultados finais
df_final = spark.read.parquet(f"{delivery_path}/tabela_final")
df_final.show()

# Parar a sessão Spark
spark.stop()


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/drive/MyDrive/Colab Notebooks/AtividadeETL-4/dados_originais/reclamacoes.csv.