# Case - Machine Learning Engineering


**O Problema**

O **Minhas Finanças** é um produto no PicPay para organizar e comparar os gastos financeiros dos nossos usuários. Desta forma, o time de MLOPS do PicPay tem o objetivo de disponibilizar uma base de dados tratada para ser analisada pelos Cientistas de dados.

O **objetivo** desse exercício é realizar o tratamento de dados para analise das das transações que foram categorizadas.

De forma objetiva, o seu output final deve ser uma tabela conforme a imagem abaixo.


![DataFrame Esperado](resultado_esperado.png)


**Importante!**
  - Você deverá disponibilizar o código em um repositório no GitHub.
  - Recomendamos o uso de Pandas ou PySpark para tratamento dos dados.

**Ambiente de desenvolvimento**
  - Recomendamos o uso do Google Colab ou do Jupyter Notebook Localmente.

**Onde estão os dados?**
 - Dentro da pasta dataset existem dois arquivos `.csv` com os seguintes nomes: `bank_dim` e `transations`.
  

**Schema dos dados de entrada**

- **`transactions`:**
  - **transaction_id**: Chave de identificação da transação
  - **user_id**: Chave de identificação do usuário no sistema.
  - **transaction_name_raw**: Nome da transação em seu formato original
  - **transaction_name_treated**: Nome da transação com tratamento aplicado
  - **transaction_amount**: Valor da transação
  - **bank_id**: Chave de Identificação do banco de origem.
  - **year**: Ano da transação
  - **month**: Mês da transação
  - **day**: Dia da transação

- **`bank_dim`**
  - **bank_name**: Nome do banco.
  - **bank_id**: Identificador do banco.


# Passos para montar a tabela 

1. Leitura dos dados brutos
2. Verificação de valores nulos 
3. Tratamento de strings: 
   - Remover os espaços do inicio e final das palavras (método do trim)
   - Deixar as letras das variáveis minusculas `transaction_name_raw` e `transaction_name_treated`.
   - Substituir os valores de `,` para `.` da variável `transaction_amount`.
   - Concatenar as informações de `year`e `month`.
   - Recolher as informações de `bank_name` através do join de `transaction.csv` com `bank_dim.csv`.

4. Mudando dtypes das variáveis:
    - BIGINT: `transaction_id`
    - INT: `user_id`
    - NUMERIC(20,14): `transaction_amount`
    - DATE: `year_month`
    - STRINGS: `transaction_name_raw` e `transaction_name_treated`.


5. Montar a tabela
   - Agrupar os dados de `user_id`, `bank_name` e `yaer_month`.
   - Pivotar a tabela com a coluna `transaction_name_treated`.
   - Somar os valores dos dados da coluna `transaction_amount`.


## 1 - Reading data

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

23/07/31 11:10:55 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.47.27 instead (on interface wlp3s0)
23/07/31 11:10:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/31 11:10:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df_transactions_raw = spark.read \
                .csv('./dataset/raw/transactions.csv', header=True)
df_bank = spark.read \
               .csv('./dataset/raw/bank_dim.csv',header=True)

                                                                                

In [3]:
print('num of samples: ',df_transactions_raw.count())
print('num of samples: ', df_bank.count())

num of samples:  209
num of samples:  3


In [4]:
df_transactions_raw.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- transaction_name_raw: string (nullable = true)
 |-- transaction_name_treated: string (nullable = true)
 |-- transaction_amount: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- bank_id: string (nullable = true)



In [5]:
df_bank.printSchema()

root
 |-- bank_name: string (nullable = true)
 |-- bank_id: string (nullable = true)



In [6]:
df_transactions_raw.show()

+--------------+-------+--------------------+------------------------+------------------+----+-----+---+-------+
|transaction_id|user_id|transaction_name_raw|transaction_name_treated|transaction_amount|year|month|day|bank_id|
+--------------+-------+--------------------+------------------------+------------------+----+-----+---+-------+
|    1098589746|   1006| MERCADOLIVRE*HADASH|                 MERCADO|              -450|2022|   12| 12|   7002|
|     923448688|   1005|MERCADOLIVRE*JURE...|                 MERCADO|               -52|2022|   12| 12|   6001|
|     535521958|   1006|MERCADOPAGO*MERCE...|                 MERCADO|             -14,5|2022|   10| 24|   7002|
|     550852764|   1005|Uber   *UBER   *TRIP|                    UBER|            -33,55|2022|   12| 12|   6001|
|    1187027961|   1005|DEB MAESTRO - DRO...|                DROGARIA|            -29,99|2022|   12| 12|   6001|
|     446972585|   1005|COMPRA C/CARTAO 1...|                    uber|            -12,31|2022|  

In [7]:
df_transactions_raw.createOrReplaceTempView('df_transactions_raw')

In [8]:
df_bank.show()

+---------+-------+
|bank_name|bank_id|
+---------+-------+
|   Nubank|   6001|
| Bradesco|   7002|
|Santander|   5003|
+---------+-------+



In [9]:
df_bank.createOrReplaceTempView('df_bank')

## 2 - Checking for null values

