# Enterprise Data Cube

Notebook to build the enterprise data cube. The results will be written in the "/data/dandboxes/sfma/data/cma/edc" (PROJECT_FOLDER) folder. Two main subfolders are important in this regard:

* PROJECT_FOLDER/<TAG_DATA>: where the final results of the process are persisted.
* PROJECT_FOLDER/TMP/<TAG_DATA>: where the temporal results are persisted.

# TODO

* Check values for variable 'risk_priority_id', class RiesgosBecDatioData. When not all '0s', replace class 'RiesgosBecData' by 'RiesgosBecDatioData'. Revisar valores también de 'risk_hat_id' (risk_qualification)

# Variable Definition

In order to run this process, **YOU MUST DEFINE THE FOLLOWING VARIABLES:**

In [180]:
FIRST_CLOSING_DATE = '2022-12-31'
LAST_CLOSING_DATE = "2022-12-31"
VERSION = 7
TAG_DATA = "test" + str(VERSION)
UPDATE_SCHEMA = 0
OVERWRITE_CUBE = 1
CHECK_TMP = 0

where:

* FIRST_CLOSING_DATE: first closing date considered in the process.
* LAST_CLOSING_DATE: last closing date considered in the process.
* VERSION: number of cube version
* TAG_DATA: defines the subfolders where the data.
* UPDATE_SCHEMA: '1' generate new schema with this cube version
* OVERWRITE_CUBE: '1' if exist cube for these dates, will overwrite then
* CHECK_TMP: '1' if check temporal files and delete some of them following configuration

The process will calculate all the non-existing partitions ranging from the first to the last closing date.

First, this partitions will be extracted. After that, the dependencies for that partitions are checked. If all the dependencies are ok, the process will be further executed. If not, the process is stoped.

# Delete previous temporal persistencies

In [2]:
del_temp = True
# Poner a False si no se quieren borrar las rutas temporales previas

In [3]:
def check_path(path):
    
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
    
    return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))
def delete_hdfs_path(path):
    
    if check_path(path):
        fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
        fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path))

In [4]:
if del_temp:
    del_path = "/data/sandboxes/sfma/data/cma/edc/temp/test7"
    delete_hdfs_path(del_path)

# Configuration

Internal configuration of the process.

In [5]:
%load_ext autoreload
%autoreload 2

In [6]:
# info about autoreload
# import autoreload
# ?autoreload

In [7]:
sys.path.insert(0, os.getcwd().replace("Processes/edc", "Utils"))

In [8]:
import cmautils
from cmautils import read_table_v2, get_closing_dates_v2, get_normalize_partition
from cmautils import get_closest_partition
from cmautils import add_months, replace_col_vals, format_id_col
from cmautils import TABLES_ROOTPATH_TRANSLATION, TABLES_NAMES_TRANSLATION
# from tables import TABLES as t
from cmautils_v2 import HdfsUtils, DateUtils, ExpressionUtils
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
import os
import numpy as np
import datetime
import pandas as pd
from functools import reduce
from datetime import date, datetime, timedelta

In [9]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Utilidades propuestas por capacitación
# import tools
# tools.main(spark)

# import catalogs
# catalogs.main(spark)

In [10]:
#sc.addPyFile(os.getcwd().replace("Processes/edc", "Utils/cmautils.py"))

In [11]:
SANDBOX_PATH = "/data/sandboxes/sfma/data"
PROJECT_PATH = os.path.join(SANDBOX_PATH, "cma/edc")
TMP_PATH = os.path.join(PROJECT_PATH, "tmp")
path_out = "/data/sandboxes/sfma/data/cma/engines/edc/stable/"

In [12]:
new_fields = {'test7':[]}

In [13]:
hu = HdfsUtils(spark)
du = DateUtils()
eu = ExpressionUtils()

## Tablas necesarias
Orígenes de datos necesarios para la generación del cubo

In [14]:
# Tables to be checked
TABLES_TO_CHECK = ["vsfmabjc", "vsfmabjd", "vsfmaiii",
                   "vsfmacgm", "vsfmahom", "vsfmartk",
                   "vsfmahca", "vsfmapvv", "vsfmapra", 
                   "vsfmapoa", "becriesgos", #"vsfmahc2","vsfmahmp",
                    "cirbe",#"segmult_datio",  #"app",
#                    "net3de3", "net1de3", #"plan1", 
#                    "app_tag", "web", "web_tag",
#                   'sessapp','sessweb','sessatm','sessagent',
#                   'tagsapp','tagsweb','tagsatm',
#                    "atm", "atm_tag", "branch", "branch_tag", 
                   "bal_ext", 'vsfmahss', 'vsfmapob',
                   'kdconcm', 'kdcouge', 
                   #'numprods', Lo quito que no parece usarse
                   "eclcscore", "vsfmapsb", "oneview", 
                   "altasbajas", "vsfmahtr", "vsfmahar",
                   'ecegprods', #'bibasic',  Inlucyo ecegprods y quito bibasic que está pendiente de cambio
                   #'transf_ext',
                   'vsfmapxc','vsfmacgr','universo_ind',
                   # 'ecrkrisk', quito su busqueda, no parece usarse
                   'eclccustunit','eclccustm','po_bi',#"vsfmanfc",
                   'kdcomta','kdcotpt', 'emoltransfext',
                   'emolexch','digi_metrics']

In [15]:
daily_tables = []
# for d in hu.get_tables_by_type('d'):
for d in hu.get_tables_daily():
    if (d in TABLES_TO_CHECK):
        daily_tables.append(d)

In [16]:
# tables are possible closest closing_date when closing_date is not the same
TABLES_CLOSEST_DATE = daily_tables + ['bal_ext', 'ecegprods']
TABLES_CLOSEST_DATE = daily_tables + ['bal_ext'] # ecegoprods ya se carga bien
TABLES_CLOSEST_DATE = daily_tables + ['bal_ext', 'bibasic', 'ecrkrisk','segmult_datio', 'ecegprods'] # Pendiente actualizar tabla y campos productivos. Incluyo ecrkrisk que no parece estar nunca a tiempo
TABLES_EXCEPTIONS = ['bal_ext']
TABLES_CLOSEST_DATE

['emoltransfext',
 'emolexch',
 'kdcotpt',
 'bal_ext',
 'bibasic',
 'ecrkrisk',
 'segmult_datio',
 'ecegprods']

In [17]:
def get_path (tmp: str, date:str, key:str)->str:
    return os.path.join(tmp, key+"/closing_date={}".format(date))

Relación de dataframes con su ruta temporal, clase y origenes de datos

In [18]:
dict_class = {'base_ini':{'tmp':'base_ind',
                          'class':'BaseData',
                          'tables':['vfmacgm']},
              
              'margen':{'tmp':'margen',
                          'class':'MargenData',
                          'tables':['vsfmartk']},
              
              'ingresos_data':{'tmp':'ingresos',
                          'class':'IngresosData',
                             'tables':['vsfmapoa']},
              
              'riesgos_data':{'tmp':'riesgos',
                          'class':'RiesgosData',
                          'tables':['ecrkrisk']},
              
              'cirbe_data':{'tmp':'cirbe',
                          'class':'CirbeData',
                            'tables':['cirbe']},
              
              'bec_riesgos_data':{'tmp':'bec_riesgos',
                          'class':'RiesgosBecData',
                          'tables':['becriesgos']},
                            
              'sessions_data':{'tmp':'sessions',
                          'class':'Sessionization',
                            'tables':["sessapp","sessatm",
                                        "sessagent","sessweb",
                                        "tagsapp","tagsweb",
                                        "tagsatm","tagsagent"]},
                            
              'digi_data':{'tmp':'digi',
                          'class':['DigiData'],
                              'tables':['segmult_datio','digi_metrics']},
              
              'digi_data_tact':{'tmp':'digi',
                          'class':['DigiData_biupload'],
                              'tables':['net1de3','net3de3',
                                        'vsfmanfc','segmult']},
                            
              'plan1_data':{'tmp':'plan1',
                          'class':['PlanUno'],
                              'tables':['vsfmadigi', "vsfmapob", "vsfmaiii",
                                        "vsfmabjc", 'vsfmacgm','vsfmardc',"vsfmabjd",
                                        "plan1", "vsfmahar", "vsfmapem",
                                        'ecog_hierarchy']},
              
             'plan1_data_tact':{'tmp':'plan1',
                          'class':['PlanUnoTactico'],
                              'tables':['plan1_tactico']},
                            
              'balance_data':{'tmp':'balances',
                          'class':'BalanceData',
                              'tables':['bal_ext']},
                            
              'segsociales_data':{'tmp':'segsociales',
                          'class':'CesionSegurosSociales',
                              'tables':["vsfmahss","vsfmapob"]},
                            
              'lisboa_data':{'tmp':'lisboa',
                          'class':'LisboaInd',
                              'tables':["kdcomta","kdcotpt",
                                        "kdconcm","kdcouge",
                                        'po_bi','eclccustm']},
                            
              'numprods_data':{'tmp':'numprods',
                          'class':'BiNumProds',
                              'tables':['ecegprods']},
              
               'cnae_data':{'tmp':'cnae',
                          'class':'CNAE',
                              'tables':['eclccustm','eclccustunit',
                                        'po_bi']},
              
              'clickpay_data':{'tmp':'clickpay',
                          'class':'ClickPayData',
                              'tables':['vsfmaiii']},
              
              'rating_data':{'tmp':'rating',
                          'class':'RatingData',
                              'tables':['eclcscore']},
              
              'comex_data':{'tmp':'comex',
                          'class':'ComexData',
                              'tables':['emoltransfext','eclccustm',
                                        'emolexch','bibasic']},
              
              'altas_data':{'tmp':'altas',
                          'class':'AltasData',
                              'tables':['altasbajas']},
              
              'rar_data':{'tmp':'rar12',
                          'class':'RarData',
                              'tables':['vsfmartk']},
              
              'mora_data':{'tmp':'mora',
                          'class':'MoraData',
                              'tables':['vsfmacgt','vsfmapsb', 'vsfmaiii']},
              
              'oneview_data':{'tmp':'oneview',
                          'class':'OneViewData',
                              'tables':['oneview']},
              
              'becnprods_data':{'tmp':'becnprods',
                          'class':'NumProdsData',
                              'tables':['vsfmaiii', "vsfmahtr", "vsfmahar"]},
              
              'firma_data':{'tmp':'riesgo_firma',
                          'class':'FirmaData',
                              'tables':['vsfmacgt']},
              
              'broker_data':{'tmp':'broker',
                          'class':'BrokerData',
                              'tables':['vsfmapxc']},
              
              'group_data':{'tmp':'business_group',
                          'class':'GroupData',
                              'tables':['vsfmacgr']},
              
              'universo_data':{'tmp':'universo_ind',
                          'class':'UniversoData',
                              'tables':['universo_ind']},
              
              't_debito_part':{'tmp':'deb_part',
                          'class':'Tarjetas',
                              'tables':["bjd", "bjc"]},
              
              't_debito_empr':{'tmp':'deb_empr',
                          'class':'Tarjetas',
                              'tables':["bjd", "bjc"]},
                      
             't_credito_part':{'tmp':'cre_part',
                          'class':'Tarjetas',
                              'tables':["bjd", "bjc"]},
              
             't_credito_empr':{'tmp':'cre_empr',
                          'class':'Tarjetas',
                              'tables':["bjd", "bjc"]},
              
              'evol':{'tmp':'evol',
                          'class':'EvolData',
                              'tables':[]},
              
              'valores':{'tmp':'valores',
                          'class':'RentaValores',
                              'tables':['vsfmapvv']},
              
              'means':{'tmp':'means',
                          'class':'MeansData',
                              'tables':[]},
              
              'cgm_imp':{'tmp':'cgm_imp',
                          'class':'CGMData',
                              'tables':['vsfmacgm']},
              
              'base_ind':{'tmp':'base_ind',
                          'class':'GenTenData',
                              'tables':['vsfmaiii']},
              
              'dis':{'tmp':'dis',
                          'class':'DescComercial',
                              'tables':['vsfmaiii','vsfmahca']},
              
              'ins':{'tmp':'ins',
                          'class':'SegurosData',
                              'tables':['bibasic']},
              
              'pra':{'tmp':'pra',
                          'class':'PraData',
                              'tables':['vsfmapra','vsfmaiii']},
              
              'cgm_ind':{'tmp':'cgm_ind',
                          'class':'CGMData',
                              'tables':['vsfmacgm']},
              
              'tpv':{'tmp':'tpv',
                          'class':'BalanceTPV',
                              'tables':['vsfmahom']},
             }

In [19]:
def get_edc_clases() -> list:
    clases = []
    for cl in dict_class:
        clase = dict_class[cl]['class']
        if(type(clase)==list):
            for c in clase:
                if(c not in clases):            
                    clases.append(c)
        else:
            if(clase not in clases):            
                clases.append(clase)
    return clases    

## Ficheros temporales a borrar
Por modificación en la salida o cálculo de las variables, no valen las persistencias temporales de versiones anteriores

In [20]:
DEL_FILES = ['balance_data','ins','sessions_data','cgm_ind',
             'cgm_imp','base_ind']
DEL_FILES_EXCEPT = ['lisboa_data','digi_data','plan1_data']

dict_deleted= {'2018-12-31':{ 'mayor o igual': DEL_FILES + DEL_FILES_EXCEPT,
                             'menor': DEL_FILES}
              }               

In [21]:
def get_deleted_files(pr_tmp:str,tag:str,date:str)->list:
    paths=[]
    tmp_path = os.path.join(PROJECT_PATH, "tmp/{0}".format(TAG_DATA))
    fecha_lim = list(dict_deleted.keys())[0]
    if(date<fecha_lim):
        clases =  dict_deleted[fecha_lim]['menor']
    else:
        clases =  dict_deleted[fecha_lim]['mayor o igual'] 
    return [get_path(tmp_path,date,dict_class[cl]['tmp']) for cl in clases]

In [22]:
def check_deleted_files (pr_tmp:str,tag:str,dates:list):
    if(CHECK_TMP==1):
        paths=[]
        l_clases = []
        tmp_path = os.path.join(PROJECT_PATH, "tmp/{0}".format(TAG_DATA))
        fecha_lim = list(dict_deleted.keys())[0]
        for date in dates:
            if(date<fecha_lim):
                clases =  dict_deleted[fecha_lim]['menor']
            else:
                clases =  dict_deleted[fecha_lim]['mayor o igual'] 
            [l_clases.append(c) for c in clases if(c not in l_clases)]
        if(len(l_clases)>0):
            print('Ficheros TMP a borrar para las fechas ',dates,': ')
            print('***** PATH_TMP: '+tmp_path)
            [print('/'+dict_class[c]['tmp']) for c in l_clases]
#             [print(c,': /',dict_class[c]['tmp']) for c in l_clases]
        else:
            print('No se borran ficheros temporales')
    else:
        print('No está activo el control de borrado de temporales CHECK_TMP')
    

# Definition of Auxiliary Classes

In [23]:
class Filters:
    """
    Common filters for used in different processses.
    """
    
    def __init__(self):
        
        # general filters
        # Este filtro cambia a partir de Enero 2022, antes el comentado abajo:
        self.pymes_empresas = (F.trim(F.col("global_segment_id"))
                               .isin(["44", "45", "46", "47",
                                      "48", "49", "38", "63",
                                      "57", "58", "59", "26",
                                      "27", "28", "29", "64",
                                      "65", "66", "67", "68",
                                      "77", "78", "79"]))
#                               .isin(["01", "02", "03", "04",
#                                      "05", "06", "07", "08", 
#                                      "09", "31", "32", "33", 
#                                      "34", "35", "38", "63",
#                                      "80", "81", "82", "83", 
#                                      "84", "85", "86", "87", 
#                                      "88", "89"]))
        
        
        self.activo = (F.trim(F.col("status_type")) == "A")
        self.activo_glo = (F.trim(F.col("ent_cust_status_type")) == "A")
        self.customer_id = (F.col("customer_id") != "0")
        self.entity_id = (F.trim(F.col("entity_id")) == "0182")
        self.con_entity_id = (F.trim(F.col("contract_entity_id")) == "0182")
        self.country_id = (F.trim(F.col("country_id")) == "ES")
        self.con_country_id = (F.trim(F.col("contract_country_id")) == "ES")
        
#         self.busi_area = (F.col("business_area_id").isin(['0013', '0058', '0023', 
#                                                           '0088', '6248', '8004']))
        self.busi_area = (F.col("business_area_id").isNotNull())        
        self.legal_entity = (F.trim(F.col("document_type")).isin(["02", "03", "09"]))
        self.cus_country = (F.trim(F.col("customer_country_id"))== "ES")
        self.hip_bal = (F.col("pending_payment_amount") > 0)
        self.tit = (F.trim(F.col("party_type_id")) == "TIT")
        self.pos_tit = (F.col("party_order_number") == 1)
        self.cus_entity = (F.trim(F.col("customer_entity_id")) == "0182")
        self.pos_balance = (F.col("end_m_bal_amount") > 0)

In [24]:
class Tables():
    """
    Auxiliary class to read the different tables. For each table, you have a 
    get_<table_name> method. 
    
    Args:
        - spark (SparkSession): SparkSession object.
        - closing_date (String): Closing date in ISO format.
        
    Returns:
        - DataFrame with the result
        
    Example:
        >>> iii_table = Tables(spark, "2018-01-31").get_iii()
        >>> hcl_table = Tables(spark, "2018-01-31").get_hcl()
    """
    
    def __init__(self, spark, closing_date):
        self.spark = spark
        self.closing_date = closing_date
        
    def get_hcl(self):
        return hu.read_table("vsfmahcl", self.closing_date)
        
    def get_bjc(self):
        return hu.read_table( "vsfmabjc", self.closing_date)
        
    def get_bjd(self):
        return hu.read_table( "vsfmabjd", self.closing_date)
    
    def get_iii(self):
        return hu.read_table( "vsfmaiii", self.closing_date)
    
    def get_cgm(self):
        return hu.read_table( "vsfmacgm", self.closing_date)
    
    def get_hom(self):
        return hu.read_table( "vsfmahom", self.closing_date)
    
    def get_rtk(self):
        return hu.read_table( "vsfmartk", self.closing_date)
    
    def get_hca(self):
        return hu.read_table( "vsfmahca", self.closing_date)
    
    def get_pvv(self):
        return hu.read_table( "vsfmapvv", self.closing_date)
    
    def get_pra(self):
        return hu.read_table( "vsfmapra", self.closing_date)
    
    def get_poa(self):
        return hu.read_table( "vsfmapoa", self.closing_date)
    
    def get_becriesgos(self):
        return hu.read_table( "becriesgos", self.closing_date)
    
    def get_hc2(self):
        return hu.read_table( "vsfmahc2", self.closing_date)
    
    def get_hmp(self):
        return hu.read_table( "vsfmahmp", self.closing_date)
    
    def get_cirbe(self):
        return hu.read_table( "cirbe", self.closing_date)
    
    def get_digi1de3(self):
        return hu.read_table( "net1de3", self.closing_date)
    
    def get_digi3de3(self):
        return hu.read_table( "net3de3", self.closing_date)
    
    def get_nfc(self):
        return hu.read_table( "vsfmanfc", self.closing_date)
    
    def get_segmulti(self):
        return hu.read_table( "segmult_datio", self.closing_date)
    
    def get_planuno(self):
        return hu.read_table( "plan1", self.closing_date)
    
    def get_balance_acc(self):
        return hu.read_table( "tsfixiec", self.closing_date)
    
    def get_balance_key(self):
        return hu.read_table( "tsfixerc", self.closing_date)
    
    def get_balance_acc_ex(self):
        return hu.read_table( "tsfixiec_ex", self.closing_date)
    
    def get_balance_key_ex(self):
        return hu.read_table( "tsfixerc_ex", self.closing_date)
    
    def get_hss(self):
        return hu.read_table( "vsfmahss", self.closing_date)
    
    def get_pob(self):
        return hu.read_table( "vsfmapob", self.closing_date)
    
    def get_eclcust(self):
        return hu.read_table('eclccustm',self.closing_date)
    
    def get_eclcustunit(self):
        return hu.read_table('eclccustunit',self.closing_date)
    
    def get_po(self):
        return hu.read_table('po_bi',self.closing_date)
    
    def get_digi(self):
        return hu.read_table('vsfmadigi',self.closing_date)
    
    def get_pem(self):
        return hu.read_table('vsfmapem',self.closing_date)
    
    def get_har(self):
        return hu.read_table('vsfmahar',self.closing_date)
        
    def get_hierarchy(self):
        return hu.read_table('ecog_hierarchy',self.closing_date)
    
    def get_pjc(self):
        return hu.read_table('vsfmapjc',self.closing_date)
    
    def get_rdc(self):
        return hu.read_table('kdcordc',self.closing_date)
      

In [25]:
class GetData():
    """
    Base class used to extract data in future classes
    """
    
    def __init__(self, spark, closing_date):
        
        self.spark = spark
        self.closing_date = closing_date
        self.filters = Filters()
        self.tables = Tables(spark, closing_date)
        self.sfma_path = "/data/master/sfma/data/general/general"
#         self.hu = hdfsutils
        
    def _get_closest_partition(self, hdfs_path, 
                               regex_expr='^.*closing_date=(\d{4}\-\d{2}\-\d{2}).*$'):
        available_partitions = cmautils.get_closing_dates_v2(self.spark, hdfs_path, regex_expr)
        return cmautils.get_closest_partition(self.closing_date, available_partitions)
    
    
    def _extract(self):
        raise NotImplementedError
        
    def _transform(self):
        raise NotImplementedError
        
    def _load(self, df):
        return df
        
    def get_data(self):
        raise NotImplementedError  

# Extract Months to be Computed

In [26]:
def get_months_to_compute(spark, 
                          first_month, 
                          last_month, 
                          project_path, 
                          tag_data,
                          overwrite_cube):
    
    """
    Get the months to be computed as a list. 
    
    Args:
        - spark (SparkSession): SparkSession object.
        - first_month (String): Initial closing date in ISO format.
        - last_month (String): Last closing date in ISO format.
        - project_path (String): Main path to the project.
        - tag_data (String): Indicates the subfolders where the results will be stored.
        
    Returns:
        - List containing the months to be computed.
    """
    
    final_path = os.path.join(project_path, tag_data)
    
    history_months = [x for x in list(cmautils.iterate_months(first_month, 
                                                              last_month, 
                                                              use_first=True))]

    if(overwrite_cube):
        return history_months
    else:
        existing_partitions = cmautils.get_closing_dates_v2(spark, final_path)    
        months_to_compute = [x for x in history_months if x not in existing_partitions]    
        return months_to_compute

# Check Dependencies

Process to check the dependencies. If some are missing, the proccess stops.

In [27]:
def tables_first_date (p_tables: dict) -> dict:
    first_load = {}
    tables_checked = []
    
    for fecha, tables in p_tables.items():
        [tables_checked.append(table) for table in tables if table not in tables_checked]

    for table in tables_checked:
        fdate = hu.get_first_loaded(table)
        if(fdate!=None):
            first_load[table]=fdate  
            
    return first_load    

In [28]:
def check_tables() -> (list,list,list) :
    res = True
    fechas_ok = []
    missed_tables = []
    default_values = {}
    try:
        missed_tables = hu.check_dependencies(TABLES_TO_CHECK, MONTHS_TO_COMPUTE)
    except ValueError as e:
        print('Hay tablas sin datos para el mes evaludado ',MONTHS_TO_COMPUTE)

    first_load = tables_first_date(missed_tables) 
    fechas = list(missed_tables.keys())
    for fecha in fechas:
        tables = list(missed_tables[fecha])
        
        for table in tables:
            if(table in TABLES_EXCEPTIONS and hu.check_path(hu.get_table_details(table)['path'])):
                missed_tables[fecha].remove(table)
                
            elif(first_load[table] > fecha):
                missed_tables[fecha].remove(table)
                path = hu.get_full_path(hu.get_table_details(table),fecha)
                if((table not in TABLES_CLOSEST_DATE) and (not hu.check_path(path))):
                    if(fecha not in default_values.keys()):
                        default_values[fecha] = [table]
                    else:
                        default_values[fecha].append(table)

        if(len(missed_tables[fecha])==0):
            del missed_tables[fecha]
            fechas_ok.append(fecha)

    if(len(fechas_ok) == len (MONTHS_TO_COMPUTE)):
        print('OK: Existen los datos necesarios para la generanción del cubo') 
    else:
        if(len(fechas_ok) == 0):
            print('ERROR: No se puede generar el cubo por falta de datos. \nTablas: ',missed_tables)
        else:
            print('OK: se puede generar cubo para las fechas',fechas_ok)
            print('ERROR: no se puede generar el cubo para alguna de las fechas dadas. \n',missed_tables)
    if(default_values):
        print('WARNING: se rellenan con valores por defecto las siguiente tablas. \n',default_values)
    return (fechas_ok,missed_tables,default_values)

In [29]:
def get_clases_by_data (table:str)->list:
    clases = []
    for cl in dict_class:
        if table in dict_class[cl]['tables']:
            if(dict_class[cl]['class'] not in clases):
                clases.append(dict_class[cl]['class'])
    return clases

Existencia de temporales para origenes de datos faltantes

In [30]:
def get_tmp_by_data (table:str)->list:
    tmps = []
    tmp_path = os.path.join(PROJECT_PATH, "tmp/{0}".format(TAG_DATA))
    for cl in dict_class:
        if table in dict_class[cl]['tables']:
            if(dict_class[cl]['tmp'] not in tmps):
                tmps.append(os.path.join(tmp_path,dict_class[cl]['tmp']))
    return tmps

In [31]:
def check_exist_info (table:str, dates:list) -> bool:    
    tmps = get_tmp_by_data(table)
    for tmp in tmps:
        if(type(dates)!=list):
            dates = [dates]
        for date in dates:
            path_tmp = tmp + "/closing_date={}".format(date)
            if(not hu.check_path(path_tmp)):
                return False            
    return True        

In [32]:
def check_tables_by_tmp (t_nodata:dict, printed:bool=True)-> list:
    dates = []
    for date, tables in t_nodata.items():
#         if(printed):
#             print ('Comprobación de temporales para fecha: ',date)
        existe = True
        for table in tables:
            if(not check_exist_info(table,date)):
                if(printed):
                    print('ERROR: no hay temporales para poder generar el cubo')
                existe=False
                break
        if(existe):
            if(printed):
                print('OK: se puede generar cubo para la fecha: ',date)
            dates.append(date)   
    return dates

In [33]:
def tmp_by_class (cl:str,date:str):
    tmp=[dict_class[key]['tmp'] for key in dict_class if dict_class[key]['class']==cl]
    tmp_path = os.path.join(PROJECT_PATH, "tmp/{0}".format(TAG_DATA))
    path_tmp =  os.path.join(tmp_path,*tmp) + "/closing_date={}".format(date)

    if(not hu.check_path(path_tmp)):
        return False 
    else:
        return True

In [34]:
# def get_class_noexec(dtables:dict)->dict:
#     clases_noexec = {}
#     for cdate in dtables:
#         l=[]
#         for table in dtables[cdate]:
#             for c in get_clases_by_data(table):
#                 if(type(c)==list):
#                     for cl in c:
#                         if(cl not in l):
#                             if(tmp_by_class(cl,cdate)==False):
#                                 l.append(cl)
#                 else:
#                     if(c not in l): 
#                         if(tmp_by_class(c,cdate)==False):
#                             l.append(c)
#         clases_noexec[cdate] = l
#         print(cdate, 'no ejecutar clases:', l)
#     return clases_noexec

In [35]:
def get_class_noexec(dtables:dict)->dict:
    clases_noexec = {}
    for cdate in dtables:
        l=[]
        for table in dtables[cdate]:
            for c in get_clases_by_data(table):
                if(c not in l): 
                    if(tmp_by_class(c,cdate)==False):
                        l.append(c)
        clases_noexec[cdate] = l
        print(cdate, 'no ejecutar clases:', l)
    return clases_noexec

# Schema

