Script de update datos Cassandra en cluster multidomain

In [None]:
!pip install mysql-connector==2.1.7
!pip install pandas
!pip install sqlalchemy
#requiere instalación adicional, consultar https://github.com/PyMySQL/mysqlclient
!pip install mysqlclient
!pip install numpy
!pip install pymysql



In [None]:
import pandas as pd
import numpy as np
import os
import json
import random
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import time
from pprint import pprint
import psutil
import uuid
from cassandra.query import tuple_factory
from cassandra.query import dict_factory
from cassandra.query import BatchStatement, SimpleStatement
from cassandra.policies import RetryPolicy

In [None]:
#Los resultados de medidas de tiempo en carga por dominios se almacenan en estos objetos.
#Se itera durante 100 iteraciones para sacar medias
#repeticiones
repeats = 100

In [None]:
#Ficheros de salida
resultados_etl_update = '../Results/Cassandra/CassandraUpdate_test_{}.csv'

In [None]:
def save_results_to_csv(results,file):
    #Guardamos los resultados en csv
    from datetime import datetime
    
    csv_df = pd.DataFrame(results, columns=['Registros', 'Tiempo', 'CPU','Memoria'])
    dia = datetime.now().strftime("%d%m%Y_%H_%M_%S")
    print(file.format(str(dia)))
    csv_df.to_csv(file.format(str(dia)))

In [None]:
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import WhiteListRoundRobinPolicy, DowngradingConsistencyRetryPolicy
from cassandra.query import tuple_factory
from cassandra import ConsistencyLevel

profile = ExecutionProfile(
    load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1']),
    retry_policy=DowngradingConsistencyRetryPolicy(),
    consistency_level=ConsistencyLevel.ALL,
    serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
    request_timeout=3600,
    row_factory=tuple_factory
)
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile})
session = cluster.connect()
print(session.execute("SELECT release_version FROM system.local").one())

session.execute('USE currentaccountkeyspace')

# Select test multidomain

In [None]:
#Obtenemos los posibles valores de pais. Se iterará por ellos para cambiar en bucle los registros
session.execute('USE positionkeepingkeyspace')
country_list= ['SPA', 'GBP', 'IND']
print(len(country_list))

In [None]:
random.choice(country_list)

In [None]:
update_query_sql = """UPDATE PositionKeepingDomainSchema.Amount a 
INNER JOIN PositionKeepingDomainSchema.PositionKeeping pk ON pk.AmountId = a.AmountId
INNER JOIN CurrentAccountDomainSchema.CurrentAccount ca ON ca.AccountId = pk.AccountId
INNER JOIN CurrentAccountDomainSchema.AccountInfo ai ON ai.AccountId = ca.AccountId
SET a.CurrencyId = (SELECT CurrencyId FROM PositionKeepingDomainSchema.Currency WHERE Code = '{}')
WHERE ca.Status = 'Enabled' AND ai.SchemeName LIKE 'UK.%'"""#.format(random.choice(country_list))

In [None]:
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from cassandra import concurrent

registers = []
batch = BatchStatement(consistency_level=ConsistencyLevel.ALL)
account_id_list = []

#Cargas Masiva con Many
def updateAmountCurrencyWhenAccountIsEnabledAndSchemeNameIsFromUK(currency_code):
    
    UPDATE_STMT = """UPDATE positionkeepingkeyspace.positionkeeping set creditLine_included = %s where accountid = %s and amount_currency_code = %s and credit_line_currency_code = %s"""
    
    #Select Currents Account enabled
    SELECT_CURRENTACCOUNT_STMT = """SELECT accountid FROM currentaccountkeyspace.currentaccount WHERE status = 'Enabled' ALLOW FILTERING;"""            
    SELECT_SCHEMENAME_STMT = """SELECT accountid FROM CurrentAccountKeySpace.CurrentAccountbyschemename WHERE schemename LIKE 'UK.%';"""    
    
    session = cluster.connect('customerprofilekeyspace')
    iter = 0;
    i = 1
        
    for i in range(0,repeats): 
                
        time_inicial = time.time()        
        
        enabled_Accounts = []
        uk_scheme_accounts = []
        
        #print(SELECT_CURRENTACCOUNT_STMT)
        result_ca_enabled = session.execute(SELECT_CURRENTACCOUNT_STMT)
        for accountid in result_ca_enabled:
            enabled_Accounts.append(accountid[0])
        #print("enabled_Accounts len:", len(enabled_Accounts))
        
        #print(SELECT_SCHEMENAME_STMT)
        result_byscheme = session.execute(SELECT_SCHEMENAME_STMT)
        for accountid in result_byscheme:
            uk_scheme_accounts.append(accountid[0])
        #print("uk_scheme_accounts len:", len(uk_scheme_accounts))
        
        #Obtenemos las cuentas que están en ambas listas
        account_to_update = set(enabled_Accounts).intersection(uk_scheme_accounts)
        #print("account_to_update intersection len:", len(account_to_update))
        
        for accountid in account_to_update:
            #print(UPDATE_STMT)    
            session.execute(UPDATE_STMT,(True, accountid,currency_code,currency_code))
            time_final = time.time()           
            data_time_collection = round(time_final - time_inicial,3)
            used_cpu = psutil.cpu_percent()
            mem_used = psutil.virtual_memory().percent
            registers.append((iter,data_time_collection,used_cpu,mem_used))
            print((iter,data_time_collection,used_cpu,mem_used))
            iter += 1;
            time_inicial = time.time()
        i = i + 1
        
    return registers

In [None]:
registers = updateAmountCurrencyWhenAccountIsEnabledAndSchemeNameIsFromUK(random.choice(country_list))

In [None]:
#Guardamos los resultados Customer Profile
save_results_to_csv(registers,resultados_etl_update)

In [None]:
cluster.shutdown()
print('Conexion cerrada')