# Deep Mailing - Preparação de Dados

In [None]:
import pandas as pd
import dateutil.parser as parser
import os.path
import math
import logging
import numpy as np
from multiprocessing import Pool
from datetime import datetime

log_location = "../logs/"
arquivo_chamadas = "../data/mailing_completo.txt.full"
arquivo_df_pickled = "../intermediate/df.pickle"
arquivo_df_pickled_norm = "../intermediate/df.norm.pickle"
arquivo_df_pickled_norm_train = "../intermediate/df.norm.train.pickle"
arquivo_df_pickled_norm_test = "../intermediate/df.norm.test.pickle"

arquivo_df_pickled_norm_train_x = "../intermediate/df.norm.train.x.pickle"
arquivo_df_pickled_norm_train_y = "../intermediate/df.norm.train.y.pickle"
arquivo_df_pickled_norm_test_x = "../intermediate/df.norm.test.x.pickle"
arquivo_df_pickled_norm_test_y = "../intermediate/df.norm.test.y.pickle"

pd.options.display.max_columns = 50
num_partitions = 8
num_cores = 8
Normalizado = False


#Delete Jupyter notebook root logger handler
logger = logging.getLogger()
logger.handlers = []
logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)-15s %(message)s",
                    level=logging.DEBUG,
                    filename=os.path.join(log_location,'prepare_data.log.' + \
                                          datetime.now().strftime("%Y%m%d%H%M%S.%f") + '.log'))



In [None]:
logging.debug("Declarando as funcoes globais")

def IsInt(s):
    try: 
        int(s)
        return True
    except ValueError:
        return False

def IsIntAndGreaterZero(s):
    return IsInt(s) and int(s)>0
    
def IsFloat(s):
    try: 
        float(s)
        return True
    except ValueError:
        return False
    
    
def IsDatetime(s):
    try: 
        parser.parse(s)
        return True
    except ValueError:
        return False
    
def ConverterInt(val):
    val = val.replace(",",".")
    if IsInt(val):
        return int(val)
    else:
        return 0

def ConverterFloat(val):
    val = val.replace(",",".")
    if IsFloat(val):
        return float(val)
    else:
        return 0
    
    
def ConverterData(val):
    if IsDatetime(val):
        return parser.parse(val)
    else:
        return val
    
def func_str(x):
    return str(x)

def func_strip(x):
    return str(x).strip()

def func_start_ALTA(x):
    return str(x).startswith('ALTA')

def IsCSVDataAvailable():
    return os.path.isfile(arquivo_df_pickled)

def IsNormDataAvailable():
    return os.path.isfile(arquivo_df_pickled_norm)

def IsTrainDataAvailable():
    return os.path.isfile(arquivo_df_pickled_norm_train)
    
def IsNumpyArrayDataAvailable():
    return os.path.isfile(arquivo_df_pickled_norm_train_x + ".npy")

def limpar_df(chamadas):
    del chamadas["CPF_CNPJ"]
    del chamadas["PRODUTO"]
    del chamadas["FILA"]
    del chamadas["STATUS_CONTRATO"]
    del chamadas["DETALHE_ORIGEM"]
    del chamadas["TELEFONE"]
    del chamadas["TELRUIM_RENITENCIA"]
    del chamadas["TELRUIM_DISCADOR"]
    del chamadas["OPERADORA"]
    del chamadas["ORIGEM_ULTIMA_ATUALIZACAO"]
    del chamadas["PRIMEIRA_ORIGEM"]
    del chamadas["ATRASO"] 
    del chamadas["VALOR"]
    del chamadas["DT_ENTRADA" ]
    del chamadas["NLOC"]
    del chamadas["SCORE_C"]
    del chamadas["SCORE_E"]
    del chamadas["RENDA"]
    del chamadas["DT_DEVOLUCAO"]
    del chamadas["VLRISCO"]
    del chamadas["SCORE_ZANC_C"]
    del chamadas["SCORE_ZANC_E"]
    del chamadas["SCORE_ZANC"]
    del chamadas["DATA_PRIMEIRA_ORIGEM"]
    del chamadas["DATA_ULTIMA_ATUALIZACAO"]
    del chamadas["ULT_ARQ_BUREAU"]
  
    return chamadas

