In [1]:
import pandas as pd
import numpy as np

# Bases de Dados

Nesta etapa vamos carregas as bases disponíveis para trabalharmos.

In [2]:
df_transacoes = pd.read_json('data/transactions.json')
df_users = pd.read_json('data/users.json')
df_customers = pd.read_json('data/customers.json')

In [3]:
df_transacoes.head()

Unnamed: 0,_id,tenantId,userId,createdAt,updatedAt,favoriteFruit,isFraud,document
0,641336efc5c272ff5d82eb76,64132ef6526eeb8b998e2d3c,64132e0f48ad8ae93e504d33,2021-01-08T02:24:02 +03:00,2023-02-08T01:52:32 +03:00,strawberry,True,"{'documentType': 'CNH', 'documentUF': 'PA'}"
1,641336efa086e60c7a834713,64132ef6526eeb8b998e2d3c,64132e0f1d87f017f64a3c20,2022-02-03T11:17:50 +03:00,2023-01-22T04:27:37 +03:00,apple,False,"{'documentType': 'CTPS', 'documentUF': 'PA'}"
2,641336ef5291e28207a6db05,64132ef6f991a3a1884f2cd0,64132e0fb4fade825fdbeed4,2021-09-03T01:40:07 +03:00,2023-02-10T06:17:25 +03:00,strawberry,False,"{'documentType': 'RG', 'documentUF': 'PA'}"
3,641336efbaf638cc19f96b71,64132ef6c9c22c95893e388b,64132e0fac889c7cbd7ebda3,2021-07-25T10:02:11 +03:00,2023-03-11T02:34:20 +03:00,strawberry,True,"{'documentType': 'CTPS', 'documentUF': 'BA'}"
4,641336ef24f561f394623973,64132ef68637da8c371cdc09,64132e0f16e8266d4823236c,2022-01-26T11:01:27 +03:00,2023-01-14T06:52:42 +03:00,strawberry,True,"{'documentType': 'CNH', 'documentUF': 'RJ'}"


In [4]:
df_users.head()

Unnamed: 0,_id,name,email,createdAt,birthdate
0,64132e0fa1b98612a1366a0a,Lucy Small,gallagherfowler@slax.com,2020-10-15T08:50:36 +03:00,1990-11-09T04:23:11 +02:00
1,64132e0f90329937be6d5b65,Alvarado Tanner,kathiekirk@ecratic.com,2021-03-31T04:35:35 +03:00,1989-07-29T03:39:13 +03:00
2,64132e0f429adbbe0334ad00,Tucker Wright,mckeebooth@deminimum.com,2021-01-12T06:28:25 +03:00,1992-10-11T06:36:20 +03:00
3,64132e0f05e9b1d120893ffe,Oneill Molina,banksgoodwin@fleetmix.com,2021-03-21T10:40:58 +03:00,1981-01-27T07:38:46 +03:00
4,64132e0f483bdb42b75e860c,Roberson Miller,ricewebb@biohab.com,2021-12-31T08:11:40 +03:00,1981-05-16T08:02:29 +03:00


In [5]:
df_customers.head()

Unnamed: 0,_id,fantasyName,cnpj,status,segment
0,64132ef6526eeb8b998e2d3c,Ecratic,38792916000102,Ativo,Banco
1,64132ef6f991a3a1884f2cd0,Stockpost,87715692000132,Inativo,Banco
2,64132ef6dca9067d51ca1b30,Eschoir,71365876000180,Inativo,Ecommerce
3,64132ef6eea665cc154f614a,Verton,62782441000140,Ativo,Banco
4,64132ef662f97d5858f87aba,Earthmark,46522418000160,Ativo,Ecommerce


## Qualidade dos Dados

Faça uma análise (simples) de qualidade da informação que você recebeu. Não assuma nada nos datasets!

In [6]:
# Implementação

In [7]:
df_transacoes.dtypes

_id              object
tenantId         object
userId           object
createdAt        object
updatedAt        object
favoriteFruit    object
isFraud            bool
document         object
dtype: object

Aqui nós vemos que algumas das colunas apresentam tipos "incorretos" de dados, por exemplo, as colunas "createdAt" e "updatedAt" poderiam ser convertidas para um tipo de dados de "datetime" com fuso horário, pois isso pode facilitar algumas das futuras análises. Além disso, a coluna "document" contém dentro dela dicionários, o que talvez não seja a melhor forma de armazenar e acessar tais informações, por isso vamos mudar a sua organização em seguida.

In [8]:
df_transacoes.isna().sum()

_id              0
tenantId         0
userId           0
createdAt        0
updatedAt        0
favoriteFruit    0
isFraud          0
document         0
dtype: int64

Por outro lado, a base não apresenta "missing values" em nenhuma das colunas, o que é positivo.

In [9]:
print('numero total de transacoes = ', len(df_transacoes))

numero total de transacoes =  10002


In [10]:
print(df_users.dtypes)
print('\n',df_users.isna().sum())
print('\n numero total de usuarios = ', len(df_users))

_id          object
name         object
email        object
createdAt    object
birthdate    object
dtype: object

 _id          0
name         0
email        0
createdAt    0
birthdate    0
dtype: int64

 numero total de usuarios =  100


