# consumer

In [2]:
import os
import json

import boto3
import awswrangler as wr

BUCKET = os.environ.get("raw_bucket_name", "atividade7-058264162662")
SQS_URL = os.environ.get("sqs_url", "https://sqs.us-east-1.amazonaws.com/058264162662/atividade7-producer-queue")

In [3]:
session = boto3.Session(profile_name="mba")

In [4]:
print("starting the lambda")
s3_folder = f"s3://{BUCKET}/reclamacoes/"
queue_url = f"{SQS_URL}"
sqs = session.client("sqs")
print(f"s3: {s3_folder} and sqs: {queue_url}")

starting the lambda
s3: s3://atividade7-058264162662/reclamacoes/ and sqs: https://sqs.us-east-1.amazonaws.com/058264162662/atividade7-producer-queue


In [5]:
# List all CSV files in the folder
files = wr.s3.list_objects(s3_folder, suffix=".csv", boto3_session=session)
files[:1]

['s3://atividade7-058264162662/reclamacoes/2021_tri_01.csv']

In [6]:
file_path = files[:1]

In [7]:
print(f"reading the data from {file_path}")

# Read CSV into DataFrame
df = wr.s3.read_csv(file_path, sep=";", encoding="utf-8", boto3_session=session)
print(f"date read {df.shape}")

reading the data from ['s3://atividade7-058264162662/reclamacoes/2021_tri_01.csv']
date read (105, 15)


In [8]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 105 entries, 0 to 104
Data columns (total 15 columns):
 #   Column                                           Non-Null Count  Dtype  
---  ------                                           --------------  -----  
 0   Ano                                              105 non-null    int64  
 1   Trimestre                                        105 non-null    object 
 2   Categoria                                        105 non-null    object 
 3   Tipo                                             105 non-null    object 
 4   CNPJ IF                                          105 non-null    object 
 5   Instituição financeira                           105 non-null    object 
 6   Índice                                           105 non-null    object 
 7   Quantidade de reclamações reguladas procedentes  105 non-null    int64  
 8   Quantidade de reclamações reguladas - outras     105 non-null    int64  
 9   Quantidade de reclamações não re

In [9]:
column_rename = {
    "CNPJ IF": "cnpj",
    "Instituição financeira": "name",
    "Categoria": "category",
    "Tipo": "type",
    "Trimestre": "quarter",
    "Ano": "year",
    "Índice": "complaint_index",
    "Quantidade de reclamações reguladas procedentes": "regulated_complaints_upheld",
    "Quantidade de reclamações reguladas - outras": "regulated_complaints_other",
    "Quantidade de reclamações não reguladas": "unregulated_complaints",
    "Quantidade total de reclamações": "total_complaints",
    "Quantidade total de clientes – CCS e SCR": "total_clients_ccs_scr",
    "Quantidade de clientes – CCS": "clients_ccs",
    "Quantidade de clientes – SCR": "clients_scr",
}
reclamacoes_schema = {
    "cnpj": "string",
    "name": "string",
    "category": "string",
    "type": "string",
    "quarter": "string",
    "year": "string",
    "complaint_index": "string",
    "regulated_complaints_upheld": "string",
    "regulated_complaints_other": "string",
    "unregulated_complaints": "string",
    "total_complaints": "string",
    "total_clients_ccs_scr": "string",
    "clients_ccs": "string",
    "clients_scr": "string",
}
reclamacoes_treated = df.rename(columns=column_rename)[column_rename.values()]

In [10]:
reclamacoes_treated.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 105 entries, 0 to 104
Data columns (total 14 columns):
 #   Column                       Non-Null Count  Dtype 
---  ------                       --------------  ----- 
 0   cnpj                         105 non-null    object
 1   name                         105 non-null    object
 2   category                     105 non-null    object
 3   type                         105 non-null    object
 4   quarter                      105 non-null    object
 5   year                         105 non-null    int64 
 6   complaint_index              105 non-null    object
 7   regulated_complaints_upheld  105 non-null    int64 
 8   regulated_complaints_other   105 non-null    int64 
 9   unregulated_complaints       105 non-null    int64 
 10  total_complaints             105 non-null    int64 
 11  total_clients_ccs_scr        105 non-null    int64 
 12  clients_ccs                  105 non-null    object
 13  clients_scr                  105 no

