<a href="https://colab.research.google.com/github/bigarcia/pece_usp_especializacao/blob/main/Exerc%C3%ADcio_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install libs

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=d2e1be33f2c85c071bc86551718ef9b4d028d52fd5b577dd1d895959b821e24a
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


Install the Hadoop AWS library to enable Spark to interact with S3:

## Functions

In [35]:
from pyspark.sql import SparkSession

def read_files(spark,source_path,delimiter, format="csv"):

  # Read data from csv file
  # df = spark.read.csv("path_to_your_file.csv", header=True, inferSchema=True)
  print(f"Reading file/folder: {source_path} using delimiter {delimiter}")
  df = spark.read.format(format).options(delimiter=delimiter, header=True, inferSchema=True).load(source_path)


  df.show(3)
  print("Total rows:",df.count())

  return df

In [36]:
# When you write a DataFrame to a Parquet file using PySpark, multiple files are often created by default. This happens because Spark writes out data in parallel and uses multiple partitions to distribute the workload. Each partition of your DataFrame is written to a separate Parquet file.
def write_parquet_files(spark,df,destination_folder):
  df.write.mode('overwrite').parquet(destination_folder)

In [37]:
def transformation_trusted(folder_name, df):
  from pyspark.sql.functions import regexp_replace, col, row_number, when, isnull
  from pyspark.sql.window import Window

  if folder_name == "Bancos":
    # A solução segue o mesmo principio do windows function row_number() no SQL
    # Create a window partitioned by col1, ordered by row_number
    # window = Window.partitionBy("CNPJ").orderBy(row_number().over(Window.partitionBy("CNPJ").orderBy("Nome")))
    window = Window.partitionBy("CNPJ").orderBy(row_number().over(Window.partitionBy("CNPJ").orderBy("Nome")))

    # Calculate row number for each partition
    df = df.withColumn("row_num", row_number().over(window))

    # Filter to keep the first row for each partition
    df = df.filter(col("row_num") == 1)

    df.show(3)
    print("Total rows after transformation:",df.count())

    # Remove "- PRUDENTIAL" dos nomes
    df = df.withColumn("Nome Original", col("Nome"))
    df = df.withColumn("Nome", regexp_replace(col("Nome"), "- PRUDENCIAL", ""))

    print("Limpeza realizada com sucesso: dados deduplicados e remoção do sufixo '- PRUDENCIAL' da coluna Nome")
  # if folder_name == "Empregados":
  #   df = df.withColumn("CNPJ_Segmento", when(isnull(col("CNPJ")), col("Segmento")).otherwise(col("CNPJ")))

  df.show()
  return df

In [30]:
#TO DO: Colocar dados para conexão
def load_dataframe_to_postgres(df, table_name, schema_name):
  # PostgreSQL connection details

  host = "your_host"
  port = "your_port"
  database = "your_database"
  user = "your_username"
  password = "your_password"

  # Construct JDBC URL with schema
  jdbc_url = f"jdbc:postgresql://{host}:{port}/{database}?currentSchema={schema_name}"

  # Write DataFrame to PostgreSQL
  df.write.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "your_table") \
    .option("driver", "org.postgresql.Driver") \
    .option("user", user) \
    .option("password", password) \
    .mode("overwrite") \
    .save()

In [31]:
def special_character_transforming(column):

  corrections = {
            "CR�DITO": "CRÉDITO",
            "MUNIC�PIO": "MUNICÍPIO",
            "UB�": "UBÁ",
            "S�O": "SÃO",
            "M�TUO": "MÚTUO",
            "CONFEDERA��O": "CONFEDERAÇÃO",
            "REGI�O": "REGIÃO",
            "AMAZ�NIA": "AMAZÔNIA",
            "ADMISS�O": " ADMISSÃO",
            "TRANSAMAZ�NICA": "TRANSAMAZÔNICA",
            "GOI�S": "GOIÁS",
            "INTERA��O": "INTERAÇÃO",
            "SOLID�RIA": "SOLIDÁRIA",
            "AG�NCIA": "AGÊNCIA",
            "RIBEIR�O": "RIBEIR˜AO",
            "TOP�ZIO": "TOPÁZIO",
            "PIAU�": "PIAUÍ"


        }
  for val in column:
    if val in corrections[0]:
      val_new = corrections[val]

1. Lemos o arquivo csv e tsv, salvamos no format parquet em camadas (raw, trusted e delivery), e depois fizemos load para postgre para cada camada

## Raw + Trusted

In [38]:
import os

# Create a Spark session
spark = SparkSession.builder \
    .appName("Exercício 3") \
    .getOrCreate()