def parallelize_dataframe(func, data):
    df = data['df']
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    items = list(data.items())
    chunksize = len(data.items())
    chunks = [items[i:i + chunksize ] for i in range(0, len(items), chunksize)]
    df = pd.concat(pool.map(func, chunks))
    pool.close()
    pool.join()
    return df


def func_to_execute_column_str(data):
    data = dict(item for item in data)  # Convert back to a dict
    logging.debug("Creating Binary Column:{} in {}".format(data["col"],  data["source_col"]))
    df = data['df']
    df['NORM_' + data["source_col"] + "_" + data["col"]] = df.apply(lambda row: 1 if func_str(row[data["source_col"]]) == data["col"] else 0, axis=1)
    return df
    
def CreateColumnStr(cols, df, source_col):
    for col in cols:
        df = parallelize_dataframe(func_to_execute_column_str, { "df" : df, "source_col" : source_col, "col" : col})
    return df    





def func_to_execute_column_strip(data):
    data = dict(item for item in data)  # Convert back to a dict
    logging.debug("Creating Binary Column:{} in {}".format(data["col"],  data["source_col"]))
    df = data['df']
    df['NORM_' + data["source_col"] + "_" + data["col"]] = df.apply(lambda row: 1 if func_strip(row[data["source_col"]]) == data["col"] else 0, axis=1)
    return df

def CreateColumnStrip(cols, df, source_col):
    for col in cols:
        df = parallelize_dataframe(func_to_execute_column_strip, { "df" : df, "source_col" : source_col, "col" : col})
    return df    




def func_to_execute_column_ALTA(data):
    data = dict(item for item in data)  # Convert back to a dict
    logging.debug("Creating Binary Column:{} in {}".format(data["col"],  data["source_col"]))
    df = data['df']
    df['NORM_' + data["source_col"] + "_" + data["col"]] = df.apply(lambda row: 1 if func_start_ALTA(row[data["source_col"]]) == data["col"] else 0, axis=1)
    return df
    
def CreateColumnALTA(cols, df, source_col):
    for col in cols:
        df = parallelize_dataframe(func_to_execute_column_ALTA, { "df" : df, "source_col" : source_col, "col" : col})
    return df    




def func_to_execute_numeric(data):
    data = dict(item for item in data)  # Convert back to a dict
    logging.debug("Creating Numeric Column:{} ".format(data["source_col"]))
    df = data['df']
    df['NORM_' + data["source_col"]] = df.apply(lambda row: 1 if IsIntAndGreaterZero(row[data["source_col"]]) else 0, axis=1)
    return df

def CreateLogColumn(df, source_col):
    df = parallelize_dataframe(func_to_execute_numeric, { "df" : df, "source_col" : source_col})
    return df

In [None]:
logging.debug("Declarando os tipos de dados no dataframe")

df_dtypes = {
    "CPF_CNPJ": "object",
    "CARTEIRA": "object",
    "SEGMENTO": "object",
    "PRODUTO": "object",
    "FILA": "object",
    "STATUS_CONTRATO": "object",
    "PROPENSAO": "object",
    "ORIGEM": "object",
    "DETALHE_ORIGEM": "object",
    "STATUS_BUREAU": "object",
    "STATUS_INTERNA": "object",
    "DDD": "object",
    "TELEFONE": "object",
    "TELRUIM_RENITENCIA": "object",
    "TELRUIM_DISCADOR": "object",
    "STATUS_TELEFONE": "object",
    "OPERADORA": "object",
    "ORIGEM_ULTIMA_ATUALIZACAO": "object",
    "PRIMEIRA_ORIGEM": "object"
}

converters = {
    "ATRASO":  ConverterInt,
    "VALOR": ConverterFloat,
    "DT_ENTRADA" : ConverterData,
    "NLOC": ConverterInt,
    "SCORE_C": ConverterInt,
    "SCORE_E": ConverterInt,
    "RENDA": ConverterFloat,
    "DT_DEVOLUCAO": ConverterData,
    "VLRISCO": ConverterFloat,
    "SCORE_ZANC_C": ConverterInt,
    "SCORE_ZANC_E": ConverterInt,
    "SCORE_ZANC": ConverterInt,
    "DATA_PRIMEIRA_ORIGEM": ConverterData,
    "DATA_ULTIMA_ATUALIZACAO": ConverterData,
    "TENTATIVAS": ConverterInt,
    "ULT_ARQ_BUREAU": ConverterData,
    "DATA_MAILING": ConverterData,
    "LIGACOES": ConverterInt,
    "CUP": ConverterInt
}