Nessa tabela temos o mesmo problema relativo às datas mas novamente não temos valores não encontrados.

In [11]:
print(df_customers.dtypes)
print('\n',df_customers.isna().sum())
print('\n numero total de customers =  ', len(df_customers))

_id            object
fantasyName    object
cnpj            int64
status         object
segment        object
dtype: object

 _id            0
fantasyName    0
cnpj           0
status         0
segment        0
dtype: int64

 numero total de customers =   10


Finalmente, nessa tabela parece que temos uma boa qualidade no geral, porém talvez fosse melhor renomear a coluna "status" para "ativo" e mudar o tipo de dados para booleanos, isso pode ser mais eficiente para análises de larga escala. Por fim, dependendo da aplicação, por examplo se fosse para utilizar esse dados em modelos de machine learning, a coluna "segment" poderia ser convertida em identificadores númericos que pudessem ser inputados no modelo.

# Modelagem
Implemente os cruzamentos e modelagem correta nas bases, de acordo com o `README.md` disponível.

Aqui, será verificada sua capacidade de entender, modelar e disponibilizar dados de modo que seja simples, limpo e eficiente. Você pode escolher qualquer estratégia que desejar (normalização, dimensionamento, fato, etc).

In [12]:
# Implementação

In [13]:
# Primeiro faremos alguns dos tratamentos citados na etapa anterior.

df_transacoes['createdAt'] = pd.to_datetime(df_transacoes['createdAt'])
df_transacoes['updatedAt'] = pd.to_datetime(df_transacoes['updatedAt'])

# Fazemos a mudança na coluna "document" também como descrita, removendo os dicionários
df_transacoes = df_transacoes.drop('document', axis = 1).assign(**df_transacoes['document'].dropna().apply(pd.Series))
df_transacoes.head()

df_transacoes

Unnamed: 0,_id,tenantId,userId,createdAt,updatedAt,favoriteFruit,isFraud,documentType,documentUF
0,641336efc5c272ff5d82eb76,64132ef6526eeb8b998e2d3c,64132e0f48ad8ae93e504d33,2021-01-08 02:24:02+03:00,2023-02-08 01:52:32+03:00,strawberry,True,CNH,PA
1,641336efa086e60c7a834713,64132ef6526eeb8b998e2d3c,64132e0f1d87f017f64a3c20,2022-02-03 11:17:50+03:00,2023-01-22 04:27:37+03:00,apple,False,CTPS,PA
2,641336ef5291e28207a6db05,64132ef6f991a3a1884f2cd0,64132e0fb4fade825fdbeed4,2021-09-03 01:40:07+03:00,2023-02-10 06:17:25+03:00,strawberry,False,RG,PA
3,641336efbaf638cc19f96b71,64132ef6c9c22c95893e388b,64132e0fac889c7cbd7ebda3,2021-07-25 10:02:11+03:00,2023-03-11 02:34:20+03:00,strawberry,True,CTPS,BA
4,641336ef24f561f394623973,64132ef68637da8c371cdc09,64132e0f16e8266d4823236c,2022-01-26 11:01:27+03:00,2023-01-14 06:52:42+03:00,strawberry,True,CNH,RJ
...,...,...,...,...,...,...,...,...,...
9997,641336ef434f4da45e00379f,64132ef6ccd2d35a83c3544a,64132e0f9647ed73a005c271,2022-01-04 01:26:08+03:00,2023-02-16 11:58:18+03:00,banana,True,CNH,BA
9998,641336efa0febbc18ae452f3,64132ef6526eeb8b998e2d3c,64132e0f9417d076f85b8f61,2022-10-28 05:01:20+03:00,2023-03-15 06:59:41+03:00,strawberry,False,CNH,MT
9999,641336efecd20d71ae58208f,64132ef68637da8c371cdc09,64132e0f29d8b74c6b1cb514,2022-12-30 08:19:19+03:00,2023-02-11 10:13:33+03:00,banana,False,RG,RJ
10000,641336ef7cc5c59939e34f34,64132ef6eea665cc154f614a,64132e0fe8e5688a81c56b8a,2022-11-02 09:01:38+03:00,2023-01-12 10:05:32+03:00,apple,False,RG,PA


In [14]:
# Agora que fizemos essa transformação precisamos verificar novamente se existem valores perdidos
df_transacoes.isna().sum()

# E vemos que agora existem valores perdidos

_id              0
tenantId         0
userId           0
createdAt        0
updatedAt        0
favoriteFruit    0
isFraud          0
documentType     3
documentUF       2
dtype: int64

In [15]:
df_users['createdAt'] = pd.to_datetime(df_users['createdAt'])
df_users['birthdate'] = pd.to_datetime(df_users['birthdate'], utc = True)
df_users.dtypes

_id                                         object
name                                        object
email                                       object
createdAt    datetime64[ns, pytz.FixedOffset(180)]
birthdate                      datetime64[ns, UTC]
dtype: object

In [16]:
df_customers['status'] = (df_customers['status'] == 'Ativo')
df_customers.rename(columns = {'status': 'ativo'})