# list_folders = [
#   {"folder_name":"Bancos", "delimiter": "\t"},
#   {"folder_name":"Reclamações", "delimiter": ";"},
#   {"folder_name":"Empregados", "delimiter": "|"},
# ]
list_folders = [
   {"folder_name":"Empregados", "delimiter": "|"},
]
root = '/content/sample_data'
for folder in list_folders:
    print("\nProcessing folder folder...")
    folder_name = folder["folder_name"]
    delimiter = folder["delimiter"]
    table_name = folder["folder_name"].lower()
    folder_path = os.path.join(root,folder_name)
    file_list = os.listdir(folder_path)


    # TODO: Definir o schema previamente da tabela de reclamações para corrigir a coluna de CNPJ +  Segmento? Na função read_files to usando o infer_schema

    #Verifica qual é o tipo do arquivo, e só aceita se for csv ou tsv
    if file_list[0].endswith('.csv') or file_list[0].endswith('.tsv'):
      df_raw = read_files(spark,os.path.join(folder_path,file_list[0]),delimiter, "csv")
      if folder_name == "Empregados":
        df_raw = df_raw.withColumnRenamed("CNPJ", "CNPJ_Segmento")
    for file_name in file_list[1:]:
        source_file_path = os.path.join(folder_path,file_name)
        if file_name.endswith('.csv') or file_name.endswith('.tsv'):
          temp_df = read_files(spark,source_file_path,delimiter, "csv")
          df_raw = df_raw.union(temp_df)
          print(f"Total rows table {table_name}:",df_raw.count())
    df_raw.show(5)
    print(f"Total rows table {table_name}:",df_raw.count())


    ### Camada raw ###
    raw_folder = os.path.join(root, "raw", table_name)
    print(f"\n Escrevendo arquivos de {folder_name} em parquet...")
    print(f"\n Dataframe resultado da camada raw:")
    df_raw.show()
    write_parquet_files(spark, df_raw, raw_folder)
    # load_dataframe_to_postgres(df_raw, table_name, "raw")


    ### Camada trusted ###

    #Ler da camada raw
    print("\n Lendo arquivos parquet da camada raw")
    read_raw = read_files(spark,raw_folder,delimiter, "parquet")
    read_raw.show(5)



    #TODO:fazer trativa decode. Uma solução; pegar o nome oficial da base de bancos (aquele git externo)
    # ===== Sugestão =====
    # Outra sugestão: Passa um for identificando os nomes que contém <?>, se existir ele irá buscar em um dict o valor correspondente aquela palavra (palavra correta).
    # Se não existir, cria um arquivo .txt com o nome errado para que possamos verificar e acrescentar o nome correto.
    # Toda vez que executar o código, o arquivo será deletado e só será criado um novo caso aconteça novamente de ter uma nova palavra
    print(f"Limpando os dados da tabela {folder_name}...")
    df_trusted = transformation_trusted(folder_name, df_raw)
    # Escrever na camada trusted
    trusted_folder = os.path.join(root, "trusted", table_name)
    print("\n Escrevendo em arquivo parquet...")
    write_parquet_files(spark, df_trusted, trusted_folder)
    print("\n Dataframe resultado da camada trusted:")
    df_trusted.show()
    #Escrever no banco de dados
    # load_dataframe_to_postgres(df_trusted, table_name, "trusted")



Processing folder folder...
Reading file/folder: /content/sample_data/Empregados/glassdoor_consolidado_join_match_less_v2.csv using delimiter |
+--------------------+-------------+-------------+--------------+--------------+--------------------+---------------------+----------------+--------------------+--------------------+--------------------+-----+-----------------+----------------------+-----------------+--------------+------------------------+-------------------------+---------------------------------+----------------------------------+--------+--------------------+-------------+
|       employer_name|reviews_count|culture_count|salaries_count|benefits_count|    employer-website|employer-headquarters|employer-founded|   employer-industry|    employer-revenue|                 url|Geral|Cultura e valores|Diversidade e inclusão|Qualidade de vida|Alta liderança|Remuneração e benefícios|Oportunidades de carreira|Recomendam para outras pessoas(%)|Perspectiva positiva da empresa(%)|    

## Delivery

In [None]:
### Camada Delivery ###
#Ler da camada trusted
print("\n Lendo arquivos parquet da camada trustred")
read_trusted = read_files(spark,trusted_folder,delimiter, "parquet")
read_trusted.show(5)



#TODO:fazer tratativas da camada delivery

df_delivery = transformation_delivery(df_trusted)
# Escrever na camada delivery
delivered_folder = os.path.join(root, "delivery", table_name)
print("\n Escrevendo em arquivo parquet...")
write_parquet_files(spark, df_delivery, delivered_folder)
load_dataframe_to_postgres(df, table_name, "delivery")