In [None]:
logging.debug("Carregando dos dados em CSV ou normalizados, dependendo da existencia deles...")
if not IsCSVDataAvailable():
    chamadas = pd.read_csv(arquivo_chamadas, sep="|", dtype=df_dtypes, converters = converters)
    chamadas = limpar_df(chamadas)
    chamadas.to_pickle(arquivo_df_pickled)
else:
    if not IsNormDataAvailable():    
        chamadas = pd.read_pickle(arquivo_df_pickled)
    else:
        chamadas = pd.read_pickle(arquivo_df_pickled_norm)
        Normalizado = True
        
logging.debug("Normalizado:{}".format(Normalizado))        

In [None]:
logging.debug("Calculando Datas minimas e maximas...")
data_maxima_mailing = chamadas.DATA_MAILING.max()
data_minima_mailing = chamadas.DATA_MAILING.min()

print("Max:{} Min:{}".format(data_maxima_mailing, data_minima_mailing))

In [None]:
logging.debug("Normalizando Carteiras...")
if not Normalizado:
    Carteiras = set([x for x in chamadas.CARTEIRA.unique()[:-1] if len(x) == 3])
    chamadas = CreateColumnStr(Carteiras,chamadas, 'CARTEIRA')

In [None]:
logging.debug("Normalizando Segmentos...")
if not Normalizado:
    Segmentos = set([x.strip() for x in chamadas.SEGMENTO.unique()[:-1] if len(x.strip()) == 2])
    chamadas = CreateColumnStrip(Segmentos,chamadas, 'SEGMENTO')

In [None]:
logging.debug("Normalizando Chamadas...")
if not Normalizado:
    Propensao = set([x[:4] for x in chamadas.PROPENSAO.unique() if str(x).startswith("ALTA")])
    chamadas = CreateColumnALTA(Propensao,chamadas, 'PROPENSAO')

In [None]:
logging.debug("Normalizando Origem...")
if not Normalizado:
    Origem = set([x for x in chamadas.ORIGEM.unique()[:-1]])
    chamadas = CreateColumnStr(Origem,chamadas, 'ORIGEM')

In [None]:
logging.debug("Normalizando StatusBureau...")
if not Normalizado:
    StatusBureau = set([str(x) for x in chamadas.STATUS_BUREAU.unique()])
    chamadas = CreateColumnStr(StatusBureau,chamadas, 'STATUS_BUREAU')

In [None]:
logging.debug("Normalizando StatusInterna...")
if not Normalizado:
    StatusInterna = set([str(x) for x in chamadas.STATUS_INTERNA.unique()[:-1]])
    chamadas = CreateColumnStr(StatusInterna,chamadas, 'STATUS_INTERNA')

In [None]:
logging.debug("Normalizando Telefone...")
if not Normalizado:
    StatusTelefone = set([str(x) for x in chamadas.STATUS_TELEFONE.unique()[:-1]])
    chamadas = CreateColumnStr(StatusTelefone,chamadas, 'STATUS_TELEFONE')

In [None]:
logging.debug("Normalizando DDD...")
if not Normalizado:
    DDD = set([str(x) for x in chamadas.DDD.unique()[:-1]])
    chamadas = CreateColumnStr(DDD,chamadas, 'DDD')

In [None]:
logging.debug("Normalizando LIGACOES E TAL...")
if not Normalizado:
    chamadas = CreateLogColumn(chamadas,'TENTATIVAS')
    chamadas = CreateLogColumn(chamadas,'LIGACOES')
    chamadas = CreateLogColumn(chamadas,'CUP')

In [None]:
logging.debug("Removendo Campos desnecessarios e pickling...")
if not Normalizado:
    del chamadas['NUMERO']
    del chamadas['TENTATIVAS']
    del chamadas['LIGACOES']
    del chamadas['CUP']
    del chamadas['DDD']
    del chamadas['STATUS_TELEFONE']
    del chamadas['STATUS_INTERNA']
    del chamadas['STATUS_BUREAU']
    del chamadas['ORIGEM']
    del chamadas['SEGMENTO']
    del chamadas['PROPENSAO']
    del chamadas['CARTEIRA']
    chamadas.to_pickle(arquivo_df_pickled_norm)