In [11]:
reclamacoes_treated["cnpj"] = reclamacoes_treated["cnpj"] \
    .str.strip() \
    .str.lower()
reclamacoes_treated["name"] = reclamacoes_treated["name"] \
    .str.strip() \
    .str.lower() \
    .str.replace(r"\s*\(conglomerado\)$|\s*s\.a\.?|\s*s\/a|ltda\.?$", "", regex=True) \
    .str.strip()
reclamacoes_treated = reclamacoes_treated.astype(reclamacoes_schema)
print(f"data amount {reclamacoes_treated.shape}")

data amount (105, 14)


In [12]:
import pandas as pd
reclamacoes_treated["cnpj"] = reclamacoes_treated["cnpj"].replace("", pd.NA)

In [13]:
reclamacoes_treated.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 105 entries, 0 to 104
Data columns (total 14 columns):
 #   Column                       Non-Null Count  Dtype 
---  ------                       --------------  ----- 
 0   cnpj                         45 non-null     string
 1   name                         105 non-null    string
 2   category                     105 non-null    string
 3   type                         105 non-null    string
 4   quarter                      105 non-null    string
 5   year                         105 non-null    string
 6   complaint_index              105 non-null    string
 7   regulated_complaints_upheld  105 non-null    string
 8   regulated_complaints_other   105 non-null    string
 9   unregulated_complaints       105 non-null    string
 10  total_complaints             105 non-null    string
 11  total_clients_ccs_scr        105 non-null    string
 12  clients_ccs                  105 non-null    string
 13  clients_scr                  105 no

In [14]:
reclamacoes_treated.tail(5)

Unnamed: 0,cnpj,name,category,type,quarter,year,complaint_index,regulated_complaints_upheld,regulated_complaints_other,unregulated_complaints,total_complaints,total_clients_ccs_scr,clients_ccs,clients_scr
100,,volvo,Grupo Secundário,Conglomerado,1º,2021,,0,0,1,1,7736,88,7727.0
101,,votorantim,Top 10,Conglomerado,1º,2021,457.0,69,428,206,703,15074861,11512143,4668540.0
102,,western union,Grupo Secundário,Conglomerado,1º,2021,,3,0,0,3,1,1,
103,23862762.0,"will financeira crédito, financiamento e inves...",Grupo Secundário,Banco/financeira,1º,2021,,16,15,17,48,1548186,1546806,314656.0
104,,yamaha motor,Grupo Secundário,Conglomerado,1º,2021,,6,4,7,17,197837,61,197782.0


In [15]:
# Send each row as a JSON message to SQS
for _, row in reclamacoes_treated.iterrows():
    message_body = json.dumps(row.to_dict(), default=str)
    break

In [16]:
json.loads(message_body)

{'cnpj': None,
 'name': 'abc-brasil',
 'category': 'Grupo Secundário',
 'type': 'Conglomerado',
 'quarter': '1º',
 'year': '2021',
 'complaint_index': ' ',
 'regulated_complaints_upheld': '2',
 'regulated_complaints_other': '3',
 'unregulated_complaints': '4',
 'total_complaints': '9',
 'total_clients_ccs_scr': '26230',
 'clients_ccs': '24698',
 'clients_scr': '3810'}

In [23]:
i = 0 

# Send each row as a JSON message to SQS
for _, row in reclamacoes_treated.iterrows():
    message_body = json.dumps(row.to_dict(), default=str)
    sqs.send_message(QueueUrl=queue_url, MessageBody=message_body)
    print(i)
    if i == 15:
        break
    i += 1
print(f"data sent to {queue_url}")

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
data sent to https://sqs.us-east-1.amazonaws.com/058264162662/atividade7-producer-queue


In [19]:
reclamacoes_treated.head(15)

