In [225]:
import pandas as pd
import numpy as np
import import_ipynb
import utils_cdt as cdtsql
from datetime import datetime
from datetime import date
from sklearn import metrics

import matplotlib.pyplot as plt
import time
import pickle
from pyspark.sql import SparkSession, Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.storagelevel import StorageLevel 

In [445]:
datetime.today().strftime('%Y-%m-%d')

'2020-04-16'

In [290]:
infoDataBases = {"ip":"","base":""}
dia_corte = 21

In [227]:
def cache(self): 
    """ 
    Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}). 
    """ 
    self.is_cached = True 
    self.persist(StorageLevel.MEMORY_ONLY_SER) 
    return self 
spark = SparkSession \
    .builder \
    .appName("Modelo - Spark") \
    .config("spark.executor.memory", "4gb") \
    .config("spark.driver.host","127.0.0.1") \
    .getOrCreate()

sc = spark.sparkContext
sc = SQLContext(sc)

In [448]:
arquivos = ["../data/eventosexternos/202001/eventosexternos.parquet",
                                                      "../data/eventosexternos/202002/eventosexternos.parquet",
                                                       "../data/eventosexternos/202003/eventosexternos.parquet"]


df_eventosexternos = pd.concat([pd.read_parquet(f) for f in arquivos], ignore_index = True)

In [449]:
df_eventosexternos.head(2)

Unnamed: 0,NomeOperacao,Id_Operacao,Id_Conta,DataOrigem,DataEntrada,DataMovimento,ValorParcela,ResponsavelInclusao,Status,Complemento,Id_Autorizacao,DataVencimentoPadrao,Valor
0,Debito Bolsa Protegida,10536,10085344,2020-01-20 22:15:00,2020-01-20 22:37:00,2020-01-20,6.99,SSA,2,007/060,,11/02/2020,6.99
1,Debito Proteção Financeira,10538,11732055,2020-01-20 22:15:00,2020-01-20 22:37:00,2020-01-20,6.99,SSA,2,007/120,,25/02/2020,6.99


In [450]:
dados_eventos = df_eventosexternos.groupby(['Id_Operacao','NomeOperacao','DataVencimentoPadrao']).agg({'Valor':['sum','mean','std']})

dados_eventos = dados_eventos.reset_index()

dados_eventos = dados_eventos.rename(columns={"sum": "valortotal", "mean": "media", "std": "desviopadrao"})
dados_eventos['DataVencimentoPadrao'] = pd.to_datetime(dados_eventos.DataVencimentoPadrao, format="%d/%m/%Y")
dados_eventos['dia'] = dados_eventos.DataVencimentoPadrao.dt.day
dados_eventos['mes'] = dados_eventos.DataVencimentoPadrao.dt.month
dados_eventos['ano'] = dados_eventos.DataVencimentoPadrao.dt.year
dados_eventos = dados_eventos.loc[dados_eventos.dia == dia_corte]
dados_eventos.columns = dados_eventos.columns.droplevel()
dados_eventos.columns = ['Id_Operacao','NomeOperacao','DataVencimentoPadrao','ValorTotal','Media','DesvioPadrao','dia','mes','ano']

In [383]:
dados_eventos.head(2)

Unnamed: 0,Id_Operacao,NomeOperacao,DataVencimentoPadrao,ValorTotal,Media,DesvioPadrao,dia,mes,ano
16,10415,Debito Seguro Novo Cartao Protegido,2020-01-21,37115.8,2.557594,1.241152,21,1,2020
17,10415,Debito Seguro Novo Cartao Protegido,2020-02-21,36222.4,2.555192,1.239519,21,2,2020


In [417]:
colunas = ['Id_Operacao','NomeOperacao','ValorTotal']
total_eventos = dados_eventos[colunas]
total_eventos = total_eventos.groupby(['Id_Operacao','NomeOperacao']).agg({'ValorTotal':['sum','mean','std']})
total_eventos = total_eventos.reset_index()
total_eventos.columns = total_eventos.columns.droplevel()
colunas = ['Id_Operacao','NomeOperacao','ValorOperacao','mediaOperacao','desviopadraoOperacao']
total_eventos.columns = colunas
total_eventos.head(2)

Unnamed: 0,Id_Operacao,NomeOperacao,ValorOperacao,mediaOperacao,desviopadraoOperacao
0,10415,Debito Seguro Novo Cartao Protegido,108756.1,27189.025,18116.636243
1,10416,Debito Seguro Cartao Protegido,13025.4,3256.35,2170.650155


In [432]:
totalgeral = dados_eventos.merge(total_eventos, left_on='Id_Operacao', right_on='Id_Operacao')
totalgeral['zscore'] = (totalgeral['ValorTotal'] - totalgeral['mediaOperacao']) / totalgeral['desviopadraoOperacao']
colunas = list(totalgeral.columns[0:9])
colunas.append('zscore')
totalgeral = totalgeral[colunas]
totalgeral = totalgeral.loc[totalgeral.zscore <= -1] & (totalgeral.mes == vencimento.month)]

In [438]:
#obtem as operações que estão com baixo score. 
operacoes = totalgeral.Id_Operacao.values

In [447]:
# Lista todos os dados dessas operações de baixo score
totalgeral = totalgeral[totalgeral.Id_Operacao.isin(operacoes)].sort_values(['Id_Operacao','mes'])

Unnamed: 0,Id_Operacao,NomeOperacao,DataVencimentoPadrao,ValorTotal,Media,DesvioPadrao,dia,mes,ano
16,10415,Debito Seguro Novo Cartao Protegido,2020-01-21,37115.8,2.557594,1.241152,21,1,2020
17,10415,Debito Seguro Novo Cartao Protegido,2020-02-21,36222.4,2.555192,1.239519,21,2,2020
18,10415,Debito Seguro Novo Cartao Protegido,2020-03-21,35383.1,2.547563,1.234285,21,3,2020


In [None]:
# Compara total de diferença entre vencimento atual com as oeprações de zscore baixo e vencimento anterior. 
valorbaixo = dfzscorebaixo[['Id_Operacao','DataVencimentoPadrao','ValorTotal',"zscore"]]
valoranterior = totalgeral[['Id_Operacao','DataVencimentoPadrao','ValorTotal',"zscore"]].loc[totalgeral.DataVencimentoPadrao == str(lessmonthvenc)]
diff_atual_anterior = valorbaixo.join(valoranterior.set_index("Id_Operacao"), on="Id_Operacao", lsuffix='_Atual', rsuffix='_Anterior')
diff_atual_anterior['diferenca'] = diff_atual_anterior['ValorTotal_Atual'] - diff_atual_anterior['ValorTotal_Anterior']