In [1]:
#fazendo a instaçação de pacotes que precisamos para importar os bancos do postgres e passar pro mongo e pro cassandra

In [2]:
! pip install cassandra-driver
! pip install SQLAlchemy
! pip install pymongo
! pip install psycopg2-binary
! python -m pip install "pymongo[srv]"



In [3]:
#instalando pacotes para manipulaçao de dados
! pip install numpy pandas



In [4]:
#importando todas as bibliotecas que precisamos usar nessa atividade:
from cassandra.cluster import Cluster
from pymongo import MongoClient
from pymongo import InsertOne, DeleteOne, ReplaceOne
from functools import reduce
import psycopg2
import pymongo
import numpy as np
import pandas as pd
import random
import time

## Conectando nos 3 bancos e criando suas classes

In [5]:
#Postgres
con = psycopg2.connect(host='dindin_pgdatabase', database='user',
user='user', password='admin')
cur = con.cursor()

In [24]:
#Cassandra
cluster = Cluster([('dindin_cassandra','9042')])
session = cluster.connect()
session.default_timeout = 60

In [19]:
#MongoDB
# CONNECTION_STRING = "mongodb+srv://root_dindin:admin@dindin_mongo.mongodb.net/dindin_agora?retryWrites=true&w=majority"
# CONNECTION_STRING = "dindin_mongo:8081"
CONNECTION_STRING = "mongodb://root_dindin:admin@mongo:27017/"
client = MongoClient(CONNECTION_STRING)

## Consultando tabelas que estão no Postgres

In [8]:
cur.execute('select * from dindinagora.cliente')
recset = cur.fetchall()

In [9]:
#trazendo as primeiras 5 linhas
for rec in recset[0:5]:
    print(rec)

(1, 'João Miguel  Queirós Aparecido', '715120359', '00.930.834.9-84', 69)
(2, 'Maria Julia  Brasil  Barros', '687075338', '11.464.906.1-05', 67)
(3, 'Theo  Magalhães   Barros', '579802040', '66.279.889.1-42', 28)
(4, 'Marcela  Costa Calado', '519537378', '22.029.821.1-05', 141)
(5, 'Vanessa  Fontes  Rodrigues', '142555651', '57.444.588.8-55', 79)


In [10]:
#visualizando as colunas
[desc[0] for desc in cur.description]

['id', 'nome', 'RG', 'CPF', 'endereco_fk']

## Caso de teste
Neste cenário podemos simular requisições de uma plataforma através do JupyterNotebook, fazendo 2 cenários:

1. Fazer 5000 requisições de tipos de empréstimo no Postgres

2. Fazer uma requisição que lê o volume de transações, depósitos e empréstimos por dia dentro das bases transacionais

Para ambas requisições, podemos gravar o tempo que levou para realizar estes cenários simulados para a pergunta 1 e 2 do enunciado.

Assim sendo, verificamos os cenários primeiramente no Postgres, e posteriormente, transferimos os datasets necessários para os bancos que respectivamente seriam mais eficientes (MongoDB e Cassandra) e verificamos a eficiência de tempos de processamento desses em cada cenário. Para isso, geramos um índice aleatório que pode ser replicado em todos os cenários (para o caso 1).

Obs: Em todos os casos, eu não armazeno todos os resultados por conta da atividade ter somente a necessidade de cronometrar o tempo.

In [11]:
#Gerando índice aleatório
random.seed(1994)
cur_emp = con.cursor()
cur_emp.execute('select * from dindinagora.dim_emprestimo;')
recset_emp = cur_emp.fetchall()
index_geral_emprestimos = [idd[0] for idd in recset_emp]
rand_index = random.choices(index_geral_emprestimos, k=5000)
#puxando os nomes da coluna dos tipos de emprestimo
nomes_colunas_emp = [desc[0] for desc in cur_emp.description]

### Cenário 1: Postgres

In [12]:
print("select * from dindinagora.dim_emprestimo where id = {0};".format(36))

select * from dindinagora.dim_emprestimo where id = 36;


In [14]:
cur = con.cursor()
start_time = time.time()
val_emprestimos_postgres=[]

for idx in rand_index:
    cur.execute("select * from dindinagora.dim_emprestimo where id = {0};".format(idx))
    val_emprestimos_postgres.append(dict(zip(tuple(nomes_colunas_emp),cur.fetchall()[0])))    
tp_1_postgres = (time.time() - start_time)
#imprimindo os 5 primeiros registros
val_emprestimos_postgres[1:5]