Unnamed: 0,_id,fantasyName,cnpj,ativo,segment
0,64132ef6526eeb8b998e2d3c,Ecratic,38792916000102,True,Banco
1,64132ef6f991a3a1884f2cd0,Stockpost,87715692000132,False,Banco
2,64132ef6dca9067d51ca1b30,Eschoir,71365876000180,False,Ecommerce
3,64132ef6eea665cc154f614a,Verton,62782441000140,True,Banco
4,64132ef662f97d5858f87aba,Earthmark,46522418000160,True,Ecommerce
5,64132ef6f54b06e1ffb65fba,Tingles,84854134000160,True,Banco
6,64132ef64f72ae4d32dc3535,Zytrax,67804098000100,False,Serviços
7,64132ef6ccd2d35a83c3544a,Exospace,35354484000151,True,Banco
8,64132ef6c9c22c95893e388b,Bisba,35166986000159,True,Banco
9,64132ef68637da8c371cdc09,Caxt,23114879000106,True,Banco


In [17]:
# Agora vamos usar a estratégia de dimensionamento-fato (levemente snowflake) para modelar a base de dados.
# Para isso, teremos 4 tabelas dimensão (data, user, customer e document) e uma tabela fato (transações).

# Data
start_date = np.min([np.min(df_users['birthdate']), np.min(df_users['createdAt']), np.min(df_transacoes['createdAt']), np.min(df_transacoes['updatedAt'])])
end_date = np.max([np.max(df_users['birthdate']), np.max(df_users['createdAt']), np.max(df_transacoes['createdAt']), np.max(df_transacoes['updatedAt'])])

dim_date = pd.DataFrame()
date_range = pd.date_range(start_date.replace(tzinfo=None).date(), end_date.replace(tzinfo=None).date())#.to_series()
dim_date['date'] = date_range
dim_date['year'] = dim_date['date'].dt.year
dim_date['month'] = dim_date['date'].dt.month
dim_date['day'] = dim_date['date'].dt.day
dim_date = dim_date.reset_index().rename(columns={'index':'dateId'})
dim_date

Unnamed: 0,dateId,date,year,month,day
0,0,1980-05-19,1980,5,19
1,1,1980-05-20,1980,5,20
2,2,1980-05-21,1980,5,21
3,3,1980-05-22,1980,5,22
4,4,1980-05-23,1980,5,23
...,...,...,...,...,...
15637,15637,2023-03-12,2023,3,12
15638,15638,2023-03-13,2023,3,13
15639,15639,2023-03-14,2023,3,14
15640,15640,2023-03-15,2023,3,15


É importante notar que nós removemos as informações com granularidade menor que dia nessa tabela,
como horas e fuso horário. Isso foi feito porque senão a tabela seria muito grande e ela tem como objetivo fazer queries mais granulosas como as analises mensais que faremos a seguir, e então não precisamos de granularidade de segundos, por exemplo.

Além disso, para garantir que as informações não são perdidas para sempre, manteremos essas informações nas tabelas originais. Isso pode ser ineficiente em termos de espaço de armazenagem e deve ser discutido, mas sem mais informações dos objetivos dessas tabelas e como elas serão usadas pelos analistas, é importante manter essa informação disponível para analises mais finas.

In [18]:
# E criar a coluna de id na tabela fato

fato_transacoes = df_transacoes.copy()

fato_transacoes['tmpCreatedAt'] = fato_transacoes['createdAt'].dt.floor('d').dt.tz_localize(None)
fato_transacoes = pd.merge(fato_transacoes, dim_date[['dateId', 'date']], left_on = 'tmpCreatedAt', right_on = 'date', how = 'left')
fato_transacoes.drop(columns = ['date', 'tmpCreatedAt'], inplace = True)
fato_transacoes.rename(columns = {'dateId': 'dateCreatedId'}, inplace = True)


fato_transacoes['tmpUpdatedAt'] = fato_transacoes['updatedAt'].dt.floor('d').dt.tz_localize(None)
fato_transacoes = pd.merge(fato_transacoes, dim_date[['dateId', 'date']], left_on = 'tmpUpdatedAt', right_on = 'date', how = 'left')
fato_transacoes.drop(columns = ['date', 'tmpUpdatedAt'], inplace = True)
fato_transacoes.rename(columns = {'dateId': 'dateUpdatedId'}, inplace = True)

fato_transacoes