In [36]:
class Schema(GetData):

    def __init__(self, spark):
        self.spark = spark
        if(UPDATE_SCHEMA==1):
            self.schema = spark.read.parquet(os.path.join(PROJECT_PATH, "schema", "test"+str(VERSION-1))).schema
        else:
            self.schema = spark.read.parquet(os.path.join(PROJECT_PATH, "schema", TAG_DATA)).schema

            
    def getSchema_version(self,version):
        return spark.read.parquet(os.path.join(PROJECT_PATH, "schema", "test"+str(version))).schema
        
    
    def getSchema(self):
        return self.schema
    
    
    def check_fields(self, df)-> dict:
        if(UPDATE_SCHEMA==1):
            version = "test"+str(VERSION-1)
        else:
            version = "test"+str(VERSION)
        df_schema = spark.read.parquet(os.path.join(PROJECT_PATH, "schema", version))
        del_fields = [col for col in df_schema.columns if not col in df.columns]
        new_fields = [col for col in df.columns if not col in df_schema.columns]
        fields = {'new':new_fields,
                  'del':del_fields
                 }
        return fields
        
         
    def updateSchema(self,new_fields):
        [self.schema.add(x) for x in new_fields]
        
    def writeSchema(self):
        df_schema = spark.createDataFrame(sc.parallelize([]), self.schema)
        df_schema.write.option("schema",self.schema).mode("overwrite").parquet(os.path.join(PROJECT_PATH, "schema", TAG_DATA)) 
     
    def correct_schema(self, df, project_path, tag_data):
        """
        Corrects the schema of an input df
        """    
        for field in self.schema:
            if field.name not in df.columns:
                df = df.withColumn(field.name, F.lit("0").cast(field.dataType))
                print('campo se rellena con 0: ',field.name)

            else:
                df = df.withColumn(field.name, F.col(field.name).cast(field.dataType))

        return df.select(self.schema.names)

In [37]:
# schema = Schema(spark)
# schema.updateSchema(comex[:1])

# Base: BEC and Pymes Clients

First, the clients base (BEC and Pymes) is extracted using the BaseData class.

In [38]:
class BaseData(GetData):
    """
    
    """
        
    def get_fils_base(self):
        
        return (self.filters.pymes_empresas & 
                self.filters.customer_id & 
                self.filters.entity_id & 
                self.filters.con_entity_id & 
                self.filters.country_id & 
                self.filters.busi_area & 
                self.filters.activo_glo)
    
    def get_data(self):
        """
        Get the clients base dataset
        """


        exclude_ids = self.tables.get_cgm()\
            .filter(self.get_fils_base()).filter(F.col("status_type")=="M")\
            .select("customer_id").distinct()

        include_ids = self.tables.get_cgm()\
            .filter(self.get_fils_base()).filter(F.col("status_type")=="A")\
            .select("customer_id").distinct()

        cgm = self.tables.get_cgm().filter(self.get_fils_base()).filter(self.filters.activo)

        cgm = cgm.join(exclude_ids, "customer_id", "left_anti")\
            .join(include_ids, "customer_id", "left")\
            .withColumn("xsn_primtit", F.col("key_pm_act_holder_type").cast(T.IntegerType()))\
            .withColumn("education_level_id", F.trim(F.col("education_level_id")))\
            .withColumn("education_level_id", F.when(F.col("education_level_id")=="", -1)
                .otherwise(F.col("education_level_id").cast(T.IntegerType())))\
            .groupBy("customer_id")\
            .agg(F.min("document_type").alias("xti_idefisco"), 
                 F.max("age_number").alias("edad"), 
                 F.max("xsn_primtit").alias("ind_primer_tit"),
                 F.first("gender_type").alias("seg_sexo"),
                 F.max("education_level_id").alias("estudios"),
                 F.max("global_segment_id").alias("seg_global"),
                 F.max("seniority_months_number").alias("antiguedad_cliente"))

        # cgm = cgm.drop("closing_date").withColumn("closing_date", F.lit(self.closing_date))

        return cgm

In [39]:
# CLOSING_DATE = "2018-12-31"

In [40]:
# get_base = BaseData(spark, CLOSING_DATE)
# base_data = get_base.get_data()

In [41]:
# base_data.cache()
# base_data.count()

# Mora

In [42]:
class MoraData(GetData):
    
    # cus_country, entity_id, con_entity_id, cus_country, cus_entity
    
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        self.final_cols = ["customer_id", "ind_mora", "ind_mora_sinfall", 
                           "imp_mora", "imp_mora_sinfall"]
        
        # nominal_pending_end_m_amount
        
        
        self.mora_conds = {"all": ((F.col("status_type").isin(["M"])) & 
                                   (F.col("nominal_pending_end_m_amount") > 0) & 
                                   (F.col("cgt_mora_flag") == 1)),
                           "sinfall": (F.col("status_type").isin(["I", "H", "D", "E"]))}
        
        self.data_fils = {
            
            "cgt": (self.filters.entity_id & self.filters.country_id & 
                    (F.col("business_area_id").isin(["0013", "0058", "0023", 
                                                     "0088", "6248" "8004"])) & 
                    self.filters.customer_id & (F.col("status_type").isin(["M"]))),
            
            "iii": (self.filters.entity_id & self.filters.country_id & 
                    self.filters.con_entity_id & self.filters.con_country_id & 
                    self.filters.customer_id & 
                    (F.col("business_area_id").isin(["0013", "0058", "0023", 
                                                     "0088", "6248" "8004"]))), 
            "psb": (self.filters.entity_id & self.filters.country_id)}
    
    
    def _extract(self):
        
#         cgt = cmautils.read_table_v2(spark, "vsfmacgt", self.closing_date)
#         psb = cmautils.read_table_v2(spark, "vsfmapsb", self.closing_date)
#         iii = cmautils.read_table_v2(spark, "vsfmaiii", self.closing_date)
        cgt = hu.read_table("vsfmacgt", self.closing_date)
        psb = hu.read_table("vsfmapsb", self.closing_date)
        iii = hu.read_table("vsfmaiii", self.closing_date)
        
        dep = {"cgt": cgt, 
               "psb": psb, 
               "iii": iii}
        
        return dep
    
    
    def _transform(self, dep):
        
        cgt_t = (dep["cgt"]
                 .filter(self.data_fils["cgt"])
                 .select("customer_id")
                 .distinct()
                 .withColumn("cgt_mora_flag", F.lit(1)))
        
        psb_t = (dep["psb"]
                 .filter(self.data_fils["psb"])
                 .select("contract_id", "nominal_pending_end_m_amount"))
        
        iii_t = (dep["iii"]
                 .filter(self.data_fils["iii"])
                 .select("customer_id", "contract_id", "status_type"))
        
        df_t = (psb_t
                .join(iii_t, "contract_id")
                .join(cgt_t, "customer_id", "left")
                .na.fill({"cgt_mora_flag": 0})
                .withColumn("mora_type", 
                            F
                            .when(self.mora_conds["all"], "all")
                            .when(self.mora_conds["sinfall"], "sinfall"))
                .filter(F.col("mora_type").isNotNull())
                .withColumn("ind_mora", F.lit(1))
                .groupBy("customer_id")
                .pivot("mora_type")
                .agg(F.max("ind_mora").alias("ind_mora"), 
                     F.sum("nominal_pending_end_m_amount").alias("imp_mora"))
                .withColumnRenamed("all_ind_mora", "ind_mora")
                .withColumnRenamed("all_imp_mora", "imp_mora")
                .withColumnRenamed("sinfall_ind_mora", "ind_mora_sinfall")
                .withColumnRenamed("sinfall_imp_mora", "imp_mora_sinfall")
                .select(self.final_cols))
        
        return df_t
        
        
    def get_data(self):
        
        dep = self._extract()
        df = self._transform(dep)
        
        return self._load(df)

In [43]:
# CLOSING_DATE = "2019-12-31"
# mora = MoraData(spark, CLOSING_DATE).get_data()

In [44]:
# mora.cache()
# mora.count()

# Riesgo de Firma: Penetración y Saldos

In [45]:
class FirmaData(GetData):
    
    def __init__(self, spark, closing_date):       
        super().__init__(spark, closing_date)
        
        self.table = {'vsfmacgt': {'key':['customer_id'], 
                                   'cols_in':['off_bal_exp_end_m_mh_amount'],
                                   'cols_out':['off_bal_exp_end_m_mh_amount',
                                               'off_bal_exp_end_m_mh_flag']
                                  },
                     }

        
    def get_riesgoFirma(self):
        
        df_cgt = hu.read_table( "vsfmacgt", self.closing_date
                                           ).select(*self.table['vsfmacgt']['key'],*self.table['vsfmacgt']['cols_in'])
        
        return df_cgt.groupBy('customer_id'
                           ).agg(F.sum('off_bal_exp_end_m_mh_amount').alias('off_bal_exp_end_m_mh_amount')
                           ).withColumn('off_bal_exp_end_m_mh_flag', F.when(F.col('off_bal_exp_end_m_mh_amount')>0,1
                                                                           ).otherwise(0))
       

In [46]:
# CLOSING_DATE = "2019-12-31"
# fd =FirmaData(spark,CLOSING_DATE).get_riesgoFirma() #.where(F.col('off_bal_exp_end_m_mh_amount')>0).show(5)

# AvalBox

In [47]:
# Path: /data/master/egug/data/t_egug_guarantee_request_mp/monthly/partitioned

# OneView

In [48]:
class OneViewData(GetData):
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        self.final_cols = ["customer_id", "one_view_flag"]
        self.limit_closing_date = "2019-05-31"
        
    
    def _extract(self):
        
#         dep = {"oneview": cmautils.read_table_v2(spark, "oneview", self.closing_date, True)}
        dep = {"oneview": hu.read_table("oneview", self.closing_date,'closest')}
        
        return dep
    
    
    def _transform(self, dep):
        
        df_t = (cmautils
                .format_id_col(dep["oneview"])
                .withColumn("one_view_flag", F.lit(1).cast(T.IntegerType())))
        
        return df_t
    
    
    def get_data(self):
        
        if self.closing_date >= self.limit_closing_date:
            dep = self._extract()
            df_t = self._transform(dep)
            
        else:
            schema = T.StructType([T.StructField("customer_id", T.StringType(), True), 
                                   T.StructField("one_view_flag", T.IntegerType(), True)])
            df_t = (spark.createDataFrame(sc.parallelize([]), schema))
        
        return self._load(df_t.select(self.final_cols))

In [49]:
#df_1 = OneViewData(spark, "2019-12-31").get_data()
# df_2 = OneViewData(spark, "2020-01-29").get_data()

In [50]:
# df_1.cache(), df_1.count()
# df_2.cache(), df_2.count()

In [51]:
# df_1.count(), df_2.count()

# Num. Prods

In [52]:
class NumProdsData(GetData):
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        
        self.datafils = {
            "iii": (self.filters.country_id & self.filters.con_country_id & 
                    self.filters.entity_id & self.filters.con_entity_id &
                    self.filters.customer_id & self.filters.tit & self.filters.activo & 
                    (F.col("business_area_id").isin(["0013", "0058", "0023", "0088",
                                                     "8004", "6248", "6057"]))),
            "trans_prods": (F.col("com_category_id").isin([442, 63, 38, 593])),
            "htr": (self.filters.country_id & self.filters.entity_id & 
                    self.filters.cus_country & self.filters.cus_entity & 
                    self.filters.activo & self.filters.customer_id & 
                    (F.col("com_category_id") == 74)), 
            "har": (self.filters.country_id & self.filters.entity_id & 
                    self.filters.cus_country & self.filters.cus_entity & 
                    (F.col("account_trans_id").isin([26, 126, 226, 326, 426, 526, 
                                                     626, 733, 953, 928, 813])) & 
                    (F.col("closing_date").between(cmautils.add_months(self.closing_date, -5), 
                                                   self.closing_date))),
            "iii_har": (self.filters.activo & self.filters.tit)
        }
    
    
    def _extract(self):
        
#         iii = cmautils.read_table_v2(spark, "vsfmaiii", self.closing_date)
#         htr = cmautils.read_table_v2(spark, "vsfmahtr", self.closing_date)
#         har = cmautils.read_table(spark, "vsfmahar", "all")
        iii = hu.read_table("vsfmaiii", self.closing_date)
        htr = hu.read_table("vsfmahtr", self.closing_date)
        har = hu.read_table("vsfmahar", "all")
        
        dep = {"iii": iii, 
               "htr": htr, 
               "har": har}
        
        return dep
    
    
    def _transform(self, dep):
        
        num_trans = (dep["iii"]
                     .filter(self.datafils["iii"])
                     .filter(self.datafils["trans_prods"])
                     .select("customer_id", "com_grouping_id")
                     .distinct()
                     .groupBy("customer_id")
                     .agg(F.count("customer_id").alias("bec_trans_num_prods"))
                     .select("customer_id", "bec_trans_num_prods"))
        
        ces_nom = (dep["htr"]
                   .filter(self.datafils["htr"])
                   .select("customer_id", "com_grouping_id")
                   .distinct()
                   .groupBy("customer_id")
                   .agg(F.count("customer_id").alias("bec_cnom_num_prods"))
                   .select("customer_id", "bec_cnom_num_prods"))
        
        # cmautils.add_months("2019-12-31", -5)
        har = (dep["har"]
               .filter(self.datafils["har"])
               .join(dep["iii"]
                     .filter(self.datafils["iii_har"])
                     .select("contract_id", "country_id", "entity_id"), 
                     [dep["har"]["contract_id"] == dep["iii"]["contract_id"], 
                      dep["har"]["country_id"] == dep["iii"]["country_id"],
                      dep["har"]["entity_id"] == dep["iii"]["entity_id"]])
               .groupBy("customer_id")
               .agg(F.sum(F.col("payable_1m_amount") + F.col("receivable_1m_amount"))
                    .alias("ces_imp_amount_6m"))
               .filter(F.col("ces_imp_amount_6m") > 0)
               .withColumn("ces_imp_flag", F.lit(1))
               .select("customer_id", "ces_imp_flag"))
        
        
        df_t = (num_trans
                .join(ces_nom, "customer_id", "outer")
                .join(har, "customer_id", "outer")
                .na.fill(0)
                .withColumn("bec_num_prods", (F.col("bec_trans_num_prods")+
                                              F.col("bec_cnom_num_prods")+
                                              F.col("ces_imp_flag")))
                .select("customer_id", "bec_num_prods"))
        
        return df_t
    
    
    def get_data(self):
        
        dep = self._extract()
        df_t = self._transform(dep)
        
        return self._load(df_t)

In [53]:
# CLOSING_DATE = "2019-12-31"
# num_prods = NumProdsData(spark, CLOSING_DATE).get_data()

In [54]:
# num_prods.cache()

In [55]:
# num_prods.limit(10).toPandas()

# Altas

In [56]:
class AltasData(GetData):
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        self.table_name = "altasbajas"
    
    
    def _extract(self):
        
#         df = cmautils.read_table_v2(self.spark, self.table_name, self.closing_date, True)
        df = hu.read_table(self.table_name, self.closing_date,'closest')
        
        return df
    
    
    def _transform(self, df):
        
        read_part = cmautils.get_closest_partition_table(self.spark, self.table_name, self.closing_date)
        prev_year = int(read_part[:4])-1
        prev_part = "{0}-12-31".format(prev_year)
        
        df_t = (df
                .filter(F.col("pure_cust_type") == 1)
                .filter(F.col("start_date") == prev_part)
                .filter(F.col("movement_type") == "Alta")
                .select("customer_id")
                .distinct()
                .withColumn("new_client_flag", F.lit(1))
               )
        
        return df_t
    
    
    def get_data(self):
        
        df = self._extract()
        df_t = self._transform(df)
        
        return self._load(df_t)

In [57]:
# df_altas = AltasData(spark, LAST_CLOSING_DATE).get_data()

In [58]:
# df_altas.select('customer')

In [59]:
# CLOSING_DATE = "2019-12-31"
# df_altas = AltasData(spark, CLOSING_DATE).get_data()
# df_altas.cache()
# df_altas.count()

# Añadir Variables Plan Uno

In [60]:
class PlanUno(GetData):
    
    def __init__(self, spark, closing_date, dfs):       
        super().__init__(spark, closing_date)
        self.means = dfs[0]
    
    def _extract(self):
        
        periodos = du.calculate_fechas_periodos(self.closing_date, 4, str)
        print(periodos)
        periodos_har = du.calculate_fechas_periodos(self.closing_date, 6, str)
        
        digi = self.tables.get_digi()
        
        pob = hu.read_table('vsfmapob',periodos)
        
        bjc = hu.read_table('vsfmabjc',periodos)
#         bjc = spark.read.parquet('/data/master/sfma/data/general/general/t_sfma_credit_card_m/closing_date=2022-02-28',
#                                  '/data/master/sfma/data/general/general/t_sfma_credit_card_m/closing_date=2022-01-31',
#                                  '/data/master/sfma/data/general/general/t_sfma_credit_card_m/closing_date=2021-12-31',
#                                  '/data/master/sfma/data/general/general/t_sfma_credit_card_m/closing_date=2020-11-30'
#                                 )
        
        bjd = hu.read_table('vsfmabjd',periodos)
        
        iii = self.tables.get_iii()
        
        planuno = self.tables.get_planuno()
        

        har = hu.read_table('vsfmahar',periodos_har)
        
        pem = self.tables.get_pem()
        
        
        dep = {"digi": digi, 
               "pob": pob, 
               "iii": iii, 
               "bjc": bjc, 
               "bjd": bjd,
               "planuno": planuno, 
               "har": har, 
               "pem": pem}
        
        return dep
    
    
    def _get_mamta(self):
    
        df = self.tables.get_hierarchy().filter(
            F.col('entity_hier_lev1_id')=='0182')

        df2 = (df
               .where(F.col('entity_hier_lev1_id')=='0182')
               .withColumn(
                   "nivel_20",F.lit('0000')).withColumn(
                   "nivel_20_desc",F.lit(' ')).withColumn(
                   "nivel_30",F.lit('0000')).withColumn(
                   "nivel_30_desc",F.lit(' ')).withColumn(
                   "nivel_50",F.lit('0000')).withColumn(
                   "nivel_50_desc",F.lit(' ')).withColumn(
                   "nivel_60",F.lit('0000')).withColumn(
                   "nivel_60_desc",F.lit(' ')).withColumn(
                   "nivel_40",F.lit('0000')).withColumn(
                   "nivel_55",F.lit('0000')).withColumn(
                   "nivel_65",F.lit('0000')))

        for campo in df.columns:
            if ((campo[-4:] == 'type') & (campo[:10] == 'branch_lev')):
                valor=campo[10:]
                valor=valor[:-5]
                oficina='branch_hier_lev'+valor+'_id'
                descripcion='branch_hier_lev'+valor+'_desc'
                df2=df2.withColumn(
                    "nivel_20",F.when((F.col(campo))=='20',F.col(oficina)).otherwise(F.col('nivel_20'))).withColumn(
                    "nivel_20_desc",F.when((F.col(campo))=='20',F.col(descripcion)).otherwise(F.col('nivel_20_desc'))).withColumn(
                    "nivel_30",F.when((F.col(campo))=='30',F.col(oficina)).otherwise(F.col('nivel_30'))).withColumn(
                    "nivel_30_desc",F.when((F.col(campo))=='30',F.col(descripcion)).otherwise(F.col('nivel_30_desc'))).withColumn(
                    "nivel_50",F.when((F.col(campo))=='50',F.col(oficina)).otherwise(F.col('nivel_50'))).withColumn(
                    "nivel_50_desc",F.when((F.col(campo))=='50',F.col(descripcion)).otherwise(F.col('nivel_50_desc'))).withColumn(
                    "nivel_60",F.when((F.col(campo))=='60',F.col(oficina)).otherwise(F.col('nivel_60'))).withColumn(
                    "nivel_60_desc",F.when((F.col(campo))=='60',F.col(descripcion)).otherwise(F.col('nivel_60_desc'))).withColumn(
                    "nivel_40",F.when((F.col(campo))=='40',F.col(oficina)).otherwise(F.col('nivel_40'))).withColumn(
                    "nivel_55",F.when((F.col(campo))=='55',F.col(oficina)).otherwise(F.col('nivel_55'))).withColumn(
                    "nivel_65",F.when((F.col(campo))=='65',F.col(oficina)).otherwise(F.col('nivel_65')))


        return df2.withColumn('branch_parent_desc',F.concat(F.col('branch_hier_lev1_id'), F.lit(';'),
                                                            F.col('nivel_20') , F.lit(';'),
                                                            F.col('nivel_30') , F.lit(';'),
                                                            F.col('nivel_40') , F.lit(';'),
                                                            F.col('nivel_50') , F.lit(';'),
                                                            F.col('nivel_55') , F.lit(';'),
                                                            F.col('nivel_60') , F.lit(';'),
                                                            F.col('nivel_65') , F.lit(';'),
                                                            F.col('entity_hier_lev1_id'))
                             ).select(F.col('branch_hier_lev1_id').alias('main_branch_id'),
                                      F.col('branch_hier_lev1_desc').alias('branch_full_name'),
                                      F.col('nivel_20').alias('level20_adjuntiadag_id'),
                                      F.col('nivel_20_desc').alias('desc_cbc'),
                                      F.col('nivel_30').alias('level30_zone_id'),
                                      F.col('nivel_30_desc').alias('desc_zone'),
                                      F.col('nivel_50').alias('level50_territorial_id'),
                                      F.col('nivel_50_desc').alias('territ_desc'),
                                      F.col('nivel_60').alias('level60_operarea_id'),
                                      F.col('nivel_60_desc').alias('desc_business_area'),
                                      'branch_parent_desc')
    
    def _get_terri_data(self, dep):
        
        mta = self._get_mamta().select("main_branch_id", "level50_territorial_id")
        
        pem = (dep["pem"]
               .filter(F.col("business_area_id").astype("int").isin([13, 6248]))
               .dropDuplicates(subset=["customer_id"])
               .select("customer_id", "main_branch_id"))
        
        df = pem.join(mta, "main_branch_id").select("customer_id", F.col("level50_territorial_id").cast("int")
                                                    .alias("plan_uno_cod_territ"))
        
        return df
    
    
    def _get_impu_data(self, dep):
        
        har_cols = ["contract_id", "business_area_id", "account_trans_id", 
                    "payable_1m_amount", "receivable_1m_amount", "entity_id", 
                    "closing_date"]
        
        har_iii_cols = ["customer_id", "entity_id", "contract_id", "com_category_id", 
                        "party_type_id"]
        
        
        har_t = (dep["har"].select(har_cols)
                 .join(dep["iii"].select(har_iii_cols), ["contract_id", "entity_id"])
                 .filter((~F.col("business_area_id").cast("int").isin([6057])) & 
                         (F.col("account_trans_id").cast("int").isin([26, 126, 226, 326, 426, 
                                                                      526, 626, 733, 953, 928, 
                                                                      813])) &
                         (F.col("entity_id").cast("int") == 182) & 
                         (F.col("party_type_id") == "TIT") & 
                         (F.col("com_category_id").cast("int").isin([36, 39, 41, 436, 439])))
                 .groupBy("customer_id", "closing_date")
                 .agg(F.sum("payable_1m_amount").alias("payable_1m_amount"), 
                      F.sum("receivable_1m_amount").alias("receivable_1m_amount"))
                 .withColumn("ces_imp_id", 
                             F.when(((F.abs(F.col("payable_1m_amount")) > 100) | 
                                    (F.abs(F.col("receivable_1m_amount")) > 100)), 1).otherwise(0))
                 .groupBy("customer_id")
                 .agg(F.sum("ces_imp_id").alias("ces_imp_id_6m"))
                 .withColumn("plan_uno_criterio_ces_imp", 
                             F.when(F.col("ces_imp_id_6m") >= 2, 1).otherwise(0))
                 .select("customer_id", "plan_uno_criterio_ces_imp"))
        
        return har_t
    
    
    
    def _get_ingr_nif_data(self, dep):
        
        har_cols = ["contract_id", "business_area_id", "account_trans_id", 
            "receivable_1m_amount", "entity_id", 
            "closing_date"]
        
        har_iii_cols = ["customer_id", "entity_id", "contract_id", 
                        "com_category_id", "party_type_id"]

        har_t = (dep["har"].select(har_cols)
                 .join(dep["iii"].select(har_iii_cols), ["contract_id", "entity_id"])
                 .filter((~F.col("business_area_id").cast("int").isin([6057])) & 
                         (F.col("account_trans_id").cast("int").isin([7, 107, 163, 263, 430, 707, 9, 
                                                                      153, 597, 587, 687, 18, 172, 772, 173, 
                                                                      186, 517, 391, 591])) &
                         (F.col("entity_id").cast("int") == 182) & 
                         (F.col("party_type_id") == "TIT") & 
                         (F.col("com_category_id").cast("int").isin([36, 39, 41, 436, 439])))
                 .groupBy("customer_id", "closing_date")
                 .agg(F.sum("receivable_1m_amount").alias("receivable_1m_amount"))
                 .withColumn("ingr_nif_id", 
                             F.when((F.abs(F.col("receivable_1m_amount")) >=600), 1).otherwise(0))
                 .groupBy("customer_id")
                 .agg(F.sum("ingr_nif_id").alias("ingr_nif_id_6m"))
                 .withColumn("plan_uno_criterio_ingresos_nif", 
                             F.when(F.col("ingr_nif_id_6m") >= 4, 1).otherwise(0))
                 .select("customer_id", "plan_uno_criterio_ingresos_nif"))
        
        return har_t
    
    
    
    def _get_ingr_cif_data(self, dep):
        
        har_cols = ["contract_id", "business_area_id", "account_trans_id", 
                    "receivable_1m_amount", "entity_id", 
                    "closing_date"]
        
        har_iii_cols = ["customer_id", "entity_id", "contract_id", 
                        "com_category_id", "party_type_id"]
        
        har_t = (dep["har"].select(har_cols)
                 .filter(F.col("closing_date").between(cmautils.add_months(self.closing_date, -3), 
                                                       self.closing_date))
                 .join(dep["iii"].select(har_iii_cols), ["contract_id", "entity_id"])
                 .filter((~F.col("business_area_id").cast("int").isin([6057])) & 
                         (F.col("account_trans_id").cast("int").isin([143, 7, 9, 172, 186, 10, 210, 
                                                                      772, 12, 17, 391, 80, 591, 39, 
                                                                      707, 239, 112, 572, 0, 517, 5, 
                                                                      719, 417, 86, 468, 153, 687, 191, 
                                                                      919, 118, 190, 220, 954, 918, 121, 
                                                                      916, 90, 817, 163, 107, 341, 263, 
                                                                      566, 261, 8])) &
                         (F.col("entity_id").cast("int") == 182) & 
                         (F.col("party_type_id") == "TIT") & 
                         (F.col("com_category_id").cast("int").isin([36, 39, 41, 436, 439])))
                 .groupBy("customer_id")
                 .agg(F.sum("receivable_1m_amount").alias("receivable_4m_amount"))
                 .withColumn("plan_uno_criterio_ingresos_cif", 
                             F.when(F.col("receivable_4m_amount") > 8000, 1).otherwise(0))
                 .select("customer_id", "plan_uno_criterio_ingresos_cif"))
        
        return har_t        
    
    
    def _get_tarj_data(self, dep):
        
        w = (Window().partitionBy("customer_id"))
        
        bjd_t = (dep["bjd"].where((F.col("business_area_id").astype("int").isin([13, 6248])) &
                                  (F.col("customer_id").isNotNull()) &
                                  (F.col("customer_id").astype("int") != 0) &
                                  (F.col("com_category_id").astype("int").isin([401, 703])) &
                                  (F.col("status_type").isin(['A', 'I'])) &
                                  ((F.col("card_block_id").isNull()) | (F.col("card_block_id") != "A")))
                 .join(dep["iii"],
                       on=(dep["bjd"].country_id == dep["iii"].contract_country_id) &
                       (dep["bjd"].customer_country_id == dep["iii"].contract_country_id) &
                       (dep["bjd"].entity_id == dep["iii"].contract_entity_id) &
                       (dep["bjd"].contract_id == dep["iii"].contract_id) &
                       (dep["bjd"].customer_entity_id == dep["iii"].entity_id) &
                       (dep["bjd"].business_area_id == dep["iii"].business_area_id))
                 .where((F.col("contract_country_id") == "ES") &
                        (F.col("customer_country_id") == "ES") &
                        (F.col("contract_entity_id") == "0182") &
                        (F.col("customer_entity_id") == "0182") &
                        (dep["iii"]["com_category_id"].astype("int").isin([401, 703])))
                    .groupBy(dep["iii"].customer_id, dep["iii"].contract_id)
                    .agg(F.sum(F.col("retail_trans_1m_number") + F.col("trans_atm_1m_number"))
                         .alias("plan_uno_sum_num_mov_4meses")))
        
        bjc_t = (dep["bjc"].where((F.col("business_area_id").astype("int").isin([13, 6248])) &
                           (F.col("customer_id").isNotNull()) &
                           (F.col("customer_id").astype("int") != 0) &
                           (F.col("com_category_id").astype("int").isin([371, 373, 701, 702])) &
                           (F.col("status_type").isin(['A', 'I'])) &
                           ((F.col("card_block_id").isNull()) | (F.col("card_block_id") != "A")))
                 .join(dep["iii"],
                       on=(dep["bjc"].country_id == dep["iii"].contract_country_id) &
                       (dep["bjc"].customer_country_id == dep["iii"].contract_country_id) &
                       (dep["bjc"].entity_id == dep["iii"].contract_entity_id) &
                       (dep["bjc"].contract_id == dep["iii"].contract_id) &
                       (dep["bjc"].customer_entity_id == dep["iii"].entity_id) &
                       (dep["bjc"].business_area_id == dep["iii"].business_area_id))
                 .where((F.col("contract_country_id") == "ES") &
                        (F.col("customer_country_id") == "ES") &
                        (F.col("contract_entity_id") == "0182") &
                        (F.col("customer_entity_id") == "0182") &
                        (dep["iii"]["com_category_id"].astype("int").isin([371, 373, 701, 702])))
                    .groupBy(dep["iii"].customer_id, dep["iii"].contract_id)
                    .agg(F.sum(F.col("retail_deb_trans_1m_number") + 
                               F.col("retail_cred_trans_1m_number") + 
                               F.col("cred_trans_nown_atm_1m_number") + 
                               F.col("deb_trans_nown_atm_1m_number") + 
                               F.col("cred_trans_own_atm_1m_number") + 
                               F.col("deb_trans_own_atm_1m_number"))
                         .alias("plan_uno_sum_num_mov_4meses")))
        
        
        tarj = (bjd_t
                .unionAll(bjc_t)
                .withColumn("plan_uno_sum_num_mov_4meses_max", 
                            F.max("plan_uno_sum_num_mov_4meses").over(w))
                .withColumn("plan_uno_sum_num_mov_4meses_sum", 
                            F.sum("plan_uno_sum_num_mov_4meses").over(w))
                .select("customer_id", "plan_uno_sum_num_mov_4meses_max", 
                        "plan_uno_sum_num_mov_4meses_sum")
                .dropDuplicates(subset=["customer_id"])
                .withColumn("plan_uno_criterio_plasticos", 
                            F.when(F.col("plan_uno_sum_num_mov_4meses_sum") >= 7, 1).otherwise(0))
                .withColumn("plan_uno_criterio_tarjetas", 
                            F.when(F.col("plan_uno_sum_num_mov_4meses_max") >= 7, 1).otherwise(0))
                .select("customer_id", 
                        F.col("plan_uno_sum_num_mov_4meses_sum").alias("plan_uno_sum_num_mov_4meses"),
                        "plan_uno_criterio_plasticos", "plan_uno_criterio_tarjetas"))
        
        return tarj
    
    
    def _get_p1_data(self, dep):
        
        df_t = (dep["planuno"]
                .select("customer_id",
                        F.col("insurance_criteria_type").alias("plan_uno_criterio_previsional_nif"),
                        F.col("insurance_criteria_type").alias("plan_uno_criterio_previsional_cif"),
                        F.col("unemployment_type").alias("plan_uno_cumple_inem"),
                        F.col("pension_type").alias("plan_uno_cumple_pension"),
                        F.col("bbva_equity_number").cast("int").alias("plan_uno_num_accionesbbva"),
                        F.col("planuno_segment_id").alias("plan_uno_segmento"),
                        F.col("recu_inc_individual_type").alias("plan_uno_cumple_ingrrec"),
                        F.greatest(F.col("payroll_type"), 
                                   F.col("pension_type"),
                                   F.col("unemployment_type"),
                                   F.col("recu_inc_individual_type"))
                        .alias("plan_uno_criterio_ingresos_par"),
                        F.col("payroll_type").alias("plan_uno_cumple_nomina"),
                        F.col("soc_insur_asgn_fulflt_type").alias("plan_uno_criterio_ces_ss_nif"),
                        F.col("soc_insur_asgn_fulflt_type").alias("plan_uno_criterio_ces_ss_cif"),
                        F.col("payroll_asgn_crit_type").alias("plan_uno_criterio_ces_nom_nif"),
                        F.col("payroll_asgn_crit_type").alias("plan_uno_criterio_ces_nom_cif"),
                        F.greatest("payroll_type", "pension_type", "recu_inc_individual_type")
                        .alias("plan_uno_cumple_ingrrec_agr")))
        
        return df_t
    
    
    def _transform(self, dep):
        
        # Digitalización
        digi_t = dep["digi"].select("customer_id", 
                                    F.col("cell_phone_validated_type").alias("plan_uno_movil_informado"))
        
        # Recibos
        iii_pob_t = (dep["iii"].filter((F.col("business_area_id").astype("int").isin([13, 6248])) & 
                                (self.filters.entity_id & self.filters.tit))
                     .select("customer_id", "contract_id")
                     .distinct())
        
        pob_t = (dep["pob"].filter(self.filters.country_id & 
                                   self.filters.cus_country & 
                                   self.filters.entity_id & 
                                   self.filters.cus_country & 
                                   self.filters.customer_id & 
                                   (F.col("business_area_id").astype("int").isin([13, 6248])))
                 .drop("customer_id")
                 .join(iii_pob_t, ["contract_id"])
                 .groupBy("customer_id")
                 .agg(F.sum(F.col("paid_bills_1m_number") - F.col("rej_bills_1m_number"))
                      .alias("plan_uno_total_recibos"))
                 .withColumn("plan_uno_criterio_recibos", 
                             F.when(F.col("plan_uno_total_recibos") >= 5, 1).otherwise(0)))
        
        # Tarjetas
        tarj_t = self._get_tarj_data(dep)
        
        # Plan uno directo
        p1_t = self._get_p1_data(dep)
        
        # Cesion de impuestos
        imp_t = self._get_impu_data(dep)
        
        # Ingresos NIF & CIF
        ingr_nif_t = self._get_ingr_nif_data(dep)
        ingr_cif_t = self._get_ingr_cif_data(dep)
        
        # Territorial
        terr_t = self._get_terri_data(dep)
        
        
        # Union todos los datos
        df_t = (digi_t
                .join(pob_t, "customer_id", "outer")
                .join(tarj_t, "customer_id", "outer")
                .join(p1_t, "customer_id", "outer")
                .join(imp_t, "customer_id", "outer")
                .join(ingr_nif_t, "customer_id", "outer")
                .join(ingr_cif_t, "customer_id", "outer")
                .join(terr_t, "customer_id", "outer")
                .na.fill(0))
