## Antes de começarmos...

Algumas considerações sobre esta demonstração:
- Para simplificar os steps desse notebook, utilizarei Materialized Views para as queries do Cassandra. Essa feature ainda é experimental, mas evita a necessidade de mais boilerplate para atualizar várias tabelas ao mesmo tempo.
- Campos como buyerId, sellerId e outros Id's foram reduzidos para dois digitos "##", assim conseguimos uma boa amostragem de disputas (varias disputas para o mesmo seller, buyer, item, etc) sem precisar popular os bd's com muitas disputas
- Algumas demonstrações contam com código assíncrono, como as estatísticas do Redis. Dependendo do ambiente Binder, isso pode quebrar o kernel.

### Inicializando e checando o ecossistema de databases

In [41]:
#mongodb
!mongo --quiet --eval 'rs.initiate();'
!mongo --quiet --eval 'rs.isMaster().ismaster'

#redis
!redis-stack-server --daemonize yes
!redis-cli ping

#cassandra
!cqlsh --execute="SELECT host_id, bootstrapped FROM system.local;"
!cqlsh --file=./cassandra/initialize.cql
!cqlsh --execute='DESC keyspaces;'

{
	"operationTime" : Timestamp(1689825681, 1),
	"ok" : 0,
	"errmsg" : "already initialized",
	"code" : 23,
	"codeName" : "AlreadyInitialized",
	"$clusterTime" : {
		"clusterTime" : Timestamp(1689825681, 1),
		"signature" : {
			"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
			"keyId" : NumberLong(0)
		}
	}
}
true
820:C 20 Jul 2023 04:01:24.480 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
820:C 20 Jul 2023 04:01:24.480 # Redis version=6.0.5, bits=64, commit=660000e3, modified=1, pid=820, just started
820:C 20 Jul 2023 04:01:24.480 # Configuration loaded
PONG

 [0;1;35mhost_id[0m                              | [0;1;35mbootstrapped[0m
--------------------------------------+--------------
 [0;1;32m0960adc7-07c9-40f7-a4f6-945f61b635c4[0m |    [0;1;33mCOMPLETED[0m

(1 rows)
./cassandra/initialize.cql:6:AlreadyExists: Keyspace 'disputes' already exists
./cassandra/initialize.cql:13:InvalidRequest: Error from server: code=2200 [Invalid query] message="A user type with name 'at

### Inicializando modulos, libs e variaveis

In [43]:
from pymongo import MongoClient
from pprintpp import pprint
import warnings
import json
from cassandra.cluster import Cluster
from libs.dispute_faker import generate_dispute
import redis

warnings.filterwarnings('ignore')


# Decoder personalizado para Cassandra
def dict_cleaner(items):
    result = {}
    for key, value in items:
        if value is None and key not in ['lastRound', 'counterProposal']:
            value = ''
        if 'gmt' in key:
            try:
                value = str(value*1000)
            except TypeError:
                value = None
        result[key] = value
    return result


mongoclient = MongoClient('localhost', 27017)
dispute_db = mongoclient["disputes"]
print(dispute_db)

cluster = Cluster()
cassandra = cluster.connect('disputes')
pprint(cassandra.execute('DESC tables;').all())

redispute = redis.Redis(host='localhost', port=6379, decode_responses=True)
redispute.ping()

Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'disputes')
[
    Row(keyspace_name='disputes', type='table', name='disputes'),
    Row(keyspace_name='disputes', type='table', name='disputes_changelog'),
]


True

### Populando os BD's com faker
Todas as disputas inseridas seguem uma lógica próxima da real graças ao uso do faker. A forma como as disputas estão sendo geradas pode ser consultada aqui [dispute_faker.py](./libs/dispute_faker.py)

In [None]:
inserted = dispute_db['disputes'].insert_many([generate_dispute() for _ in range(2000)])
pprint(len(inserted.inserted_ids))

In [None]:
query = dispute_db['disputes'].find({})
for dispute in query:
    dispute['id'] = str(dispute['_id'])
    dispute['arbitratorId'] = str(dispute['resolution']['arbitratorId']) if not None else ''
    dispute['decision'] = str(dispute['resolution']['decision']) if not None else ''
    dispute['status'] = str(dispute['resolution']['status'])
    dispute['itemId'] = str(dispute['item']['itemId'])
    dispute['skuId'] = str(dispute['item']['skuId'])
    dispute['id'] = str(dispute['_id'])
    dispute.pop('_id')
    dispute = json.dumps(dispute)
    cleaned = json.dumps(json.loads(dispute, object_pairs_hook=dict_cleaner))
    cassandra.execute(f"INSERT INTO disputes JSON'{cleaned}'").all()
pprint(cassandra.execute(f"SELECT COUNT(*) from disputes")[0].count)

### Cassandra - Exemplo 1 - Controle de qualidade
Após uma longa e complicada disputa envolvendo o produto '23', a equipe de controle de qualidade foi acionada para analisar se o produto vem gerando disputas corriqueiramente e quais sao as alegações dos compradores. A equipe reuniu relatos de todos os compradores que abriram uma disputa para o item '23', para montar um dossiê.

In [56]:
cql_query = '''
    SELECT
      toDate(gmtCreated) as date, buyerId, disputeReason as reason, resolution.firstround.initialproposal.richTextReport as claimings
    FROM
      disputes.disputes_by_item_and_date
    WHERE
      gmtCreated >= '2021-01-01'
    AND 
      gmtCreated <= '2023-12-31'
    AND itemId = '23'
    ALLOW FILTERING
'''

!cqlsh --execute="{cql_query}"


 [0;1;35mdate[0m       | [0;1;35mbuyerid[0m | [0;1;35mreason[0m                     | [0;1;35mclaimings[0m
------------+---------+----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [0;1;32m2022-08-09[0m |      [0;1;33m83[0m |           [0;1;33mMISSING_QUANTITY[0m |                                                                         [0;1;33mStart carry for air determine opportunity condition fight officer cell cell important catch oil exist.[0m
 [0;1;32m2022-07-25[0m |      [0;1;33m98[0m | [0;1;33mITEM_NOT_MATCH_DESCRIPTION[0m |                                                                                   [0;1;33mNew ago capital process fact drug each commercial magazine himself phone draw they politics.[0m
 [0;1;32m2022-04-28[0m |      [0;1;33m83[0m |           [0;1;33mMISSING_QUANTITY[0m |    

### Cassandra - Exemplo 2 - Análise de fraude
A empresa PegaLadrao Inc., contratada para análise de fraude, recebeu uma denúncia de que o usuário 48 abre disputas de forma fraudulenta. Com acesso ao banco analítico, a empresa consultou todas as disputas abertas pelo usuário no ano passado.

PS: Como os dados são gerados com uso de faker, é possível que o usuário 48 não possua disputas registradas, nesse caso, escolher um número entre 01 e 99.

In [54]:
cql_query = '''
    SELECT
      toDate(gmtCreated) as date, disputeReason, resolution.firstround.initialProposal.proposedSolution as proposedSolution, status, decision, sellerId
    FROM
      disputes.disputes_by_buyer
    WHERE
      gmtCreated >= '2022-01-01'
    AND 
      gmtCreated <= '2022-12-31'
    AND buyerId = '48'
    ALLOW FILTERING
'''

!cqlsh --execute="{cql_query}"


 [0;1;35mdate[0m       | [0;1;35mdisputereason[0m              | [0;1;35mproposedsolution[0m      | [0;1;35mstatus[0m             | [0;1;35mdecision[0m              | [0;1;35msellerid[0m
------------+----------------------------+-----------------------+--------------------+-----------------------+----------
 [0;1;32m2022-07-27[0m |                      [0;1;33mOTHER[0m |        [0;1;33mPARTIAL_REFUND[0m |         [0;1;33mARBITRATED[0m | [0;1;33mSHIP_MISSING_QUANTITY[0m |       [0;1;33m52[0m
 [0;1;32m2022-06-03[0m |           [0;1;33mMISSING_QUANTITY[0m |           [0;1;33mFULL_REFUND[0m | [0;1;33mWAITING_FOR_SELLER[0m |                  [0;1;33mNone[0m |       [0;1;33m58[0m
 [0;1;32m2022-12-07[0m |          [0;1;33mITEM_NOT_RECEIVED[0m |         [0;1;33mSHIP_NEW_ITEM[0m |  [0;1;33mREACHED_AGREEMENT[0m | [0;1;33mSHIP_MISSING_QUANTITY[0m |       [0;1;33m91[0m
 [0;1;32m2022-07-02[0m |                    [0;1;33mDAMAGED[0m |          [0

In [None]:
# Esta cell irá "spawnar" outro interpretador CPython, nem sempre isso será estável
import multiprocessing, time

def background_watcher():
    while True:
        dispute_db['disputes'].watch()
        feed_cassandra()
        feed_redis_statistics()
        
def feed_cassandra():
    pass

def feed_redis_statistics():
    pass
    
multiprocessing.Process(target=network_call).start()