Unnamed: 0,_id,tenantId,userId,createdAt,updatedAt,favoriteFruit,isFraud,documentType,documentUF,dateCreatedId,dateUpdatedId
0,641336efc5c272ff5d82eb76,64132ef6526eeb8b998e2d3c,64132e0f48ad8ae93e504d33,2021-01-08 02:24:02+03:00,2023-02-08 01:52:32+03:00,strawberry,True,CNH,PA,14844,15605
1,641336efa086e60c7a834713,64132ef6526eeb8b998e2d3c,64132e0f1d87f017f64a3c20,2022-02-03 11:17:50+03:00,2023-01-22 04:27:37+03:00,apple,False,CTPS,PA,15235,15588
2,641336ef5291e28207a6db05,64132ef6f991a3a1884f2cd0,64132e0fb4fade825fdbeed4,2021-09-03 01:40:07+03:00,2023-02-10 06:17:25+03:00,strawberry,False,RG,PA,15082,15607
3,641336efbaf638cc19f96b71,64132ef6c9c22c95893e388b,64132e0fac889c7cbd7ebda3,2021-07-25 10:02:11+03:00,2023-03-11 02:34:20+03:00,strawberry,True,CTPS,BA,15042,15636
4,641336ef24f561f394623973,64132ef68637da8c371cdc09,64132e0f16e8266d4823236c,2022-01-26 11:01:27+03:00,2023-01-14 06:52:42+03:00,strawberry,True,CNH,RJ,15227,15580
...,...,...,...,...,...,...,...,...,...,...,...
9997,641336ef434f4da45e00379f,64132ef6ccd2d35a83c3544a,64132e0f9647ed73a005c271,2022-01-04 01:26:08+03:00,2023-02-16 11:58:18+03:00,banana,True,CNH,BA,15205,15613
9998,641336efa0febbc18ae452f3,64132ef6526eeb8b998e2d3c,64132e0f9417d076f85b8f61,2022-10-28 05:01:20+03:00,2023-03-15 06:59:41+03:00,strawberry,False,CNH,MT,15502,15640
9999,641336efecd20d71ae58208f,64132ef68637da8c371cdc09,64132e0f29d8b74c6b1cb514,2022-12-30 08:19:19+03:00,2023-02-11 10:13:33+03:00,banana,False,RG,RJ,15565,15608
10000,641336ef7cc5c59939e34f34,64132ef6eea665cc154f614a,64132e0fe8e5688a81c56b8a,2022-11-02 09:01:38+03:00,2023-01-12 10:05:32+03:00,apple,False,RG,PA,15507,15578


In [19]:
# E na tabela users
dim_users = df_users.copy() 
dim_users['tmpCreatedAt'] = dim_users['createdAt'].dt.floor('d').dt.tz_localize(None)
dim_users = pd.merge(dim_users, dim_date[['dateId', 'date']], left_on = 'tmpCreatedAt', right_on = 'date', how = 'left')
dim_users.drop(columns = ['date', 'tmpCreatedAt'], inplace = True)
dim_users.rename(columns = {'dateId': 'dateCreatedId'}, inplace = True)


dim_users['tmpBirthdate'] = dim_users['birthdate'].dt.floor('d').dt.tz_localize(None)
dim_users = pd.merge(dim_users, dim_date[['dateId', 'date']], left_on = 'tmpBirthdate', right_on = 'date', how = 'left')
dim_users.drop(columns = ['date', 'tmpBirthdate'], inplace = True)
dim_users.rename(columns = {'dateId': 'dateUpdatedId'}, inplace = True)

cols = dim_users.columns.tolist()
cols = cols[-2:] + cols[:-2]
dim_users = dim_users[cols]

dim_users

Unnamed: 0,dateCreatedId,dateUpdatedId,_id,name,email,createdAt,birthdate
0,14759,3826,64132e0fa1b98612a1366a0a,Lucy Small,gallagherfowler@slax.com,2020-10-15 08:50:36+03:00,1990-11-09 02:23:11+00:00
1,14926,3358,64132e0f90329937be6d5b65,Alvarado Tanner,kathiekirk@ecratic.com,2021-03-31 04:35:35+03:00,1989-07-29 00:39:13+00:00
2,14848,4528,64132e0f429adbbe0334ad00,Tucker Wright,mckeebooth@deminimum.com,2021-01-12 06:28:25+03:00,1992-10-11 03:36:20+00:00
3,14916,253,64132e0f05e9b1d120893ffe,Oneill Molina,banksgoodwin@fleetmix.com,2021-03-21 10:40:58+03:00,1981-01-27 04:38:46+00:00
4,15201,362,64132e0f483bdb42b75e860c,Roberson Miller,ricewebb@biohab.com,2021-12-31 08:11:40+03:00,1981-05-16 05:02:29+00:00
...,...,...,...,...,...,...,...
95,15567,2390,64132e0fd0fd03a2161eade3,Henrietta Bradford,nonaharding@ezentia.com,2023-01-01 08:46:30+03:00,1986-12-04 05:23:46+00:00
96,14683,1860,64132e0f5db21511f29bd2c3,Kristen Beasley,mariastephenson@slambda.com,2020-07-31 09:28:58+03:00,1985-06-22 05:54:52+00:00
97,14664,6358,64132e0f49625262850d71ab,Katrina Welch,tishaclark@isosure.com,2020-07-12 01:34:15+03:00,1997-10-15 08:23:49+00:00
98,14795,4583,64132e0f1eeec2acd297c0e9,Mayer Spencer,montgomerydennis@buzzness.com,2020-11-20 12:08:03+03:00,1992-12-05 04:40:09+00:00


In [20]:
# Criamos a tabela dimensão de documentos
dim_document = df_transacoes[['documentType', 'documentUF']]
dim_document = dim_document.drop_duplicates().reset_index().rename(columns={'index':'documentId'})

dim_document

Unnamed: 0,documentId,documentType,documentUF
0,0,CNH,PA
1,1,CTPS,PA
2,2,RG,PA
3,3,CTPS,BA
4,4,CNH,RJ
5,5,CNH,RS
6,6,CNH,SP
7,9,CTPS,RJ
8,10,CNH,BA
9,13,CTPS,RS