Unnamed: 0,cnpj,name,category,type,quarter,year,complaint_index,regulated_complaints_upheld,regulated_complaints_other,unregulated_complaints,total_complaints,total_clients_ccs_scr,clients_ccs,clients_scr
0,,abc-brasil,Grupo Secundário,Conglomerado,1º,2021,,2,3,4,9,26230,24698.0,3810
1,,agibank,Grupo Secundário,Conglomerado,1º,2021,5479.0,58,140,73,271,1058431,790848.0,693843
2,36321990.0,"agoracred sociedade de crédito, financiamento ...",Grupo Secundário,Banco/financeira,1º,2021,,3,3,0,6,420692,129.0,420563
3,27214112.0,"al5 crédito, financiamento e investimento",Grupo Secundário,Banco/financeira,1º,2021,,1,1,0,2,12645,4979.0,10112
4,,alfa,Grupo Secundário,Conglomerado,1º,2021,,14,44,15,73,412135,268186.0,145105
5,,andbank,Grupo Secundário,Conglomerado,1º,2021,,0,2,1,3,4383,4382.0,52
6,3532415.0,banco abn amro,Grupo Secundário,Banco/financeira,1º,2021,,0,0,3,3,98,71.0,31
7,54403563.0,banco arbi,Grupo Secundário,Banco/financeira,1º,2021,,1,0,0,1,18087,9312.0,8838
8,2992446.0,banco cnh industrial capital,Grupo Secundário,Banco/financeira,1º,2021,,0,1,2,3,28352,1238.0,27399
9,8357240.0,banco csf,Grupo Secundário,Banco/financeira,1º,2021,2253.0,165,94,51,310,7322283,,7322283


# consumer

In [None]:
import json
import boto3
import awswrangler as wr
import pandas as pd
import os

DATABASE = "atividade7"
OUTPUT_BUCKET = "athena-query-results-753251897225"

In [None]:
with open("lambdas/event.json", "r", encoding="utf-8") as file:
    event = json.load(file)

In [None]:
event["Records"][0]["body"].replace("'", '"')

In [None]:
json.loads(event["Records"][0]["body"].replace("'", '"'))

In [None]:
session = boto3.Session(profile_name="mfc-admin")

In [None]:
print(event)
results = []

for record in event["Records"]:
    body = json.loads(record["body"].replace("'", '"'))
    print(body)
    cnpj = body.get("cnpj")
    print(f"working with CNPJ {cnpj}")

    if not cnpj or cnpj == '':
        df_bancos = pd.DataFrame(columns=["name", "cnpj", "segment"])
        print("not retrieving from bancos")
    else:
        query = f"SELECT * FROM {DATABASE}.bancos WHERE cnpj = '{cnpj}'"
        df_bancos = wr.athena.read_sql_query(
            sql=query,
            database=DATABASE,
            ctas_approach=False,
            # s3_output=OUTPUT_BUCKET,
            boto3_session=session
        )
        print(f"data retrieved from bancos info")
    print(f"banco record found for CNPJ {df_bancos.shape}")

    df_event = pd.DataFrame([body])
    df_joined = df_event.merge(df_bancos[["cnpj", "segment"]], on="cnpj", how="left", suffixes=("", "_right"))
    results.append(df_joined)


In [None]:
final_df = pd.concat(results, ignore_index=True)
final_df

In [None]:
final_df.info()

In [None]:
schema = {
    "cnpj": "string",
    "name": "string",
    "category": "string",
    "type": "string",
    "quarter": "string",
    "year": "string",
    "complaint_index": "string",
    "regulated_complaints_upheld": "string",
    "regulated_complaints_other": "string",
    "unregulated_complaints": "string",
    "total_complaints": "string",
    "total_clients_ccs_scr": "string",
    "clients_ccs": "string",
    "clients_scr": "string",
    "segment": "string"
}
final_df.astype(schema)[schema.keys()]

In [None]:
if results:
    final_df = pd.concat(results, ignore_index=True)

    wr.athena.to_iceberg(
        df=final_df,
        database=DATABASE,
        table="reclamacoes_augmented",
        schema_evolution=True,
        mode="append",
        temp_path=f"s3://{OUTPUT_BUCKET}/athena-staging/", 
        boto3_session=session
    )
    print(f"inserted {len(final_df)} records into reclamacoes_augmented")
else:
    print("no records to insert")