#                 .withColumnRenamed("customer_id", "cod_persona"))
        
        return df_t
    
    def  get_aux_fields(self,df):
        filter_gms =  (self.filters.customer_id & 
              self.filters.entity_id & self.filters.con_entity_id & 
              self.filters.country_id & self.filters.busi_area & 
              self.filters.activo_glo)
        aux_f =  fields_edc_needed(self.tables,filter_gms,self.closing_date,self.spark,self.means)
        return get_new_fields(self.closing_date,self.tables,aux_f,df)
        
    
    def _load(self, df):
        
        return df
    
    
    def get_data(self):
        
        dep = self._extract()
        df = self._transform(dep)
        df_res = self.get_aux_fields(df)
        
        return self._load(df_res)

### Columnas auxiliares plan1

In [61]:
def fields_edc_needed (tables,filter_gms,closing_date,spark,means):
    cgm = tables.get_cgm().filter(filter_gms)
    
    # imp_activo # ind_prest_hipoteca # imp_pasivo #imp_val_rentavble #imp_val_rentafija
    # media_mens_cta_personal # xti_idefisco #edad
    cgm_fields = cgm.withColumn('pasivoactivo',(F.col("loan_book_end_m_mh_amount") +  # suma cantidad saldo préstamos mínimo titular + balance de saldos mínimo titular
                          F.col("off_bal_exp_end_m_mh_amount"))
                    ).withColumn('ind_prest_hipoteca',
                                 F.when(F.col("ind_mg_loan_id") == 'A', 1).otherwise(0)
                    ).withColumn('media_mens_cta_personal',
                                 F.col("personal_acc_avg_m_bal_amount") + 
                                 F.col("fc_per_acc_avg_m_bal_amount")
                    ).withColumn('imp_activo',
                                 (F.col("loan_book_end_m_mh_amount") + 
                                  F.col("off_bal_exp_end_m_mh_amount"))                                                                                    
                    ).groupBy("customer_id"
                        ).agg(F.sum('imp_activo').alias('imp_activo'),
                            F.max('ind_prest_hipoteca').alias('ind_prest_hipoteca'),
                            F.sum('tot_res_end_m_mh_amount').alias('imp_pasivo'), # total de recursos mínimo titular
                            F.sum('securt_dom_eq_emb_amount').astype("double"
                                            ).alias('imp_val_rentavble'),
                            F.sum('securt_dom_fix_emb_amount').astype("double"
                                            ).alias('imp_val_rentafija'),
                            F.sum('media_mens_cta_personal').astype("double"
                                            ).alias('media_mens_cta_personal'),
                            F.min("document_type").alias("xti_idefisco"), 
                            F.max("age_number").alias("edad")
                             )
    
   # imp_pendiente_credito    
    periodos_3m = du.calculate_fechas_periodos(closing_date, 3, str)
    imp_pendiente_credito = hu.read_table('vsfmapjc',periodos_3m).where(
                (F.col("business_area_id").astype("int").isin([13, 6248])) &
                (F.col("status_type").isin(['A', 'I'])) &
                (F.col("country_id") == "ES") &
                (F.col("customer_country_id") == "ES") &
                (F.col("entity_id") == "0182") &
                (F.col("customer_entity_id") == "0182") &
                (F.col("customer_id").isNotNull()) &
                (F.col("customer_id").astype("int") != 0) &
                (F.col("com_category_id").astype("int").isin([371, 373, 701, 702])) &
                (~F.col("com_grouping_id").astype("int").isin([701010, 371010])) &
                (F.col("credit_limit_amount") != 1.0
                )).groupby(F.col("customer_id")).agg(
                (-1*F.sum("pending_payment_amount")).alias("imp_pendiente_credito"))
     

    # ind_tarj_credito #ind_tarj_debito 
    iii_fields =tables.get_iii().where(
            F.col("business_area_id").astype("int").isin([13, 6248]) &
            (F.col("country_id") == "ES") &
            (F.col("contract_country_id") == "ES") &
            (F.col("entity_id") == "0182") &
            (F.col("contract_entity_id") == "0182") &
            (F.col("status_type").isin(['A', 'I'])) &
            (F.col("customer_id").isNotNull()) &
            (F.col("customer_id").astype("int") != 0) &
            (F.col("party_type_id") == "TIT") &
            (F.col("party_order_number") == 1)
        ).select(
            "status_type",
            'contract_id',
            F.col("customer_id"),
            F.col("com_category_id").astype("int").alias("cod_ctgcom"),
            F.col("com_grouping_id").astype("int").alias("cod_agrcom"),
            F.least("start_date", "contract_regist_date").alias("fh_max")
        ).where(F.col("fh_max") <= closing_date)   
        
    tm_tarjetas = iii_fields.where(F.col("cod_ctgcom").isin([401, 703, 371, 373, 701, 702]))\
                       .withColumn("num_months",
                            F.months_between(F.lit(closing_date), F.col("fh_max")).astype("int"))\
                       .withColumn("tipo",
                                   F.when( F.col("cod_ctgcom").isin([401, 703]) | 
                                           F.col("cod_agrcom").isin([701010, 371010]), 
                                           "n_meses_ult_tarj_debito")\
                                     .when( F.col("cod_ctgcom").isin([371, 373, 701, 702]) &
                                            ~F.col("cod_agrcom").isin([701010, 371010]),
                                           "n_meses_ult_tarj_credito")\
                                     .otherwise("otra"))\
                       .where(F.col("tipo") != "otra")\
                       .groupBy("customer_id")\
                       .pivot("tipo", values=["n_meses_ult_tarj_debito",
                                             "n_meses_ult_tarj_credito"])\
                       .agg(F.min("num_months"))\
                       .withColumn("ind_tarj_debito",
                                  F.when(F.col("n_meses_ult_tarj_debito").isNotNull(), 1).otherwise(0))\
                       .withColumn("ind_tarj_credito",
                                  F.when(F.col("n_meses_ult_tarj_credito").isNotNull(), 1).otherwise(0)
                                  ).select('customer_id','ind_tarj_credito','ind_tarj_debito')
    
    # ind_tarj_rev 
    REVOLVING = 1
    CREDITO = 2
    TARJETA_10 = 3
    contrato_plastico = tables.get_rdc().withColumnRenamed('dest_contract_id','contract_id'
                                ).join(tables.get_bjc(), 
                                 on=["contract_id"],
                                 how="inner")\
                           .groupBy(
                                F.col("source_contract_id").alias("cod_iucori"),
                                F.col("contract_id"),
                                F.col("customer_id"),
                                F.col("payment_method_id").alias("formapago"),
                                F.col("com_grouping_id"),
                                F.col("com_subgrouping_id").alias("cod_subagr"))\
                            .agg(
                                F.sum(F.col("retail_deb_trans_1m_amount") + 
                                      F.col("retail_cred_trans_1m_amount")).astype("double")\
                                 .alias("imp_tarj_cre_com"),
                                F.sum(F.col("cred_trans_nown_atm_1m_amount") +
                                      F.col("deb_trans_nown_atm_1m_amount") +
                                      F.col("cred_trans_own_atm_1m_amount")   +
                                      F.col("deb_trans_own_atm_1m_amount")).astype("double")\
                                 .alias("imp_tarj_cre_caj"),
                                F.sum(F.col("retail_deb_trans_1m_number") +
                                      F.col("retail_cred_trans_1m_number")).alias("n_ope_tarj_cre_com"),
                                F.sum(F.col("cred_trans_nown_atm_1m_number") +
                                      F.col("deb_trans_nown_atm_1m_number") +
                                      F.col("cred_trans_own_atm_1m_number")  +
                                      F.col("deb_trans_own_atm_1m_number")).alias("n_ope_tarj_cre_caj"))\
                            .join(tables.get_pjc(), on=["contract_id",'customer_id'], how="inner")\
                            .join(iii_fields,
                                 on=["contract_id",'customer_id'],
                                how="inner")\
                            .withColumn("tipo",
                                       F.when(F.col("cod_agrcom").astype("int").isin([701010, 371010]) #& (F.col("imp_limcre") == 1)
                                              , TARJETA_10)
                                        .when(F.col("cod_agrcom").astype("int").isin([371014, 371015, 
                                                                                      371016, 371018,
                                                                                      371046, 371049]) |
                                              F.col("cod_subagr").astype("int").isin([371017001, 371017002, 
                                                                                     371017003, 371017004, 
                                                                                     371037005]) |
                                             
                                              F.col("formapago").isin(['C', 'F', 'P']), 
                                              REVOLVING)\
                                         .otherwise(CREDITO))\
                            .groupBy("customer_id").agg(
                                    F.sum(F.when(F.col("tipo") == REVOLVING, 
                                                 F.col("imp_tarj_cre_com")).otherwise(0)).astype("double")\
                                     .alias("imp_tarj_rev_com"),
                                    F.sum(F.when(F.col("tipo") == REVOLVING,
                                                 F.col("imp_tarj_cre_caj")).otherwise(0)).astype("double")\
                                     .alias("imp_tarj_rev_caj"),
                                    F.sum(F.when(F.col("tipo") == REVOLVING,
                                                 F.col("n_ope_tarj_cre_com")).otherwise(0)).alias("n_ope_tarj_rev_com"),
                                    F.sum(F.when(F.col("tipo") == REVOLVING,
                                                 F.col("n_ope_tarj_cre_caj")).otherwise(0)).alias("n_ope_tarj_rev_caj"),
                                    F.sum(F.when(F.col("tipo") == CREDITO,
                                                 F.col("imp_tarj_cre_com")).otherwise(0)).astype("double")\
                                     .alias("imp_tarj_cre_com"),
                                    F.sum(F.when(F.col("tipo") == CREDITO,
                                                 F.col("imp_tarj_cre_caj")).otherwise(0)).astype("double")\
                                     .alias("imp_tarj_cre_caj"),
                                    F.sum(F.when(F.col("tipo") == CREDITO,
                                                 F.col("n_ope_tarj_cre_com")).otherwise(0)).alias("n_ope_tarj_cre_com"),
                                    F.sum(F.when(F.col("tipo") == CREDITO,
                                                 F.col("n_ope_tarj_cre_caj")).otherwise(0)).alias("n_ope_tarj_cre_caj"))\
                            .withColumn("ind_tarj_rev",
                                        F.when((F.col("n_ope_tarj_rev_com") > 0) | (F.col("n_ope_tarj_rev_caj") > 0), 1)\
                                         .otherwise(0))\
                            .fillna(0).select('customer_id','ind_tarj_rev')


    # imp_cta_personal_mean# imp_pasivo_mean
    fields_means = means.select('customer_id','imp_cta_personal_mean','imp_pasivo_mean')
    
    return cgm_fields.join(imp_pendiente_credito,['customer_id'],how='outer'
                          ).join(tm_tarjetas,['customer_id'],how='outer'
                          ).join(contrato_plastico,['customer_id'],how='outer'
                          ).join(fields_means,['customer_id'],how='outer')

In [62]:
def get_new_fields(closing_date,tables,df,pl1):
    
    cols = ['customer_id','plan_uno_cod_territ','plan_uno_criterio_activo_cif','plan_uno_criterio_activo_nif','plan_uno_criterio_ces_imp',
        'plan_uno_criterio_ces_nom_cif','plan_uno_criterio_ces_nom_nif','plan_uno_criterio_ces_ss_cif','plan_uno_criterio_ces_ss_nif',
        'plan_uno_criterio_estables','plan_uno_criterio_hipoteca','plan_uno_criterio_ingresos_cif','plan_uno_criterio_ingresos_nif'
        ,'plan_uno_criterio_ingresos_par','plan_uno_criterio_liquidos','plan_uno_criterio_plasticos',
        'plan_uno_criterio_previsional_cif','plan_uno_criterio_previsional_nif','plan_uno_criterio_recibos',
        'plan_uno_criterio_recursos','plan_uno_criterio_tarjetas','plan_uno_criterio_trans_joven','plan_uno_cumple_inem',
        'plan_uno_cumple_ingrrec','plan_uno_cumple_ingrrec_agr','plan_uno_cumple_nomina',
        'plan_uno_cumple_pension','plan_uno_movil_informado','plan_uno_num_accionesbbva','plan_uno_segmento',
        'plan_uno_sum_num_mov_4meses','plan_uno_tarjeta_activa','plan_uno_total_recibos']
    
    # ind_bbvanet, edad, plan_uno_tarjeta_activa, plan_uno_movil_informado, xti_idefisco
    df= pl1.join(df,['customer_id'],how='left')
    df_t = (df.join(tables.get_pjc(), "customer_id", "left").na.fill(0)
            .withColumn("imp_activo_mod", F.col("imp_activo") - F.abs(F.col("imp_pendiente_credito")))
            .withColumn("plan_uno_criterio_activo_cif", 
                       F.when((F.col("ind_prest_hipoteca") == 0) & 
                              (F.col("imp_activo_mod") > 15000), 1)
                       .otherwise(0))
            .withColumn("plan_uno_criterio_activo_nif", 
                       F.when((F.col("ind_prest_hipoteca") == 0) & 
                              (F.col("imp_activo_mod") > 5000), 1)
                       .otherwise(0))
            .withColumn("plan_uno_criterio_estables",  
                        F.when(((F.col("imp_pasivo") - 
                               F.col("imp_val_rentavble") - 
                               F.col("imp_val_rentafija") - 
                               F.col("media_mens_cta_personal")) > 10000) | 
                               (F.col("plan_uno_num_accionesbbva") > 500), 1)
                        .otherwise(0))
            .withColumn("plan_uno_tarjeta_activa", F.greatest(F.col("ind_tarj_credito"),
                                                             F.col("ind_tarj_debito"),
                                                             F.col("ind_tarj_rev")))
            .withColumn("plan_uno_criterio_trans_joven",
                       F.when((~(F.col("xti_idefisco").cast("int").isin([2, 3, 9])) & 
                              (F.col("edad").between(18, 29))& 
                              (F.col("plan_uno_movil_informado") == 1) & 
                              (F.col("plan_uno_tarjeta_activa") == 1)), 1)
                        .otherwise(0))
            .withColumnRenamed('ind_prest_hipoteca',"plan_uno_criterio_hipoteca")
            .withColumn('plan_uno_criterio_liquidos',F.when(F.col("imp_cta_personal_mean") > 10000, 1)
                                                     .otherwise(0))
            .withColumn('plan_uno_criterio_recursos',F.when(F.col("imp_pasivo_mean") > 50000, 1)
                                                    .otherwise(0))
            .select(*cols))
    
    return df_t

In [63]:
# means_data = MeansData(spark, '2019-12-31').get_data()
# plaunoData = PlanUno(spark, '2019-12-31',[means_data])

In [64]:
# campos = plaunoData.get_data()

In [65]:
# len(campos.columns)

In [66]:
class PlanUnoTactico(GetData):

    def _get_plan1_cols(self):

        return ["customer_id", F.col("bbva_equity_number").astype("int").alias("plan_uno_num_accionesbbva"),
                F.when(F.substring("plan_uno_typology_id", 2, 1) == "P",
                       "VINCULADO")
                .when(F.substring("plan_uno_typology_id", 2, 1) == "T",
                      "TRANSACCIONAL")
                .when(F.substring("plan_uno_typology_id", 2, 1) == "V",
                      "PREVINCULADO")
                .when(F.substring("plan_uno_typology_id", 2, 1) == "R",
                      "BASICO")
                .otherwise("OTRO").alias("plan_uno_segmento"),
                F.col("individual_income_id").astype("int")
                .alias("plan_uno_criterio_ingresos_par"),
                F.col("payroll_id").astype("int").alias(
                    "plan_uno_cumple_nomina"),
                F.col("pension_id").astype("int").alias(
                    "plan_uno_cumple_pension"),
                F.col("unemployment_id").astype("int")
                .alias("plan_uno_cumple_inem"),
                F.col("recurrent_inc_indiv_id").astype("int")
                .alias("plan_uno_cumple_ingrrec"),
                F.col("recurrent_inc_indiv_group_id").astype("int")
                .alias("plan_uno_cumple_ingrrec_agr"),
                F.col("freelance_income_id").astype("int")
                .alias("plan_uno_criterio_ingresos_nif"),
                F.col("card_inidvidual_id").astype("int")
                .alias("plan_uno_criterio_plasticos"),
                F.col("drawn_bills_id").astype("int")
                .alias("plan_uno_criterio_recibos"),
                F.col("active_card_id").astype("int")
                .alias("plan_uno_tarjeta_activa"),
                F.col("cell_phone_criteria_id").astype("int")
                .alias("plan_uno_movil_informado"),
                F.col("stable_criteria_id").astype("int")
                .alias("plan_uno_criterio_estables"),
                F.col("personal_acc_criteria_id").astype("int")
                .alias("plan_uno_criterio_liquidos"),
                F.col("resources_criteria_id").astype("int")
                .alias("plan_uno_criterio_recursos"),
                F.col("card_business_id").astype("int")
                .alias("plan_uno_criterio_tarjetas"),
                F.col("transacc_young_criteria_id").astype("int")
                .alias("plan_uno_criterio_trans_joven"),
                F.col("soc_insurance_freelance_id").astype("int")
                .alias("plan_uno_criterio_ces_ss_nif"),
                F.col("cession_tax_id").astype("int")
                .alias("plan_uno_criterio_ces_imp"),
                F.col("card_trans_sum_4m_number").astype("int")
                .alias("plan_uno_sum_num_mov_4meses"),
                F.col("drawn_bills_4m_number").astype("int")
                .alias("plan_uno_total_recibos"),
                F.col("asset_criteria_business_id").astype("int")
                .alias("plan_uno_criterio_activo_cif"),
                F.col("asset_criteria_freelance_id").astype("int")
                .alias("plan_uno_criterio_activo_nif"),
                F.col("business_income_id").astype("int")
                .alias("plan_uno_criterio_ingresos_cif"),
                F.col("mortgage_id").astype("int")
                .alias("plan_uno_criterio_hipoteca"),
                F.col("soc_insurance_business_id").astype("int")
                .alias("plan_uno_criterio_ces_ss_cif"),
                F.col("cession_payroll_business_id").astype("int")
                .alias("plan_uno_criterio_ces_nom_cif"),
                F.col("cession_payroll_freelance_id").astype("int")
                .alias("plan_uno_criterio_ces_nom_nif"),
                F.col("insurance_criteria_business_id").astype("int")
                .alias("plan_uno_criterio_previsional_cif"),
                F.col("insurance_criteria_particular_id").astype("int")
                .alias("plan_uno_criterio_previsional_nif"),
                F.col("level50_territorial_id").astype("int")
                .alias("plan_uno_cod_territ")
                ]

    def get_data(self):

#         df_plan1 = cmautils.read_table_v2(
#             spark, "planuno", self.closing_date.replace("-", "")[:6])
        df_plan1 = hu.read_table("plan1_tactico", self.closing_date)
        
        df_plan1 = cmautils.format_id_col(df_plan1)\
                        .select(*self._get_plan1_cols())
        
        return df_plan1

In [67]:
# df_old = PlanUnoTactico(spark,'2019-12-31').get_data()

# sorted(df_old.columns)

# Cálculo Tenencias Generales

In [68]:
class GenTenData(GetData):
        
    def get_fils_iii(self):
        
        return (self.filters.activo & self.filters.tit & 
               self.filters.pos_tit & self.filters.busi_area & 
               self.filters.country_id & self.filters.entity_id & 
               self.filters.customer_id)
    
    
    def get_gen_dict(self):
        
        return  {#"ind_seg_keyman": (F.col("com_grouping_id") == 593002), 
                 "ind_seg_cesce": ((F.col("com_category_id") == 593) & 
                                   (F.col("com_subgrouping_id") == 593003004)), 
                 "ind_finan_import": (F.col("com_category_id") == 705)}
    
    
    def get_data(self):
        """
        Get general products indicators
        """

        # Read general iii
        iii = self.tables.get_iii().filter(self.get_fils_iii()).drop("closing_date")

        # Generate one flag for each productos
        for flag_name, cond in self.get_gen_dict().items():
            iii = iii.withColumn(flag_name, F.when(cond, 1).otherwise(0))


        # Aggregation of all the data by customer
        max_list = [F.max(key).alias(key) for key in self.get_gen_dict().keys()]
        iii = iii.groupBy("customer_id").agg(*max_list)

        return iii

In [69]:
# gen_tend_data = GenTenData(spark, "2019-01-31").get_data()
# gen_tend_data.cache()
# gen_tend_data.count()

In [70]:
# (gen_tend_data.groupBy("ind_finan_import").count().show())

# Datos CGM