In [21]:
# E criar a coluna de ID na tabela fato

fato_transacoes = pd.merge(fato_transacoes, dim_document, on = ['documentType', 'documentUF'], how = 'left')
fato_transacoes.drop(columns = ['documentType', 'documentUF'], inplace = True)
fato_transacoes.rename(columns = {'index': 'documentId'}, inplace = True)

cols = fato_transacoes.columns.tolist()
cols = cols[-3:] + cols[:-3]
fato_transacoes = fato_transacoes[cols]
fato_transacoes.fillna('-', inplace = True)
dim_document.fillna('-', inplace = True)

fato_transacoes

Unnamed: 0,dateCreatedId,dateUpdatedId,documentId,_id,tenantId,userId,createdAt,updatedAt,favoriteFruit,isFraud
0,14844,15605,0,641336efc5c272ff5d82eb76,64132ef6526eeb8b998e2d3c,64132e0f48ad8ae93e504d33,2021-01-08 02:24:02+03:00,2023-02-08 01:52:32+03:00,strawberry,True
1,15235,15588,1,641336efa086e60c7a834713,64132ef6526eeb8b998e2d3c,64132e0f1d87f017f64a3c20,2022-02-03 11:17:50+03:00,2023-01-22 04:27:37+03:00,apple,False
2,15082,15607,2,641336ef5291e28207a6db05,64132ef6f991a3a1884f2cd0,64132e0fb4fade825fdbeed4,2021-09-03 01:40:07+03:00,2023-02-10 06:17:25+03:00,strawberry,False
3,15042,15636,3,641336efbaf638cc19f96b71,64132ef6c9c22c95893e388b,64132e0fac889c7cbd7ebda3,2021-07-25 10:02:11+03:00,2023-03-11 02:34:20+03:00,strawberry,True
4,15227,15580,4,641336ef24f561f394623973,64132ef68637da8c371cdc09,64132e0f16e8266d4823236c,2022-01-26 11:01:27+03:00,2023-01-14 06:52:42+03:00,strawberry,True
...,...,...,...,...,...,...,...,...,...,...
9997,15205,15613,10,641336ef434f4da45e00379f,64132ef6ccd2d35a83c3544a,64132e0f9647ed73a005c271,2022-01-04 01:26:08+03:00,2023-02-16 11:58:18+03:00,banana,True
9998,15502,15640,15,641336efa0febbc18ae452f3,64132ef6526eeb8b998e2d3c,64132e0f9417d076f85b8f61,2022-10-28 05:01:20+03:00,2023-03-15 06:59:41+03:00,strawberry,False
9999,15565,15608,29,641336efecd20d71ae58208f,64132ef68637da8c371cdc09,64132e0f29d8b74c6b1cb514,2022-12-30 08:19:19+03:00,2023-02-11 10:13:33+03:00,banana,False
10000,15507,15578,2,641336ef7cc5c59939e34f34,64132ef6eea665cc154f614a,64132e0fe8e5688a81c56b8a,2022-11-02 09:01:38+03:00,2023-01-12 10:05:32+03:00,apple,False


In [22]:
# renomear as colunas das tabelas para padronizar as chaves primarias

dim_date.rename(columns = {'dateId': '_id'}, inplace = True)
dim_document.rename(columns = {'documentId': '_id'}, inplace = True)

In [23]:
import mysql.connector
from mysql.connector import errorcode

In [24]:
config = {
   'user': 'root',
   'password': 'Ninkelas2.',
   'host': 'localhost',
   'database': 'transactions',
   'raise_on_warnings': True
}

connection = mysql.connector.connect(
  host='localhost',
  user='root',
  password='#####'
)

database_name = 'transactions'
cursor = connection.cursor()

cursor.execute("DROP DATABASE IF EXISTS {} ".format(database_name))
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(database_name))

connection.commit()

In [25]:
TABLES = {}
TABLES['documents'] = """CREATE TABLE dimDocuments (
                _id INT NOT NULL ,
                documentType VARCHAR(10),
                documentUF VARCHAR(10),
                PRIMARY KEY (_id)
            )"""

TABLES['dates'] = """CREATE TABLE dimDate (
                _id INT NOT NULL ,
                date DATE,
                year INT,
                month INT,
                day INT,
                PRIMARY KEY (_id)
            )"""

TABLES['customers'] = """CREATE TABLE dimCustomers (
                _id VARCHAR(255),
                fantasyName VARCHAR(255),
                cnpj VARCHAR(14),
                status VARCHAR(255),
                segment VARCHAR(255),
                PRIMARY KEY (_id)
            )"""

TABLES['users'] = """CREATE TABLE dimUsers (
                dateCreatedId INT NOT NULL,
                dateUpdatedId INT NOT NULL,
                _id VARCHAR(255),
                name VARCHAR(255),
                email VARCHAR(255),
                createdAt TIMESTAMP,
                birthdate TIMESTAMP,
                PRIMARY KEY (_id),
                FOREIGN KEY (dateCreatedId) REFERENCES dimDate(_id),
                FOREIGN KEY (dateUpdatedId) REFERENCES dimDate(_id)
            )"""