[{'id': 322, 'montante': 2200.0, 'taxa_aa': 0.35, 'parcelas': 18},
 {'id': 354, 'montante': 5400.0, 'taxa_aa': 0.35, 'parcelas': 18},
 {'id': 187, 'montante': 8700.0, 'taxa_aa': 0.17, 'parcelas': 6},
 {'id': 79, 'montante': 7900.0, 'taxa_aa': 0.12, 'parcelas': 2}]

In [15]:
#tempo para o postgres fazer as "requisições"
tp_1_postgres

2.5434505939483643

### Cenário 2: Postgres

In [16]:
cur = con.cursor()
start_time = time.time()

cur.execute (""" select coalesce(coalesce(data_transferencia,data_deposito),data_emprestimo) as data
                ,a.sum_transferencia
                ,b.sum_deposito
                ,c.sum_emprestimos
                from
                (
                    select date(data_hora_transacao) as data_transferencia
                           ,sum(montante) as sum_transferencia
                    from dindinagora.tr_transferencia a
                    group by date(data_hora_transacao)
                ) a
                full join
                (
                    select date(data_hora_transacao) as data_deposito
                           ,sum(montante) as sum_deposito
                    from dindinagora.tr_deposito a
                    group by date(data_hora_transacao)
                ) b
                on a.data_transferencia = b.data_deposito
                full join
                (
                    select date(a.data_hora_emprestimo) as data_emprestimo
                           ,sum(b.montante) as sum_emprestimos
                    from dindinagora.tr_emprestimo a
                    left join dindinagora.dim_emprestimo b
                    on a.emprestimo_fk = b.id
                    group by date(data_hora_emprestimo)
                ) c
                on a.data_transferencia = c.data_emprestimo
            """)

result_post = cur.fetchall()
tp_2_postgres = (time.time() - start_time)


In [17]:
tp_2_postgres

1.4607887268066406

Agora, importamos os dados de tipos de empréstimos no MongoDB e empréstimos, depósitos e transferências no cassandra: 

### Para o MongoDB

In [34]:
#criando coleção (para colocar dentro os json)
mydb = client['dindin_agora']
emprestimos=mydb['emprestimos']

In [35]:
#puxando os nomes da coluna dos tipos de emprestimo
dict(zip(tuple(nomes_colunas_emp),recset_emp[0]))

{'id': 1, 'montante': 100.0, 'taxa_aa': 0.12, 'parcelas': 2}

In [36]:
#fazendo para cada linha um dict que ira ser importado na colecao de emprestimos:
lista_requests = []
for i in range(0,max(index_geral_emprestimos)):
    t = dict(zip(tuple(nomes_colunas_emp),recset_emp[i]))
    lista_requests.append(InsertOne(t))


In [37]:
result = emprestimos.bulk_write(lista_requests)

In [38]:
#verificando se foi gravado mesmo
result.inserted_count

500

In [41]:
#agora testamos a consulta dos 5000 registros pelo mongodb:
start_time = time.time()
val_emprestimos_mongodb=[]
for idx in rand_index:
    doc = dict(emprestimos.find_one({'id':idx}))
    doc.pop('_id')
    val_emprestimos_mongodb.append(doc)
tp_1_mongodb = (time.time() - start_time)
#verificand as 5 primeiras linhas
val_emprestimos_mongodb[1:5]

[{'id': 322, 'montante': 2200.0, 'taxa_aa': 0.35, 'parcelas': 18},
 {'id': 354, 'montante': 5400.0, 'taxa_aa': 0.35, 'parcelas': 18},
 {'id': 187, 'montante': 8700.0, 'taxa_aa': 0.17, 'parcelas': 6},
 {'id': 79, 'montante': 7900.0, 'taxa_aa': 0.12, 'parcelas': 2}]

In [42]:
tp_1_mongodb

8.858848810195923

### Fazendo agora para o Cassandra:

In [43]:
#Para importar os dados transacionais, temos 3 datasets diferentes que podemos colocar dentro
#do mesmo keyspace (dindin_agora):

session.execute("""
    CREATE KEYSPACE IF NOT EXISTS dindin_agora 
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} 
    """)

<cassandra.cluster.ResultSet at 0x7f2e15a82b00>

