# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [2]:
import os
import glob
import psycopg2
import re

import pandas as pd
from sql_queries import *

In [2]:
conn = psycopg2.connect("host=127.0.0.1 user=postgres password=nubank")
conn.set_session(autocommit=True)
cur = conn.cursor()

In [3]:
cur.execute("DROP DATABASE IF EXISTS nubank")
cur.execute("CREATE DATABASE nubank WITH ENCODING 'utf8' TEMPLATE template0")

# close connection to default database
conn.close()    
    
 # connect to sparkify database
conn = psycopg2.connect("host=127.0.0.1 dbname=nubank user=postgres password=nubank")
cur = conn.cursor()

ObjectInUse: database "nubank" is being accessed by other users
DETAIL:  There are 3 other sessions using the database.


In [2]:
conn = psycopg2.connect("host=127.0.0.1 dbname=nubank user=postgres password=nubank")
cur = conn.cursor()

In [6]:
for query in drop_table_queries:
        cur.execute(query)
        conn.commit()

for query in create_table_queries:
        cur.execute(query)
        conn.commit()

In [7]:
#VERIFY THE DB TABLES
cur.execute("select relname from pg_class where relkind='r' and relname !~ '^(pg_|sql_)';")
print(cur.fetchall())

[('accounts_table',), ('city_table',), ('costumers_table',), ('country_table',), ('d_month_table',), ('d_time_table',), ('d_week_table',), ('d_weekday_table',), ('d_year_table',), ('pix_movements_table',), ('state_table',), ('transfer_ins_table',), ('transfer_outs_table',)]


In [3]:
def insert_function(data, query):
        for i, row in data.iterrows():
                cur.execute(query, list(row))
        conn.commit()


# Process table data
In this first part, you'll perform ETL on the tables

### Insert data in accounts_table

In [9]:
accounts_df = pd.read_csv("./tables/tables/accounts_table.csv")
insert_function(accounts_df, accounts_table_insert)

### Insert data in city_table

In [10]:
city_df = pd.read_csv("./tables/tables/city_table.csv")
city_df = city_df[['city_id','city','state_id']]
insert_function(city_df, city_table_insert)

### Insert data in costumers_table

In [12]:
costumers_df = pd.read_csv("./tables/tables/costumers_table.csv")
costumers_df.head()
insert_function(costumers_df, costumers_table_insert)

### Insert data in country_table

In [13]:
country_df = pd.read_csv("./tables/tables/country_table.csv")
country_df = country_df[["country_id", "country"]]
insert_function(country_df, country_table_insert)

### Insert data in d_month_table

In [14]:
d_month_table_df = pd.read_csv("./tables/tables/d_month_table.csv")
insert_function(d_month_table_df, d_month_table_insert)

### Insert data in d_time_table

In [15]:
d_time_table_df = pd.read_csv("./tables/tables/d_time_table.csv")
tmp_df = "./tables/tables/tmp.csv"
d_time_table_df.to_csv(tmp_df,index=False , header=False)
f = open(tmp_df, 'r')
cur.copy_from(f, "d_time_table", sep=",")
conn.commit()

### Insert data in d_week_table

In [17]:
d_week_table_df = pd.read_csv("./tables/tables/d_week_table.csv")
insert_function(d_week_table_df, d_week_table_insert)

UniqueViolation: duplicate key value violates unique constraint "d_week_table_pkey"
DETAIL:  Key (weekday_id)=(102414000) already exists.


### Insert data in d_weekday_table

In [4]:
d_weekday_table_df = pd.read_csv("./tables/tables/d_weekday_table.csv")
insert_function(d_weekday_table_df, d_weekday_table_insert)

### Insert data in d_year_table

In [5]:
d_year_table_df = pd.read_csv("./tables/tables/d_year_table.csv")
insert_function(d_year_table_df, d_year_table_insert)

### Insert data in state_table

In [6]:
state_table_df = pd.read_csv("./tables/tables/state_table.csv")
state_table_df = state_table_df[["state_id", "state", "country_id"]]
insert_function(state_table_df, state_table_insert)

### Insert data in pix_movements_table