TABLES['fact'] = """CREATE TABLE factTransactions (
                dateCreatedId INT NOT NULL,
                dateUpdatedId INT NOT NULL,
                documentId INT NOT NULL,
                _id VARCHAR(255),
                tenantId VARCHAR(255),
                userId VARCHAR(255),
                createdAt TIMESTAMP,
                updatedAt TIMESTAMP,
                favoriteFruit VARCHAR(255),
                isFraud BOOLEAN,
                PRIMARY KEY (_id, updatedAt),
                FOREIGN KEY (dateCreatedId) REFERENCES dimDate(_id),
                FOREIGN KEY (dateUpdatedId) REFERENCES dimDate(_id),
                FOREIGN KEY (documentId) REFERENCES dimDocuments(_id),
                FOREIGN KEY (tenantId) REFERENCES dimCustomers(_id),
                FOREIGN KEY (userId) REFERENCES dimUsers(_id)
            )"""



In [26]:
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()

try:
    cnx.database = config['database']
except mysql.connector.Error as err:
    if err.errno == errorcode.ER_BAD_DB_ERROR:
        create_database(cursor)
        cnx.database = config['database']
    else:
        print(err)
        exit(1)

for table_name in TABLES:
    table_description = TABLES[table_name]
    try:
        print("Creating table {}: ".format(table_name), end='')
        cursor.execute(table_description)
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
            print("already exists.")
        else:
            print(err.msg)
    else:
        print("OK")

cursor.close()
cnx.close()

Creating table documents: OK
Creating table dates: OK
Creating table customers: OK
Creating table users: OK
Creating table fact: OK


In [27]:
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()

# Insert data into dimDocuments table
add_data_query = ("INSERT INTO dimDocuments "
                  "(_id, documentType, documentUF) "
                  "VALUES (%s, %s, %s)")
data = [tuple(x) for x in dim_document.to_numpy()]
cursor.executemany(add_data_query, data)

In [28]:
add_data_query = ("INSERT INTO dimDate "
                  "(_id, date, year, month, day) "
                  "VALUES (%s, %s, %s, %s, %s)")
data = [tuple(x) for x in dim_date.to_numpy()]
cursor.executemany(add_data_query, data)

In [29]:
add_data_query = ("INSERT INTO dimUsers "
                  "(dateCreatedId, dateUpdatedId, _id, name, email, createdAt, birthdate) "
                  "VALUES (%s, %s, %s, %s, %s, %s, %s)")
data = [tuple(x) for x in dim_users.to_numpy()]
cursor.executemany(add_data_query, data)

In [30]:
add_data_query = ("INSERT INTO dimCustomers "
                  "(_id, fantasyName, cnpj, status, segment) "
                  "VALUES (%s, %s, %s, %s, %s)")
data = [tuple(x) for x in df_customers.to_numpy()]
cursor.executemany(add_data_query, data)

In [31]:
add_data_query = ("INSERT INTO factTransactions "
                  "(dateCreatedId, dateUpdatedId, documentId, _id, tenantId, userId, createdAt, UpdatedAt, favoriteFruit, isFraud) "
                  "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)")
data = [tuple(x) for x in fato_transacoes.to_numpy()]
cursor.executemany(add_data_query, data)

Usando a estratégia de dimensionamento-fato, temos 5 tabelas diferentes, 4 tabelas de dimensão e uma tabela fato. Isso nos permite criar facilmente queries eficientes que dependem, por exemplo, do tipo de documentação usada para realizar a transação ou do mês em que a transação foi realizada. 

É importante notar que usamos uma esquema de estrela na nossa estratégia, o que implica que nossas tabelas não necessariamente estão normalizadas, mas discutiremos essa escolha na próxima seção.

Por fim, implementamos um banco de dados de MySQL com as relações necessárias entre as tabelas considerando suas chaves primárias e estrangeiras.


# Disponibilização e Processamento

Agora que você já modelou as bases, discuta sobre as estratégias de armazenamento.

1. Como você disponibilizaria estas bases no data lake (em S3)? Não precisa enviar o arquivo, apenas discuta.
2. Qual formato de arquivo você escolheria para persistir estas bases? Por que?
3. Você prefere bases normalizadas ou desnormalizadas? Discuta os pontos positivos e negativos de cada um.

Persista sua modelagem final conforme suas opções e responda as próximas perguntas a partir dela. Não esqueça de desalocar os dataframes antigos!

In [32]:
# Implementação

1. Existem várias maneiras de disponibilizar as bases no data lake, como simplesmente salvar os arquivos localmente e usar a interface do S3 na web para disponibilizar os arquivos no data lake. Porém essa não é a maneira mais eficiente, na minha opinião, eu usaria a SDK da Amazon para Python, chamada boto3 e enviaria os arquivos dessa forma.

In [33]:
# O código poderia ser algo dessa natureza:

'''def uploadDirectory(path,bucketname):
    for root,dirs,files in os.walk(path):
        for file in files:
            s3C.upload_file(os.path.join(root,file),bucketname,file)'''

# Considerando que as tabelas finais após a modelagem se encontram todas em uma pasta específica