##Teste deduplicar registro

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, col
from pyspark.sql.window import Window

# Create a SparkSession
spark = SparkSession.builder.appName("DropDuplicates").getOrCreate()

# Sample data
data = [("A", "PRUDENCIAL", 1),
        ("A", "OTHER", 2),
        ("B", "OTHER", 3),
        ("B", "PRUDENCIAL", 4),
        ("C", "PRUDENCIAL", 5)]
df = spark.createDataFrame(data, ["col1", "col2", "value"])

# Create a window partitioned by col1, ordered by row_number
window = Window.partitionBy("col1").orderBy(row_number().over(Window.partitionBy("col1").orderBy("col2")))

# Calculate row number for each partition
df = df.withColumn("row_num", row_number().over(window))

# Filter to keep the first row for each partition
result_df = df.filter(col("row_num") == 1)

result_df.show()


+----+----------+-----+-------+
|col1|      col2|value|row_num|
+----+----------+-----+-------+
|   A|     OTHER|    2|      1|
|   B|     OTHER|    3|      1|
|   C|PRUDENCIAL|    5|      1|
+----+----------+-----+-------+



## Testes

In [None]:
from pyspark.sql import SparkSession

# Step 2: Create a Spark session
spark = SparkSession.builder \
    .appName("Exercício 3") \
    .getOrCreate()

# Step 3: Read data from a CSV file
# df = spark.read.csv("path_to_your_file.csv", header=True, inferSchema=True)
df = spark.read.options(delimiter='\t', header=True, inferSchema=True).csv("/content/sample_data/EnquadramentoInicia_v2.tsv")

# Step 4: Show the first few rows of the DataFrame
df.show()


+--------+--------+--------------------+
|Segmento|    CNPJ|                Nome|
+--------+--------+--------------------+
|      S1|       0|BANCO DO BRASIL -...|
|      S1|60746948|BRADESCO - PRUDEN...|
|      S1|30306294|BTG PACTUAL - PRU...|
|      S1|  360305|CAIXA ECONOMICA F...|
|      S1|60872504|   ITAU - PRUDENCIAL|
|      S1|90400888|SANTANDER - PRUDE...|
|      S2|92702067|BANRISUL - PRUDEN...|
|      S2| 7237373|BANCO DO NORDESTE...|
|      S2|33657248|  BNDES - PRUDENCIAL|
|      S2|33479023|CITIBANK - PRUDEN...|
|      S2|33987793|CREDIT SUISSE - P...|
|      S2|58160789|  SAFRA - PRUDENCIAL|
|      S2|59588111|VOTORANTIM - PRUD...|
|      S3|28195667|ABC-BRASIL - PRUD...|
|      S3|60770336|   ALFA - PRUDENCIAL|
|      S3|  655522|          APE POUPEX|
|      S3| 2992446|BANCO CNH INDUSTR...|
|      S3| 2038232|BANCOOB - PRUDENCIAL|
|      S3|28127603|BANESTES - PRUDEN...|
|      S3|31597552|BANCO CLASSICO S....|
+--------+--------+--------------------+
only showing top

In [None]:
import pandas as pd

file_path = "C:\\Users\\lucas\\OneDrive\\Área de Trabalho\\USP\\C3_Ingestão de Dados\\ETL com Python\\Dados\\Bancos\\EnquadramentoInicia_v2.csv"
dest_file = "C:\\Users\\lucas\\OneDrive\\Área de Trabalho\\USP\\C3_Ingestão de Dados\\ETL com Python\\Dados\\Bancos\\banco corrigido.xlsx"

