# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 8543e01a-ddc5-4d1d-9fed-16dbc1b5b57b
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 8543e01a-ddc5-4d1d-9fed-16dbc1b5b57b to get into ready status...
Session 8543e01a-ddc5-4d1d-9fed-16dbc1b5b57b ha

In [2]:
from pyspark.sql.functions import col,regexp_replace,lit





In [26]:
df_bancos = spark.read.csv("s3://atividade6/data/raw/Bancos", header=True, sep="\t", encoding="iso-8859-1")
df_bancos.write.parquet("s3://atividade6/data/raw/bancos_parquet", mode="overwrite")





In [28]:
df_empregados_1 = spark.read.csv("s3://atividade6/data/raw/Empregados/glassdoor_consolidado_join_match_less_v2.csv", header=True, sep="|")
df_empregados_2 = spark.read.csv("s3://atividade6/data/raw/Empregados/glassdoor_consolidado_join_match_v2.csv", header=True, sep="|")

df_empregados_1 = df_empregados_1.withColumn("Segmento", lit(""))
df_empregados_2 = df_empregados_2.withColumn("CNPJ", lit(""))

df_empregados = df_empregados_1.union(df_empregados_2)
df_empregados.write.parquet("s3://atividade6/data/raw/empregados_parquet", mode="overwrite")


root
 |-- employer_name: string (nullable = true)
 |-- reviews_count: string (nullable = true)
 |-- culture_count: string (nullable = true)
 |-- salaries_count: string (nullable = true)
 |-- benefits_count: string (nullable = true)
 |-- employer-website: string (nullable = true)
 |-- employer-headquarters: string (nullable = true)
 |-- employer-founded: string (nullable = true)
 |-- employer-industry: string (nullable = true)
 |-- employer-revenue: string (nullable = true)
 |-- url: string (nullable = true)
 |-- Geral: string (nullable = true)
 |-- Cultura e valores: string (nullable = true)
 |-- Diversidade e inclusão: string (nullable = true)
 |-- Qualidade de vida: string (nullable = true)
 |-- Alta liderança: string (nullable = true)
 |-- Remuneração e benefícios: string (nullable = true)
 |-- Oportunidades de carreira: string (nullable = true)
 |-- Recomendam para outras pessoas(%): string (nullable = true)
 |-- Perspectiva positiva da empresa(%): string (nullable = true)
 |-- CNP

In [35]:
s3_client = boto3.client("s3")
fileList = s3_client.list_objects_v2(Bucket="atividade6", Prefix="data/raw/Reclamacoes/")

reclamacoes_list = []

if 'Contents' in fileList:
    for obj in fileList['Contents']:
        filename = obj['Key']
        try:
            s3_path = f's3a://atividade6/{filename}'
            df = spark.read.csv(s3_path, sep=';', header=True, encoding='iso-8859-1')

            reclamacoes_list.append(df)
        except Exception as e:
            print(f'Não foi possível abrir o arquivo {filename}: {str(e)}')
else:
    print("Nenhum arquivo encontrado no bucket.")
    
if reclamacoes_list:
    reclamacoes_df = reclamacoes_list[0]
    for df in reclamacoes_list[1:]:
        reclamacoes_df = reclamacoes_df.union(df)

    reclamacoes_df.write.parquet("s3://atividade6/data/raw/reclamacoes_parquet")
else:
    print("Array vazio")





In [66]:
df_bancos_parquet = spark.read.parquet("s3://atividade6/data/raw/bancos_parquet")
df_bancos_parquet = df_bancos_parquet.withColumn(
    "CNPJ",
    col("CNPJ")
    .cast("string")
    )
df_bancos_parquet = df_bancos_parquet.withColumn(
    'Nome',
    regexp_replace(
        col('Nome'),
        ' - PRUDENCIAL',
        '')
    )
df_bancos_parquet = df_bancos_parquet.withColumnRenamed("CNPJ", "cnpj").withColumnRenamed("Nome", "nome").withColumnRenamed("Segmento", "segmento")
df_bancos_parquet.write.parquet("s3://atividade6/data/trusted/bancos_trusted_parquet", mode="overwrite")




In [30]:
df_empregados_parquet = spark.read.parquet("s3://atividade6/data/raw/empregados_parquet")
df_empregados_parquet = df_empregados_parquet.withColumn(
    "CNPJ",
    col("CNPJ")
    .cast("string")
    )