'def uploadDirectory(path,bucketname):\n    for root,dirs,files in os.walk(path):\n        for file in files:\n            s3C.upload_file(os.path.join(root,file),bucketname,file)'

2 - A escolha de que formato de arquivo deve ser usado depende bastante de qual a será o uso dessas tabelas após o upload para o S3. Por exemplo, se for para essas tabelas serem usadas para analistas que utilizam excel, seria ideal salvar esses dados com um formato .csv para facilitar o acesso desses arquivos aos usuários finais. 

Por outro lado, se esses dados forem ser utilizados para popular tabelas do Redshift, então outras possibilidades se abrem, por exemplo podemos salvar os dados em json ou parquet e além disso podemos zippar os arquivos, dado que o Redshift é capaz de abrir esses tipos de arquivo com facilidade além dos tipos character-separated values e .csv. O parquet seria superior nesse caso por causa da maior velocidade de leitura e da maior compressão dos dados.

Considerando isso, e meu entendimento dos objetivos atuais da caf, eu optaria por salvar as tabelas em Parquet.

3 - Novamente, a escolha de bases normalizadas ou desnormalizadas é dependente do tipo de uso que a base terá. Por exemplo, bases normalizadas têm maior velocidade na inclusão, atualização e deleção de dados, porém bases desnormalizadas têm maior velocidade na leitura dos dados. Isso significa que se a base for usada de forma que um grande número de leituras seja executado, pode ser melhor que usemos uma base desnormalizada. Por outro lado, se essa base for lida infrequentemente mas atualizada frequentemente, uma base normalizada deve ser melhor. 

Além disso, bases normalizadas tem maior eficiência em termos de memória, mesmo apresentando um número maior de tabelas, devido à redução da redundância nos dados. Porém isso também implica que as queries em bases normalizadas tendem a ser mais complexas do que àquelas feitas em bases desnormalizadas.

A minha preferência é por bases desnormalizadas pois eu considero que é mais intuitivo acessar e realizar queries nesse tipo de base, facilitando o processo de levantamento de hipóteses e de solução de problemas, agilizando o desenvolvimento. Além disso, temos maior velocidade de resposta das queries de leitura. Porém, em casos específicos, a base normalizada deve ser considerada e apresenta vantagens significativas, como redução de custos por maior eficiência no armazenamento.

In [34]:
# Salvando as tabelas finais usando parquet e zip
dim_date.to_parquet('PersistentData/dimDate.parquet.gzip', compression='gzip')
dim_users.to_parquet('PersistentData/dimUsers.parquet.gzip', compression='gzip')
df_customers.to_parquet('PersistentData/dimCustomers.parquet.gzip', compression='gzip')
dim_document.to_parquet('PersistentData/dimDocument.parquet.gzip', compression='gzip')
fato_transacoes.to_parquet('PersistentData/FactTransactions.parquet.gzip', compression='gzip')

del df_customers
del df_transacoes
del df_users
del dim_users
del dim_date
del dim_document
del fato_transacoes

# Perguntas

Agora com a sua modelagem pronta, responda as seguintes perguntas utilizando SQL:

### Obrigatórias
1. Qual a média de tentativas de fraudes por mês (quantidade)?
2. Quais são as UFs com o maior percentual de tentativa de fraude?
3. Em quais meses do ano o percentual de tentativa de fraude foi maior e em quais foram menores?
4. Quais são os 10 clientes com maior quantidade de transações?
5. Qual a idade média dos usuários? 
6. Qual documento é mais utilizado pelos usuários?


### Desafio Extra
1. Há na base transações iguais mas com data de atualização diferente (houve modificação na transação e por isso ela ganhou um novo registro). Faça um select que retorne sempre a versão mais recente das transações, permitindo assim a visualização das informações mais recentes

In [35]:
# 1 usando SQL
query = ("""SELECT dimDate.month, SUM(factTransactions.isFraud) AS sum
            FROM factTransactions
            JOIN dimDate ON factTransactions.dateCreatedId = dimDate._id
            GROUP BY dimDate.month
            ORDER BY dimDate.month""")

cursor.execute(query)
query = cursor.fetchall()
query

[(1, Decimal('440')),
 (2, Decimal('387')),
 (3, Decimal('413')),
 (4, Decimal('435')),
 (5, Decimal('434')),
 (6, Decimal('428')),
 (7, Decimal('398')),
 (8, Decimal('448')),
 (9, Decimal('423')),
 (10, Decimal('418')),
 (11, Decimal('415')),
 (12, Decimal('405'))]

In [36]:
# 2 usando sql
query = ("""SELECT dimDocuments.documentUF, AVG(factTransactions.isFraud) AS mean_value
            FROM factTransactions
            JOIN dimDocuments ON factTransactions.documentId = dimDocuments._id
            GROUP BY dimDocuments.documentUF
            """)

cursor.execute(query)
query = cursor.fetchall()
query

[('PA', Decimal('0.5066')),
 ('BA', Decimal('0.5065')),
 ('RJ', Decimal('0.5108')),
 ('RS', Decimal('0.4834')),
 ('SP', Decimal('0.5117')),
 ('MT', Decimal('0.5060')),
 ('-', Decimal('1.0000'))]