class correct_text:
    corrections = {
        "CR�DITO": "CRÉDITO",
        "MUNIC�PIO": "MUNICÍPIO",
        "UB�": "UBÁ",
        "S�O": "SÃO",
        "M�TUO": "MÚTUO",
        "CONFEDERA��O": "CONFEDERAÇÃO",
        "REGI�O": "REGIÃO",
        "AMAZ�NIA": "AMAZÔNIA",
        "ADMISS�O": " ADMISSÃO",
        "TRANSAMAZ�NICA": "TRANSAMAZÔNICA",
        "GOI�S": "GOIÁS",
        "INTERA��O": "INTERAÇÃO",
        "SOLID�RIA": "SOLIDÁRIA",
        "AG�NCIA": "AGÊNCIA",
        "RIBEIR�O": "RIBEIRÃO",
        "TOP�ZIO": "TOPÁZIO",
        "PIAU�": "PIAUÍ",
        'PARAN�': 'PARANÁ',
        'ALIAN�A': 'ALIANÇA',
        'JACU�': 'JACUÍ',
        'PRODU��O': 'PRODUÇÃO',
        'PAR�': 'PARÁ',
        'ITAJA�': 'ITAJAÍ',
        'INTEGRA��O': 'INTEGRAÇÃO',
        'GA�CHO': 'GAÚCHO',
        'UNI�O': 'UNIÃO',
        'CREDIGUA�U': 'CREDIGUAÇU',
        'EMPRES�RIOS': 'EMPRESÁRIOS',
        'IGUA�U': 'IGUAÇU',
        'PARAN�/SÃO': 'PARANÁ/SÃO',
        'PARA�BA': 'PARAÍBA',
        'M�DICOS': 'MÉDICOS',
        'SA�DE': 'SAÚDE',
        'SOLID�RIO': 'SOLIDÁRIO',
        'POUPAN�A': 'POUPANÇA',
        'ELETROBR�S': 'ELETROBRÁS',
        'NEG�CIOS': 'NEGÓCIOS',
        'SEGURAN�A': 'SEGURANÇA',
        'P�BLICA': 'PÚBLICA',
        'T�TULO': 'TÍTULO',
        'FAM�LIA': 'FAMÍLIA',
        'M�XIMA': 'MÁXIMA',
        'MONOP�LIO': 'MONOPÓLIO',
        'C�MBIO': 'CÂMBIO',
        '�REA': 'ÁREA',
        'TEND�NCIA': 'TENDÊNCIA',
        'M�DICOS,': 'MÉDICOS',
        'CI�NCIAS': 'CIÊNCIAS',
        'FARMAC�UTICA': 'FARMACÊUTICA',
        'ESP�RITO': 'ESPÍRITO',
        'PARTICIPA��ES': 'PARTICIPAÇÕES',
        'SERVI�OS': 'SERVIÇO',
        'EMPR�STIMO': 'EMPRÉSTIMO',
        'FUNCION�RIOS': 'FUNCIONÁRIOS',
        'GOI�NIA': 'GOIÂNIA',
        'ROND�NIA': 'RONDÔNIA',
        'DIVIN�POLIS': 'DIVINÓPOLIS',
        'ITA�NA': 'ITAÚNA',
        'SEBASTI�O': 'SEBASTIÃO',
    }

    def replace_charact(text, corrections_dict):
        for key, value in corrections_dict.items():
          text = text.replace(key, value)
        return text

    # def identificar_palavras_com_caractere(df, coluna, arquivo_saida):
    def identify_words_with_character(df, coluna, arquivo_saida):

        novas_palavras = []

        for bank in df[coluna]:
            words = bank.split()
            for word in words:
                if '�' in word and word not in (novas_palavras):
                    novas_palavras.append(word)

        with open(arquivo_saida, 'a',encoding='utf-8') as arquivo:
            for palavra in novas_palavras:
                # arquivo.write(palavra + '\n')
                arquivo.write(f'{str(palavra)} \n')
        arquivo.close()

        print(novas_palavras)


    def words_with_errors(df, coluna, arquivo_saida):

        novas_palavras = {}

        for bank in df[coluna]:
            words = bank.split()
            for word in words:
                if '�' in word and word not in (novas_palavras):
                    novas_palavras[word] = ''

        with open(arquivo_saida, 'a',encoding='utf-8') as arquivo:
            arquivo.write(str(novas_palavras))
            # for palavra in novas_palavras:
            #     # arquivo.write(palavra + '\n')
            #     arquivo.write(f'{str(palavra)} \n')
        arquivo.close()

        print(novas_palavras)


# Dicionário de correções
corrections = correct_text.corrections

# Banks dataframe
df = pd.read_csv(file_path, sep="\t")

# Aplicando a função a todas as linhas da coluna 'Nome'
df['Nome Corrigido'] = df['Nome'].apply(lambda x: correct_text.replace_charact(x, corrections))

# novas_palavras = []

# for bank in df['Nome Corrigido']:
#     words = bank.split()
#     for word in words:
#         if '�' in word and word not in (novas_palavras):
#             novas_palavras.append(word)

# print(novas_palavras)


# df.to_excel(dest_file)
log_palavras = "C:\\Users\\lucas\\OneDrive\\Área de Trabalho\\USP\\C3_Ingestão de Dados\\ETL com Python\\Dados\\Bancos\\new_words_with_special.txt"
log_palavras2 = "C:\\Users\\lucas\\OneDrive\\Área de Trabalho\\USP\\C3_Ingestão de Dados\\ETL com Python\\Dados\\Bancos\\words_with_errors.json"

# Identificando palavras com o caractere '�' e salvando em um arquivo
# correct_text.identify_words_with_character(df, 'Nome Corrigido', log_palavras)
correct_text.words_with_errors(df, 'Nome Corrigido', log_palavras2)



print(df)