df_empregados_parquet = df_empregados_parquet \
    .withColumnRenamed("employer-website", "employer_website") \
    .withColumnRenamed("employer-headquarters", "employer_headquarters") \
    .withColumnRenamed("employer-founded", "employer_founded") \
    .withColumnRenamed("employer-industry", "employer_industry") \
    .withColumnRenamed("employer-revenue", "employer_revenue") \
    .withColumnRenamed("Geral", "geral") \
    .withColumnRenamed("Cultura e valores", "cultura_e_valores") \
    .withColumnRenamed("Diversidade e inclusão", "diversidade_e_inclusao") \
    .withColumnRenamed("Qualidade de vida", "qualidade_de_vida") \
    .withColumnRenamed("Alta liderança", "alta_lideranca") \
    .withColumnRenamed("Remuneração e benefícios", "remuneracao_e_beneficios") \
    .withColumnRenamed("Oportunidades de carreira", "oportunidades_de_carreira") \
    .withColumnRenamed("Recomendam para outras pessoas(%)", "recomendam_para_outras_pessoas") \
    .withColumnRenamed("Perspectiva positiva da empresa(%)", "perspectiva_positiva_da_empresa") \
    .withColumnRenamed("CNPJ", "cnpj") \
    .withColumnRenamed("Nome", "nome") \
    .withColumnRenamed("Segmento", "segmento")

df_empregados_parquet.write.parquet("s3://atividade6/data/trusted/empregados_trusted_parquet", mode="overwrite")





In [38]:
df_reclamacoes_parquet = spark.read.parquet("s3://atividade6/data/raw/reclamacoes_parquet")
df_reclamacoes_parquet = df_reclamacoes_parquet \
    .withColumnRenamed("Ano", "ano") \
    .withColumnRenamed("Trimestre", "trimestre") \
    .withColumnRenamed("Categoria", "categoria") \
    .withColumnRenamed("Tipo", "tipo") \
    .withColumnRenamed("CNPJ IF", "cnpj") \
    .withColumnRenamed("Instituição financeira", "nome") \
    .withColumnRenamed("Índice", "indice") \
    .withColumnRenamed("Quantidade de reclamações reguladas procedentes", "quantidade_de_reclamacoes_reguladas_procedentes") \
    .withColumnRenamed("Quantidade de reclamações reguladas - outras", "quantidade_de_reclamacoes_reguladas_outras") \
    .withColumnRenamed("Quantidade de reclamações não reguladas", "quantidade_de_reclamacoes_nao_reguladas") \
    .withColumnRenamed("Quantidade total de reclamações", "quantidade_total_de_reclamacoes") \
    .withColumnRenamed("Quantidade total de clientes \x96 CCS e SCR", "quantidade_total_de_clientes_ccs_e_scr") \
    .withColumnRenamed("Quantidade de clientes \x96 CCS", "quantidade_de_clientes_ccs") \
    .withColumnRenamed("Quantidade de clientes \x96 SCR", "quantidade_de_clientes_scr")
df_reclamacoes_parquet = df_reclamacoes_parquet.withColumn(
    'nome',
    regexp_replace(
        col('nome'),
        ' (conglomerado)',
        '')
    )
df_reclamacoes_parquet.write.parquet("s3://atividade6/data/trusted/reclamacoes_trusted_parquet", mode="overwrite")


+----+---------+--------------------+----------------+--------+--------------------+------+-----------------------------------------------+------------------------------------------+---------------------------------------+-------------------------------+--------------------------------------+--------------------------+--------------------------+----+
| ano|trimestre|           categoria|            tipo|    cnpj|                nome|indice|quantidade_de_reclamacoes_reguladas_procedentes|quantidade_de_reclamacoes_reguladas_outras|quantidade_de_reclamacoes_nao_reguladas|quantidade_total_de_reclamacoes|quantidade_total_de_clientes_ccs_e_scr|quantidade_de_clientes_ccs|quantidade_de_clientes_scr|_c14|
+----+---------+--------------------+----------------+--------+--------------------+------+-----------------------------------------------+------------------------------------------+---------------------------------------+-------------------------------+--------------------------------------+-

In [40]:
df_bancos_trusted = spark.read.parquet("s3://atividade6/data/trusted/bancos_trusted_parquet")
df_empregados_trusted = spark.read.parquet("s3://atividade6/data/trusted/empregados_trusted_parquet")
df_reclamacoes_trusted = spark.read.parquet("s3://atividade6/data/trusted/reclamacoes_trusted_parquet")

df_bancos_empregados = df_bancos_trusted.join(
    df_empregados_trusted, 
    on=['cnpj', 'nome', 'segmento'], 
    how='left'
).dropDuplicates(['cnpj'])
                                            
df_reclamacoes_trusted.write.csv("s3://atividade6/data/delivery/reclamacoes_delivery_csv", mode="overwrite")                                         
                                            

df_final = df_reclamacoes_trusted.join(
    df_bancos_empregados, 
    on=['cnpj', 'nome'], 
    how='inner'
)

df_final.write.parquet("s3://atividade6/data/delivery/final_delivery_parquet", mode="overwrite")





In [41]:
df_final = spark.read.parquet("s3://atividade6/data/delivery/final_delivery_parquet")
df_final.write.format("jdbc").options(
    url="jdbc:mysql://arn:3306/atividade6",
    driver="com.mysql.cj.jdbc.Driver",
    user="atividade6",
    password="atividade6",
    dbtable="final_delivery").mode("overwrite").save()