In [37]:
# 3 usando sql
query = ("""SELECT dimDate.month, AVG(factTransactions.isFraud) AS mean_month
            FROM factTransactions
            JOIN dimDate ON factTransactions.dateCreatedId = dimDate._id
            GROUP BY dimDate.month
            ORDER BY mean_month DESC""")

cursor.execute(query)
query = cursor.fetchall()
query

[(10, Decimal('0.5218')),
 (8, Decimal('0.5209')),
 (4, Decimal('0.5160')),
 (2, Decimal('0.5146')),
 (9, Decimal('0.5084')),
 (6, Decimal('0.5065')),
 (5, Decimal('0.5041')),
 (3, Decimal('0.5018')),
 (11, Decimal('0.5018')),
 (1, Decimal('0.4911')),
 (12, Decimal('0.4874')),
 (7, Decimal('0.4789'))]

In [38]:
# 4 usando SQL. Assumindo que o cliente é o usuário apesar de a tradução mais precisa ser o customer, isso
# porque só existem apenas 10 customers então determinar o top 10 seria equivalente a determinar tudo.
query = ("""SELECT dimUsers.name, COUNT(*) AS count
            FROM factTransactions
            JOIN dimUsers ON factTransactions.userId = dimUsers._id
            GROUP BY dimUsers.name
            ORDER BY count DESC limit 10""")

cursor.execute(query)
query = cursor.fetchall()
query

[('Morin Nolan', 127),
 ('Harrington Wilkerson', 125),
 ('Audrey Maynard', 119),
 ('Horn Larsen', 118),
 ('Nell Dyer', 118),
 ('Mari Willis', 118),
 ('Susanna Padilla', 118),
 ('Huber Hernandez', 117),
 ('Chelsea Ortiz', 116),
 ('Bernadine Vargas', 116)]

In [39]:
# 4 usando SQL. E então fazendo para os customers.
query = ("""SELECT dimCustomers.fantasyName, COUNT(*) AS count
            FROM factTransactions
            JOIN dimCustomers ON factTransactions.tenantId = dimCustomers._id
            GROUP BY dimCustomers.fantasyName
            ORDER BY count DESC limit 10""")

cursor.execute(query)
query = cursor.fetchall()
query

[('Tingles', 1062),
 ('Caxt', 1018),
 ('Earthmark', 1017),
 ('Stockpost', 1015),
 ('Ecratic', 1009),
 ('Eschoir', 1000),
 ('Verton', 981),
 ('Bisba', 972),
 ('Zytrax', 964),
 ('Exospace', 964)]

In [40]:
# 5 usando SQL.
query = ("""SELECT AVG(DATEDIFF(CURRENT_DATE, birthdate) / 365) AS average_age
            FROM dimUsers""")

cursor.execute(query)
query = cursor.fetchall()
query

[(Decimal('34.02183562'),)]

In [41]:
# 6 usando SQL.
query = ("""SELECT dimDocuments.documentType, COUNT(*) AS count
            FROM factTransactions
            JOIN dimDocuments ON factTransactions.documentId = dimDocuments._id
            GROUP BY dimDocuments.documentType
            ORDER BY count DESC""")

cursor.execute(query)
query = cursor.fetchall()
query

[('CTPS', 3377), ('CNH', 3330), ('RG', 3292), ('-', 3)]

In [42]:
# questão extra usando SQL.
query = ("""SELECT t1.*
            FROM factTransactions t1
            LEFT JOIN factTransactions t2 ON t1._id = t2._id AND t1.updatedAt < t2.updatedAt
            WHERE t2._id IS NULL""")

cursor.execute(query)
query = cursor.fetchall()
query

[(14944,
  15610,
  1,
  '641336ef000b2ce307b655c6',
  '64132ef662f97d5858f87aba',
  '64132e0ffe16938ae5708fb3',
  datetime.datetime(2021, 4, 18, 2, 56, 39),
  datetime.datetime(2023, 2, 13, 1, 11, 3),
  'banana',
  0),
 (14874,
  15608,
  9,
  '641336ef0012d7b5dfccf81d',
  '64132ef64f72ae4d32dc3535',
  '64132e0f72148c806c32b064',
  datetime.datetime(2021, 2, 7, 10, 58, 22),
  datetime.datetime(2023, 2, 11, 11, 42, 20),
  'strawberry',
  0),
 (15206,
  15568,
  33,
  '641336ef00224417c62cfdf5',
  '64132ef6f991a3a1884f2cd0',
  '64132e0fc7534e992bafa834',
  datetime.datetime(2022, 1, 5, 3, 49, 30),
  datetime.datetime(2023, 1, 2, 1, 28, 25),
  'apple',
  1),
 (15217,
  15570,
  63,
  '641336ef00337c3444c99a78',
  '64132ef68637da8c371cdc09',
  '64132e0f555673bb72251b1d',
  datetime.datetime(2022, 1, 16, 11, 42, 35),
  datetime.datetime(2023, 1, 4, 12, 17, 18),
  'apple',
  0),
 (15437,
  15581,
  22,
  '641336ef00338af0a965428b',
  '64132ef662f97d5858f87aba',
  '64132e0f29d8b74c6b1cb514',