# Construção da ABT
_ABT - Analytical Base Table_

---

## Sumário

1. **Importação da bibliotecas**
2. **Criação e iniciação de uma sessão PySpark**
3. **Criação dos datasets a partir da leitura dos arquivos \*.csv e \*.json**

<br>

---

<br>

## 1. Importação de bibliotecas

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, expr, count
import json
import csv

## 2. Criação e iniciação de uma sessão PySpark

In [23]:
appName = 'ABT - Transações de clientes'

# O objeto SparkSession configurado com configurações específicas
spark = SparkSession.builder \
    .appName(appName) \
    .config('spark.driver.memory', '8g') \
    .config('spark.executor.memory', '8g') \
    .config('spark.master', 'local[*]') \
    .getOrCreate()
    
spark.sparkContext.setLogLevel('ERROR')

spark

## 3. Criação dos datasets a partir da leitura dos arquivos *.csv e *.json

In [24]:
# Caminho dos arquivos *.csv e *.json
caminho = 'dados/'

### 3.1. Arquivos CSV

- cards_data.csv

In [25]:
# Dados de cartão de crédito
df_cards = spark.read.csv(caminho + 'cards_data.csv', header=True, inferSchema=True)

# Exibir o esquema do dataframe
df_cards.printSchema()

root
 |-- id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nullable = true)
 |-- expires: string (nullable = true)
 |-- cvv: integer (nullable = true)
 |-- has_chip: string (nullable = true)
 |-- num_cards_issued: integer (nullable = true)
 |-- credit_limit: string (nullable = true)
 |-- acct_open_date: string (nullable = true)
 |-- year_pin_last_changed: integer (nullable = true)
 |-- card_on_dark_web: string (nullable = true)



- transactions_data.csv

In [26]:
# Dados de transações
df_transactions = spark.read.csv(caminho + 'transactions_data.csv', header=True, inferSchema=True)