In [10]:
df_transactions_raw.filter('''user_id is NULL 
                 or transaction_id is NULL
                 or bank_id is NULL
                 or transaction_name_raw is NULL
                 or transaction_name_treated is NULL
                 or transaction_amount is NULL 
                 or year is NULL
                 or month is NULL 
                 or day is NULL''')\
                .count()

0

## 3 - Treating strings

In [11]:
df_transactions_clean = spark.sql('''
     SELECT 
        trim(BOTH from transaction_id) as transaction_id,
        trim(BOTH from user_id) as user_id,
        trim(BOTH from df_bank.bank_name) as bank_name,
        lower(trim(BOTH from transaction_name_raw)) as transaction_name_raw,
        lower(trim(BOTH from transaction_name_treated)) as transaction_name_treated,
        regexp_replace(trim(BOTH from transaction_amount),r"[,]",".") as transaction_amount,
        trim(BOTH from year) || '-' || trim(BOTH from month) as year_month
     FROM df_transactions_raw
     JOIN df_bank
     ON df_bank.bank_id=df_transactions_raw.bank_id
''')

In [12]:
df_transactions_clean.createOrReplaceTempView('df_transactions_clean')

In [13]:
df_transactions_clean.show()

+--------------+-------+---------+--------------------+------------------------+------------------+----------+
|transaction_id|user_id|bank_name|transaction_name_raw|transaction_name_treated|transaction_amount|year_month|
+--------------+-------+---------+--------------------+------------------------+------------------+----------+
|    1098589746|   1006| Bradesco| mercadolivre*hadash|                 mercado|              -450|   2022-12|
|     923448688|   1005|   Nubank|mercadolivre*jure...|                 mercado|               -52|   2022-12|
|     535521958|   1006| Bradesco|mercadopago*merce...|                 mercado|             -14.5|   2022-10|
|     550852764|   1005|   Nubank|uber   *uber   *trip|                    uber|            -33.55|   2022-12|
|    1187027961|   1005|   Nubank|deb maestro - dro...|                drogaria|            -29.99|   2022-12|
|     446972585|   1005|   Nubank|compra c/cartao 1...|                    uber|            -12.31|   2022-12|
|

## 4 - Check for cardinality for strings

In [14]:
spark.sql('''SELECT DISTINCT transaction_name_treated FROM df_transactions_clean''').show()

+------------------------+
|transaction_name_treated|
+------------------------+
|                drogaria|
|                 spotify|
|                 netflix|
|                   ifood|
|                   luiza|
|              auto posto|
|                farmacia|
|                    uber|
|                  subway|
|                   posto|
|                 mercado|
+------------------------+



## 5 - Change dtypes

In [15]:
df_transactions_treated = spark.sql('''
     SELECT 
        CAST(transaction_id as BIGINT) as transaction_id,
        CAST(user_id as INT) as user_id,
        bank_name,
        transaction_name_raw,
        transaction_name_treated,
        CAST(transaction_amount as NUMERIC(20,14)) as transaction_amount,
        DATE_FORMAT(CAST(year_month as DATE),'y-M') as year_month
     FROM df_transactions_clean
''')

In [16]:
df_transactions_treated.createOrReplaceTempView('df_transactions_treated')

In [17]:
df_transactions_treated.show()

+--------------+-------+---------+--------------------+------------------------+-------------------+----------+
|transaction_id|user_id|bank_name|transaction_name_raw|transaction_name_treated| transaction_amount|year_month|
+--------------+-------+---------+--------------------+------------------------+-------------------+----------+
|    1098589746|   1006| Bradesco| mercadolivre*hadash|                 mercado|-450.00000000000000|   2022-12|
|     923448688|   1005|   Nubank|mercadolivre*jure...|                 mercado| -52.00000000000000|   2022-12|
|     535521958|   1006| Bradesco|mercadopago*merce...|                 mercado| -14.50000000000000|   2022-10|
|     550852764|   1005|   Nubank|uber   *uber   *trip|                    uber| -33.55000000000000|   2022-12|
|    1187027961|   1005|   Nubank|deb maestro - dro...|                drogaria| -29.99000000000000|   2022-12|
|     446972585|   1005|   Nubank|compra c/cartao 1...|                    uber| -12.31000000000000|   2

## 6 - Grouping info about transactions

In [18]:
from pyspark.sql.column import cast

df_transactions_final=df_transactions_treated.groupBy(['user_id','year_month','bank_name'])\
                             .pivot('transaction_name_treated') \
                             .sum('transaction_amount')

In [19]:
df_transactions_final.show()

+-------+----------+---------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+------------------+-------------------+-------------------+
|user_id|year_month|bank_name|         auto posto|           drogaria|           farmacia|              ifood|              luiza|             mercado|            netflix|              posto|           spotify|             subway|               uber|
+-------+----------+---------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+------------------+-------------------+-------------------+
|   1006|   2022-10| Bradesco|               null|               null|               null| -73.00000000000000|               null|-1165.89000000000000|               null|               null|              null|               null|               nu

In [20]:
df_transactions_final.createOrReplaceTempView('user_transactions_by_name')

In [21]:
final_df = df_transactions_final.toPandas()
final_df.shape

(14, 14)

# 7 - Saving dataset

In [22]:
final_df.to_csv('./dataset/processed/user_transactions_by_name.csv',index=False)