In [71]:
class CGMData(GetData):
        
    def get_cgm_fils(self):
        
        return(self.filters.customer_id & 
              self.filters.entity_id & self.filters.con_entity_id & 
              self.filters.country_id & self.filters.busi_area & 
              self.filters.activo_glo)   
    
    def get_imp_dict(self):
        
        return {"imp_comercio_exterior": "foreign_trade_emb_amount", 
                "imp_pp_ben": "ben_pp_emb_amount",
                "imp_pp_par": "part_pp_emb_amount",
                "imp_cta_personal": (F.col("personal_acc_emb_amount")
                                 + F.col("fc_per_acc_end_m_bal_amount")),
                "imp_fondos": (F.col("mng_mfund_emb_amount") + 
                          F.col("n_mng_mfund_emb_amount")),
                "imp_plazo": (F.col("term_liability_emb_amount") + 
                         F.col("fc_term_liabilities_emb_amount")),
                "imp_avales": F.col("endorsem_emb_amount")*-1, 
                "imp_factoring_deudor": "debt_fact_emb_amount", 
                "imp_factoring": "factoring_emb_amount",
                "imp_leasing": F.col("leasing_emb_amount")*-1,
                "imp_renting": F.col("renting_emb_amount")*-1, 
                "imp_cta_credito": (F.col("credit_acc_emb_amount") + 
                               F.col("fc_credit_acc_emb_amount")),
                "imp_confirming": "rev_fact_emb_amount",
                "imp_confirming_proveedor": "sup_rev_fact_emb_amount",
                "imp_activo": (F.col("loan_book_end_m_mh_amount") + 
                          F.col("off_bal_exp_end_m_mh_amount")),
                "imp_pasivo": "tot_res_end_m_mh_amount"}
    
    def get_ind_data(self):
        """
        Get the tenency flag of many productos from the cgm table.
        """

        cgm = self.tables.get_cgm().filter(self.get_cgm_fils()).drop("closing_date")

        cgm = cgm.withColumn("ind_pp_ben", F.when(F.col("ben_pp_id") == "A", 1).otherwise(0))\
        .withColumn("ind_pp_par", F.when(F.col("part_pp_id") == "A", 1).otherwise(0))\
        .withColumn("ind_cta_personal", F.when(((F.col("personal_acc_id") == "A") | (F.col("fc_per_acc_id") == "A")), 1).otherwise(0))\
        .withColumn("ind_fondos", F.when(((F.col("mng_mfund_id") == "A") | (F.col("n_mng_mfund_id") == "A")), 1).otherwise(0))\
        .withColumn("ind_plazo", F.when(((F.col("term_liability_id") == "A") | (F.col("fc_term_liabilities_id") == "A")), 1).otherwise(0))\
        .withColumn("ind_seg_salud", F.when((F.col("health_ins_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_seg_vida", F.when((F.col("life_insurc_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_seg_hogar", F.when((F.col("hou_insurc_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_avales", F.when((F.col("endorsem_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_factoring_deudor", F.when((F.col("debt_fact_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_factoring", F.when((F.col("factoring_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_leasing", F.when((F.col("leasing_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_renting", F.when((F.col("renting_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_cta_credito", F.when((F.col("credit_acc_id") == "A") | (F.col("fc_credit_acc_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_confirming", F.when((F.col("rev_fact_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_confirming_proveedor", F.when((F.col("sup_rev_fact_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_tpv", F.when((F.col("retail_bill_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_cesion_impuestos", F.when((F.col("tax_assig_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_cesion_nompen", F.when((F.col("payr_pens_assig_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_emision_recibos", F.when((F.col("bill_iss_id") == "A"), 1).otherwise(0))\
        .withColumn("ind_comercio_exterior", F.when((F.col("foreign_trade_id") == "A"), 1).otherwise(0))
        #         .withColumn("ind_seg_coche", F.when((F.col("car_insurc_id") == "A"), 1).otherwise(0))\

        cgm = cgm.groupBy("customer_id")\
            .agg(F.max("ind_pp_ben").alias("ind_pp_ben"), 
                 F.max("ind_pp_par").alias("ind_pp_par"),
                 F.max("ind_cta_personal").alias("ind_cta_personal"), 
                 F.max("ind_fondos").alias("ind_fondos"),
                 F.max("ind_plazo").alias("ind_plazo"), 
                 F.max("ind_seg_salud").alias("ind_seg_salud"),
                 F.max("ind_seg_hogar").alias("ind_seg_hogar"),
                 F.max("ind_seg_vida").alias("ind_seg_vida"),
                 F.max("ind_avales").alias("ind_avales"),
                F.max("ind_factoring_deudor").alias("ind_factoring_deudor"),
                F.max("ind_factoring").alias("ind_factoring"),
                F.max("ind_leasing").alias("ind_leasing"),
                F.max("ind_renting").alias("ind_renting"),
                F.max("ind_cta_credito").alias("ind_cta_credito"),
                F.max("ind_confirming").alias("ind_confirming"),
                F.max("ind_confirming_proveedor").alias("ind_confirming_proveedor"),
                F.max("ind_tpv").alias("ind_tpv"),
                F.max("ind_cesion_impuestos").alias("ind_cesion_impuestos"),
                F.max("ind_cesion_nompen").alias("ind_cesion_nompen"),
                F.max("ind_emision_recibos").alias("ind_emision_recibos"),
                F.max("ind_comercio_exterior").alias("ind_comercio_exterior"))
        #                  F.max("ind_seg_coche").alias("ind_seg_coche"),

        return cgm
    
    
    def get_imp_data(self):
        """
        Get the balance of many productos from the cgm table.
        """
        # Read cgm data
        cgm = self.tables.get_cgm().filter(self.get_cgm_fils()).drop("closing_date")
        # List that contains all the balances to sum
        sum_list = [F.sum(value).alias(key) for key, value in self.get_imp_dict().items()]
        # Aggregation of data
        cgm = cgm.groupBy("customer_id").agg(*sum_list)
        
        return cgm

In [72]:
# CLOSING_DATE = '2019-12-31'
# cgm_ind = CGMData(spark, CLOSING_DATE).get_ind_data()
# cgm_imp = CGMData(spark, CLOSING_DATE).get_imp_data()

In [73]:
# sorted(cgm_imp.columns)

In [74]:
# cgm_ind.cache(), cgm_ind.count()
# cgm_imp.cache(), cgm_imp.count()

# Rentas y Valores

In [75]:
class RentaValores(GetData):
    
    def get_pvv_fils(self):
        
        return (self.filters.customer_id & self.filters.entity_id & 
               self.filters.country_id & self.filters.busi_area & 
               (F.col("cash_bal_end_m_amount") > 0) & 
                (F.col("contract_cancel_date") > self.closing_date))
    
    
    def get_pvv_cols(self):
        
        return ["customer_id", "dependent_id", "cash_bal_end_m_amount", 
                "com_category_id", "com_grouping_id", "com_subgrouping_id", 
                "catalogsf_product_id"]
    
    def get_pvv_fill_na(self):
        
        return {"dependent_id": "0", 
              "cash_bal_end_m_amount": 0, 
              "com_category_id": 0,
              "com_grouping_id": 0,
              "com_subgrouping_id": 0, 
              "catalogsf_product_id": "0"}
    

    
    def get_data(self):
        """
        Get the data from the productos of the fvv table
        """
        
        def get_cod_epigsf4(x):
            """
            Get the cod_epigsf4 code from the lowest level code
            """
            if x and len(x) >= 12:
                return x[:12]
            else:
                return 0
        
        def get_tipo_pasivo(cod_prodcat, cod_ctgcom, cod_epigsf4):
            """
            Get the type of pasive product
            """
            if cod_ctgcom == 655:
                tipo_pasivo = "rf"
            elif cod_epigsf4 == "656001001010":
                tipo_pasivo = "valores_bbva"
            elif cod_ctgcom == 656:
                tipo_pasivo = "valores_no_bbva"
            else:
                tipo_pasivo = "resto_valores"
            return tipo_pasivo
        
        get_cod_epigsf4_udf = F.udf(get_cod_epigsf4)
        get_pasivo_udf = F.udf(get_tipo_pasivo)

        
        df_pvv = self.tables.get_pvv().filter(self.get_pvv_fils()).select(self.get_pvv_cols())

        df_pvv = df_pvv.withColumn("cod_epigsf4", get_cod_epigsf4_udf(F.col("catalogsf_product_id")))
        df_pvv = df_pvv.withColumn("tipo_pasivo", get_pasivo_udf(F.col("catalogsf_product_id"), 
                                                                F.col("com_category_id"), 
                                                                F.col("cod_epigsf4")))
        # Add sal_valores, sal_rf, sal_valores_nobbva, sal_resto_valores
        df_pvv = df_pvv\
        .withColumn("imp_val_rentafija", 
                    F.when(F.col("tipo_pasivo") == "rf", 
                           F.col("cash_bal_end_m_amount")).otherwise(0))\
        .withColumn("imp_valores_bbva", 
                    F.when(F.col("tipo_pasivo") == "valores_bbva", 
                           F.col("cash_bal_end_m_amount")).otherwise(0))\
        .withColumn("imp_valores_no_bbva", 
                    F.when(F.col("tipo_pasivo") == "valores_no_bbva", 
                           F.col("cash_bal_end_m_amount")).otherwise(0))\
        .withColumn("imp_resto_valores", 
                    F.when(F.col("tipo_pasivo") == "resto_valores", 
                           F.col("cash_bal_end_m_amount")).otherwise(0))

        df_pvv = df_pvv.groupBy("customer_id")\
            .agg(F.sum("imp_val_rentafija").alias("imp_val_rentafija"),
                 F.sum("imp_valores_bbva").alias("imp_valores_bbva"),
                 F.sum("imp_valores_no_bbva").alias("imp_valores_no_bbva"),
                 F.sum("imp_resto_valores").alias("imp_resto_valores"))


        df_pvv = df_pvv\
            .withColumn("ind_val_rentafija", F.when(F.col('imp_val_rentafija') > 0, 1).otherwise(0))\
            .withColumn("ind_valores_bbva", F.when(F.col('imp_valores_bbva') > 0, 1).otherwise(0))\
            .withColumn("ind_valores_no_bbva", F.when(F.col('imp_valores_no_bbva') > 0, 1).otherwise(0))\
            .withColumn("ind_resto_valores", F.when(F.col('imp_resto_valores') > 0, 1).otherwise(0))

        return df_pvv

In [76]:
# pvv_data = RentaValores(spark, CLOSING_DATE).get_data()
# pvv_data.cache(), pvv_data.count()

# Prestamos, Hipotecas y Consumos

In [77]:
class PraData(GetData):
    
    def get_fils_pra(self):
        
        return(self.filters.country_id & self.filters.entity_id & 
              self.filters.cus_country & self.filters.busi_area &
              self.filters.hip_bal)  
    
    def get_fils_iii(self):
        
        return(self.filters.country_id & self.filters.entity_id &
              self.filters.tit & self.filters.pos_tit & 
              self.filters.activo & self.filters.customer_id)
    
    def get_pra_ind_cols(self):
        
        return ["ind_hipoteca_particular", "ind_prestamo_empresa", 
                "ind_consumo", "ind_prestamo_credinegocio", "ind_hipoteca_empresa"]
    
    def get_conditions(self):
        
        cond_p1 = ((F.col("com_category_id") == 462) & (F.col("com_grouping_id") != 462013))
    
    
        cond_p2 = ((F.col("com_category_id") == 461) & 
                    (~F.col("com_grouping_id").isin([461003, 461004, 461008])) & 
                     (~F.col("com_subgrouping_id").isin([461001001, 461002001, 461005001, 461006001])))

        cond_p3 = (F.col("com_category_id") == 464)

        cond_p4_1 = (F.col("com_category_id") == 461)
        cond_p4_2 = ((F.col("com_grouping_id").isin([461003, 461004]) | 
                      (F.col("com_subgrouping_id").isin([461008002, 461008003]))))

        cond_p4 = (cond_p4_1 & cond_p4_2)

#         cond_p5 = ((F.col("com_category_id") == 461) & 
#                    (F.col("com_subgrouping_id").isin([461001001, 461002001, 461005001, 461006001, 461008001])))

        categ_hip_empresa = [461001000,461002000,461003001,461005000,
                         461006000,461008000,461022000,461024000]
        cond_p5 = ((F.col("com_category_id") == 461) & 
                   (F.col("com_subgrouping_id").isin(categ_hip_empresa)))
        
        return (cond_p1, cond_p2, cond_p3, cond_p4, cond_p5)
    
    def get_data(self):
        """
        Get the data of the products contained in the pra table
        """

        pra = self.tables.get_pra().filter(self.get_fils_pra()).select("customer_id", 
                                     "contract_id",
                                     "pending_payment_amount",
                                     "com_category_id", 
                                     "com_grouping_id",
                                     "com_subgrouping_id")
        
        (cond_p1, cond_p2, cond_p3, cond_p4, cond_p5) = self.get_conditions()



        pra = pra.withColumn("tipo_prod", 
                            F.when(cond_p1, "imp_hipoteca_particular").
                            when(cond_p2, "imp_prestamo_empresa").
                            when(cond_p3, "imp_prest_consumo").
                            when(cond_p4, "imp_prestamo_credinegocio").
                            when(cond_p5, "imp_hipoteca_empresa").
                            otherwise(None))

        pra = pra.filter(F.col("tipo_prod").isNotNull())

        iii = self.tables.get_iii()\
            .filter(self.get_fils_iii()).select("contract_id")

        pra_iii = pra.join(iii, "contract_id")

        pra_iii = pra_iii.groupBy(["customer_id"]).pivot("tipo_prod")\
        .agg(F.sum("pending_payment_amount")).na.fill(0)

        for prod in ["imp_hipoteca_particular", "imp_prestamo_empresa", 
                     "imp_prest_consumo", "imp_prestamo_credinegocio", "imp_hipoteca_empresa"]:

            if prod not in pra_iii.columns:
                pra_iii = pra_iii.withColumn(prod, F.lit(0))

        pra_iii = pra_iii\
            .withColumn("ind_hipoteca_particular", 
                        F.when(F.col("imp_hipoteca_particular") > 0, 1).otherwise(0))\
            .withColumn("ind_prestamo_empresa", 
                        F.when(F.col("imp_prestamo_empresa") > 0, 1).otherwise(0))\
            .withColumn("ind_prest_consumo", 
                        F.when(F.col("imp_prest_consumo") > 0, 1).otherwise(0))\
            .withColumn("ind_prestamo_credinegocio", 
                        F.when(F.col("imp_prestamo_credinegocio") > 0, 1).otherwise(0))\
            .withColumn("ind_hipoteca_empresa", 
                        F.when(F.col("imp_hipoteca_empresa") > 0, 1).otherwise(0))

        return pra_iii

In [78]:
# pra_data = PraData(spark, CLOSING_DATE).get_data()
# pra_data.cache(), pra_data.count()

# Datos Tarjetas

In [79]:
class Tarjetas(GetData):    
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        
        self.filters.tarj_deb_part_cods = (F.col("com_category_id").astype("int").isin([401, 703]))
        self.filters.tarj_deb_empr_cods = (F.col("com_category_id").astype("int").isin([403, 704]))
        self.filters.tarj_cre_part_cods = (F.col("com_category_id").astype("int").isin([371, 701]))
        self.filters.tarj_cre_empr_cods = (F.col("com_category_id").astype("int").isin([373, 702]))
        
        self.filters.tarj_no_block = ((F.col("card_block_id").isNull()) | (F.col("card_block_id") != "A"))
        
        
        self.filters.tar_base = (self.filters.entity_id &
                         self.filters.cus_country & 
                         self.filters.cus_entity & 
                         self.filters.busi_area & self.filters.customer_id & 
                         self.filters.activo & self.filters.tarj_no_block)
        
        self.filters.tar_iii_base = (self.filters.con_country_id & 
                             self.filters.con_entity_id & 
                             self.filters.country_id & self.filters.entity_id & 
                             self.filters.tit & self.filters.pos_tit & self.filters.busi_area)
        
        self.filters.bjd_part = (self.filters.tar_base & self.filters.tarj_deb_part_cods)
        self.filters.bjd_empr = (self.filters.tar_base & self.filters.tarj_deb_empr_cods)
        self.filters.bjc_part = (self.filters.tar_base & self.filters.tarj_cre_part_cods)
        self.filters.bjc_empr = (self.filters.tar_base & self.filters.tarj_cre_empr_cods)

        self.filters.bjd_iii_part = (self.filters.tar_iii_base & self.filters.tarj_deb_part_cods)
        self.filters.bjd_iii_empr = (self.filters.tar_iii_base & self.filters.tarj_deb_empr_cods)
        self.filters.bjc_iii_part = (self.filters.tar_iii_base & self.filters.tarj_cre_part_cods)
        self.filters.bjc_iii_empr = (self.filters.tar_iii_base & self.filters.tarj_cre_empr_cods)
        
        self.ind_dict = {"bjd": "deb", "bjc": "cre"}
        
        
    def get_agg_tdeb(self, cust_type):
        
        agg_tdeb = [F.sum("retail_trans_1m_amount").astype("double")\
                        .alias("imp_tarj_deb_com_{0}".format(cust_type)),
                    F.sum("trans_atm_1m_amount").astype("double")\
                        .alias("imp_tarj_deb_caj_{0}".format(cust_type)),
                    F.sum("retail_trans_1m_number")\
                        .alias("n_ope_tarj_deb_com_{0}".format(cust_type)),
                    F.sum("trans_atm_1m_number")\
                        .alias("n_ope_tarj_deb_caj_{0}".format(cust_type))]
        return agg_tdeb
    
    
    def get_agg_tcre(self, cust_type):
    
        agg_tcre = [F.sum(F.col("retail_deb_trans_1m_amount") + 
                      F.col("retail_cred_trans_1m_amount")).astype("double")\
                            .alias("imp_tarj_cre_com_{0}".format(cust_type)),
                F.sum(F.col("cred_trans_nown_atm_1m_amount") +
                      F.col("deb_trans_nown_atm_1m_amount") +
                      F.col("cred_trans_own_atm_1m_amount")   +
                      F.col("deb_trans_own_atm_1m_amount")).astype("double")\
                            .alias("imp_tarj_cre_caj_{0}".format(cust_type)),
                F.sum(F.col("retail_deb_trans_1m_number") +
                      F.col("retail_cred_trans_1m_number"))
                            .alias("n_ope_tarj_cre_com_{0}".format(cust_type)),
                F.sum(F.col("cred_trans_nown_atm_1m_number") +
                      F.col("deb_trans_nown_atm_1m_number") +
                      F.col("cred_trans_own_atm_1m_number")  +
                      F.col("deb_trans_own_atm_1m_number"))
                            .alias("n_ope_tarj_cre_caj_{0}".format(cust_type))]
    
        return agg_tcre
    
    
    def get_data(self, card_type, cust_type):
        """
        Get a df with the following variables from cards data:
            - customer id
            - imp_tarj_cre_com_<cust_type>
            - imp_tarj_cre_caj_<cust_type>
            - n_ope_tarj_cre_com_<cust_type>
            - n_ope_tarj_cre_caj_<cust_type>
        Where <cust_type> can be "part" (general public) or "empr" (enterprises).

        Args:
            - spark: spark context
            - closing_date
            - tables_dict: general dict that contains all the tables of the program
            - fils_dict: dict that contains the general filters of the program
            - card_type: "bjc" -> credit cards, "bjd" -> debit cards
            - cust_type: "part" (general public) or "empr" (enterprises)


        Returns:
            - DF[customer_id, imp_tarj_cre_com_<cust_type>, imp_tarj_cre_caj_<cust_type>,
            n_ope_tarj_cre_com_<cust_type>, n_ope_tarj_cre_caj_<cust_type>]
        """

        # Get data and filters
        (data_tarjeta, iii) = (getattr(self.tables, "get_{0}".format(card_type))(), self.tables.get_iii())
        (fils_tarjeta, fils_iii) = (getattr(self.filters, "{0}_{1}".format(card_type, cust_type)),
                                    getattr(self.filters, "{0}_iii_{1}".format(card_type, cust_type)))

        # Get sentence for the aggregationself.closing_date
        if card_type == "bjd":
            agg_vars = self.get_agg_tdeb(cust_type)
        elif card_type == "bjc":
            agg_vars = self.get_agg_tcre(cust_type)


        # Define join conditions
        join_cond = ((F.trim(data_tarjeta.country_id) == F.trim(iii.contract_country_id)) &
                     (F.trim(data_tarjeta.customer_country_id) == F.trim(iii.contract_country_id)) &
                     (F.trim(data_tarjeta.entity_id) == F.trim(iii.contract_entity_id)) &
                     (F.trim(data_tarjeta.contract_id) == F.trim(iii.contract_id)) &
                     (F.trim(data_tarjeta.customer_entity_id) == F.trim(iii.entity_id)) &
                     (F.trim(data_tarjeta.business_area_id) == F.trim(iii.business_area_id)))

        # Data aggregation: we select the corresponding table for the different 
        # card types (debit: bjd, credit: bjc), applying the corresponding filters.
        # Then we join with iii, so we select only first tit. After that, we aggregate
        # on customer id and get the corresponding variables
        
        t_gasto_opera = (data_tarjeta
                         .filter(fils_tarjeta)
                         .join(iii.filter(fils_iii), on=join_cond)
                         .groupBy(iii.customer_id).agg(*agg_vars)
                         .withColumn("ind_tarj_{0}_{1}".format(self.ind_dict.get(card_type), cust_type), 
                                     F.lit(1)))

        return t_gasto_opera

In [80]:
# CLOSING_DATE = "2018-02-28"

In [81]:
# tarjetas_data = Tarjetas(spark, CLOSING_DATE).get_data(card_type="bjd", cust_type="empr")
# tarjetas_data.cache()
# tarjetas_data.count()

# Descuento Comercial

In [82]:
class DescComercial(GetData):
    
    def get_fils_hca(self):
        
        return (self.filters.cus_country & self.filters.entity_id & 
                self.filters.cus_entity 
                & self.filters.busi_area & self.filters.pos_balance)
    
    def get_fils_iii_hca(self):
        
        return(self.filters.busi_area & self.filters.tit &
              self.filters.pos_tit & self.filters.customer_id&
              (F.col("com_category_id") == 27))
    
    def get_data(self):
        
        hca = self.tables.get_hca()\
            .filter(self.get_fils_hca()).select("customer_id",
                                "com_grouping_id", 
                                "end_m_bal_amount",
                                "contract_id")
    
        iii = self.tables.get_iii()\
            .filter(self.get_fils_iii_hca()).select("contract_id")

        hca_iii = hca.join(iii, ["contract_id"])\
            .groupBy(["customer_id", "com_grouping_id"])\
            .agg(F.sum("end_m_bal_amount").alias("end_m_bal_amount"))
        
        cond = F.col("com_grouping_id").isin([27001, 27003])
    
        hca_iii = hca_iii.withColumn("imp_comercial_dis", 
            F.when(cond, F.col("end_m_bal_amount")).otherwise(0))

        hca_iii = hca_iii.withColumn("ind_comercial_dis", 
            F.when(cond, 1).otherwise(0))

        hca_iii = hca_iii.withColumn("imp_other_portfolio", 
            F.when(~cond, F.col("end_m_bal_amount")).otherwise(0))

        hca_iii = hca_iii.withColumn("ind_other_portfolio", 
            F.when(~cond, 1).otherwise(0))
        
        hca_iii = hca_iii.groupBy("customer_id").agg(
            F.max("ind_comercial_dis").alias("ind_comercial_dis"),
            F.max("ind_other_portfolio").alias("ind_other_portfolio"),
            F.sum("imp_comercial_dis").alias("imp_comercial_dis"),
            F.sum("imp_other_portfolio").alias("imp_other_portfolio"))
        
        return hca_iii

In [83]:
# desc_com_data = DescComercial(spark, CLOSING_DATE).get_data()
# desc_com_data.cache(), desc_com_data.count()

# Seguros

In [84]:
# CLOSING_DATE = '2019-10-31'
# seg_data = SegurosData_old(spark, CLOSING_DATE).get_data()


# seg_data.where(F.col('customer_id').isin(['000057312','009147874','007373279'])).toPandas()

In [85]:
class SegurosData(GetData):
    
    def __init__(self, spark, closing_date):       
        super().__init__(spark, closing_date)
        
        self.table = {'vsfmacgm': {'key':['customer_id','country_id','business_area_id','entity_id'], 
                                   'cols_in':['stockpyme_type', 'agric_bus_insrc_type',
                                              'mrisk_bus_insrc_type','rcadmin_bus_insrc_type',
#                                              'rest_insurance_bussines_companies_premium_12m_type',
                                              'keyman_bus_insrc_type',
                                              'veh_bus_insrc_type', 'health_bus_insrc_type',
                                              'lse_rk_bus_insrc_type','credrk_bus_insrc_type',
                                              'bus_insurc_id', 'min_insurance_id'],
                                   'cols_out':['ind_stock_pymes', 'ind_seg_agra', 'ind_multi_riesgs', 'ind_dyo',# 'ind_resto_seguros_neg',
                                               'ind_seg_keyman', 'ind_seg_coche', 'ind_seg_health', 'ind_seg_leasing', 'ind_seg_credito',
                                               'ind_seg_negocio', 'ind_mini_seguros',]
                                  },
                     }
        self.relation_cols = {'ind_stock_pymes': 'stockpyme_type',
               'ind_seg_agra' : 'agric_bus_insrc_type',
               'ind_multi_riesgs': 'mrisk_bus_insrc_type',
               'ind_dyo': 'rcadmin_bus_insrc_type',
#               'ind_resto_seguros_neg': 'rest_insurance_bussines_companies_premium_12m_type',
               'ind_seg_keyman': 'keyman_bus_insrc_type',
               'ind_seg_coche': 'veh_bus_insrc_type',
               'ind_seg_health': 'health_bus_insrc_type',
               'ind_seg_leasing': 'lse_rk_bus_insrc_type',               
               'ind_seg_credito': 'credrk_bus_insrc_type',
               'ind_seg_negocio': 'bus_insurc_id',
               'ind_mini_seguros': 'min_insurance_id',
              }
    
    def get_insurances (self):
        bibasic = hu.read_table('vsfmacgm',self.closing_date).select('customer_id',*self.table['vsfmacgm']['cols_in'])
        
        return bibasic.groupBy('customer_id').agg(*[F.max(col).alias(alias) for alias,col in self.relation_cols.items()])

In [86]:
# df = SegurosData(spark, '2019-12-31').get_insurances()

In [87]:
# SegurosData(spark, CLOSING_DATE).get_insurances().select('customer_id',
#             'ind_dyo','ind_stock_pymes','ind_resto_seguros_neg','ind_seg_agra','ind_multi_riesgs'
#             ).where(F.col('customer_id').isin(['000057312','009147874','007373279'])).toPandas()

# Importe TPV

In [88]:
class BalanceTPV(GetData):
            
    def get_fils_hom(self):
        
        fil = self.filters
        
        return(fil.country_id & fil.entity_id & 
              fil.cus_country & fil.cus_entity & 
              fil.busi_area & fil.activo & fil.customer_id)
    
    def get_sum_expr(self):
        
        return F.col("intn_coll_billed_1m_amount") + F.col("dom_coll_billed_1m_amount")
    
    def get_data(self):

        tpv = self.tables.get_hom()\
            .filter(self.get_fils_hom())\
            .groupBy("customer_id")\
            .agg(F.sum(self.get_sum_expr()).alias("imp_tpv"))

        return tpv

In [89]:
# tpv = BalanceTPV(spark, CLOSING_DATE).get_data()
# tpv.cache(), tpv.count()

# Pago Aplazado

In [90]:
# TODO

# Margen

In [91]:
class MargenData(GetData):
    
    def get_fil_margen(self):
        
        return ((F.col("gross_margin_12m_amount") > -10000000) & 
                (F.col("gross_margin_12m_amount") < 10000000))
    
    def get_fils_rtk(self):
        
        fil = self.filters
        
        return (fil.country_id & fil.entity_id & fil.con_entity_id & 
                fil.busi_area & self.get_fil_margen() & fil.customer_id)
    
    def get_data(self):
        """
        Get margin data
        """

        # Define imp_margen and imp_margen_opera
        margen_expr = F.col("gross_margin_12m_amount") - F.col("risk_premium_12m_amount")
        margen_opera_expr = F.col("gross_margin_12m_amount")
        
        margen_servi_expr = F.col("serv_margin_12m_amount")
        margen_inte_expr = F.col("int_margin_12m_amount")
        margen_inte_net_expr = F.col("net_int_margin_12m_amount")



        # Read rtk data
        margen = self.tables.get_rtk()\
                    .filter(self.get_fils_rtk())\
                    .groupBy("customer_id")\
                    .agg(F.sum(margen_expr).alias("imp_margen"), 
                         F.sum(margen_opera_expr).alias("imp_margen_opera"),
                         F.sum(margen_servi_expr).alias("imp_margen_servi"),
                         F.sum(margen_inte_expr).alias("imp_margen_inte"),
                         F.sum(margen_inte_net_expr).alias("imp_margen_inte_net"))

        return margen

In [92]:
# margen_data = MargenData(spark, "2019-01-31").get_data()
# margen_data.cache()
# margen_data.count()

# RAR

In [93]:
class RarData(GetData):
    
    def __init__(self, spark, closing_date):       
        super().__init__(spark, closing_date)
        
        self.table = {'vsfmartk': {'key':['customer_id'], 
                                   'cols_in':['econ_profit_12m_amount','econ_risk_12m_amount'],
                                   'cols_out':['customer_id','rar_12m']
                                  },
                     }

        
    def get_rar12m(self):
        
        rentabilidad_ligera = hu.read_table( "vsfmartk", self.closing_date
                                           ).select(*self.table['vsfmartk']['key'],*self.table['vsfmartk']['cols_in'])
        
        rentabilida_cliente = rentabilidad_ligera.groupBy('customer_id'
                           ).agg(F.sum('econ_profit_12m_amount').alias('beneficio_inf'),
                                 F.sum('econ_risk_12m_amount').alias('capital_riesgo')
                                )
        
        return rentabilida_cliente.withColumn('rar_12m', F.when(F.col('capital_riesgo')==0,999
                            ).otherwise((F.col('beneficio_inf')/F.col('capital_riesgo'))*(36500/365)))
        
        

In [94]:
# CLOSING_DATE = "2019-10-31"
# rar12m = RarData(spark,CLOSING_DATE).get_rar12m()
# rar12m.where(F.col('customer_id').isin('052372840','001004882','029346794')).show()

# Ingresos

In [95]:
class IngresosData(GetData):
    """
    Given a closing date, returns de following data:
        DF[customer_id, imp_nomina, ind_nomina, imp_pension, ind_pension, ind_inem, imp_inem]
        
    To get the data, the current month and the previous one are taken into account
    """
    
    
    def _get_poa_fils(self):
        
        fil = self.filters
        
        return (fil.country_id & fil.cus_country & 
                fil.entity_id & fil.cus_entity & 
                fil.customer_id & fil.busi_area & 
                (F.col("com_grouping_id").astype("int").isin([72004, 72001, 72002])) & 
                (F.col("total_credited_1m_amount") > 0) & 
                (~F.col("catalogsf_product_id").startswith("72001001001")) & 
                (~F.col("catalogsf_product_id").startswith("72001002001")))
    
    
    def _get_agg_dict(self):
        
        return {"imp_pension_": F.sum(F.when(F.col("com_grouping_id") == "72002", 
                                             F.col("imp_apuzm")).otherwise(0))\
                .astype("double"),
               "imp_nomina_": F.sum(F.when(F.col("com_grouping_id") == "72001", 
                                           F.col("imp_apuzm")).otherwise(0))\
                .astype("double"),
               "imp_inem": F.sum(F.when(F.col("com_grouping_id") == "72004", F.col("imp_apuzm"))\
                                 .otherwise(0)).astype("double")}
    
    
    def _get_final_field_dict(self):
        
        return {"imp_nomina": F.when(F.col("imp_nomina_") > 500, F.col("imp_nomina_")).otherwise(0),
                "ind_nomina": F.when(F.col("imp_nomina_") > 500, 1).otherwise(0),
                "imp_pension": F.when(F.col("imp_pension_") > 300, F.col("imp_pension_")).otherwise(0),
                "ind_pension": F.when(F.col("imp_pension_") > 300, 1).otherwise(0),
                "ind_inem": F.when(F.col("imp_inem") > 0, 1).otherwise(0)}
    
    def get_data(self):
        
        # Get the data for the current month and the previous one
        df_out = None
        for subindex, part in zip(["_0", "_1"], [self.closing_date, 
                                                 cmautils.add_months(self.closing_date, -1)]):
            
            poa = Tables(self.spark, part).get_poa().filter(self._get_poa_fils())\
                    .select("customer_id", "com_grouping_id", 
                            F.col("total_credited_1m_amount").alias("imp_apuzm{}".format(subindex)))
                        
            if df_out is None:
                df_out = poa
            else:
                df_out = df_out.join(poa, on=["customer_id", "com_grouping_id"], how="outer")\
                             .fillna(0)
        
        # Group data getting the max for the two months and getting "imp_pension_",
        # "imp_nomina_" and "imp_inem"
        df_out_gr = df_out.groupBy("customer_id", "com_grouping_id")\
                    .agg(F.sum("imp_apuzm_0").alias("imp_apuzm_0"), 
                         F.sum("imp_apuzm_1").alias("imp_apuzm_1"))\
                    .withColumn("imp_apuzm", F.greatest(F.col("imp_apuzm_0"),F.col("imp_apuzm_1")))\
                    .drop("imp_apuzm_0", "imp_apuzm_1")\
                    .groupBy("customer_id")\
                    .agg(*[expr.alias(name) for name, expr in self._get_agg_dict().items()])
        
        # Getting final fields: "customer_id", "imp_nomina", "ind_nomina", "imp_pension", 
        # "ind_pension", "imp_inem", "imp_inem"
        for name, expr in self._get_final_field_dict().items():
            df_out_gr = df_out_gr.withColumn(name, expr)
        
        return df_out_gr.drop("imp_nomina_", "imp_pension_")

In [96]:
# ingresos_data = IngresosData(spark, CLOSING_DATE).get_data()
# ingresos_data.cache(), ingresos_data.count()

# Riesgos

In [97]:
class RiesgosData(GetData):
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        self.risk_var_out = "hermes_risk_group_type"
        self.risk_var_in = "risk_cust_group_type"
        

    def get_data(self):

        df_risk = hu.read_table("ecrkrisk", self.closing_date, criteria='closest'
                               ).select('customer_id','risk_cust_group_type')
        df_risk = df_risk.withColumn(self.risk_var_in, F.trim(F.col(self.risk_var_in)))
        df_risk = df_risk.withColumn(self.risk_var_in, F.when(F.col(self.risk_var_in
                                     ) == "", "-1").otherwise(F.col(self.risk_var_in)))
        df_risk_final = df_risk.na.fill({self.risk_var_in: "-1"})
        df_risk_final = df_risk_final.withColumn(self.risk_var_out, F.col(self.risk_var_in).cast(T.IntegerType()))

        return df_risk_final.select('customer_id','hermes_risk_group_type')


In [98]:
# CLOSING_DATE = "2019-11-30"
# riesgos_data = RiesgosData(spark, CLOSING_DATE).get_data()
# riesgos_data.columns
# riesgos_data.limit(10).toPandas()

# Datos Cirbe 

In [99]:
class CirbeData(GetData):
    
    def get_data(self):
        
        cirbe = self.tables.get_cirbe()\
                    .withColumnRenamed("cod_persona", "customer_id").drop('closing_date')
        
        return cirbe

In [100]:
# CLOSING_DATE = "2019-01-31"
# cirbe = CirbeData(spark, CLOSING_DATE).get_data()
# cirbe.cache(), cirbe.count()

# Riesgos BEC

In [101]:
class RiesgosBecDatioData(GetData):
    
    """
    DF[customer_id, hat_id, risk_priority_id].
         - risk_hat_id: hat_cust_level_id
         - risk_priority_id: hat_cust_priority_id (all values = 0)
    """
    
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        
        self.hdfs_path = os.path.join(cmautils.TABLES_ROOTPATH_TRANSLATION.get("eclc"), 
                                      cmautils.TABLES_NAMES_TRANSLATION.get("eclcrisk").get("table_name"))
        self.partition_field = cmautils.PARTITION_FIELDS.get("eclc")
        
        
    def _extract(self):
        
        closing_date =  self._get_closest_partition(self.hdfs_path, 
                                                    regex_expr='^.*part_closing_date=(\d{4}\-\d{2}\-\d{2}).*$')
#         df = (cmautils.read_table_v2(self.spark, "eclcrisk", closing_date))
        df = (hu.read_table("eclcrisk", closing_date))
        
        return df
        
        
    def _transform(self, df):
        
        return (df
                .filter(F.col("hat_cust_level_id").isNotNull())
                .select("customer_id", 
                        F.col("hat_cust_level_id").alias("risk_hat_id"),
                        F.col("hat_cust_priority_id").alias("risk_priority_id")))
    
    def _load(self, df):
        
        return df
    
    
    def get_data(self):
        
        df = self._extract()
        df_t = self._transform(df)
        
        return self._load(df_t)

In [102]:
# CLOSING_DATE = "2019-01-31"
# bec_riesgos_datio_data = RiesgosBecDatioData(spark, CLOSING_DATE).get_data()
# bec_riesgos_datio_data.cache(), bec_riesgos_datio_data.count()

In [103]:
# bec_riesgos_datio_data.show()

In [104]:
class RiesgosBecData(GetData):
    
    def __init__(self, spark, closing_date, date_criteria):
        
        super().__init__(spark, closing_date)
        
        self.date_criteria = date_criteria
        
    def get_data(self):
        
        if self.date_criteria == "hard":
        
            return self.tables.get_becriesgos()
            
        elif self.date_criteria == "soft":
            
#             riesgos_bec_path = "/data/sandboxes/sfma/data/ex_data/prioridad_grupos/stable"
            riesgos_bec_path =hu.get_table_details('becriesgos')['path']
            available_partitions = cmautils.get_closing_dates_v2(spark, riesgos_bec_path)
            new_closing_date = cmautils.get_closest_partition(self.closing_date, available_partitions)
            return Tables(spark, new_closing_date).get_becriesgos().drop('closing_date')
            
        else:
            
            raise ValueError("Invalid date_criteria. Must be 'hard' or 'soft'")

In [105]:
# CLOSING_DATE = "2019-07-31"
# bec_riesgos_data = RiesgosBecData(spark, CLOSING_DATE, "hard").get_data()
# bec_riesgos_data.cache(), bec_riesgos_data.count()

In [106]:
# bec_riesgos_data_fil = (bec_riesgos_data
#                         .filter((F.col("risk_qualification").isNotNull()) 
#                                 | (F.col("risk_priority").isNotNull())))

In [107]:
# df = bec_riesgos_data_fil.join(bec_riesgos_datio_data, "customer_id", "left")

In [108]:
# df.cache()
# df.count()

# Rating

In [109]:
class RatingData(GetData):
    """
    DF[customer_id, ext_rating_id]. 'ext_rating_id' corresponds to 'alerta_ant', 
    alertas.Alertas_<yyyymm> ('/IC/modelos/BEC/rating')    
    """
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        
        self.hdfs_path = os.path.join(cmautils.TABLES_ROOTPATH_TRANSLATION.get("eclc"), 
                                      cmautils.TABLES_NAMES_TRANSLATION.get("eclcscore").get("table_name"))
        self.partition_field = cmautils.PARTITION_FIELDS.get("eclc")
        
        
    def _extract(self):
        
        closing_date =  self._get_closest_partition(self.hdfs_path, 
                                                    regex_expr='^.*part_closing_date=(\d{4}\-\d{2}\-\d{2}).*$')
#         df = (cmautils.read_table_v2(self.spark, "eclcscore", closing_date))
        df = (hu.read_table("eclcscore", closing_date))
        
        return df
        
        
    def _transform(self, df):
        
        return (df
                .filter(F.col("cur_ind_rating_id") == "S")
                .select("customer_id", F.col("short_scale_id").alias("ext_rating_id")))
    
    
    def _load(self, df):
        
        return df
    
    
    def get_data(self):
        
        df = self._extract()
        df_t = self._transform(df)
        
        return self._load(df_t)

In [110]:
# df = RatingData(spark, "2019-07-31").get_data()
# df.count()
# df.show()

# Cesión Seguros Sociales

In [111]:
class CesionSegurosSociales(GetData):
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        
        self.sum_ssss_expr = [F.col("direct_debit_3m_amount") + 
                              F.col("elect_assig_3m_amount") + 
                              F.col("non_direct_debl_3m_amount") + 
                              F.col("trans_assig_3m_amount")]
        
        self.general_fils = (self.filters.country_id & 
                             self.filters.cus_country & 
                             self.filters.entity_id & 
                             self.filters.cus_entity & 
                             self.filters.busi_area & 
                             self.filters.activo & 
                             self.filters.customer_id)
        
    
    def _get_ss_data(self, hss):
        
        hss_t = (hss
               .filter(self.general_fils)
               .filter(F.col("com_category_id").cast(T.IntegerType())==38)
               .groupBy("customer_id")
               .agg(F.sum(*self.sum_ssss_expr).alias("imp_cesion_segsociales"))
               .withColumn("ind_cesion_segsociales", 
                           F.when(F.col("imp_cesion_segsociales") > 0.0, 1).otherwise(0)))
        
        return hss_t
    
    
    def _get_cole_data(self, pob):
        
        pob_t = (pob
               .filter(self.general_fils)
               .filter(F.col("bill_t_id").isin([530, 1507]))
               .groupBy("customer_id")
               .agg(F.sum("paid_bills_1m_amount").alias("imp_cesion_segsociales"))
               .withColumn("ind_cesion_segsociales", F.when(F.col("imp_cesion_segsociales") > 0.0, 1)
                           .otherwise(0)))
        
        return pob_t
    
    
    def _extract(self):
        
#         hss = cmautils.read_table_v2(self.spark, "vsfmahss", self.closing_date)
#         pob = cmautils.read_table_v2(self.spark, "vsfmapob", self.closing_date)
        hss = hu.read_table("vsfmahss", self.closing_date)
        pob = hu.read_table("vsfmapob", self.closing_date)
        
        return (hss, pob)
    
        
    def _transform(self, hss, pob):
        
        hss_t = self._get_ss_data(hss).select("customer_id", "ind_cesion_segsociales")
        pob_t = self._get_cole_data(pob).select("customer_id", "ind_cesion_segsociales")
        
        ssss = (hss_t
                .unionAll(pob_t).groupBy("customer_id")
                .agg(F.max("ind_cesion_segsociales")
                     .alias("ind_cesion_segsociales"))
                .filter(F.col("ind_cesion_segsociales") == 1))
        
        return ssss
        
    def _load(self, df):
        
        return df
    
    
    def get_data(self):
        
        (hss, pob) = self._extract()
        ssss = self._transform(hss, pob)
        
        return self._load(ssss)

In [112]:
# hss = CesionSegurosSociales(spark, "2019-07-31").get_data()

In [113]:
# hss.cache()
# hss.count()

# Criterio Lisboa

In [114]:
# hu.get_first_loaded('ecog_hierarchy')

In [115]:
# hu.get_first_loaded('ecog_branch')

In [116]:
# hu.get_first_loaded("kdcomta")

In [117]:
# hu.get_first_loaded("kdcotpt")

In [118]:
class Branches(GetData):
    
    def __init__(self, spark, closing_date, branch_close_date=None):
        
        super().__init__(spark, closing_date)
        
        if closing_date == "2018-02-28":
            self.closing_date = "2018-01-31"
        else:
            self.closing_date = closing_date
    
        self.branch_close_date = branch_close_date
        
    
    def _extract(self):

        mta = hu.read_table("kdcomta", self.closing_date,'closest')
        tpt = hu.read_table("kdcotpt", self.closing_date,'closest')
        
        return (mta, tpt)
    
    def _transform(self, mta, tpt):
        
        hierarchy = (tpt.filter(self.filters.entity_id)
                     .select("branch_id","level20_adjuntiadag_id", 
                             "level30_zone_id", "level50_territorial_id", 
                             "level60_operarea_id", "level55_gen_manag_id"))
        
        
        branch = (mta.filter(self.filters.entity_id)
                      .select("branch_id", "group_level_id","complex_level_id",
                              "branch_close_date","branch_full_name"))
        
        if self.branch_close_date is not None:
            branch = branch.filter(F.col("branch_close_date") == self.branch_close_date)
            
            
        branch_hierarchies = (branch
                             .join(hierarchy, "branch_id")
                             .select("branch_id", "group_level_id","complex_level_id",
                                     "branch_close_date","branch_full_name",
                                     "level20_adjuntiadag_id", "level30_zone_id", 
                                     "level50_territorial_id", "level60_operarea_id",
                                     "level55_gen_manag_id"))
        
        offices = (branch_hierarchies
                   .where(F.col("group_level_id")== 10)
                   .select("branch_id","complex_level_id","branch_close_date",
                           "branch_full_name","level20_adjuntiadag_id",
                           "level30_zone_id", "level50_territorial_id", 
                           "level60_operarea_id", "level55_gen_manag_id"))
        
        units_hierarchy = (offices
                          .join(branch_hierarchies
                                .where(F.col("group_level_id")== 20)
                                .withColumnRenamed("branch_id", "cbc_code")
                                .withColumnRenamed("branch_full_name", "cbc_desc")
                                .withColumnRenamed("complex_level_id", "cbc_complex_lvl")
                                .drop("level30_zone_id", "level50_territorial_id",
                                      "level60_operarea_id", "branch_close_date",
                                      "group_level_id", "level55_gen_manag_id"), on="level20_adjuntiadag_id", 
                                how="left").na.fill(0)
                          .join(branch_hierarchies.where(F.col("group_level_id")== 30)
                                .withColumnRenamed("branch_id", "zone_code")
                                .withColumnRenamed("branch_full_name", "zone_desc")
                                .withColumnRenamed("complex_level_id", "zone_complex_lvl")
                                .drop("level20_adjuntiadag_id","level50_territorial_id", 
                                      "level60_operarea_id", "branch_close_date",
                                      "group_level_id", "level55_gen_manag_id"), 
                                on="level30_zone_id", how='left').na.fill(0)
                          .join(branch_hierarchies.where(F.col("group_level_id")== 50)
                                .withColumnRenamed("branch_id", "territorial_code")
                                .withColumnRenamed("branch_full_name", "territorial_desc")
                                .drop("level20_adjuntiadag_id","level30_zone_id", 
                                      "level60_operarea_id", "complex_level_id",
                                      "branch_close_date", "group_level_id", 
                                      "level55_gen_manag_id"), on="level50_territorial_id", how='left')
                          .na.fill(0)
                          .join(branch_hierarchies.where(F.col("group_level_id")== 55)
                                .withColumnRenamed("branch_id", "territorial_general_code")
                                .withColumnRenamed("branch_full_name", "territorial_general_desc")
                                .drop("level20_adjuntiadag_id","level30_zone_id", 
                                      "level60_operarea_id", "complex_level_id",
                                      "branch_close_date", "group_level_id", 
                                      "level50_territorial_id"), 
                                on="level55_gen_manag_id", how='left').na.fill(0)
                          .drop("cbc_code","territorial_code","zone_code"))
        
        return units_hierarchy
    
    
    def _load(self, df):
        
        return df
    
    
    def get_data(self):
        
        (mta, tpt) = self._extract()
        units_hierarchy = self._transform(mta, tpt)
        
        return self._load(units_hierarchy)

In [119]:
# branchs = Branches(spark,'2019-12-31').get_data()
# branchs.columns

In [120]:
class Carterization(GetData):
    
    def _extract(self):
        
#         ncm = cmautils.read_table_v2(self.spark, "kdconcm", self.closing_date, soft_criteria=True)
#         uge = cmautils.read_table_v2(self.spark, "kdcouge", self.closing_date, soft_criteria=True)
        ncm = hu.read_table("kdconcm", self.closing_date, 'closest')
        uge = hu.read_table("kdcouge", self.closing_date, 'closest')
        return (ncm, uge)
    
    def _transform(self, ncm, uge):
        
        ncm_t = (ncm.filter((F.col("entity_id") == "0182") &
                            (F.col("portfolio_remov_date") == "9999-12-31") & 
                            (F.col("cust_branch_allocation_type")==1))
                 .select('customer_id', 'employee_id','manager_branch_id')
                 .withColumnRenamed("manager_branch_id", "branch_id_cart"))
        
        uge_t = (uge.filter(F.col('entity_id') == '0182')
                 .select('employee_id','user_id', 'position_id', 'grade_id'))
#                 .withColumnRenamed("branch_id", "branch_id_cart"))
        
        cart = (ncm_t.join(uge_t, "employee_id", "left"))
        
        return cart
    
    def _load(self, df):
        
        return df
    
    
    def get_data(self):
        
        (ncm, uge) = self._extract()
        cart = self._transform(ncm, uge)
        
        return self._load(cart)

In [121]:
class LisboaInd(GetData):
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        
        self.areas = ["8411", "8412", "8413", "8414", "8415", "8416", "8417"]
        self.exclude_groups = [["Caixabanc", "A08663619"], 
                               ["Bankia", "A14010342"], 
                               ["Evo", "A86373701"],
                               ["Altamira", "A86819596"], 
                               ["BBVA", "N0035575J"], 
                               ["BME Clearing", "A78973864"],
                               ["SGTyP", "S2826011E"], 
                               ["Santander", "A39000013"]]
        
        self.list_GR1 = ['E40', 'E44', 'E45', 'I30', 'I31', 'Y34', '092']
        self.list_GR2 = ['E46', 'E47', 'E48', 'I32', 'I68', '093', 'L72', 'E14']
        
    def _extract(self):
        
        custom = self.tables.get_po()
        basic = self.tables.get_eclcust()
        
# sustituir 'basic' por este código comentado, si el criterio de persona Física/Jurídica la tomamos según valores 
# xti_idefisco (cgm) en lugar de client_type_id (eclccustm)
#         df_cgm = self.tables.get_cgm(
#                            ).select('customer_id','personal_id','document_type'
#                            ).where(~F.col('document_type').isNull())
    
#         basic = df_cgm.groupBy("customer_id",'personal_id')\
#             .agg(F.min("document_type").alias("xti_idefisco")
#             ).withColumn('client_type_id', F.when(F.col('xti_idefisco').isin('02','03','09'),'J'
#                         ).otherwise('F'))

        cart = Carterization(self.spark, self.closing_date).get_data()
        
        return (custom, basic, cart)
        
    
    def _transform(self, custom, basic, cart):
       
        cust_complete = custom.join(basic.select('customer_id','personal_id','client_type_id'
                                                ),['customer_id'],how='left')
        
        cust_basic = (cust_complete.select("customer_id", "status_type", "main_branch_id", "personal_id")
                 .filter((F.col('client_type_id') == 'J') & 
                         (F.col("status_type") == "A")).dropDuplicates())

        
        cart_t = (cart.withColumn("tipo_gestor", F.when(F.col("position_id").isin(self.list_GR1), "GR1").
                                  when(F.col("position_id").isin(self.list_GR2), "GR2").otherwise(""))
                  .select("customer_id", "tipo_gestor", "branch_id_cart"))
        
        exclude_groups = (self.spark
                          .createDataFrame(pd.DataFrame(self.exclude_groups, 
                                                        columns = ['name', 
                                                                   'personal_id'])))
        
        exclude_groups = (exclude_groups
                          .join(cust_complete.select("personal_id", "customer_id"), 
                                "personal_id", "left")
                          .select("customer_id").filter(F.col("customer_id").isNotNull())
                          .dropDuplicates())
        
        
        cust_cart = (cust_basic.join(cart_t, "customer_id", "left")
                      .withColumn("final_branch_id", F.when(F.col("branch_id_cart").isNull(), 
                                                            F.col("main_branch_id"))
                                  .otherwise(F.col("branch_id_cart"))))
        
        branches = Branches(self.spark, self.closing_date).get_data()
        
        
        cust_branch = (cust_cart.join(branches, 
                                    cust_cart["final_branch_id"] == branches["branch_id"],
                                    "left"))
        
        df_lisboa = (cust_branch.filter((F.col("level60_operarea_id").isin("0023", "0058", "0088", "8484")) & 
                                      (F.col("territorial_general_code").isin(self.areas)))
                     .join(exclude_groups, "customer_id", "left_anti")
                     .dropDuplicates(subset=["customer_id"])
                     .select("customer_id", F.lit(1).alias("ind_lisboa"), "level60_operarea_id", "territorial_general_code"))
        
        return df_lisboa
            
    
    def _load(self, df):
        
        return df
    
    def get_data(self):
        
        (custom, basic, cart) = self._extract()
        df_lisboa = self._transform(custom, basic,cart)
        
        return df_lisboa

In [122]:
# lisboa = LisboaInd(spark, "2019-08-31").get_data()
# lisboa.show(5)
# lisboa.columns
# lisboa.cache()
# lisboa.count()

In [123]:
# lisboa.dropDuplicates(subset=["customer_id"]).count()

In [124]:
# lisboa.groupBy("level60_operarea_id").count().orderBy("level60_operarea_id").show()

In [125]:
# lisboa.filter(F.col("level60_operarea_id") == "0058").groupBy("territorial_general_code").count().show()

# Click&Pay

In [126]:
class ClickPayData(GetData):
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        
        self.iii_fils = (self.filters.tit & self.filters.pos_tit & 
                         self.filters.busi_area & self.filters.country_id & 
                         self.filters.entity_id & self.filters.customer_id & 
                         (F.col("com_grouping_id").isin([82021, 82025])) & 
                         (F.col("contract_cancel_date") == "2999-12-31"))
        
    def _extract(self):
        
#         iii = cmautils.read_table_v2(self.spark, "vsfmaiii", self.closing_date)
        iii = hu.read_table("vsfmaiii", self.closing_date)
        
        return iii
    
    
    def _transform(self, iii):
        
        iii_t = (iii
                 .filter(self.iii_fils).withColumn("ind_clickpay", F.lit(1))
                 .groupBy("customer_id").agg(F.max("ind_clickpay")
                                             .alias("ind_clickpay")))
        
        return iii_t
    
    
    def _load(self, df):
        
        return df
    
    
    def get_data(self):
        
        iii = self._extract()
        iii_t = self._transform(iii)
        
        return self._load(iii_t)

In [127]:
# clickpay_data = ClickPayData(spark, "2019-01-31").get_data()
# clickpay_data.cache()
# clickpay_data.count()

In [128]:
# clickpay_data.printSchema()

# Bi Num Products

In [129]:
class BiNumProds(GetData):
    
    def _extract(self):
        
#         bi_closing_date = self.closing_date[:7].replace("-", "")
#         num_prods = cmautils.read_table_v2(self.spark, "numprods", bi_closing_date)
        if('ecegprods' in TABLES_CLOSEST_DATE):
            num_prods = hu.read_table("ecegprods", self.closing_date, 'closest')
        else:
            num_prods = hu.read_table("ecegprods", self.closing_date)
        
        return num_prods
    
    
    def _transform(self, num_prods):
        
        num_prods_t = (cmautils.format_id_col(num_prods)
                       .withColumnRenamed("first_holder_products_number", "bi_num_productos"))
        
        return num_prods_t
    
    
    def _load(self, df):
        
        return df
    
    
    def get_data(self):
        
        num_prods = self._extract()
        num_prods_t = self._transform(num_prods)
        
        return self._load(num_prods_t)

In [130]:
# numprods = BiNumProds(spark, "2020-09-30").get_data()
# numprods.cache()
# numprods.count()

# CNAE

In [131]:
class CNAE(GetData):
    
    """
    Returns a DF with the following columns
        - customer_id
        - cnae_cod
    """
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        
        
        self.basic_fils = (self.filters.customer_id & self.filters.entity_id & 
                           self.filters.country_id & self.filters.busi_area & 
                           self.filters.con_entity_id)
        
    
    def _extract(self):
        
#         nfc_partition = self._get_closest_partition(self.nfc_path)
#         nfc = (cmautils.read_table_v2(self.spark, "vsfmanfc", nfc_partition))
#         nfc = (hu.read_table("vsfmanfc", nfc_partition))

        custom = self.tables.get_po()
        basic = self.tables.get_eclcust()
        unit = self.tables.get_eclcustunit().withColumnRenamed('entity_id','contract_entity_id')
        
        return custom,basic,unit
    
    
    def _transform(self, custom, basic, unit):
        
        basic_c = custom.join(unit.select('customer_id','contract_entity_id'),['customer_id'],how='left'
                             ).join(basic.select('customer_id','code_cnae_id'),['customer_id'],how='left')
        
        basic_t = (basic_c.filter(self.basic_fils)
                 .groupBy("customer_id")
                 .agg(F.first("code_cnae_id").alias("cnae_cod")))
        
        return basic_t
        
    
    def _load(self, df):
        
        return df
    
    
    def get_data(self):
        
        custom,basic,unit = self._extract()
        custom_t = self._transform(custom,basic,unit)
        
        return self._load(custom_t)

In [132]:
# cnae = CNAE(spark, "2019-01-31").get_data()
# cnae.columns
# cnae.cache()
# cnae.count()

# Digitalización

In [133]:
class DigiData_biupload(GetData):
    
    def __init__(self, spark, closing_date, nfc_date_criteria):
        
        super().__init__(spark, closing_date)
        self.nfc_date_criteria = nfc_date_criteria
        self.nfc_contact_cols = \
            ["c_email_desc", "c_email_ver_type", "email_desc", 
             "c_cell_phone_number", "c_cell_phone_ver_type", 
             "p_cell_phone_number", "w_cell_phone_number"]
        
        self.nfc_path = os.path.join(self.sfma_path, "t_sfma_customers_ba_d")
        
        self.fils_nfc = (self.filters.pymes_empresas & 
                         self.filters.customer_id & 
                         self.filters.entity_id & 
                         self.filters.con_entity_id & 
                         self.filters.country_id &
                         self.filters.busi_area)
        
        self.bad_emails = ["ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
                           "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY",
                           ""]
        
        self.bad_mvl = ["YYYYYYYYYYYYYYY", ""]
        
        
    
    def _get_1de3_data(self):
        
        df = self.tables.get_digi1de3()\
            .withColumn("movil_1de3", F.col("banca_mvl").cast(T.IntegerType()))\
            .withColumn("net_1de3", F.col("net").cast(T.IntegerType()))\
            .select("customer_id", "movil_1de3", "net_1de3").na.fill(0)
        
        return df
    
    def _get_3de3_data(self):
        
        df = self.tables.get_digi3de3()\
            .withColumn("movil_3de3", F.col("banca_mvl").cast(T.IntegerType()))\
            .withColumn("net_3de3", F.col("net").cast(T.IntegerType()))\
            .select("customer_id", "movil_3de3", "net_3de3").na.fill(0)
        
        return df
    
    def _get_nfc_data(self):
        
        if self.nfc_date_criteria == "hard":
            nfc = self.tables.get_nfc()
        
        elif self.nfc_date_criteria == "soft":
            nfc = Tables(spark, self._get_closest_partition(self.nfc_path)).get_nfc()
            
        
        cond_email_1 = ((~F.col("c_email_desc").isin(self.bad_emails)) & 
                   (F.length(F.col("c_email_desc")) > 1 ))
        cond_email_2 = ((~F.col("email_desc").isin(self.bad_emails)) & 
                       (F.length(F.col("email_desc")) > 1 ))
        cond_mvl_1 = ((~F.col("c_cell_phone_number").isin(self.bad_mvl)) & 
                       (F.length(F.col("c_cell_phone_number")) > 1 ))
        cond_mvl_2 = ((~F.col("p_cell_phone_number").isin(self.bad_mvl)) & 
                       (F.length(F.col("p_cell_phone_number")) > 1 ))
        cond_email_val = (F.col("c_email_ver_type") == "V")
        cond_mvl_val = (F.col("c_cell_phone_ver_type") == "V")


        nfc = nfc.withColumn("email", F.when((cond_email_1 | cond_email_2), 1)\
                                             .otherwise(0))\
                 .withColumn("email_val", F.when(cond_email_val, 1)\
                                                 .otherwise(0))\
                 .withColumn("movil", F.when((cond_mvl_1|cond_mvl_2), 1)\
                                                 .otherwise(0))\
                 .withColumn("movil_val", F.when(cond_mvl_val, 1)\
                                             .otherwise(0))
        
        nfc = nfc.groupBy("customer_id").agg(F.max("email").alias("email"), 
                                             F.max("email_val").alias("email_val"), 
                                             F.max("movil").alias("movil"), 
                                             F.max("movil_val").alias("movil_val"))
        
        return nfc
    
    
    def _get_diy_data(self):
                
        diy = hu.read_table( "segmult", self.closing_date)\
                .withColumn("diy", F.col("diy").cast(T.IntegerType()))
        
        return diy.select("customer_id", "diy")
        
        
    
    def get_data(self):
        
        digi_1de3 = self._get_1de3_data()
        digi_3de3 = self._get_3de3_data()
        nfc = self._get_nfc_data()
        diy = self._get_diy_data()
        
        df_total = digi_1de3.join(digi_3de3, "customer_id", "outer")\
                            .join(nfc, "customer_id", "outer")\
                            .join(diy, "customer_id", "outer")\
                            .na.fill(0)
        
        return df_total

In [134]:
class DigiData(GetData):
    
    def get_metrics(self):
        
        cols_out = ['customer_id','movil_3de3','net_3de3',
                    'movil_1de3','net_1de3','email',
                    'email_val','movil','movil_val']
        
        metrics = ['use_mobile_bank_type_3d3','use_net_type_3d3',
                   'use_mobile_bank_type_1d3','use_net_type_1d3',
                   'email_informed_type_1d1','email_validated_type_1d1',
                   'cell_phone_informed_type_1d1','cell_phone_validated_type_1d1']
        
        df_metrics = hu.read_table('digi_metrics',self.closing_date
                        ).select('customer_id',*metrics)

        ind = 0
        for col in cols_out[1:]:
            df_metrics = df_metrics.withColumnRenamed(metrics[ind],col)
            ind = ind + 1
        
        return df_metrics
 
    
    def _get_diy_data(self):
        
        diy = self.tables.get_segmulti()\
                .withColumn("diy", F.col("diy_customer_type").cast(T.IntegerType()))
        
        return diy.select("customer_id", "diy")
        
        
    
    def get_data(self):
        
        digi_metrics = self.get_metrics()
        diy = self._get_diy_data()
        
        df_total = digi_metrics.join(diy, "customer_id", "outer"
                                ).na.fill(0)
        
        return df_total

In [135]:
# digi = DigiData(spark, "2019-12-31").get_data()
# digi.columns
# digi.cache()
# digi.count()

In [136]:
# digi.where(F.col('customer_id').isin('000000645','000000799')).show()

# Sesionizado

Equivalencia de columnas tablas ex_data y master:
- tag_event - transaction_event_desc
- tag_action - transaction_action_desc
- t_intag_sum - elapsed_time_number

In [137]:
class Sessionization(GetData):
    
    def __init__(self, spark, closing_date):
        
        super().__init__(spark, closing_date)
        
        self.num_months_sessionization = 3
        self.part_name_sess = "partition_id"
        self.part_name_sess1 = "part_closing_date"
        self.session_dict = {'operate': 'OPERA',
                             'advising': 'CONSULTA',
                             'contract': 'CONTRATA',
                             'error': 'OTRO',
                             'simulator': 'CONSULTA',
                             'browse': 'CONSULTA',
                             'communication': 'CONSULTA',
                             'consult': 'CONSULTA',
                             'disconnect': 'OTRO',
                             'manage': 'CONSULTA',
                             "operaciones": "OPERA",
                             "automatico": "OPERA",
                             "consulta": "CONSULTA",
                             "contratacion": "CONTRATA",
                             "otros": "OTRO",
                             "0001": "CONSULTA",
                             "0002": "OPERA",
                             "0003": "CONTRATA",
                             "0010": "OTRO",
                             "0011": "OTRO"}
        
        self.session_id_name = {"web": "session_id", 
                                "app": "session_id",
                                "atm": "session_id",
                                "agent": "session_id"}
        
        self.time_col_name = {"web": "t_intag_sum", 
                              "app": "t_intag_sum", 
                              "atm": "t_intag_sum",
                              "agent": "t_intag_sum"}
                
        self.time_col_name1 = {"web": "elapsed_time_number", 
                              "app": "elapsed_time_number", 
                              "atm": "elapsed_time_number",
                              "agent": "elapsed_time_number"}
        
        self.customer_col_id = "customer_id"
        self.sandbox_path = "/data/sandboxes/sfma/data"
        self.channels = ["app", "web", "atm", "agent"]
        self.actions_short = ["consulta", "opera", "contrata", "otro"]
        self.main_sess_path = '/data/master/sfma/data/general/general/t_sfma_s_cust_sessions_d/'        
        self.main_tag_path = '/data/master/sfma/data/general/general/t_sfma_s_tag_sessions_d/'        
        self.channel_sess = ["sessapp","sessatm","sessweb", "sessagent"]
        self.channel_tag = ["tagsapp","tagsatm","tagsweb", "tagsagent"] 
        self.agent = ["prueba"]
        self.sess_paths = {"sessapp": 'part_session_channel_type=app',
                          "sessatm": 'part_session_channel_type=atm',
                          "sessagent": 'part_session_channel_type=agent',
                          "sessweb": 'part_session_channel_type=web',
                          "tagsapp": 'part_session_channel_type=app',
                          "tagsweb": 'part_session_channel_type=web',
                          "tagsatm": 'part_session_channel_type=atm',
                          "tagsagent": 'part_session_channel_type=agent'}
    
    def _get_diff_months(self, date_1, date_2):
        date_1_format = datetime.strptime(date_1, '%Y-%m-%d')
        date_2_format = datetime.strptime(date_2, '%Y-%m-%d')    
        return abs((date_1_format.year - date_2_format.year) * 12 + 
            date_1_format.month - date_2_format.month)
    def _get_available_partitions(self, channel, testing):
        if(channel in self.agent):
        
            hdfs_path = self._get_hdfs_path(channel, testing)
            available_partitions_raw = get_closing_dates_v2(self.spark, hdfs_path,
                                        regex_expr='^.*partition_id=(\d{4}\-\d{2}).*$')
            available_partitions = [get_normalize_partition(partition) for partition
                                    in available_partitions_raw]
        else:
            if(channel in self.channel_sess):
                available_partitions = hu.get_loaded_dates('sessionizado')
            else:
                available_partitions = hu.get_loaded_dates('tageado')
            
        
        return available_partitions
    
    def _get_closest_partition(self, channel, testing):      
        if(channel in self.channel_sess):                   
            return hu.get_closest_partition_table('sessionizado', self.closing_date)      
        else:
            return hu.get_closest_partition_table('tageado', self.closing_date) 
        
        
    def _get_rename_dict(self, channel):
        
        return {"CONSULTA": "t_consulta_{0}".format(channel),
                "CONTRATA": "t_contrata_{0}".format(channel),
                "OPERA": "t_opera_{0}".format(channel),
                "OTRO": "t_otro_{0}".format(channel)}
          
        
    def add_customer_id(self, df, channel, sel_date_expr, testing):
        if channel == 'agent':
            hdfs_path = self._get_hdfs_path("sess{0}".format(channel), testing)
            closing_date_channel = self._get_closest_partition("sess{0}".format(channel), testing)
            min_available_date = min(self._get_available_partitions("sess{0}".format(channel), testing))
            ini_date = add_months(closing_date_channel, -self.num_months_sessionization+1)
            fin_month = datetime.strptime(closing_date_channel, '%Y-%m-%d').strftime('%Y-%m')
            ini_month = datetime.strptime(ini_date, '%Y-%m-%d').strftime('%Y-%m')
            sel_date_expr = "{0} > '{1}' and {0} <= '{2}'".format(self.part_name_sess1, ini_month, fin_month)
            df_ids = (self.spark.read.parquet(hdfs_path)
                       .where(sel_date_expr)
                       .where(F.col('part_session_channel_type')==channel)
                      .select('customer_id','session_id'))
            df_ids = format_id_col(df_ids)
            df_ids = (df_ids
                      .dropDuplicates(subset=[self.session_id_name.get(channel), "customer_id"])
                      .select(self.session_id_name.get(channel), "customer_id"))
            df = df.join(df_ids, self.session_id_name.get(channel))
            
            
        else:
            df = df.withColumn(self.customer_col_id,
                               F.udf(lambda x: x.split("_")[0])(F.col(self.session_id_name.get(channel))))
            df = format_id_col(df)
            
        return df

    
    def _get_hdfs_path(self, channel, testing):
        
        if testing:
            hdfs_path = os.path.join(self.sandbox_path, 
                                     "cma/utilidades/perfil_tipo/sesionizado/{0}_data.parquet".format(channel))
            return hdfs_path
        else:
            if(channel in self.channel_sess):
                return self.main_sess_path
            else:
                return self.main_tag_path
       
    def _get_corr_factor(self, channel, testing):
        closing_date_channel = self._get_closest_partition(channel, testing)
        min_available_date = min(self._get_available_partitions(channel, testing))
        ini_date = add_months(closing_date_channel, -self.num_months_sessionization+1)
        if min_available_date <= ini_date:
            corr_factor = 1.0
        else:
            corr_factor =  3/(3-self._get_diff_months(ini_date, min_available_date))
        return corr_factor
    
    
    def get_sessions_data(self, channel, testing):
        hdfs_path = self._get_hdfs_path("tags{0}".format(channel), testing)
        
        if(channel in self.agent):
            closing_date_channel = self._get_closest_partition("tags{0}".format(channel), testing)
        else:
            closing_date_channel = self._get_closest_partition(channel, testing)
        
        corr_factor = self._get_corr_factor("tags{0}".format(channel), testing)
        ini_date = add_months(closing_date_channel, -self.num_months_sessionization)
        fin_month = datetime.strptime(closing_date_channel, '%Y-%m-%d').strftime('%Y-%m')
        ini_month = datetime.strptime(ini_date, '%Y-%m-%d').strftime('%Y-%m')
        
        if(channel in self.agent):
            sel_date_expr = "{0} > '{1}' and {0} <= '{2}'".format(self.part_name_sess, ini_month, fin_month)
        
            if testing:
                tags_data = self.spark.read.parquet(hdfs_path)
            else:
                tags_data = self.spark.read.parquet(hdfs_path).where(sel_date_expr)
        else:
            sel_date_expr = "{0} > '{1}' and {0} <= '{2}'".format(self.part_name_sess1, ini_month, fin_month)
            if testing:
                tags_data = self.spark.read.parquet(hdfs_path)
            else:
                tags_data = self.spark.read.parquet(hdfs_path).where(sel_date_expr
                                                             ).where(F.col('part_session_channel_type')==channel)
        if(channel in self.agent):
            col = "tag_action"
            cols_time = self.time_col_name
        else:
            col = 'transaction_action_desc'
            cols_time = self.time_col_name1
            
        tags_data = replace_col_vals(tags_data, col, self.session_dict, None)
        tags_data = tags_data.filter(F.col(col).isNotNull())
        tags_data = tags_data.filter(F.col(self.session_id_name.get(channel)).isNotNull())
        tags_data = self.add_customer_id(tags_data, channel, sel_date_expr, testing) 
        
        # Aggretate data
        time_data = tags_data.groupBy("customer_id")\
            .pivot(col).agg(F.sum(cols_time.get(channel)))\
            .na.fill(0)
        
        # Rename columns
        for old_name, new_name in self._get_rename_dict(channel).items():
            time_data = time_data.withColumnRenamed(old_name, new_name)
        # Apply correction factor
        for col_name in time_data.columns:
            if col_name != "customer_id":
                time_data = time_data.withColumn(col_name, (F.col(col_name)*corr_factor).cast(T.FloatType()))
        
        return time_data
    def _normalize_columns(self, df):
        col_names = ["t_{0}_{1}".format(action, channel) for 
            channel in self.channels for action in self.actions_short]
        for col_name in col_names:
            if col_name not in df.columns:
                df = df.withColumn(col_name, F.lit(0.0).cast(T.FloatType()))
        return df
    
    def get_data(self, testing=False):
        
        app_data = self.get_sessions_data("app", testing)
        web_data = self.get_sessions_data("web", testing)
        atm_data = self.get_sessions_data("atm", testing)
        agent_data = self.get_sessions_data("agent", testing)
        
        df_sessions = (app_data.join(web_data, "customer_id", "outer")
                       .join(atm_data, "customer_id", "outer")
                       .join(agent_data, "customer_id", "outer")
                       .na.fill(0))
        
        df_sessions = self._normalize_columns(df_sessions)
        
        return df_sessions

# Balances

Otra posible tabla de datos para balances:
- /data/master/people/eclc/data/t_eclc_cust_finantial_infor/valid/monthly/ 
- y el campo  balance_amount

In [138]:
class BalanceData(GetData):
    
    def __init__(self, spark, closing_date):
        """
        acc_sum_test = (cmautils
                .BalanceData(spark, CLOSING_DATE)
                .get_data())
        """
        
        super().__init__(spark, closing_date)
        
        self.accounts_id = ["0200", "0202", "0203", 
                            "0201", "7111", "7110", 
                            "0001", "0044", "0050", 
                            "0099", "1000", "2000"]
        
        self.conds = {"type1": F.col("st_model_type").isin([6, 8, 9]),
                      "type2": F.col("st_model_type").isin([10, 11]),
                      "type3_v": ((F.col("st_model_type").isin([1,2,3,4,12,13])) & 
                                  (F.col("7111") != 0.0)),
                      "type4_v": ((F.col("st_model_type").isin([1,2,3,4,12,13])) & 
                                  (F.col("7111") == 0.0) & (F.col("7110") != 0.0)),
                      "type3_a": (F.col("st_model_type").isin([1,2,3,4,12,13]))
                     }
        
        self.sum_expr = {"type1_v": F.col("0200")-F.col("0202")-F.col("0203"),
                         "type2_v": F.col("0201")+F.col("0203"),
                         "type3_v": F.col("7111"),
                         "type4_v": F.col("7110"),
                         "type1_a": F.col("0001")+F.col("0044")+F.col("0050")+F.col("0099"),
                         "type2_a": F.col("0099")+F.col("0044"),
                         "type3_a": F.col("1000")+F.col("2000")}
#         self.acc_path = os.path.join(TABLES_ROOTPATH_TRANSLATION.get("ecrk"), 
#                                      TABLES_NAMES_TRANSLATION.get("tsfixiec").get("table_name"))
#         self.acc_ext_path = os.path.join(TABLES_ROOTPATH_TRANSLATION.get("ecrk"), 
#                                          TABLES_NAMES_TRANSLATION.get("tsfixiec_ex").get("table_name"))
#         self.extra_data_path = "/data/sandboxes/sfma/data/bi_uploads/bec/riesgos/balances/stable"
        self.acc_path = hu.get_table_details('tsfixiec')['path']
        self.acc_ext_path = hu.get_table_details('tsfixiec_ex')['path']
        self.extra_data_path  =  hu.get_table_details('bal_ext')['path']
#         print( self.extra_data_path)
#         print('path balances: ',self.acc_path,'\n',self.acc_ext_path,'\n',self.extra_data_path)
        
        self.extra_data_cast = ['clave_49200', 'clave_10000',
                                 'clave_12700', 'clave_40200',
                                 'clave_12000', 'clave_40400',
                                 'clave_40800', 'clave_40100',
                                 'clave_40600', 'clave_49500',
                                 'clave_49100', 'clave_40700',
                                 'clave_31220', 'clave_12500',
                                 'clave_21000', 'clave_30000',
                                 'clave_11000', 'clave_32320',
                                 'clave_41000', 'clave_12400',
                                 'clave_40300', 'clave_12370',
                                 'clave_31230', 'clave_32000', 
                                 'clave_32330']
        
        self.extra_data_calc = {"imp_ebitda": (F.col("clave_49100") + 
                                               F.col("clave_40800")*-1 + 
                                               F.col("clave_41000")), 
                               "per_ebitda_vtas": ((F.col("clave_49100") + 
                                                    F.col("clave_40800")*-1 + 
                                                    F.col("clave_41000"))/ F.col("clave_40100")), 
                                "per_activo_corr": (F.col("clave_12000")/F.col("clave_10000")), 
                                "per_activo_no_corr": (F.col("clave_11000")/F.col("clave_10000")),
                                "per_pasivo_corr": (F.col("clave_32000")/F.col("clave_30000")),
                                "per_result_finan_vtas": (F.col("clave_49200")/F.col("clave_40100")), 
                                "per_result_ejer_bai_vtas": (F.col("clave_49500")/F.col("clave_40100")), 
                                "per_result_explo_vtas": ((F.col("clave_40100") + 
                                                          F.col("clave_40200") + 
                                                          F.col("clave_40300") + 
                                                          F.col("clave_40600") + 
                                                          F.col("clave_40400") + 
                                                          F.col("clave_40700") + 
                                                          F.col("clave_40800")) / 
                                                          (F.col("clave_40100"))), 
                               "per_result_dimp_fprop": ((F.col("clave_49500")) / 
                                                         (F.col("clave_21000") - F.col("clave_12370"))), 
                               "per_activo_finan_cp_total": ((F.col("clave_12700") + 
                                                             F.col("clave_12400") + 
                                                             F.col("clave_12500")) / (F.col("clave_10000"))), 
                               "per_deudas_ent_dcp_activo": ((F.col("clave_31220") + 
                                                              F.col("clave_31230") + 
                                                              F.col("clave_32320") + 
                                                              F.col("clave_32330")) / 
                                                             (F.col("clave_10000"))), 
                               "per_deuda_finan_ebitda": ((F.col("clave_31220") + 
                                                          F.col("clave_31230") + 
                                                          F.col("clave_32320") + 
                                                          F.col("clave_32330") + 
                                                          F.col("clave_12700")) / 
                                                          (F.col("clave_49100") + 
                                                           F.col("clave_40800")*-1 + 
                                                           F.col("clave_41000"))),
                               "imp_cash_flow": (F.col("clave_49500") + 
                                                 F.col("clave_40800")*-1 + 
                                                 F.col("clave_41000"))}
        
        
        self.extra_data_dict_replace = {"clave_10000": "imp_activo_total",
                                        "clave_11000": "imp_activo_no_corr",
                                        "clave_12000": "imp_activo_corr",
                                        "clave_20000": "imp_patrimonio_neto",
                                        "clave_30000": "imp_pasivo_total",
                                        "clave_31000": "imp_pasivo_no_corr",
                                        "clave_32000": "imp_pasivo_corr",
                                        "clave_49500": "imp_resultado_ejer",
                                        "clave_11500": "imp_inver_finan_largo_plazo",
                                        "clave_11510": "imp_instr_patrimonio_ilp",
                                        "clave_11520": "imp_cre_terceros_ilp",
                                        "clave_11530": "imp_val_repre_deuda_ilp",
                                        "clave_11540": "imp_derivados_ilp",
                                        "clave_11550": "imp_otros_activos_ilp",
                                        "clave_11560": "imp_otras_inversiones_ilp",
                                        "clave_12500": "imp_inver_finan_corto_plazo",
                                        "clave_12510": "imp_instru_patrimonio_icp",
                                        "clave_12520": "imp_cred_empresas_icp",
                                        "clave_12530": "imp_valores_deuda_icp",
                                        "clave_12540": "imp_derivados_icp",
                                        "clave_12550": "imp_otros_activos_icp",
                                        "clave_12560": "imp_otras_inversiones_icp",
                                        "clave_12700": "imp_efectivo",
                                        "clave_12710": "imp_tesoreria",
                                        "clave_12720": "imp_otros_liq",
                                        "clave_31200": "imp_deudas_largo_plazo",
                                        "clave_31210": "imp_obligaciones_dlp",
                                        "clave_31220": "imp_deudas_ent_dlp",
                                        "clave_31230": "imp_acreedores_dlp",
                                        "clave_31240": "imp_derivados_dlp",
                                        "clave_31250": "imp_otros_pasivos_dlp",
                                        "clave_32300": "imp_deudas_corto_plazo",
                                        "clave_32310": "imp_obligaciones_dcp",
                                        "clave_32320": "imp_deudas_ent_dcp",
                                        "clave_32330": "imp_acreedores_dcp",
                                        "clave_32340": "imp_derivados_dcp",
                                        "clave_32350": "imp_otros_pasivos_dcp",
                                        "clave_41400": "imp_ingre_finan",
                                        "clave_41410": "imp_ingre_finan_part",
                                        "clave_41411": "imp_ingre_finan_part_empr",
                                        "clave_41412": "imp_ingre_finan_part_ter",
                                        "clave_41420": "imp_ingre_finan_valores",
                                        "clave_41421": "imp_ingre_finan_valores_empr",
                                        "clave_41422": "imp_ingre_finan_valores_ter",
                                        "clave_41430": "imp_subv_don_leg",
                                        "clave_41500": "imp_gastos_finan",
                                        "clave_41510": "imp_gastos_finan_empr",
                                        "clave_41520": "imp_gastos_finan_ter",
                                        "clave_41530": "imp_gastos_finan_prov",
                                        "clave_40100": "imp_neto_nego",
                                        "clave_49100": "imp_result_explo",
                                        "clave_40800": "imp_amort_inmov",
                                        "clave_40200": "imp_var_existen",
                                        "clave_40300": "imp_trabajo_activo",
                                        "clave_40400": "imp_aprovi",
                                        "clave_40500": "imp_otros_ingr_explo",
                                        "clave_40600": "imp_gasto_personal",
                                        "clave_40630": "imp_gasto_personal_prov",
                                        "clave_40700": "imp_otros_gast_explo",
                                        "clave_40730": "imp_otros_gast_explo_per",
                                        "clave_40900": "imp_subv_inmv",
                                        "clave_41000": "imp_exceso_provi",
                                        "clave_41100": "imp_deterioro_enaj",
                                        "clave_41110": "imp_deterioro",
                                        "clave_41120": "imp_enaj",
                                        "clave_41200": "imp_dif_nega_negocio",
                                        "clave_41300": "imp_otros_resultados",
                                        "clave_41600": "imp_var_inst_finan",
                                        "clave_41700": "imp_dif_cambio",
                                        "clave_41800": "imp_deterioro_enaj_finan",
                                        "clave_42100": "imp_otros_ingr_finan",
                                        "clave_49300": "imp_result_antes_imp"}
    def _get_ind_data(self, key):
#         print('paths: \n',self.acc_path,'\n',self.acc_ext_path) 
        
        if key == "interno":
#             closing_date = self._get_closest_partition(self.acc_path)
#             acc = Tables(self.spark, closing_date).get_balance_acc()
            acc = spark.read.parquet(self.acc_path)
            
            
        else:
#             closing_date = self._get_closest_partition(self.acc_ext_path)
#             acc = Tables(self.spark, closing_date).get_balance_acc_ex()
            acc = spark.read.parquet(self.acc_ext_path)
              
            
#         acc = acc.filter(F.col("year_exercise_number") < self.closing_date[:4])
        
        acc = acc.filter(F.col("part_closing_date") <= self.closing_date)
        
        acc_sum = (acc
                   .filter(F.col("bbv_acc_debit_id").isin(self.accounts_id))
                   .groupBy("customer_id", "year_exercise_number", 
                            "acc_balan_mo_number", "st_model_type")
                   .pivot("bbv_acc_debit_id")
                   .agg(F.sum("count_eur_amount"))
                   .na.fill(0.0))
        
        for col in self.accounts_id:
            if col not in acc_sum.columns:
                acc_sum = acc_sum.withColumn(col, F.lit(0.0))
                
        
        window_year = (Window
                       .partitionBy("customer_id", 
                                    "acc_balan_mo_number", 
                                    "st_model_type")
                       .orderBy(F.desc("year_exercise_number"))
                       .rowsBetween(Window.unboundedPreceding, Window.currentRow))
        
        year_max_col = F.max(F.col("year_exercise_number")).over(window_year)
        
        acc_sum = (acc_sum
                   .select(F.expr("*"), 
                           year_max_col.alias("year_max"))
                   .filter(F.col("year_exercise_number") == F.col("year_max"))
                   .drop("year_max"))
        
        # type_balance
        # 1 - consolidado parcial: 1
        # 2 - consolidado total: 3
        # 3 - individual parcial: 2
        # 4 - individual total: 4
        
        
        cond_1 = ((F.col("st_model_type").isin(9, 11)) & 
                  (F.col("acc_balan_mo_number") < 12))
        
        cond_2 = ((F.col("st_model_type").isin(9, 11)) & 
                  (F.col("acc_balan_mo_number") == 12))
        
        cond_3 = (~(F.col("st_model_type").isin(9, 11)) & 
                  (F.col("acc_balan_mo_number") < 12))
        
        cond_4 = (~(F.col("st_model_type").isin(9, 11)) & 
                  (F.col("acc_balan_mo_number") == 12))        
        
        acc_sum = (acc_sum.withColumn("type_balance", 
                                      F.when(cond_1, 1)
                                      .when(cond_2, 3)
                                      .when(cond_3, 2)
                                      .when(cond_4, 4)))
        
        acc_sum = (acc_sum
                   .withColumn("ventas", 
                               F.when(self.conds.get("type1"), self.sum_expr.get("type1_v"))
                               .when(self.conds.get("type2"), self.sum_expr.get("type2_v"))
                               .when(self.conds.get("type3_v"), self.sum_expr.get("type3_v"))
                               .when(self.conds.get("type4_v"), self.sum_expr.get("type4_v"))
                               .otherwise(0.0))
                   .withColumn("activo", 
                              F.when(self.conds.get("type1"), self.sum_expr.get("type1_a"))
                               .when(self.conds.get("type2"), self.sum_expr.get("type2_a"))
                               .when(self.conds.get("type3_a"), self.sum_expr.get("type3_a"))
                               .otherwise(0.0)))
        
        if key == "interno":
            interno_flag = 1
        else:
            interno_flag = 0
        
        acc_sum = (acc_sum.withColumn("dato_interno", F.lit(interno_flag)))
                   
        return acc_sum
    
    def _get_closest_year(self):
        
        partition_year = self.closing_date[0:4]
        available_years = cmautils.get_closing_dates_v2(self.spark, self.extra_data_path, 
                                                        '^.*year=(\d{4}).*$')
        
        available_years_fin = list(filter(lambda x: x<=partition_year, available_years))
                                                            
        year = min(available_years_fin, key=lambda x:abs(int(x)-int(partition_year)))
        
        return year
    
    
    def cacl_evol_mean(df, cols_to_calc, ini_suf = "_y2", mean_sufs = ["_y1"]):
        
        for col in cols_to_calc:

            ini_col = "{0}{1}".format(col, ini_suf)

            col_evol = "{0}_evol".format(col)
            cond_evol = (F.col(ini_col) > 0)
            expr_evol = ((F.col(col) - F.col(ini_col))/F.col(ini_col))

            cols_mean = ["{0}{1}".format(col, suf) for suf in [final_suf] + [ini_suf] + mean_sufs]
            print(cols_mean)
            col_mean = "{0}_mean".format(col)
            expr_sum_mean = reduce(lambda x, y: x+y, [F.col(col_mean) for col_mean in cols_mean])
            expr_mean = expr_sum_mean /len(cols_mean)

            df = (df
                  .withColumn(col_evol, F.when(cond_evol, expr_evol).otherwise(0.))
                  .withColumn(col_mean, expr_mean))
        
        cols_sel = (["customer_id"] + cols_to_calc + 
                    ["{0}_{1}".format(col, "evol") for col in cols_to_calc] + 
                    ["{0}_{1}".format(col, "mean") for col in cols_to_calc])
        
        return df.select(cols_sel)
    
    
    def _get_extra_bal_data_year(self, year_diff = 0, calc_ratios = True):
        
        path = os.path.join(self.extra_data_path, "year={0}".format(int(self._get_closest_year()) + year_diff))
#         print('path balance data externo: ',path)
        
        df_extra_bal = spark.read.parquet(path)
        if ('cod_cliente' in df_extra_bal.columns):
            df_extra_bal = df_extra_bal.withColumn("customer_id", F.lpad("cod_cliente", 9, "0")).drop('cod_cliente')
        
        
        for col in list(set(list(self.extra_data_dict_replace.keys()) + self.extra_data_cast)):
            df_extra_bal = df_extra_bal.withColumn(col, F.col(col).cast(T.FloatType()))
        
        if calc_ratios:
            for col_name, col_expr in self.extra_data_calc.items():
                df_extra_bal = df_extra_bal.withColumn(col_name, col_expr)
                                                   
        for col_old, col_new in self.extra_data_dict_replace.items():
            df_extra_bal = df_extra_bal.withColumnRenamed(col_old, col_new)
        
        if calc_ratios:
            sel_cols = (["customer_id"] + 
                        list(self.extra_data_dict_replace.values()) + 
                        list(self.extra_data_calc.keys()))
        else:
            sel_cols = (["customer_id"] + 
                        list(self.extra_data_dict_replace.values()))
            
        return  df_extra_bal.select(sel_cols).na.fill(0.)
    
    
    def _get_extra_bal_data(self):
        
        df_y0 = self._get_extra_bal_data_year()
#         df_y1 = cmautils.append_suffix_columns(self.spark, 
#                                                self._get_extra_bal_data_year(-1), 
#                                               "_y1", 
#                                               ["customer_id"])
        
#         df_y2 = cmautils.append_suffix_columns(self.spark, 
#                                                self._get_extra_bal_data_year(-2), 
#                                               "_y2", 
#                                               ["customer_id"])
        
#         df_y = (df_y0
#                 .join(df_y1, "customer_id", "outer")
#                 .join(df_y2, "customer_id", "outer")
#                 .na.fill(0.))
        
#         cols_to_calc = [col for col in df_y0.columns if col != "customer_id"]
#         df_evol_mean = self._cacl_evol_mean(df_y, cols_to_calc)
        
#         df_final = df_y0.join(df_evol_mean, "customer_id", "outer").na.fill(0.0)
        
        return df_y0
    
    def get_data(self):
        
        acc_int = self._get_ind_data("interno")
        acc_ext = self._get_ind_data("externo")
        acc_all = acc_int.unionAll(acc_ext)
        
        window_dato_interno = (Window
                               .partitionBy("customer_id", 
                                            "acc_balan_mo_number", 
                                            "st_model_type", 
                                            "year_exercise_number",
                                            "type_balance")
                               .orderBy(F.desc("dato_interno"))
                               .rowsBetween(Window.unboundedPreceding, Window.currentRow))
        
        dato_interno_max = F.max(F.col("dato_interno")).over(window_dato_interno)
        
        acc_all = (acc_all
                   .select(F.expr("*"), 
                           dato_interno_max.alias("dato_interno_max"))
                   .filter(F.col("dato_interno") == F.col("dato_interno_max"))
                   .drop("dato_interno_max"))
        
        acc_all = (acc_all
           .withColumn("rank", 
                       F.struct(F.col("type_balance").alias("type"),
                               F.col("year_exercise_number").alias("year"),
                               F.col("acc_balan_mo_number").alias("n_months"))))
        
        window_type_year = (Window
                            .partitionBy("customer_id")
                            .orderBy(F.desc("rank.type"), F.desc("rank.year"), 
                                     F.desc("rank.n_months"))
                            .rowsBetween(Window.unboundedPreceding, Window.currentRow))
    
        type_max = F.max(F.col("rank")).over(window_type_year)
    
        acc_all = (acc_all
                   .select(F.expr("*"), 
                           type_max.alias("rank_max"))
                   .filter((F.col("type_balance") == F.col("rank_max.type")) & 
                           (F.col("year_exercise_number") == F.col("rank_max.year")) &
                           (F.col("acc_balan_mo_number") == F.col("rank_max.n_months")))
                   .dropDuplicates(subset=["customer_id", "st_model_type"])
                   .select("customer_id", "year_exercise_number", 
                           "acc_balan_mo_number", "st_model_type", "ventas", "activo")
                    .withColumnRenamed("ventas", "imp_ventas")
                    .withColumnRenamed("activo", "imp_activo_bal")
                   .select("customer_id", "imp_ventas", "imp_activo_bal", 
                           "year_exercise_number", 
                           "acc_balan_mo_number", "st_model_type"))
        
        extra_bal_data = self._get_extra_bal_data()
        
        if ('cod_cliente' in extra_bal_data.columns):
            extra_bal_data = extra_bal_data.withColumn("customer_id", F.lpad("cod_cliente", 9, "0")).drop('cod_cliente')
        
        acc_all = (acc_all.join(extra_bal_data, "customer_id", "outer")
                   .na.fill(0.0))
        
        return acc_all

# Cómex

In [139]:
def get_info_comex (closing_date:str) -> DataFrame : 
    cols_t = ['settl_payment_id','cash_letter_check_id','issuance_type',
              'contract_regist_date','last_installment_payment_date',
              'personal_id','currency_id','counterv_check_oper_amount',
              'check_oper_amount']
    transf_ext = hu.read_table('emoltransfext',closing_date
                              ).select(*cols_t)

    cols_c = ['customer_id','personal_id', 'client_type_id',
              'customer_name','first_sur_name','second_sur_name']
    customer= hu.read_table('eclccustm',closing_date
                           ).select(*cols_c)

    cols_e = ['exchange_curr_amount','entity_id','exch_cur_date',
             'currency_id','exchange_rate_type']
    df_exchange = hu.read_table('emolexch',closing_date
                               ).select(*cols_e)

    # OPES (Ordenes de pago emitidas),OPRS (Ordenes de pago recibidas),CHEE (Cheques internacionales emitidos)
    # CHER (Cheques internacionales recibidos),PIES (Pagos Inmediatos emitidos),PIRE (Pagos Inmediatos recibidos)

    df_comex = transf_ext.withColumn('PRODUCTO', \
                F.when((F.col('settl_payment_id') != 'PI') &
                     ((F.col('cash_letter_check_id').isNull()) | (F.trim(F.col('cash_letter_check_id')) == '')) &
                     (F.col('issuance_type') == '0'), 'OPES') \
                              .otherwise(F.when((F.col('settl_payment_id') != 'PI') &
                     ((F.col('cash_letter_check_id').isNull()) | (F.trim(F.col('cash_letter_check_id')) == '')) &
                     (F.col('issuance_type') == '1'), 'OPRS') \
                              .otherwise(F.when((F.col('settl_payment_id') != 'PI') &
                     ((F.col('cash_letter_check_id').isNotNull()) | (F.trim(F.col('cash_letter_check_id')) != '')) &
                     (F.col('issuance_type') == '0'), 'CHEE') \
                              .otherwise(F.when((F.col('settl_payment_id') != 'PI') &
                     ((F.col('cash_letter_check_id').isNotNull()) | (F.trim(F.col('cash_letter_check_id')) != '')) &
                     (F.col('issuance_type') == '1'), 'CHER') \
                              .otherwise(F.when((F.col('settl_payment_id') == 'PI') &
                     (F.col('issuance_type') == '0'), 'PIES') \
                              .otherwise(F.when((F.col('settl_payment_id') == 'PI') & 
                     (F.col('issuance_type') == '1'), 'PIRE') \
                              .otherwise('Otros')))))))

    # OPES (contract_regist_date),OPRS (contract_regist_date),CHEE (last_installment_payment_date)
    # CHER (last_installment_payment_date),PIES (contract_regist_date / last_installment_payment_date)
    # PIRE (contract_regist_date / last_installment_payment_date)

    df_comex = df_comex.withColumn('FECHA', F.when(F.col('PRODUCTO') == 'OPES', F.col('contract_regist_date')).otherwise(
                            F.when(F.col('PRODUCTO') == 'OPRS', F.col('contract_regist_date')).otherwise(
                            F.when(F.col('PRODUCTO') == 'CHEE', F.col('last_installment_payment_date')).otherwise(
                            F.when(F.col('PRODUCTO') == 'CHER', F.col('last_installment_payment_date')).otherwise(
                            F.when(F.col('PRODUCTO') == 'PIES', F.col('contract_regist_date')).otherwise(
                            F.when(F.col('PRODUCTO') == 'PIRE', F.col('contract_regist_date'))))))))

    customer = customer.select(F.col('personal_id').alias('personal_id_eclc'), 
                        F.col('customer_id'),#.alias('customer_id_eclc'), 
                        F.col('client_type_id'),
                        F.col('customer_name'), F.col('first_sur_name'), F.col('second_sur_name')
                        ).dropDuplicates(subset=['personal_id_eclc'])

    # Unimos a la tabla de COMEX, el *customer_id* del cliente y tipo de cliente *client_type_id*

    df_comex = df_comex.join(customer, [F.trim(df_comex.personal_id) == F.trim(customer.personal_id_eclc)],
                             how = 'left').drop('personal_id_eclc')

    df_exchange = df_exchange.withColumnRenamed('entity_id','entity_id_cambio'
                            ).withColumnRenamed('currency_id','currency_id_cambio'
                            ).where((F.col('exchange_rate_type').isin([1])) &
                                    (F.col('entity_id_cambio') == '9000')
                            ).dropDuplicates(subset=['exch_cur_date','currency_id_cambio'])

    df_comex = df_comex.join(df_exchange, (df_comex.FECHA == df_exchange.exch_cur_date) &
                            (df_comex.currency_id == df_exchange.currency_id_cambio), how = 'left')
    df_comex = df_comex.drop('entity_id_cambio','exch_cur_date','currency_id_cambio')

    # Cogemos el tipo de cambio de fixing de divisa *(exchange_curr_amount = 1)* emitido por el BdE
    # Una vez que tenemos el tipo de cambio añadido al dataframe, calculamos el nuevo importe de la operacion contravalorado con nuestro tipo de cambio

    df_comex = df_comex.withColumn('IMPORTE_CALCULADO', F.when(F.col('currency_id') == 'EUR',
                                        F.col('counterv_check_oper_amount')
                                    ).otherwise(F.col('check_oper_amount') / F.col('exchange_curr_amount')))

    # Para finalizar este apartado, debemos aplicar el IMPORTE_CALCULADO un formateo.

    return df_comex.withColumn('imp_operacion', F.col('IMPORTE_CALCULADO').cast('decimal(32,2)'))

In [140]:
class ComexData(GetData):
        
#     def __init__(self, spark, closing_date, hdfsutils):       
#         super().__init__(spark, closing_date, hdfsutils)
    def __init__(self, spark, closing_date):       
        super().__init__(spark, closing_date)
        self.table = {'vsfmacgm': {'key':['customer_id','country_id','business_area_id','entity_id'], 
#                                 'cols_in':['comex_export_type','comex_export_amount',
#                                'comex_import_type','comex_import_amount'],
                                 'cols_in':['comex_s_export_remit_amount','comex_doc_export_remit_amount',
                                'comex_doc_credit_export_amount','comex_s_export_remit_type',
                                'comex_doc_export_remit_type','comex_doc_credit_export_type',
                                'comex_s_import_remit_amount','comex_doc_import_remit_amount',
                                'comex_doc_credit_import_amount','comex_s_import_remit_type',
                                'comex_doc_import_remit_type','comex_doc_credit_import_type'
                                           ],
                                  
                                  #                                             'transactional_comex_type','transactional_comex_amount'],
                                 'cols_out':['customer_id','ind_comex_export','imp_comex_export',
                                             'ind_comex_import','imp_comex_import'],
#                                              'transactional_comex_flag','transactional_comex_amount']
                                 }
                     }
#        self.renames = {'comex_export_type': 'ind_comex_export',
#                        'comex_export_amount': 'imp_comex_export',
#                        'comex_import_type': 'ind_comex_import',
#                        'comex_import_amount': 'imp_comex_import'
#                       }
                        
        
    def get_comex_trans(self)-> DataFrame:
        if(self.closing_date < '2019-06-01'): # la información anterior no está validada por governance
            return spark.createDataFrame([['000000000', 0, 0]],
                                        ['customer_id','ind_transactional_comex','imp_transactional_comex'])
        else:
            cols_in=['customer_id','imp_operacion','PRODUCTO']
            cols_out=['customer_id','ind_transactional_comex','imp_transactional_comex']
            comex_estr = get_info_comex(self.closing_date).select(*cols_in)
            res = comex_estr.where(~(F.col('customer_id').isNull())
                                  ).where(~F.col('PRODUCTO').isin('PIRE','PIES')) # desde  governance nos indican que el dato no está validado
            return res.groupBy('customer_id').agg(F.sum('imp_operacion').alias('imp_transactional_comex')
                                               ).withColumn('ind_transactional_comex',F.lit(1)
                                               ).select(*cols_out)
    
    
    def get_comex_rest(self)-> DataFrame:        
        bibasic = hu.read_table('vsfmacgm',self.closing_date).where(F.col('key_pm_act_holder_type')==1
                                      ).select(*self.table['vsfmacgm']['key'],*self.table['vsfmacgm']['cols_in']
                                       )
#        for col, alias in self.renames.items():
#            bibasic = bibasic.withColumnRenamed(col, alias)
        bibasic = bibasic.withColumn('ind_comex_export',(F.col('comex_s_export_remit_amount') +
                                            F.col('comex_doc_export_remit_amount') +
                                            F.col('comex_doc_credit_export_amount'))) 
        bibasic = bibasic.withColumn('imp_comex_export',F.when((F.col('comex_s_export_remit_type')=='A') |
                                            (F.col('comex_doc_export_remit_type') == 'A') |
                                           (F.col('comex_doc_credit_export_type') == 'A'), 1).otherwise(0)) 
        bibasic = bibasic.withColumn('ind_comex_import',(F.col('comex_s_import_remit_amount') +
                                            F.col('comex_doc_import_remit_amount') +
                                            F.col('comex_doc_credit_import_amount')))
        bibasic = bibasic.withColumn('imp_comex_import',F.when((F.col('comex_s_import_remit_type')=='A')|
                                            (F.col('comex_doc_import_remit_type') == 'A') |
                                            (F.col('comex_doc_credit_import_type') == 'A'), 1).otherwise(0))
        bibasic = bibasic.drop('comex_s_export_remit_amount','comex_doc_export_remit_amount',
                                'comex_doc_credit_export_amount','comex_s_export_remit_type',
                                'comex_doc_export_remit_type','comex_doc_credit_export_type',
                                'comex_s_import_remit_amount','comex_doc_import_remit_amount',
                                'comex_doc_credit_import_amount','comex_s_import_remit_type',
                                'comex_doc_import_remit_type','comex_doc_credit_import_type')
                              
        cols_flag =  [x for x in bibasic.columns if x.startswith('ind_')]
        cols_amount =  [x for x in bibasic.columns if x.startswith('imp_')]
        
        return (bibasic
                .groupBy('customer_id')
                .agg(*[F.max(col).alias(col) for col in cols_flag],
                     *[F.sum(col).alias(col) for col in cols_amount])
                .select(self.table['vsfmacgm']['cols_out']))
    
    def get_comex(self)-> DataFrame:  
        return self.get_comex_trans().join(self.get_comex_rest(),['customer_id'],how='outer').fillna(0)
    

# Broker

In [141]:
class BrokerData(GetData):
    def __init__(self, spark, closing_date):       
        super().__init__(spark, closing_date)
        self.table = {'vsfmapxc': {'key':['customer_id'], 
                                 'cols_in':['com_product_id'],
                                 'cols_out':['customer_id','ind_broker']
                                 }
                     }

    def get_broker(self):
        values = ['0000005010', '0000005030', '0000005040', '0000005041', '0000005042', '0000005043', 
                  '0000005050', '0000005060', '0000005065', '0000005066', '0000005080', '0000005081']
        pxc = hu.read_table('vsfmapxc',self.closing_date).select(*self.table['vsfmapxc']['key'],
                                                            *self.table['vsfmapxc']['cols_in'])
        return pxc.withColumn('ind_broker', F.when(F.col('com_product_id').isin(*values),'1'
                    ).otherwise('0')).groupBy('customer_id'
                    ).agg(F.max('ind_broker').alias('ind_broker'))  

In [142]:
# CLOSING_DATE = '2019-10-31'
# bd = BrokerData(spark,CLOSING_DATE).get_broker() #.where(F.col('ind_broker')=='1').count()

# Grupo Empresarial

In [143]:
class GroupData(GetData):
    def __init__(self, spark, closing_date):       
        super().__init__(spark, closing_date)
        self.table = {'vsfmacgr': {'key':['customer_id'], 
                                 'cols_in':['direct_pp_per','business_group_id'],
                                 'cols_out':['customer_id','business_group_id']
                                 }
                     }

    def get_business_group(self):
        cgr = hu.read_table('vsfmacgr',self.closing_date).select(*self.table['vsfmacgr']['key'],
                                                            *self.table['vsfmacgr']['cols_in'])
        
        window_per = Window.partitionBy('customer_id').orderBy(F.desc('direct_pp_per'))
        return cgr.withColumn('max_per',F.max('direct_pp_per').over(window_per)
                             ).filter(F.col('direct_pp_per')==F.col('max_per')
                                     ).dropDuplicates(subset=["customer_id"]
                                    ).select(*self.table['vsfmacgr']['cols_out'])

In [144]:
# CLOSING_DATE = '2019-10-31'
# ge = GroupData(spark,CLOSING_DATE).get_business_group()#.where(F.col('customer_id'
# #                                         ).isin('016927348','000951804','006686409')).show()

# Universo

In [145]:
class UniversoData(GetData):
    def __init__(self, spark, closing_date):       
        super().__init__(spark, closing_date)
        self.table = {'universo_ind': {'key':['customer_id'], 
                                 'cols_in':['situacion_contable','politica','opinion'],
                                 'cols_out':['situacion_contable_ind','politica_ind','opinion_ind']
                                 }
                     }

    def get_data(self):
        df = hu.read_table('universo_ind',self.closing_date)
        if ('cod_cliente' in df.columns):
            df = df.withColumn("customer_id", F.lpad("cod_cliente", 9, "0")).drop('cod_cliente')
        df = df.select(*self.table['universo_ind']['key'],*self.table['universo_ind']['cols_in'])
        
        return hu.rename_columns(df,self.table['universo_ind']['cols_in'],'ind')

In [146]:
# CLOSING_DATE = '2020-01-31'
# UniversoData(spark,CLOSING_DATE).get_data().show()

# Cálculo de Evoluciones

In [147]:
class BaseEvolMeans():
    
    def __init__(self, spark, closing_date):
        
        self.spark = spark
        self.closing_date = closing_date
        
    def get_pvv_ind_cols(self):
        
        return ["ind_val_rentafija", "ind_valores_bbva", 
                "ind_valores_no_bbva", "ind_resto_valores"]
    
    def get_pra_ind_cols(self):
        
        return ["ind_hipoteca_particular", "ind_prestamo_empresa", 
                "ind_prest_consumo", "ind_prestamo_credinegocio", 
                "ind_hipoteca_empresa"]
    
    def get_hca_ind_cols(self):
        
        return ["ind_comercial_dis", "ind_other_portfolio"]
        
    def get_data(self, suffix=None):

        """
        Get all the balance data for a particular partition
        """

        # Get all the data from cgm, pvv, pra, desc_com and margen
        df = CGMData(self.spark, self.closing_date).get_imp_data()
        
        df = df.join(RentaValores(self.spark, self.closing_date)\
                     .get_data().drop(*self.get_pvv_ind_cols()), 
                     on=["customer_id"], how="outer").na.fill(0.)
        
        df = df.join(PraData(self.spark, self.closing_date).get_data()\
                           .drop(*self.get_pra_ind_cols()),
                     on=["customer_id"], how="outer").na.fill(0.)
        
        df = df.join(DescComercial(self.spark, self.closing_date).get_data()
                           .drop(*self.get_hca_ind_cols()),
                           on=["customer_id"], how="outer").na.fill(0.)
        
        df = df.join(MargenData(self.spark, self.closing_date).get_data(),
                     on=["customer_id"], how="outer").na.fill(0.)
        
        
#         df = df.join(BalanceData(self.spark, self.closing_date).get_data(),
#                      on=["customer_id"], how="outer").na.fill(0.)
        

        # Append suffix to cols if needed
        if suffix:
            df = cmautils.append_suffix_columns(spark, df, suffix, 
                                                exclude_cols=["customer_id"])

        return df

In [148]:
# imp_data_m1 = BaseEvolMeans(spark, CLOSING_DATE).get_data()
# imp_data_m1.cache(), imp_data_m1.count()

In [149]:
class EvolData(GetData):
    
    def get_query_evol(self, temp_table_name, t_now):
        """
        Get the SQL query for the calculation of the evolution of the balance data
        """

        query_components = ["customer_id"]
        for y_cur in t_now.columns:
            if y_cur == "customer_id":
                continue
            y_prev = y_cur + "_m3"
            if y_cur.startswith("n_"):
                s = """ROUND(COALESCE(
                          (COALESCE({0}, 0) - {1}) / {1},
                           COALESCE({0}, 0) - {1},
                           0))
                       AS {0}_evol""".format(y_cur, y_prev)
            else:
                s = """COALESCE(
                          (COALESCE(CAST({0} as FLOAT), 0.0) - 
                                    CAST({1} as FLOAT)) / CAST({1} AS FLOAT),
                           COALESCE(CAST({0} as FLOAT), 0.0) - 
                               COALESCE(CAST({1} as FLOAT), 0.0),
                           0.0) 
                        AS {0}_evol""".format(y_cur, y_prev)
            query_components.append(s)

        query = "SELECT " + ",\n".join(query_components) +\
                " FROM {}".format(temp_table_name)

        return query
    
    
    def get_data(self):
        """
        Compute the evolutions for all the balances
        """

        partition_m3 = cmautils.add_months(self.closing_date, -3)
        

        t_now = BaseEvolMeans(self.spark, self.closing_date).get_data()
        t_before = BaseEvolMeans(self.spark, partition_m3).get_data("_m3")


        evol_df = t_now.join(t_before, on=["customer_id"], how="outer").fillna(0.)
        temp_table_name = "evol_components"
        evol_df.createOrReplaceTempView(temp_table_name)
        final_evol_df = spark.sql(self.get_query_evol(temp_table_name, t_now))

        return final_evol_df

In [150]:
# evol_data = EvolData(spark, CLOSING_DATE).get_data()
# evol_data.cache(), evol_data.count()

# Cálculo de Medias

In [151]:
class MeansData(GetData):
    
    def get_query_mean(self, temp_table_name, t_now):
        """
        Get the SQL query for the calculation of the means of the balance data
        """

        query_components = ["customer_id"]
        for y_cur in t_now.columns:
            # if y_cur in ("customer_id", "imp_factoring"):
            if y_cur in ("customer_id"):
                continue
            y_m1 = y_cur + "_m1"
            y_m2 = y_cur + "_m2"
            s = """(COALESCE({0}, 0) + 
                     COALESCE({1}, 0) +
                     COALESCE({2}, 0)) 
                     / ( CAST(isnotnull({0}) AS INT) +
                         CAST(isnotnull({1}) AS INT) + 
                         CAST(isnotnull({2}) AS INT) )
                    AS {0}_mean""".format(y_cur, y_m1, y_m2)
            query_components.append(s)
        query = "SELECT " + ",\n".join(query_components) +\
            " FROM {}".format(temp_table_name)

        return query
    
    def get_data(self):

        """
        Compute the means for all the balances
        """

        partition_m1 = cmautils.add_months(self.closing_date, -1)
        partition_m2 = cmautils.add_months(self.closing_date, -2)
        
        t_now = BaseEvolMeans(self.spark, self.closing_date).get_data()
        t_m1 = BaseEvolMeans(self.spark, partition_m1).get_data("_m1")
        t_m2 = BaseEvolMeans(self.spark, partition_m2).get_data("_m2")

        evol_mean = t_now.join(t_m1, on="customer_id", how="outer")\
                         .join(t_m2, on="customer_id", how="outer")

        temp_table_name = "mean_components"
        evol_mean.createOrReplaceTempView(temp_table_name)
        final_mean_df = spark.sql(self.get_query_mean(temp_table_name, t_now))

        return final_mean_df

In [152]:
# CLOSING_DATE = FIRST_CLOSING_DATE
# means_data = MeansData(spark, CLOSING_DATE).get_data()
# means_data.cache(), means_data.count()

In [153]:
# means_data.columns

# Agregación de Todos los Datos

In [154]:
def call_method(o, name, kwargs):
    
    return getattr(o, name)(**kwargs)

In [155]:
def write_read_df(path, class_data, method="get_data", extra_args={}):        
    
    if not hu.check_path(path):
        df = call_method(class_data, method, extra_args)
        df.write.mode("overwrite").parquet(path)
        print("Calculating")
    else:
        print("Reading")
        
    return spark.read.parquet(path)

In [156]:
def get_cards_data(spark, closing_date, tmp_path, class_data_dict):
    """
    Get the data related to cards: 
        - debit cards, general public
        - credit cards, general public
        - debit cards, enterprises
        - credit cards, enterprises
    """
    
    # tmp paths
    deb_part_path = os.path.join(tmp_path, "deb_part/closing_date={}".format(closing_date))
    deb_empr_path = os.path.join(tmp_path, "deb_empr/closing_date={}".format(closing_date))
    cre_part_path = os.path.join(tmp_path, "cre_part/closing_date={}".format(closing_date))
    cre_empr_path = os.path.join(tmp_path, "cre_empr/closing_date={}".format(closing_date))
    
    t_debito_part = write_read_df(deb_part_path, class_data_dict["deb_part"], 
                                  extra_args={"card_type": "bjd",  "cust_type": "part"})
    
    t_debito_empr = write_read_df(deb_empr_path, class_data_dict["deb_empr"], 
                                 extra_args={"card_type": "bjd",  "cust_type": "empr"})
    
    t_credito_part = write_read_df(cre_part_path, class_data_dict["cre_part"], 
                                  extra_args={"card_type": "bjc",  "cust_type": "part"})
    
    t_credito_empr = write_read_df(cre_empr_path, class_data_dict["cre_empr"],
                                  extra_args={"card_type": "bjc",  "cust_type": "empr"})
    
    return (t_debito_part, t_debito_empr, t_credito_part, t_credito_empr)

## Tablas necesarias
Orígenes de datos necesarios para la generación del cubo

In [157]:
#os.path.join(PROJECT_PATH, "tmp/{0}".format(TAG_DATA))

In [158]:
def get_dfs (spark, closing_date, project_path, data_tag,no_exec) -> dict:

    # Define configuration
    
    sess_vars = ["t_consulta_app", "t_contrata_app", "t_opera_app", "t_otro_app",
                 "t_consulta_web", "t_contrata_web", "t_opera_web", "t_otro_web", 
                 "t_consulta_atm", "t_contrata_atm", "t_opera_atm", "t_otro_atm",
                 "t_consulta_agent", "t_contrata_agent", "t_opera_agent", 
                 "t_otro_agent"]
    
    dfs = {}
    
    # Define table paths
    tmp_path = os.path.join(project_path, "tmp/{0}".format(data_tag))
    stable_path = os.path.join(project_path, data_tag)
#   stable_path = '/data/sandboxes/sfma/data/cma/edc/tmp/test7/'

    base_ini_path = os.path.join(tmp_path, "base_ini/closing_date={}".format(closing_date))
    cgm_ind_path = os.path.join(tmp_path, "cgm_ind/closing_date={}".format(closing_date))
    cgm_imp_path = os.path.join(tmp_path, "cgm_imp/closing_date={}".format(closing_date))
    valores_path = os.path.join(tmp_path, "valores/closing_date={}".format(closing_date))
    pra_path = os.path.join(tmp_path, "pra/closing_date={}".format(closing_date))
    dis_path = os.path.join(tmp_path, "dis/closing_date={}".format(closing_date))
    evol_path = os.path.join(tmp_path, "evol/closing_date={}".format(closing_date))
    means_path = os.path.join(tmp_path, "means/closing_date={}".format(closing_date))
    ins_path = os.path.join(tmp_path, "ins/closing_date={}".format(closing_date))
    tpv_path = os.path.join(tmp_path, "tpv/closing_date={}".format(closing_date))
    base_ind_path = os.path.join(tmp_path, "base_ind/closing_date={}".format(closing_date))
    rtk_path = os.path.join(tmp_path, "rtk/closing_date={}".format(closing_date))
    margen_path = os.path.join(tmp_path, "margen/closing_date={}".format(closing_date))
    ingresos_path = os.path.join(tmp_path, "ingresos/closing_date={}".format(closing_date))
    riesgos_path = os.path.join(tmp_path, "riesgos/closing_date={}".format(closing_date))
    cirbe_path = os.path.join(tmp_path, "cirbe/closing_date={}".format(closing_date))
    bec_riesgos_path = os.path.join(tmp_path, "bec_riesgos/closing_date={}".format(closing_date))
    sessions_path = os.path.join(tmp_path, "sessions/closing_date={}".format(closing_date))
    digi_path = os.path.join(tmp_path, "digi/closing_date={}".format(closing_date))
    plan1_path = os.path.join(tmp_path, "plan1/closing_date={}".format(closing_date))
    balance_path = os.path.join(tmp_path, "balances/closing_date={}".format(closing_date))
    segsociales_path = os.path.join(tmp_path, "segsociales/closing_date={}".format(closing_date))
    lisboa_path = os.path.join(tmp_path, "lisboa/closing_date={}".format(closing_date))
    numprods_path = os.path.join(tmp_path, "numprods/closing_date={}".format(closing_date))
    cnae_path = os.path.join(tmp_path, "cnae/closing_date={}".format(closing_date))
    clickpay_path = os.path.join(tmp_path, "clickpay/closing_date={}".format(closing_date))
    rating_path = os.path.join(tmp_path, "rating/closing_date={}".format(closing_date))
    comex_path = os.path.join(tmp_path, "comex/closing_date={}".format(closing_date))
    altas_path = os.path.join(tmp_path, "altas/closing_date={}".format(closing_date))
    rar_path = os.path.join(tmp_path, "rar12/closing_date={}".format(closing_date))
    mora_path = os.path.join(tmp_path, "mora/closing_date={}".format(closing_date))
    oneview_path = os.path.join(tmp_path, "oneview/closing_date={}".format(closing_date))
    becnprods_path = os.path.join(tmp_path, "becnprods/closing_date={}".format(closing_date))
    firma_path = os.path.join(tmp_path, "riesgo_firma/closing_date={}".format(closing_date))
    broker_path = os.path.join(tmp_path, "broker/closing_date={}".format(closing_date))
    group_path = os.path.join(tmp_path, "business_group/closing_date={}".format(closing_date))
    universo_path = os.path.join(tmp_path, "universo_ind/closing_date={}".format(closing_date))
    print("Finish set-up")
    
    if('BaseData' not in no_exec):
        # base_ini
        base_ini = write_read_df(base_ini_path, BaseData(spark, closing_date))
        dfs['base_ini']= base_ini
        print("Finish: base_ini")

    if('AltasData' not in no_exec):
        # altas
        altas_data = write_read_df(altas_path, AltasData(spark, closing_date))
        dfs['altas_data']=altas_data
        print("Finish: Altas")

    if('ComexData' not in no_exec):
        # comex
        comex_data = write_read_df(comex_path, ComexData(spark, closing_date), "get_comex")
        dfs['comex_data']=comex_data
        print("Finish: Comex")
        
    if('BrokerData' not in no_exec):    
        # broker
        broker_data = write_read_df(broker_path, BrokerData(spark, closing_date), "get_broker")
        dfs['broker_data']=broker_data
        print("Finish: Broker")

    if('GroupData' not in no_exec):
        # business_group
        group_data = write_read_df(group_path, GroupData(spark, closing_date), "get_business_group")
        dfs['group_data']=group_data
        print("Finish: business group")
        
    if('UniversoData' not in no_exec):    
        # universo
        universo_data = write_read_df(universo_path, UniversoData(spark, closing_date), "get_data")
        dfs['universo_data']=universo_data
        print("Finish: universo")
        
    if('RarData' not in no_exec):     
        # rar
        rar_data = write_read_df(rar_path, RarData(spark, closing_date), "get_rar12m")
        dfs['rar_data']=rar_data
        print("Finish: rar")

    if('CGMData' not in no_exec): 
        # cgm_ind
        cgm_ind = write_read_df(cgm_ind_path, CGMData(spark, closing_date), "get_ind_data")
        dfs['cgm_ind']=cgm_ind  
        print("Finish: cgm_ind")

        # cgm_imp
        cgm_imp = write_read_df(cgm_imp_path, CGMData(spark, closing_date), "get_imp_data")
        dfs['cgm_imp']=cgm_imp
        print("Finish: cgm_imp")

    if('RentaValores' not in no_exec): 
        # valores
        valores = write_read_df(valores_path, RentaValores(spark, closing_date))
        dfs['valores']=valores
        print("Finish: valores")
        
    if('PraData' not in no_exec):     
        # activo I
        pra = write_read_df(pra_path, PraData(spark, closing_date))
        dfs['pra']=pra
        print("Finish: activo I")

    if('DescComercial' not in no_exec): 
        # descuento comercial
        dis = write_read_df(dis_path, DescComercial(spark, closing_date))
        dfs['dis']=dis
        print("Finish: discount")

    if('GenTenData' not in no_exec): 
        # indicadores generales de tenencia
        base_ind = write_read_df(base_ind_path, GenTenData(spark, closing_date))
        dfs['base_ind']=base_ind
        print("Finish: base_ind")

    if('SegurosData' not in no_exec): 
        # seguros
        ins = write_read_df(ins_path, SegurosData(spark, closing_date),'get_insurances')
        dfs['ins']=ins
        print("Finish: insurance")

    if('BalanceTPV' not in no_exec): 
        # tpv
        tpv = write_read_df(tpv_path, BalanceTPV(spark, closing_date))
        dfs['tpv']=tpv    
        print("Finish: tpv")
        
    if('MargenData' not in no_exec):    
        # margen
        margen = write_read_df(margen_path, MargenData(spark, closing_date))
        dfs['margen']=margen
        print("Finish: margen")

    if('Tarjetas' not in no_exec): 
        # cards
        cards_class_dict = {"deb_part": Tarjetas(spark, closing_date),
                            "deb_empr": Tarjetas(spark, closing_date),
                            "cre_part": Tarjetas(spark, closing_date),
                            "cre_empr": Tarjetas(spark, closing_date)}

        (t_debito_part, t_debito_empr, t_credito_part, t_credito_empr) = \
            get_cards_data(spark, closing_date, tmp_path, cards_class_dict)
        dfs['t_debito_part']=t_debito_part 
        dfs['t_debito_empr']=t_debito_empr
        dfs['t_credito_part']=t_credito_part
        dfs['t_credito_empr']=t_credito_empr 
        print("Finish: cards")
        
    if('IngresosData' not in no_exec):     
        # ingresos
        ingresos_data = write_read_df(ingresos_path, IngresosData(spark, closing_date))
        dfs['ingresos_data']=ingresos_data
        print("Finish: ingresos")

    if('RiesgosData' not in no_exec):  
        # riesgos
        riesgos_data = write_read_df(riesgos_path, RiesgosData(spark, closing_date))
        dfs['riesgos_data']=riesgos_data
        print("Finish: riesgos")

    if('CirbeData' not in no_exec):  
        # cirbe
        cirbe_data = write_read_df(cirbe_path, CirbeData(spark, closing_date))
        dfs['cirbe_data']=cirbe_data   
        print("Finish: cirbe")

    if('RiesgosBecData' not in no_exec):
        # bec riesgos
        bec_riesgos_data = write_read_df(bec_riesgos_path, RiesgosBecData(spark, 
                                                                          closing_date, 
                                                                          "soft"))
        dfs['bec_riesgos_data']= bec_riesgos_data
        print("Finish: bec riesgos")
    

    if('BiNumProds' not in no_exec):
        # Num prods
        numprods_data = write_read_df(numprods_path, BiNumProds(spark, closing_date))
        dfs['numprods_data']=numprods_data
        print("Finish: num prods")
    
    if(('DigiData_biupload' not in no_exec) or ('DigiData' not in no_exec)):    
        # digitalizacion
        digi_data=None
        if(('DigiData_biupload' not in no_exec) and (closing_date<'2018-10-31')):        
            digi_data = write_read_df(digi_path, DigiData_biupload(spark, closing_date, "soft"))
        else:
            if('DigiData' not in no_exec):
                digi_data = write_read_df(digi_path, DigiData(spark, closing_date))
        if(digi_data!=None):
            dfs['digi_data']=digi_data
            print("Finish: digi")

    if('Sessionization' not in no_exec):
        # sesionizado
        sessions_data = write_read_df(sessions_path, 
                                      Sessionization(spark, closing_date))
        dfs['sessions_data']=sessions_data
        print("Finish: sessions")

    if('BalanceData' not in no_exec):    
        # balances
        balance_data = write_read_df(balance_path, BalanceData(spark, closing_date))
        dfs['balance_data']=balance_data
        print("Finish: balances")
        
    if('CesionSegurosSociales' not in no_exec):     
        # seguros sociales
        segsociales_data = write_read_df(segsociales_path, CesionSegurosSociales(spark, closing_date))
        dfs['segsociales_data']=segsociales_data
        print("Finish: segsociales")
    
    if('LisboaInd' not in no_exec):    
        # seguros sociales
        lisboa_data = write_read_df(lisboa_path, LisboaInd(spark, closing_date))
        dfs['lisboa_data']=lisboa_data
        print("Finish: lisboa")
        
    if('CNAE' not in no_exec):      
        # cnae
        cnae_data = write_read_df(cnae_path, CNAE(spark, closing_date))
        dfs['cnae_data']=cnae_data
        print("Finish: CNAE")

    if('ClickPayData' not in no_exec): 
        # click&pay
        clickpay_data = write_read_df(clickpay_path, ClickPayData(spark, closing_date))
        dfs['clickpay_data']=clickpay_data
        print("Finish: Click & Pay")
        
    if('RatingData' not in no_exec):     
        # rating
        rating_data = write_read_df(rating_path, RatingData(spark, closing_date))
        dfs['rating_data']=rating_data
        print("Finish: Rating")
    
    if('MoraData' not in no_exec):     
        # mora
        mora_data = write_read_df(mora_path, MoraData(spark, closing_date))
        dfs['mora_data']=mora_data
        print("Finish: mora")

    if('FirmaData' not in no_exec): 
        # riesgo firma
        firma_data = write_read_df(firma_path, FirmaData(spark, closing_date),'get_riesgoFirma')
        dfs['firma_data']=firma_data
        print("Finish: riesgo firma")
    
    if('OneViewData' not in no_exec):     
        # one view
        oneview_data = write_read_df(oneview_path, OneViewData(spark, closing_date))
        dfs['oneview_data']=oneview_data
        print("Finish: oneview")

    if('NumProdsData' not in no_exec):   
        # bec num prods
        becnprods_data = write_read_df(becnprods_path, NumProdsData(spark, closing_date))
        dfs['becnprods_data']=becnprods_data
        print("Finish: becnprods_data")

    if('EvolData' not in no_exec):  
        # evoluciones
        evol = write_read_df(evol_path, EvolData(spark, closing_date))
        dfs['evol']=evol
        print("Finish: evol")

    if('MeansData' not in no_exec):  
        # medias
        means = write_read_df(means_path, MeansData(spark, closing_date))
        dfs['means']=means
        print("Finish: means")

    if(('PlanUnoTactico' not in no_exec) or ('PlanUno' not in no_exec)):  
        # planuno
        plan1_data=None
        if(('PlanUnoTactico' not in no_exec) and (closing_date<'2018-12-31')):
            plan1_data = write_read_df(plan1_path, PlanUnoTactico(spark, closing_date))
        else:
            if('PlanUno' not in no_exec):
                plan1_data = write_read_df(plan1_path, PlanUno(spark, closing_date,[means])) 
        if(plan1_data!=None):
            dfs['plan1_data']=plan1_data
            print("Finish: plan uno")
    
    return dfs

In [159]:
def get_df_cube (dfs:dict,closing_date, project_path, data_tag)-> DataFrame:
    dfs_joins = []
    tmp_path = os.path.join(project_path, "tmp/{0}".format(data_tag))
    df_final = dfs['base_ini'].withColumn('customer_id',F.trim(F.col('customer_id')))
    dfs_cube = ['base_ini']
    count = 1
    keys = list(dfs.keys())
    keys.remove('base_ini')
    
    for key in keys:
        try:
            if(count==10):
                count = 0
                join_path = os.path.join(tmp_path, "joins/"+str(len(dfs_joins))+"/closing_date={}".format(closing_date))
                df_final.write.parquet(join_path,mode='overwrite')
                dfs_joins.append(spark.read.parquet(join_path))
                if(dfs[key]!=None):
                    df_final = dfs[key]
                    dfs_cube.append(key)
            else: 
                if(dfs[key]!=None):
                    if ('cod_cliente' in dfs[key].columns):
                        dfs[key] = dfs[key].withColumn("customer_id", F.lpad("cod_cliente", 9, "0")).drop('cod_cliente')
                    df_key = dfs[key].withColumn('customer_id',F.trim(F.col('customer_id')))
                    df_final = df_final.join(df_key,["customer_id"], "outer")
                    dfs_cube.append(key)
            count+=1
        except:
            print('Error con el dataframe ',key)
            return None
            
    if(count!=0 and df_final!=None):
        join_path = os.path.join(tmp_path, "joins/"+str(len(dfs_joins))+"/closing_date={}".format(closing_date))
        df_final.write.parquet(join_path,mode='overwrite')
        dfs_joins.append(spark.read.parquet(join_path))
        
    df_cube = dfs['base_ini'].select('customer_id').distinct()
    for df_join in dfs_joins[0:]:
        df_cube = df_cube.join(df_join,["customer_id"], "left")
        
    if ('risk_priority' in df_cube.columns):
        df_cube = df_cube.na.fill({"risk_priority": "00"})
    if ('risk_qualification' in df_cube.columns):
        df_cube = df_cube.na.fill({"risk_qualification": -1})
        
    return df_cube.na.fill(0).na.fill("00")

In [160]:
def check_new_classes (spark,df_cube,dict_data):
    clases_new = []
    schema = Schema(spark)
    fields = schema.check_fields(df_cube)
    if(len(fields['del'])>0):
        print('se han eliminado variables:',fields['del'])

    for f in fields['new']:
        clases=hu.check_column_dfs(dict_data,f)
        [clases_new.append(clase) for clase in clases if clase not in clases_new]
        
    return clases_new   

In [161]:
def calc_final_data(spark, closing_date, project_path, data_tag, dict_dfs, df_final):
    """
    Execution of the main process
    """    
    schema = Schema(spark)
    if(UPDATE_SCHEMA==1):
        new_fields = []
        if(VERSION==7):
            new_fields+= dict_dfs['comex_data'].schema[1:]
            new_fields+= dict_dfs['altas_data'].schema[1:]   
            new_fields+= dict_dfs['mora_data'].schema[1:] 
            new_fields+= dict_dfs['oneview_data'].schema[1:]   
            new_fields+= dict_dfs['becnprods_data'].schema[1:]  
            new_fields+= dict_dfs['firma_data'].schema[1:]   
            new_fields+= dict_dfs['rar_data'].schema[1:] 
            new_fields+= dict_dfs['broker_data'].schema[1:] 
            new_fields+= dict_dfs['group_data'].schema[1:]
            new_fields+= dict_dfs['universo_data'].schema[1:]
            # clases existentes con nuevos campos
            col_lisboa = ['level60_operarea_id', 'territorial_general_code']
            new_fields+= dict_data['lisboa_data'].select(*col_lisboa).schema
            col_bal = ['acc_balan_mo_number', 'st_model_type','year_exercise_number']
            new_fields+= dict_data['balance_data'].select(*col_bal).schema
            col_ins = ['ind_mini_seguros', 'ind_seg_credito','ind_seg_health',
                   'ind_seg_leasing','ind_seg_negocio']
            new_fields+= dict_data['ins'].select(*col_ins).schema
            schema.updateSchema(new_fields)
        schema.writeSchema()
    
    # Correct schema
    df_final = schema.correct_schema(df_final, project_path, data_tag)
    df_final = df_final.dropDuplicates(subset=["customer_id"])
    
    stable_path = os.path.join(PROJECT_PATH, TAG_DATA)    
    final_path = os.path.join(stable_path, "closing_date={}".format(closing_date))
    
    print('Path de escritura del cubo: ',final_path)
    if (OVERWRITE_CUBE or not hu.check_path(final_path)):
        print("Persisting partition: {0}".format(closing_date))
        df_final.write.mode("overwrite").parquet(final_path)
    
    print("Finish: df_final")
    
    return df_final

# Orquestador

### Fechas: Se calculan y chequean las fechas dadas para el cálculo del cubo

In [162]:
MONTHS_TO_COMPUTE = get_months_to_compute(spark, 
                                          FIRST_CLOSING_DATE, 
                                          LAST_CLOSING_DATE, 
                                          PROJECT_PATH, 
                                          TAG_DATA,
                                          OVERWRITE_CUBE)
print("Months to compute: {0}".format(MONTHS_TO_COMPUTE))

Months to compute: ['2022-12-31']


#### CHECK: origen de datos

In [163]:
(dates_checked, tables_nodata, tables_fill0) = check_tables()

OK: Existen los datos necesarios para la generanción del cubo


#### DROP TMP: temporales que hay que suprimir para generación del cubo

In [164]:
check_deleted_files(PROJECT_PATH,TAG_DATA,dates_checked)

No está activo el control de borrado de temporales CHECK_TMP


#### CHECK: temporales para origen de datos faltantes

In [165]:
dates_no_del_tmp = []
if(bool(tables_nodata)):
    dates_need_tmp = check_tables_by_tmp(tables_nodata)
    dates_checked =  dates_checked + dates_need_tmp
    dates_no_del_tmp = dates_no_del_tmp + dates_need_tmp
if(bool(tables_fill0)):
    dates_tmp = check_tables_by_tmp(tables_fill0,False)
    for d in dates_tmp:
        del tables_fill0[d]
    dates_no_del_tmp = dates_no_del_tmp + dates_tmp
if (CHECK_TMP==1):
    print('Fechas no se borra tmp ',dates_no_del_tmp)

#### CHECK: Origenes de datos censados que si no existen se marcan sus variables a 0.
Clases que no se deben ejecutar y dejan valores a 0

In [166]:
clases_noexec = get_class_noexec(tables_fill0)

### Generación del cubo: Se calcula y persiste el cubo con las fechas disponibles

In [167]:
print('***** FECHAS que se van a generar: ',sorted(dates_checked),' ******')

***** FECHAS que se van a generar:  ['2022-12-31']  ******


In [168]:
for closing_date in dates_checked:
    print("**** Computing Closing Date: {0}".format(closing_date)+" *******")
    if((CHECK_TMP==1) and (closing_date not in dates_no_del_tmp)):
        hu.delete_list_paths(get_deleted_files(PROJECT_PATH,TAG_DATA,closing_date),True)
        print('--- fin borrado de temporales ---')
        
    if(closing_date in clases_noexec.keys()):
        dict_data =  get_dfs(spark, closing_date, PROJECT_PATH, TAG_DATA,clases_noexec[closing_date])
    else:
        dict_data =  get_dfs(spark, closing_date, PROJECT_PATH, TAG_DATA,[])
    
    df_cube =  get_df_cube(dict_data,closing_date, PROJECT_PATH, TAG_DATA) 
    #new_classes = check_new_classes(spark, df_cube,dict_data)
    #if(len(new_classes)>0):
    #    print('nuevas clases: ',new_classes)
        
    #calc_final_data(spark, closing_date, PROJECT_PATH, TAG_DATA,dict_data,df_cube)
    df_cube = df_cube.dropDuplicates(subset=["customer_id"])
    #stable_path = os.path.join(PROJECT_PATH, TAG_DATA)    
    #final_path = os.path.join(stable_path, "closing_date={}".format(closing_date))
    final_path = os.path.join(path_out, "closing_date={}".format(closing_date))
    
    print('Path de escritura del cubo: ',final_path)
    if (OVERWRITE_CUBE or not hu.check_path(final_path)):
        print("Persisting partition: {0}".format(closing_date))
        df_cube.write.mode("overwrite").parquet(final_path)

**** Computing Closing Date: 2022-12-31 *******
Finish set-up
Calculating
Finish: base_ini
Calculating
Finish: Altas
Calculating
Finish: Comex
Calculating
Finish: Broker
Calculating
Finish: business group
Calculating
Finish: universo
Calculating
Finish: rar
Calculating
Finish: cgm_ind
Calculating
Finish: cgm_imp
Calculating
Finish: valores
Calculating
Finish: activo I
Calculating
Finish: discount
Calculating
Finish: base_ind
Calculating
Finish: insurance
Calculating
Finish: tpv
Calculating
Finish: margen
Calculating
Calculating
Calculating
Calculating
Finish: cards
Calculating
Finish: ingresos
Calculating
Finish: riesgos
Calculating
Finish: cirbe
Calculating
Finish: bec riesgos
Calculating
Finish: num prods
Calculating
Finish: digi
Calculating
Finish: sessions
Calculating
Finish: balances
Calculating
Finish: segsociales




Calculating
Finish: lisboa
Calculating
Finish: CNAE
Calculating
Finish: Click & Pay
Calculating
Finish: Rating
Calculating
Finish: mora
Calculating
Finish: riesgo firma
Calculating
Finish: oneview
Calculating
Finish: becnprods_data
Calculating
Finish: evol
Calculating
Finish: means
['2022-12-31', '2022-11-30', '2022-10-31', '2022-09-30']
Calculating
Finish: plan uno
Path de escritura del cubo:  /data/sandboxes/sfma/data/cma/engines/edc/stable/closing_date=2022-12-31
Persisting partition: 2022-12-31


#### Registros del cubo por fechas

In [181]:
stable_path = path_out
print('Ruta del cubo: ',stable_path)
df_all = spark.read.parquet(stable_path)

df_edc = df_all.groupBy("closing_date").count()
df_edc.orderBy('closing_date').show(df_edc.count())

Ruta del cubo:  /data/sandboxes/sfma/data/cma/engines/edc/stable/
+------------+-------+
|closing_date|  count|
+------------+-------+
|  2019-03-31|1025810|
|  2019-09-30|1029102|
|  2019-10-31|1028889|
|  2019-11-30|1035362|
|  2019-12-31|1031673|
|  2020-01-31|1035669|
|  2020-02-29|1037487|
|  2020-03-31|1031628|
|  2020-04-30|1032903|
|  2020-05-31|1032929|
|  2020-06-30|1025218|
|  2020-07-31|1028390|
|  2020-08-31|1032433|
|  2020-09-30|1030660|
|  2020-10-31|1034820|
|  2020-11-30|1040973|
|  2020-12-31|1035225|
|  2021-01-31|1047906|
|  2021-02-28|1048247|
|  2021-03-31|1044351|
|  2021-04-30|1026706|
|  2021-05-31|1029788|
|  2021-06-30|1033709|
|  2021-07-31|1034774|
|  2021-08-31|1030714|
|  2021-09-30|1031411|
|  2021-10-31|1035209|
|  2021-11-30|1038470|
|  2021-12-31|1038058|
|  2022-01-31|1035741|
|  2022-02-28|1037473|
|  2022-03-31|1049464|
|  2022-04-30|1052680|
|  2022-05-31|1055388|
|  2022-06-30|1058515|
|  2022-07-31|1062653|
|  2022-08-31|1058171|
|  2022-09-30|

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 34910)
Traceback (most recent call last):
  File "/miniconda/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/miniconda/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/miniconda/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/miniconda/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/opt/spark/dist/python/lib/pyspark.zip/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/opt/spark/dist/python/lib/pyspark.zip/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/opt/spark/dist/python/lib/pyspark.zip/pyspark/accumulators.py", line 239, in accum_updates
    num_updates = read_int(sel

In [170]:
prueba =  spark.read.parquet('/data/sandboxes/sfma/data/cma/engines/edc/stable/closing_date=2022-08-31')

In [171]:
prueba2 =  spark.read.parquet('/data/sandboxes/sfma/data/cma/engines/edc/stable/closing_date=2022-07-31')

In [172]:
prueba.select('imp_activo_total','imp_activo_bal','imp_ventas','imp_instr_patrimonio_ilp','imp_otros_activos_ilp','imp_efectivo','imp_ingre_finan_valores').groupBy().sum().show(truncate=False)

+---------------------+--------------------+-------------------+-----------------------------+--------------------------+--------------------+----------------------------+
|sum(imp_activo_total)|sum(imp_activo_bal) |sum(imp_ventas)    |sum(imp_instr_patrimonio_ilp)|sum(imp_otros_activos_ilp)|sum(imp_efectivo)   |sum(imp_ingre_finan_valores)|
+---------------------+--------------------+-------------------+-----------------------------+--------------------------+--------------------+----------------------------+
|2.235103246365418E12 |8.738395742339608E12|4.09388219865345E12|9.517390893209505E9          |1.2218396895870083E10     |9.324011043266374E10|5.954043781096898E9         |
+---------------------+--------------------+-------------------+-----------------------------+--------------------------+--------------------+----------------------------+



In [173]:
prueba2.select('imp_activo_total','imp_activo_bal','imp_ventas','imp_instr_patrimonio_ilp','imp_otros_activos_ilp','imp_efectivo','imp_ingre_finan_valores').groupBy().sum().show(truncate=False)

+---------------------+--------------------+--------------------+-----------------------------+--------------------------+---------------------+----------------------------+
|sum(imp_activo_total)|sum(imp_activo_bal) |sum(imp_ventas)     |sum(imp_instr_patrimonio_ilp)|sum(imp_otros_activos_ilp)|sum(imp_efectivo)    |sum(imp_ingre_finan_valores)|
+---------------------+--------------------+--------------------+-----------------------------+--------------------------+---------------------+----------------------------+
|2.4328580718416567E12|8.750128965394014E12|4.174178770517762E12|1.0313551879340752E10        |9.234546484387152E9       |1.3053357495986902E11|4.33370369146439E9          |
+---------------------+--------------------+--------------------+-----------------------------+--------------------------+---------------------+----------------------------+



In [174]:
prueba.groupBy('seg_global').count().orderBy('seg_global').show(50)

+----------+------+
|seg_global| count|
+----------+------+
|        26| 12046|
|        27|  6987|
|        28| 19271|
|        29| 28998|
|        38|    17|
|        44|  1357|
|        45|   650|
|        46|   501|
|        47|   150|
|        48|   351|
|        49|  3314|
|        57|  2996|
|        58|  1363|
|        59|   223|
|        63|  3482|
|        64| 99163|
|        65|107571|
|        66|110327|
|        67|137841|
|        68|519623|
|        77|   204|
|        78|    30|
|        79|  1706|
+----------+------+



In [175]:
prueba2 =  spark.read.parquet('/data/sandboxes/sfma/data/cma/engines/edc/stable/closing_date=2021-12-31')

In [176]:
prueba2.groupBy('seg_global').count().orderBy('seg_global').show(50)

+----------+------+
|seg_global| count|
+----------+------+
|        01|  1372|
|        02|    40|
|        03|   943|
|        04|   827|
|        05|   128|
|        06|    31|
|        07|  1152|
|        08|   692|
|        09|  2151|
|        31|  6410|
|        32|  2504|
|        33| 17512|
|        34| 47993|
|        35|108066|
|        38|    18|
|        63|  2770|
|        80| 94715|
|        81|275809|
|        82| 75288|
|        83|243926|
|        84| 11475|
|        85| 57205|
|        86| 12115|
|        87| 35274|
|        88|  8431|
|        89| 31211|
+----------+------+



In [177]:
pruebahermes = spark.read.parquet('/data/master/risk/ecrk/data/t_ecrk_risk_hermes_gen_res_m/part_closing_date=2022-01-31')

In [178]:
pruebahermes.where(F.col('customer_id')=='000008264').show()

+-----------+--------------------+-------------------+---------------------+--------------------------+----------------------+------------------------+--------------------+----------------------+-----------------------+----------------------+-------------------+------------------------+------------------+-----------------------+-----------------------+----------------------+---------------------+-----------------------+-----------------------+--------------------+-------------------------+-------------------------+-------------------------+---------------------+------------------------+------------------------+-------------------------+------------------------+------------------------+------------------------+----------------------+--------------------------+--------------------------+--------------------------+--------------------+-----------------------+-----------------------+------------------------+-----------------------+-----------------------+-----------------------+-----------

In [179]:
pruebahermes.where(F.col('customer_id')=='000008264').select('risk_cust_group_type','risk_cust_d_pb_per').show()

+--------------------+------------------+
|risk_cust_group_type|risk_cust_d_pb_per|
+--------------------+------------------+
|                  7 |         0.5200000|
+--------------------+------------------+

