In [18]:
main_dict = {
    'bank':[
        'BBVA BANCOMER',
        'BANORTE',
        'SANTANDER',
        'BANCO NACIONAL DE MEXICO',
        'BANREGIO',
        'BANAMEX',
        'HSBC',
        'BANCO AFIRME',
        'BANCO AZTECA',
        'BANCOPPEL',
        'SCOTIABANK',
        'BANCO REGIONAL DE MONTERREY',
        'BANCO DEL BAJIO',
    ]
}

In [30]:
from typing import Dict
from pathlib import Path

# Ingeniería de variables
from re import sub, UNICODE
from numpy import nan, array
from unicodedata import normalize
from datetime import datetime, date
from difflib import get_close_matches
from pandas import DataFrame, read_csv, to_datetime, qcut

# Modelos
from sklearn.pipeline import Pipeline
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import RobustScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from category_encoders.cat_boost import CatBoostEncoder

class ShoppersChurn:
    def __init__(self, file_name: str='raw_data_MID', main_dict: Dict=main_dict) -> None:
        self.main_dict = main_dict
        self.base_dir = Path.cwd().parent
        self.data_dir = self.base_dir.joinpath('data')
        self.file_name = file_name
        self.file_path = self.data_dir.joinpath(f'{self.file_name}.csv')
        if not self.file_path.is_file():
            print(f'There should be a file called "{self.file_name}" at:\n{self.data_dir}\n\nAdd it and try again!')


    def get_files(self, shopper_id_col: str='shopper_id') -> None:
        data = read_csv(self.file_path, low_memory=False)
        end_of_shopper_data = [x for x,y in enumerate(data.columns) if y=='end_of_shoppers_data'][0]
        self.sh = data.iloc[:,:end_of_shopper_data].drop_duplicates(shopper_id_col)
        self.df = data[[shopper_id_col]].join(data.iloc[:,end_of_shopper_data+1:])


    def clean_text(self, text: str, pattern: str="[^a-zA-Z0-9\s]", lower: bool=False) -> str: 
        '''
        Limpieza de texto
        '''
        # Reemplazar acentos: áàäâã --> a
        clean = normalize('NFD', str(text).replace('\n', ' \n ')).encode('ascii', 'ignore')
        # Omitir caracteres especiales !"#$%&/()=...
        clean = sub(pattern, ' ', clean.decode('utf-8'), flags=UNICODE)
        # Mantener sólo un espacio
        clean = sub(r'\s{2,}', ' ', clean.strip())
        # Minúsculas si el parámetro lo indica
        if lower: clean = clean.lower()
        # Si el registro estaba vacío, indicar nulo
        if clean in ('','nan'): clean = nan
        return clean


    def choose_correct(self, df: DataFrame, col: str, correct_list: list, fill_value: str='Otro', keep_nan: bool=True, replace_col: bool=True, **kwargs) -> DataFrame:
        '''
        Recibe un DataFrame y una lista de posibilidades, especificando la columna a revisar
        elige la opción que más se parezca a alguna de las posibilidades
        '''
        # Aplicar limpieza de texto a la lista de posibilidades
        correct_clean = list(map(lambda x: self.clean_text(x, lower=True), correct_list))+['nan']
        # Hacer un diccionario de posibilidades limpias y las originales recibidas
        correct_dict = dict(zip(correct_clean, correct_list+['nan']))

        # Aplicar la limpieza a la columna especificada
        df[f'{col}_correct'] = df[col].map(lambda x: self.clean_text(x,lower=True))
        # Encontrar las posibilidades más parecidas
        df[f'{col}_correct'] = df[f'{col}_correct'].map(lambda x: get_close_matches(str(x), correct_clean, **kwargs))
        # Si existen parecidas, traer la primera opción que es la más parecida
        df[f'{col}_correct'] = df[f'{col}_correct'].map(lambda x: x[0] if isinstance(x,list) and len(x)>0 else nan)
        # Regresar del texto limpio a la posibilidad original, lo no encontrado se llena con "fill_value"
        df[f'{col}_correct'] = df[f'{col}_correct'].map(correct_dict).fillna(fill_value)
        
        if keep_nan: df[f'{col}_correct'] = df[f'{col}_correct'].map(lambda x: nan if str(x)=='nan' else x)
        if replace_col: df = df.drop(col, axis=1).rename({f'{col}_correct':col}, axis=1)
        return df


    def clean_shopper_data(self, marital_col: str='marital_status', insurance_col: str='insurance', bank_col: str='bank', transport_col: str='transport') -> None:
        df = self.sh.copy()
        df[marital_col] = df[marital_col].map(lambda x: nan if str(x)=='nan' else x.replace(' ',''))

        aux = []
        for x in df[insurance_col]:
            if str(x)=='nan': aux.append(nan)
            else: 
                try: to_append = to_datetime(x, format=r'%d/%m/%y')
                except: 
                    try: to_append = to_datetime(x[:10], format=r'%Y-%m-%d')
                    except: 
                        try: to_append = to_datetime(x[:11], format=r'%d-%b-%Y')
                        except: 
                            print(f'Date: "{x}" was not converted successfully')
                            to_append = nan
                finally: aux.append(to_append)
        df[insurance_col] = aux

        df = self.choose_correct(df, bank_col, self.main_dict[bank_col], n=1, cutoff=0.7)

        df[transport_col] = df[transport_col].map(lambda x: nan if str(x)=='nan' else x.split()[0].title())
        aux = df[transport_col].value_counts(1).to_frame()
        self.main_dict[transport_col] = [x for x,y in zip(aux.index, aux[transport_col]) if y>=0.02]
        df = self.choose_correct(df, transport_col, self.main_dict[transport_col], n=1, cutoff=0.7)

        self.sh = df.copy()
        

    def two_char(self, n): 
        '''
        Función para convertir float: 1.0 --> str: '01.00'
        '''
        return str(round(n,2)).zfill(4)

    def get_bins(self, df: DataFrame, col: str, bin_dict: Dict, replace_col: bool=False) -> DataFrame:
        # Encontrar el bin al cual el dato pertenece
        df[f'rango_{col}'], bin_dict[f'{col}_bins'] = qcut(df[col], q=5, retbins=True, duplicates='drop')
        # Convertirlo a texto: [1.0 - 5.0] --> '01.00 a 05.00'
        df[f'rango_{col}'] = df[f'rango_{col}'].map(lambda x: nan if str(x)=='nan' else self.two_char(x.left)+' to '+self.two_char(x.right))
        if replace_col: df = df.drop(col, axis=1).rename({f'rango_{col}':col}, axis=1)
        return df


    def vars_shopper(self, id_col: str='shopper_id', official_id_col: str='official_id', insurance_col: str='insurance', last_date_col: str='last_date') -> None:
        df = self.sh.set_index(id_col)

        df['birthday'] = to_datetime(df[official_id_col].str[4:10], format=r'%y%m%d')
        df['birthday'] = df['birthday'].map(lambda x: date(x.year-100, x.month, x.day) if x.year>datetime.today().year else x)
        df['age_in_days'] = (datetime.today() - df['birthday']).dt.days

        df['genre'] = df[official_id_col].str[10:11]

        df[last_date_col] = to_datetime(df[last_date_col])
        df['is_churn'] = (datetime.today() - df[last_date_col]).dt.days//7 >= 4
        df['days_for_insurance_exp'] = df[[insurance_col,last_date_col]].apply(lambda x: nan if str(x[0])=='nan' else (x[0] - x[-1]).days, axis=1)
        
        df.drop([official_id_col, 'birthday', insurance_col, last_date_col], axis=1, inplace=True)

        self.main_dict['shop_num_cols'] = df.sample(frac=0.1).describe().columns.tolist()
        self.main_dict['shop_cat_cols'] = [x for x in df.columns if x not in self.main_dict['shop_num_cols']]
        self.main_dict['shop_cat_cols'].remove('is_churn')

        self.main_dict['shop_bin_dict'] = {}
        for col in self.main_dict['shop_num_cols']:
            df = self.get_bins(df, col, self.main_dict['shop_bin_dict'], replace_col=True)
            df[col] = df[col].cat.add_categories('Unknown')

        df.fillna('Unknown', inplace=True)
        self.sh = df.copy()


    def train_model(self, X: DataFrame, y: array, encoder=CatBoostEncoder, scaler=RobustScaler, model=LogisticRegression, **kwargs) -> tuple: 
        '''
        Escala y entrena un modelo, devuelve el score, el objeto tipo Pipeline y la relevancia de cada variable
        '''
        # Conjunto de entrenamiento y de test
        X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.85, random_state=7, shuffle=True)

        # Define los pasos del flujo
        pipe_obj = Pipeline(steps=[
            ('encoder', encoder()),
            ('scaler', scaler()),
            ('model', model(**kwargs))
        ])

        # Entrena y guarda el score en test
        test_score = pipe_obj.fit(X_train,y_train).score(X_test, y_test)
        # Guarda el score en train, para revisar sobreajuste
        train_score = pipe_obj.score(X_train,y_train)

        # Imprime los scores
        self.cool_print(f"Score: {'{:.2%}'.format(test_score)}\nTraining score: {'{:.2%}'.format(train_score)}")

        # Elige la forma de obtener las variables más representativas
        # Ya sea por Regresión Lineal
        try: most_important_features = pipe_obj[-1].coef_ 
        except: 
            # O por Árbol de decisión, Bosque Aleatorio, XGBoost
            try: most_important_features = pipe_obj[-1].feature_importances_
            # De otro modo, solamente asignar un vector de 0s a este objeto
            except: most_important_features = [0]*len(X.columns)

        # Las ordena descendentemente
        coef_var = DataFrame(zip(X.columns, most_important_features)).sort_values(1, ascending=False).reset_index(drop=True)

        # Devuelve el objeto para clustering, la lista de scores tanto en train como en test y la relevancia de cada variable para el modelo 
        return pipe_obj, (test_score,train_score), coef_var

    def shoppers_model(self) -> None:
        df = sc.sh.copy()
        X = df[[x for x in df.columns if x not in ['is_churn']]].copy()
        y = df['is_churn'].values
        model, scores, coefs = self.train_model(X, y)
        self.main_dict['shopper_model'] = model
        print(scores)
        return coefs


sc = ShoppersChurn()
sc.get_files()
sc.clean_shopper_data()
sc.vars_shopper()
sc.shoppers_model()

ValueError: could not convert string to float: 'CAR'