In [44]:
#Gravando o schema das atbelas usadas num dict (só pra facilitar)
tabelas= {'tr_transferencia':{'id':'int','montante':'float','tipo':'text','data_hora_transacao':'timestamp','origem_fk':'int','destino_fk':'int'}
          ,'tr_deposito':{'id':'int','montante':'float','conta_fk':'int','data_hora_transacao':'timestamp'}
          ,'tr_emprestimo':{'id':'int','conta_fk':'int','emprestimo_fk':'int','data_hora_emprestimo':'timestamp'}
          ,'dim_emprestimo':{'id':'int','montante':'float','taxa_aa':'float','parcelas':'int'}
         }

In [45]:
#criando tabelas para fazer a importação depois (do schema do postgres)
session.set_keyspace("dindin_agora")
for indx in tabelas:
    
    print('\n para '+indx)
    
    dic = tabelas.get(indx)
    lista_vals = []
    for elmt in dic:
        lista_vals.append(elmt+' '+dic.get(elmt)+',')

    string_colunas_tip = reduce(lambda x, y: x + y, lista_vals)
    string_colunas = reduce(lambda x, y: x +','+ y, dic)
    string_imput = reduce(lambda x,y: x+','+y,['?' for i in range(0,len(dic))])

    session.execute("CREATE TABLE IF NOT EXISTS {0} (".format(indx)+string_colunas_tip+" PRIMARY KEY(id))")
    prepared = session.prepare("INSERT INTO {0} (".format(indx)+string_colunas+") VALUES ("+string_imput+")")

    cur = con.cursor()
    start_time = time.time()
    cur.execute("select * from dindinagora.{0}".format(indx))
    linhas_import = cur.fetchall()

    #laço que le linha a linha da tabela selecionada:
    for linn in linhas_import:
        session.execute(prepared, linn)



 para tr_transferencia

 para tr_deposito

 para tr_emprestimo

 para dim_emprestimo


In [46]:
#verificando as primeiras 10 linhas por tabela:
for indx in tabelas:
    result = session.execute("select * from {0} ".format(indx)+"limit 5")
    for ress in result:
        print(ress)
    print('----------------')