# Exibir o esquema do dataframe
df_transactions.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: string (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: double (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)



- users_data.csv

In [27]:
# Dados de clientes
df_clients = spark.read.csv(caminho + 'users_data.csv', header=True, inferSchema=True)

# Exibir o esquema do dataframe
df_clients.printSchema()

root
 |-- id: integer (nullable = true)
 |-- current_age: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income: string (nullable = true)
 |-- yearly_income: string (nullable = true)
 |-- total_debt: string (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)



### 3.2. Arquivos JSON

In [28]:
def process_json_to_csv(json_path, csv_path, list_columns, one_column=None):
    '''
    Processa um arquivo JSON e converte em CSV.

    :param json_path: path + filename.json
        Caminho do arquivo JSON de entrada.
    :param csv_path: path + filename.csv
        Caminho do arquivo CSV de saída.
    :param list_columns: list
        Lista com os nomes das colunas do CSV.
    :param one_column: string (default=None)
        Nome da chave no JSON para acessar dados aninhados (opcional).
    '''
    # Carregar o arquivo JSON
    with open(json_path, 'r', encoding='utf-8') as json_file:
        data = json.load(json_file)
        
    if not data:
        print('Arquivo JSON está vazio.')
        return
    
    # Processar dados aninhados, se necessário
    if one_column:
        data = data.get(one_column, {})
        if not data:
            print(f'A chave \'{one_column}\' não foi encontrada ou está vazia no JSON.')
            return
        print(f'Dados extraídos da chave \'{one_column}\'.')
    else:
        print('Processando JSON sem dados aninhados.')

    # Escrever o CSV
    with open(csv_path, 'w', encoding='utf-8', newline='') as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(list_columns)  # Escrevendo o cabeçalho
        
        for key, value in data.items():
            writer.writerow([key, value])  # Escrevendo as linhas
    
    print(f'Arquivo CSV gerado com sucesso em {csv_path}.')

- mcc_codes.json

In [29]:
# Processar mcc_codes.json
json_path = caminho + 'mcc_codes.json'
csv_path = caminho + 'mcc_codes.csv'
list_columns = ['code', 'description']
process_json_to_csv(json_path, csv_path, list_columns)

Processando JSON sem dados aninhados.
Arquivo CSV gerado com sucesso em dados/mcc_codes.csv.


In [30]:
# Dados de códigos de categoria de mercadorias
df_mcc = spark.read.csv(caminho + 'mcc_codes.csv', header=True, inferSchema=True)

# Exibir o esquema do dataframe
df_mcc.printSchema()

root
 |-- code: integer (nullable = true)
 |-- description: string (nullable = true)



- train_fraud_labels.json

In [31]:
# Processar train_fraud_labels.json
json_path = caminho + 'train_fraud_labels.json'
csv_path = caminho + 'train_fraud_labels.csv'
list_columns = ['transaction_id', 'is_fraud']
one_column = 'target'
process_json_to_csv(json_path, csv_path, list_columns, one_column)

Dados extraídos da chave 'target'.
Arquivo CSV gerado com sucesso em dados/train_fraud_labels.csv.


In [32]:
# Dados de rótulos de fraude
df_train_fraud = spark.read.csv(caminho + 'train_fraud_labels.csv', header=True, inferSchema=True)

# Exibir o esquema do dataframe
df_train_fraud.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- is_fraud: string (nullable = true)



## 4. Análise dos dados para construção da ABT

- Volumetria dos dataframes

In [33]:
def dataframe_volumetry(df, df_name):
    '''
    Exibe a quantidade de linhas e colunas de um dataframe.
    
    :param df: DataFrame
        O dataframe a ser analisado.
    :param df_name: string
        Nome do dataframe a ser analisado.
    '''
    print(f'Volumetria do \033[3m{df_name}\033[0m')
    print(f'Linhas: {df.count()}')
    print(f'Colunas: {len(df.columns)}\n')

In [34]:
dataframe_volumetry(df_cards, 'df_cards')
dataframe_volumetry(df_transactions, 'df_transactions')
dataframe_volumetry(df_clients, 'df_clients')
dataframe_volumetry(df_mcc, 'df_mcc')
dataframe_volumetry(df_train_fraud, 'df_train_fraud')

Volumetria do [3mdf_cards[0m
Linhas: 6146
Colunas: 13

Volumetria do [3mdf_transactions[0m
Linhas: 13305915
Colunas: 12

Volumetria do [3mdf_clients[0m
Linhas: 2000
Colunas: 14

Volumetria do [3mdf_mcc[0m
Linhas: 109
Colunas: 2

Volumetria do [3mdf_train_fraud[0m
Linhas: 8914963
Colunas: 2



- Criação de Views temporárias para cada dataframe

In [35]:
df_cards.createOrReplaceTempView('tb_cards')
df_clients.createOrReplaceTempView('tb_clients')
df_mcc.createOrReplaceTempView('tb_mcc')
df_train_fraud.createOrReplaceTempView('tb_train_fraud')
df_transactions.createOrReplaceTempView('tb_transactions')

- Visualizando os dados da tb_cards

In [36]:
df_cards.show(10)

+----+---------+----------+---------------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|  id|client_id|card_brand|      card_type|     card_number|expires|cvv|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|
+----+---------+----------+---------------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|4524|      825|      Visa|          Debit|4344676511950444|12/2022|623|     YES|               2|      $24295|       09/2002|                 2008|              No|
|2731|      825|      Visa|          Debit|4956965974959986|12/2020|393|     YES|               2|      $21968|       04/2014|                 2014|              No|
|3701|      825|      Visa|          Debit|4582313478255491|02/2024|719|     YES|               2|      $46414|       07/2003|                 2004|              No|
|  4

- Visualizando os dados da tb_clients

In [37]:
df_clients.show(10)

+----+-----------+--------------+----------+-----------+------+--------------------+--------+---------+-----------------+-------------+----------+------------+----------------+
|  id|current_age|retirement_age|birth_year|birth_month|gender|             address|latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+----+-----------+--------------+----------+-----------+------+--------------------+--------+---------+-----------------+-------------+----------+------------+----------------+
| 825|         53|            66|      1966|         11|Female|       462 Rose Lane|   34.15|  -117.76|           $29278|       $59696|   $127613|         787|               5|
|1746|         53|            68|      1966|         12|Female|3606 Federal Boul...|   40.76|   -73.74|           $37891|       $77254|   $191349|         701|               5|
|1718|         81|            67|      1938|         11|Female|     766 Third Drive|   34.02|  -117.89|           $

- Visualizando os dados da tb_mcc

In [38]:
df_mcc.show(10)

+----+--------------------+
|code|         description|
+----+--------------------+
|5812|Eating Places and...|
|5541|    Service Stations|
|7996|Amusement Parks, ...|
|5411|Grocery Stores, S...|
|4784|Tolls and Bridge ...|
|4900|Utilities - Elect...|
|5942|         Book Stores|
|5814|Fast Food Restaur...|
|4829|      Money Transfer|
|5311|   Department Stores|
+----+--------------------+
only showing top 10 rows



- Visualizando os dados da tb_train_fraud

In [39]:
df_train_fraud.show(10)

+--------------+--------+
|transaction_id|is_fraud|
+--------------+--------+
|      10649266|      No|
|      23410063|      No|
|       9316588|      No|
|      12478022|      No|
|       9558530|      No|
|      12532830|      No|
|      19526714|      No|
|       9906964|      No|
|      13224888|      No|
|      13749094|      No|
+--------------+--------+
only showing top 10 rows



- Visualzando os dados da tb_transactions

In [40]:
df_transactions.show(10)

+-------+-------------------+---------+-------+-------+------------------+-----------+-------------+--------------+-------+----+------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|merchant_city|merchant_state|    zip| mcc|errors|
+-------+-------------------+---------+-------+-------+------------------+-----------+-------------+--------------+-------+----+------+
|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00| Swipe Transaction|      59935|       Beulah|            ND|58523.0|5499|  null|
|7475328|2010-01-01 00:02:00|      561|   4575| $14.57| Swipe Transaction|      67570|   Bettendorf|            IA|52722.0|5311|  null|
|7475329|2010-01-01 00:02:00|     1129|    102| $80.00| Swipe Transaction|      27092|        Vista|            CA|92084.0|4829|  null|
|7475331|2010-01-01 00:05:00|      430|   2860|$200.00| Swipe Transaction|      27092|  Crown Point|            IN|46307.0|4829|  null|
|7475332|2010-01-01 00:06:00|      848|   3915| 

In [41]:
import sys

try:
    sys.exit()
except SystemExit:
    print('Encerrando o script, mas mantendo o ambiente interativo ativo.')


Encerrando o script, mas mantendo o ambiente interativo ativo.


In [42]:
# 

vw_cards = spark.sql('''
    SELECT
        id,
        client_id,
        card_brand,
        card_type,
        card_number,
        expires,
        cvv,
        has_chip,
        num_cards_issued,
        credit_limit,
        acct_open_date,
        year_pin_last_changed,
        card_on_dark_web
    FROM 
        tb_cards;
''')

vw_cards.show(10, truncate=False)

Py4JError: An error occurred while calling o29.sql. Trace:
py4j.Py4JException: Method sql([class java.lang.String, class [Ljava.lang.Object;]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)