In [7]:
pix_movements_table_df = pd.read_csv("./tables/tables/pix_movements_table.csv")
pix_movements_table_df

tmp_df = "./tables/tables/tmp.csv"
pix_movements_table_df.to_csv(tmp_df,index=False , header=False)
f = open(tmp_df, 'r')
cur.copy_from(f, "pix_movements_table", sep=",")
conn.commit()

### Insert data in transfer_ins_table

In [8]:
transfer_ins_table_df = pd.read_csv("./tables/tables/transfer_ins_table.csv")
tmp_df = "./tables/tables/tmp.csv"
transfer_ins_table_df.to_csv(tmp_df,index=False , header=False)
f = open(tmp_df, 'r')
cur.copy_from(f, "transfer_ins_table", sep=",")
conn.commit()

### Insert data in transfer_outs_table

In [9]:
transfer_outs_table_df = pd.read_csv("./tables/tables/transfer_outs_table.csv")
tmp_df = "./tables/tables/tmp.csv"
transfer_outs_table_df.to_csv(tmp_df,index=False , header=False)
f = open(tmp_df, 'r')
cur.copy_from(f, "transfer_outs_table", sep=",")
conn.commit()

### Close connection

In [10]:
conn.close()

# Answering the questions: 

## 1. Create a SQL query to help Jane retrieving the monthly balance of all accounts (this query should be made using the warehouse structure before the changes you propose on 2.)

Your colleague Jane Hopper, the analyst in charge of analysing customer behaviour, who directly
consumes data from the Data Warehouse Environment, needs to get all the account's monthly
balances between Jan/2020 and Dec/2020. She wasn't able to do it alone, and asked for your help.
Add to your resolution the SQL query used to retrieve the data needed (the necessary tables in csv
format were sent along with this pdf, on folder tables/). Feel free to use the dialect of your choice,
but please specify the SQL engine.


## Resolution:

### Developing the query to Account Monthly Balance

The original table schema has three tables that contain movements in the costumers accounts, so in order to help Jane I need to put it all together and create the expected columns in the figure bellow.

![](./images/balance_table.jpg)

Note: In the image above the column costumer is present, but Jane want to know the account balance so to do this the account_id column will be used insteade costumer column.


1. Using the `UNION ALL` combined with `SELECT` the result put all the data contained in the tables together one appending the tables creating a subquery.
2. We need to create the columns with INs and OUTs, in the pix transactions we have to distinct between the IN and OUT movements, where I used `CASE` to make this distinction using the column `in_or_out` present in the pix table:


```sql 
    CASE in_or_out WHEN 'pix_out' then pix_amount ELSE 0 END as SAIDA,
    CASE in_or_out WHEN  'pix_in' then pix_amount ELSE 0 END as ENTRADA
``` 
In the tables of usual transactions the distinction was not necessary, because each table contain one specific kind of transaction, so I returned the amount in the column that represents the kind of transaction and 0(zero) in the other column, like above:

In table:
```sql
        SELECT account_id,
        0 AS SAIDA, 
        amount as ENTRADA,
```

Out table:
```sql
        SELECT account_id,
        amount as SAIDA,
        0 AS ENTRADA,
```

3. To get the mounth column, first I converted the timestamp that contains the date when the transaction was completed to a datetime format and after with `DATE_PART` specifing month as my part of interest, I got the month:

```sql
        DATE_PART('month', to_timestamp(CAST(transaction_completed_at as numeric)/1000)) MES
```

4. The `WHERE` clause is filtering the transactions that were completed and occured between JAN/2020 and DEC/2020:

```sql
        WHERE (status = 'completed') AND (DATE_PART('year', to_timestamp(CAST(pix_completed_at as numeric)/1000))  = 2020)
```

5. With the data returned in the subquery `transactions` is necessary to sum all the In and all the Out movements separately grouping then by account_id and month, after the monthly balance is a result of the Ins - Outs for each month and account. All this columns were rounded with two decimal places that is enough when we talk about monetary values.


Select statement:
```sql
        SELECT CAST(mes AS INT), account_id,
        ROUND(CAST(sum(entrada) AS NUMERIC),2) AS entradas_total,
        ROUND(CAST(SUM(saida) AS NUMERIC),2) as saidas_total,
        ROUND(CAST((sum(entrada) - SUM(saida))AS NUMERIC),2) as balanco

```