Row(id=4317, data_hora_transacao=datetime.datetime(2022, 3, 24, 17, 44, 20), destino_fk=538, montante=459.3399963378906, origem_fk=814, tipo='DOC')
Row(id=3372, data_hora_transacao=datetime.datetime(2022, 4, 22, 22, 14, 48), destino_fk=305, montante=103.48999786376953, origem_fk=659, tipo='DOC')
Row(id=1584, data_hora_transacao=datetime.datetime(2021, 10, 31, 16, 3, 46), destino_fk=421, montante=61.619998931884766, origem_fk=19, tipo='TED')
Row(id=4830, data_hora_transacao=datetime.datetime(2021, 5, 30, 3, 6, 18), destino_fk=112, montante=388.4100036621094, origem_fk=392, tipo='DOC')
Row(id=2731, data_hora_transacao=datetime.datetime(2021, 2, 7, 13, 27, 47), destino_fk=512, montante=143.72000122070312, origem_fk=847, tipo='PIX')
----------------
Row(id=23, conta_fk=852, data_hora_transacao=datetime.datetime(2022, 1, 18, 21, 40, 32), montante=474.1775817871094)
Row(id=114, conta_fk=558, data_hora_transacao=datetime.datetime(2021, 5, 1, 6, 12, 58), montante=582.2879638671875)
Row(id=53, 

In [47]:
# Agora, fazendo a consulta completa dos valores de série temporal:
# Como o cassandra não suporta joins, temos que puxar tudo para o python e trabalhar com data 
# frames aqui:
session.set_keyspace("dindin_agora")
start_time = time.time()

lista_transf =[]
result = session.execute("""
                            select montante,data_hora_transacao
                            from tr_transferencia
                         """)
    
for ress in result:
    lista_transf.append(tuple(ress))

#para depósitos
lista_depositos =[]
result = session.execute("""
                            select montante,data_hora_transacao
                            from tr_deposito
                         """)
    
for ress in result:
    lista_depositos.append(tuple(ress))

#para emprestimos
lista_emprestimos =[]
result = session.execute("""
                            select emprestimo_fk,data_hora_emprestimo
                            from tr_emprestimo
                         """)
    
for ress in result:
    lista_emprestimos.append(tuple(ress))
    
#para emprestimos
lista_dim_emprestimos =[]
result = session.execute("""
                            select id,montante
                            from dim_emprestimo
                         """)
    
for ress in result:
    lista_dim_emprestimos.append(tuple(ress))

time_cassandra_1 = time.time()

df_dim_emprestimos = pd.DataFrame(lista_dim_emprestimos)
df_dim_emprestimos.columns =['id','montante_emp']

df_emprestimos = pd.DataFrame(lista_emprestimos)
df_emprestimos.columns =['emprestimo_fk','data_hora_emprestimo']

df_depositos = pd.DataFrame(lista_depositos)
df_depositos.columns =['montante_dep','data_hora_transacao']

df_transf = pd.DataFrame(lista_transf)
df_transf.columns =['montante_trsf','data_hora_transacao']

# Fazendo as agregações que precisa antes do merge (ou join)
#criando uma coluna de data:
df_depositos['data_transacao'] = df_depositos['data_hora_transacao'].apply(lambda x: x.date())
df_transf['data_transacao'] = df_transf['data_hora_transacao'].apply(lambda x: x.date())
df_emprestimos['data_emprestimo'] = df_emprestimos['data_hora_emprestimo'].apply(lambda x: x.date())

#agora realizando o agrupamento por dia e fazendo os joins
df_depositos_g = df_depositos.groupby('data_transacao',as_index=False)['montante_dep'].sum()
df_transf_g = df_transf.groupby('data_transacao',as_index=False)['montante_trsf'].sum()

#e para a de emprestimos:
df_emprestimos_g = pd.merge(df_emprestimos
        ,df_dim_emprestimos
        ,left_on= ['emprestimo_fk']
        ,right_on= ['id'] 
         ,how = 'left').groupby('data_emprestimo',as_index=False)['montante_emp'].sum()

#e por fim juntando tudo num df:
df_cruz = pd.merge(df_depositos_g,df_transf_g,on= 'data_transacao',how='outer').merge(df_emprestimos_g
                                                                                      ,left_on='data_transacao'
                                                                                      ,right_on = 'data_emprestimo',how='outer')
df_cruz['data_transacao']=df_cruz[['data_transacao','data_emprestimo']].bfill(axis=1).iloc[:, 0]
df_cruz.drop(['data_emprestimo'],axis=1,inplace=True)
time_cassandra_2 = time.time()

#agora usando pandas para fazer as agregações dentro do jupyter notebook

In [48]:
df_cruz

Unnamed: 0,data_transacao,montante_dep,montante_trsf,montante_emp
0,2021-01-02,300.361237,10218.060028,7700.0
1,2021-01-03,519.614990,3158.029996,
2,2021-01-04,632.922546,5870.800079,
3,2021-01-07,311.113983,1449.199978,
4,2021-01-09,201.752380,5598.689995,
...,...,...,...,...
511,2022-05-23,,3116.699970,
512,2022-05-25,,6487.669968,
513,2022-05-29,,1718.989967,
514,2022-05-30,,5135.519938,


In [49]:
session.shutdown()
cur.close()
client.close()

Agora analisando os tempos que obtivemos para cada situação:

In [50]:
print('Tempo para situação 1 com Postgres:{:3.4f}'.format(tp_1_postgres))  
print('Tempo para situação 2 com Postgres:{:3.4f}'.format(tp_2_postgres))
print('Tempo para situação 1 com MongoDb:{:3.4f}'.format(tp_1_mongodb))
print('Tempo para situação 2 com Cassandra (só o fetch):{:3.4f}'.format(time_cassandra_1-start_time))
print('Tempo para situação 2 após consultar Cassandra e manipulação com pandas:{:3.4f}'.format(time_cassandra_2-time_cassandra_1))
print('Tempo para situação 2 com Cassandra total:{:3.4f}'.format(time_cassandra_2-start_time))


Tempo para situação 1 com Postgres:2.5435
Tempo para situação 2 com Postgres:1.4608
Tempo para situação 1 com MongoDb:8.8588
Tempo para situação 2 com Cassandra (só o fetch):0.8213
Tempo para situação 2 após consultar Cassandra e manipulação com pandas:0.2674
Tempo para situação 2 com Cassandra total:1.0887


Por algum motivo as soluções que escolhemos performou um pouco mais devagar comparado com o postgres, mas provavelmente é por conta do dataset ser pequeno e, não estarmos considerando um cenário em que tivesse muitas requisições de vários clientes. Isso mostra que essa simulação poderia ser aprimorada para um cenário mais real se tivessemos trabalhado com um paralelismo sobre as requisições, mostrando que em um workload pesado os bancos do MongoDB e do Cassandra sairiam melhor. Em particular, o Cassandra foi mais rápido na consulta do que o Postgres, mesmo considerando o tempo total de manipulação com os dataframes.