Group by and Order by statement:

```sql
        GROUP BY mes, account_id
        ORDER BY account_id, mes

```

### Importing the `create_engine` from `sqlalchemy` to create the connection to our Postgres database.

In [4]:
from sqlalchemy import create_engine

In [None]:
engine = create_engine('postgresql://postgres:nubank@127.0.0.1/nubank')

### Using the `pd.read_sql` and the previous created `engine` to execute our SQL query and create a pandas dataframe with the result.

In [30]:
balance = pd.read_sql('''
SELECT CAST(mes AS INT), account_id,
ROUND(CAST(sum(entrada) AS NUMERIC),2) AS entradas_total,
ROUND(CAST(SUM(saida) AS NUMERIC),2) as saidas_total,
ROUND(CAST((sum(entrada) - SUM(saida))AS NUMERIC),2) as balanco
FROM (  SELECT account_id, 
        CASE in_or_out WHEN 'pix_out' then pix_amount ELSE 0 END as SAIDA, 
        CASE in_or_out WHEN  'pix_in' then pix_amount ELSE 0 END as ENTRADA,
        DATE_PART('month', to_timestamp(CAST(pix_completed_at as numeric)/1000)) MES
        FROM pix_movements_table
        WHERE (status = 'completed') AND (DATE_PART('year', to_timestamp(CAST(pix_completed_at as numeric)/1000))  = 2020)

        UNION ALL

        SELECT account_id,
        0 AS SAIDA, 
        amount as ENTRADA,
        DATE_PART('month', to_timestamp(CAST(transaction_completed_at as numeric)/1000)) MES
        FROM transfer_ins_table
        WHERE (status = 'completed') AND (DATE_PART('year', to_timestamp(CAST(transaction_completed_at as numeric)/1000)) = 2020)

        UNION ALL

        SELECT account_id,
        amount as SAIDA,
        0 AS ENTRADA,
        DATE_PART('month', to_timestamp(CAST(transaction_completed_at as numeric)/1000)) MES
        FROM transfer_outs_table
        WHERE (status = 'completed') AND (DATE_PART('year', to_timestamp(CAST(transaction_completed_at as numeric)/1000)) = 2020)) as transactions
GROUP BY mes, account_id
ORDER BY account_id, mes
''', engine)

### Getting informations about the created pandas dataframe

In [27]:
balance.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 45557 entries, 0 to 45556
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   mes             45557 non-null  int64  
 1   account_id      45557 non-null  int64  
 2   entradas_total  45557 non-null  float64
 3   saidas_total    45557 non-null  float64
 4   balanco         45557 non-null  float64
dtypes: float64(3), int64(2)
memory usage: 1.7 MB


### Looking into the `balance` dataframe to see the result of our query

In [28]:
balance.head(20)

Unnamed: 0,mes,account_id,entradas_total,saidas_total,balanco
0,1,2569200459575096,6622.15,1826.3,4795.85
1,2,2569200459575096,2510.59,4934.91,-2424.32
2,3,2569200459575096,2221.55,3205.17,-983.62
3,4,2569200459575096,4492.74,540.4,3952.34
4,5,2569200459575096,1825.52,2429.55,-604.03
5,6,2569200459575096,2322.68,3059.74,-737.06
6,7,2569200459575096,3003.19,8062.59,-5059.4
7,8,2569200459575096,80.6,2333.27,-2252.67
8,9,2569200459575096,2401.67,3796.14,-1394.47
9,10,2569200459575096,1542.31,457.52,1084.79


## 2. Improve the data warehouse architecture and justify your changes

Imagine now that you could remodel the data warehouse environment freely, keeping in mind that
Nubank is always evolving with new products (Whatsapp Payments, PIX, phone recharge, etc) and it
is also expanding to new countries, so our data warehouse needs to accommodate all these
incoming changes. Which modifications would you propose and why? Remember to consider that
other analysts will be using the same structure, so it should be as clear as possible. Feel free to
change, remove or add tables/fields to generate a better database design