In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
from datetime import datetime
import matplotlib.pyplot as plt
import os
import pickle

from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

import category_encoders as ce

In [None]:
pd.set_option('display.max_columns', None)

In [None]:
#Cargar el csv
df = pd.read_csv("../bin/data_preprocess.csv")

In [None]:
#Calcular en porcentaje de nulos
stats = []
for col in df.columns:
    stats.append((col, df[col].nunique(), df[col].isnull().sum() * 100 / df.shape[0], df[col].value_counts(normalize=True, dropna=False).values[0] * 100, df[col].dtype))
    stats_df = pd.DataFrame(stats, columns=['Columna', 'Valores únicos', '% nulos', '% of values in the biggest category', 'tipo'])
stats_df.sort_values('% nulos', ascending=False)

In [None]:
df = df.dropna(subset=['precio_contado'])

In [None]:
# Eliminamos las columnas con mayoría de nulos y potencia_kw porque es un cálculo de potencia_cv

df = df.drop(columns=['co2', 'consumo_carretera', 'consumo_urbano', 'consumo_medio', 'id_sobrealimentacion', 'potencia_kw'])

In [None]:
columnas_categoricas = ['id_provincia', 'id_concesionario', 'id_distintivo_ambiental','combustible','tipo_cambio', 'id_traccion', 'id_marca', 'id_modelo', 'num_plazas', 'num_puertas']
columnas_numericas = []

for columna in df.columns: 
    if columna not in columnas_categoricas: 
        columnas_numericas.append(columna)

for columna in columnas_categoricas:
    df[columna] = df[columna].astype('category')

## Eliminación outliers

In [None]:
df_sin_outliers = df.copy()

In [None]:
def prueba_outliers_tukey(col, lado_outliers, k=1.5):
    if col not in columnas_numericas:
        print(f'La columna {col} no es numérica o no existe')
        return
    
    q1 = df[col].quantile(0.25)
    q3 = df[col].quantile(0.75)
    ric = q3 - q1

    if lado_outliers == 'derecho':
        lim_l = df[col].min()
        lim_r = q3 + k * ric

    elif lado_outliers == 'izquierdo':
        lim_l = q1 - k * ric
        lim_r = df[col].max()

    elif lado_outliers == 'ambos':
        lim_l = q1 - k * ric
        lim_r = q3 + k * ric

    else: 
        print(f'No has indicado un lado válido para los outliers. Usa "derecho", "izquierdo", o "ambos".')
        return
    
    porcentaje = df[df[col].between(lim_l, lim_r) | df[col].isna()].shape[0] / df.shape[0] * 100
    print(lim_l, lim_r)
    print(f'Se mantienen el {porcentaje:.2f}% de los datos en la columna {col} después de eliminar los outliers.')
    
    fig = plt.figure(figsize=(14, 6))
    gs = fig.add_gridspec(1, 2)
        
    ax1 = fig.add_subplot(gs[0, 0])
    sns.histplot(df[col], kde=True, ax=ax1)
    plt.axvline(lim_l, color='red')
    plt.axvline(lim_r, color='red')
    ax1.set_title(f'Histograma de {col}')
    ax1.set_xlabel(col)
        
    ax2 = fig.add_subplot(gs[0, 1])
    sns.boxplot(x=df[col], ax=ax2)
    plt.axvline(lim_l, color='red')
    plt.axvline(lim_r, color='red')
    ax2.set_title(f'Boxplot de {col}')
    ax2.set_xlabel(col)
        
    plt.tight_layout()
    plt.show()

In [None]:
def prueba_outliers_z(col, z=3):
    
    if col not in columnas_numericas:
        print(f'La columna {col} no es numérica o no existe')
        
    mean = df[col].mean()
    std = df[col].std()
    
    lim_l = mean - z*std
    lim_r = mean + z*std
    
    porcentaje = df[df[col].between(lim_l, lim_r) | df[col].isna()].shape[0] / df.shape[0] * 100
    print(f'Se mantienen el {porcentaje:.2f}% de los datos en la columna {col} después de eliminar los outliers.')
    
    fig = plt.figure(figsize=(14, 6))
    gs = fig.add_gridspec(1, 2)
        
    ax1 = fig.add_subplot(gs[0, 0])
    sns.histplot(df[col], kde=True, ax=ax1)
    plt.axvline(lim_l, color='red')
    plt.axvline(lim_r, color='red')
    ax1.set_title(f'Histograma de {col}')
    ax1.set_xlabel(col)
        
    ax2 = fig.add_subplot(gs[0, 1])
    sns.boxplot(x=df[col], ax=ax2)
    plt.axvline(lim_l, color='red')
    plt.axvline(lim_r, color='red')
    ax2.set_title(f'Boxplot de {col}')
    ax2.set_xlabel(col)
        
    plt.tight_layout()
    plt.show()

In [None]:
def outliers_tukey(df_sin_outliers, col, lado_outliers, k=1.5):
    
    if col not in columnas_numericas:
        print(f'La columna {col} no es numérica o no existe')

    q1 = df_sin_outliers[col].quantile(0.25)
    q3 = df_sin_outliers[col].quantile(0.75)
    ric = q3 - q1

    if lado_outliers == 'derecho':
        lim_l = df_sin_outliers[col].min()
        lim_r = q3 + k * ric

    elif lado_outliers == 'izquierdo':
        lim_l = q1 - k * ric
        lim_r = df_sin_outliers[col].max()

    elif lado_outliers == 'ambos':
        lim_l = q1 - k * ric
        lim_r = q3 + k * ric

    else:
        print(f'Lado especificado incorrectamente. Usa "derecho", "izquierdo" o "ambos".')

    df_sin_outliers = df_sin_outliers[df_sin_outliers[col].between(lim_l, lim_r) | df_sin_outliers[col].isna()]
    porcentaje = df_sin_outliers.shape[0] / df.shape[0] * 100
    print(f'Se mantienen el {porcentaje:.2f}% de los datos en la columna {col} después de eliminar los outliers.')

    return df_sin_outliers

In [None]:
def outliers_z_score(df_sin_outliers, col, z = 3):
    if col not in columnas_numericas:
        print(f'La columna {col} no es numérica o no existe')
        
    mean = df_sin_outliers[col].mean()
    std = df_sin_outliers[col].std()
    
    lim_l = mean - z*std
    lim_r = mean + z*std
    
    df_sin_outliers = df_sin_outliers[df_sin_outliers[col].between(lim_l, lim_r) | df_sin_outliers[col].isna()]
    porcentaje = df_sin_outliers.shape[0] / df.shape[0] * 100
    print(f'Se mantienen el {porcentaje:.2f}% de los datos en la columna {col} después de eliminar los outliers.')
    
    return df_sin_outliers

In [None]:
prueba_outliers_tukey('cilindrada', 'derecho')

In [None]:
prueba_outliers_z('peso')

In [None]:
df_sin_outliers = outliers_tukey(df_sin_outliers,'kilometraje', 'derecho')
df_sin_outliers = outliers_tukey(df_sin_outliers,'garantia', 'derecho')
df_sin_outliers = outliers_tukey(df_sin_outliers,"potencia_cv", 'derecho', 2.5)
df_sin_outliers = outliers_tukey(df_sin_outliers,"cilindrada", 'derecho')

In [None]:
df_sin_outliers = outliers_z_score(df_sin_outliers,'precio_contado',3)
df_sin_outliers = outliers_z_score(df_sin_outliers,'precio_nuevo',3)
df_sin_outliers = outliers_z_score(df_sin_outliers,'batalla',3)
df_sin_outliers = outliers_z_score(df_sin_outliers,'capacidad_maletero',3)
df_sin_outliers = outliers_z_score(df_sin_outliers,'velocidad_max',3)
df_sin_outliers = outliers_z_score(df_sin_outliers,'peso',3)

## Imputación de nulos

In [None]:
#Calcular en porcentaje de nulos
stats = []
for col in df_sin_outliers.columns:
    stats.append((col, df_sin_outliers[col].nunique(), df_sin_outliers[col].isnull().sum() * 100 / df_sin_outliers.shape[0], df_sin_outliers[col].value_counts(normalize=True, dropna=False).values[0] * 100, df_sin_outliers[col].dtype))
    stats_df = pd.DataFrame(stats, columns=['Columna', 'Valores únicos', '% nulos', '% of values in the biggest category', 'tipo'])
stats_df.sort_values('% nulos', ascending=False)

In [None]:
#Funcion Imputacion de datos

def imputar_datos(df):
    
    #Inicializar Knn para las columnas numericas
    knn_imputer = KNNImputer(n_neighbors=10)
    
    #Aplicar knn imputer a las columnas numericas
    df_numericas = df[columnas_numericas]
    df_imputado_numericas = knn_imputer.fit_transform(df_numericas)
    
    #Convertir los datos imputados a df
    df_imputado_numericas = pd.DataFrame(df_imputado_numericas, columns=columnas_numericas)
    
    #Inicializar SimpleImputer para las columnas categoricas
    simple_imputer = SimpleImputer(strategy='most_frequent')
    
    #Aplicar SimpleImputer para las columnas categoricas
    df_categoricas = df[columnas_categoricas]
    df_imputado_categoricas = simple_imputer.fit_transform(df_categoricas)
    
    #Convertir las columnas categoricas imputadas a df
    df_imputado_categoricas = pd.DataFrame(df_imputado_categoricas,columns=columnas_categoricas)
    
    #Combinamos los datos imputados
    df_imputado = pd.concat([df_imputado_numericas, df_imputado_categoricas], axis =1)
    
    #verificamos que no hay valores nulos
    print(df_imputado.isnull().sum())
    
    return df_imputado

In [None]:
#Imputacion de datos
df_sin_nulos = imputar_datos(df_sin_outliers)

#Guardamos el df si es necesario
#df_imputado.to_csv('Coches-segunda-mano/bin/df_imputado.csv', index=False)

## Transformaciones

In [None]:
df_transformado = df_sin_nulos

In [None]:
plt.figure(figsize=(12, 6))
sns.histplot(df_sin_nulos['precio_contado'], kde=True)
plt.title('Histograma de precio_contado')
plt.xlabel('Precio Contado')
plt.xticks(rotation=45, ha='right')
plt.show()

In [None]:
# Aplicamos transformación logarítmica a las siguientes columnas para que no afecte tanto la varianza de los datos

df_transformado['log_precio_contado'] = np.log(df_transformado['precio_contado'])
df_transformado['log_kilometraje'] = np.log(df_transformado['kilometraje'])
df_transformado['log_precio_nuevo'] = np.log(df_transformado['precio_nuevo'])

In [None]:
# Creamos métricas derivadas de los datos que ya tenemos para reducir el número de columnas

df_transformado['volumen'] = df_transformado['largo'] * df_transformado['ancho'] * df_transformado['alto']

In [None]:
# Creamos la columna fecha_matriculacion para calcular la antigüedad del coche en años

df_transformado['fecha_matriculacion'] = (
    '01/' + df_transformado['mes_matriculacion'].astype(int).astype(str) + 
    '/' + df_transformado['ano_matriculacion'].astype(int).astype(str)
)
df_transformado['fecha_matriculacion'] = pd.to_datetime(df_transformado['fecha_matriculacion'], format='%d/%m/%Y')
current_date = pd.to_datetime(datetime.now())

df_transformado['antiguedad_coche'] = ((current_date - df_transformado['fecha_matriculacion']).dt.days / 365.25).round(2)

In [None]:
df_transformado.head()

In [None]:
df_transformado = df_transformado.drop(columns=['largo', 'ancho', 'alto', 'mes_matriculacion', 'fecha_matriculacion', 'precio_contado', 'kilometraje', 'precio_nuevo'])

df['original_precio_contado'] = np.exp(df['log_precio_contado'])

In [None]:
df_transformado.head()

## Encoding

In [None]:
#Calcular en porcentaje de nulos
stats = []
for col in df_transformado.columns:
    stats.append((col, df_transformado[col].nunique(), df_transformado[col].isnull().sum() * 100 / df_transformado.shape[0], df_transformado[col].value_counts(normalize=True, dropna=False).values[0] * 100, df_transformado[col].dtype))
    stats_df = pd.DataFrame(stats, columns=['Columna', 'Valores únicos', '% nulos', '% of values in the biggest category', 'tipo'])
stats_df.sort_values('% nulos', ascending=False)

In [None]:
#Aplica los encoders y los guarda en archivos pickle.
def aplicar_encoders(df_transformado,col, encoder):
    
    # Crear la carpeta si no existe 
    os.makedirs('encoders/', exist_ok=True)  
    
    if encoder == 'one hot encoder':
        
        encoder = OneHotEncoder(sparse_output=False, drop='first')
        encoded = encoder.fit_transform(df_transformado[[col]])
        one_hot_df = pd.DataFrame(encoded, columns=encoder.get_feature_names_out([col]), index=df_transformado.index)
        
        df_transformado = pd.concat([df_transformado, one_hot_df], axis=1)
        df_transformado.drop(columns=[col], inplace=True)
        
    elif encoder == 'label encoder':
        
        encoder = LabelEncoder()
        df_transformado[col] = encoder.fit_transform(df_transformado[col].values)
        
    elif encoder == 'target encoder':
        encoder = ce.TargetEncoder()
        df_transformado[col] = encoder.fit_transform(df_transformado[col], df_transformado['log_precio_contado'])
        
    elif encoder == 'frequency encoder':
        frequency_encoding = df_transformado[col].value_counts().to_dict()
        df_transformado[col] = df_transformado[col].map(frequency_encoding)
        
    else: 
        print('La columna no existe o no has seleccionado el encoder correctamente. Las opciones son:\n'
              '  - one hot encoder\n'
              '  - label encoder\n'
              '  - target encoder\n'
              '  - frequency encoder')
        
        # Guardar el encoder en un archivo pickle
    with open(os.path.join('encoders/', f"{col}_encoder.pickle"), "wb") as file:
        pickle.dump(encoder, file)
        
    print(f'Encoder aplicado a {col}')
    
    return df_transformado

In [None]:
df_encoded = aplicar_encoders(df_transformado, 'id_provincia', 'frequency encoder')

df_encoded = aplicar_encoders(df_transformado, 'id_concesionario', 'target encoder')

df_encoded = aplicar_encoders(df_transformado, 'tipo_cambio', 'label encoder')

df_encoded = aplicar_encoders(df_transformado, 'id_marca', 'target encoder')

df_encoded = aplicar_encoders(df_transformado, 'id_modelo', 'target encoder')

In [None]:
encoder = OneHotEncoder(sparse_output=False, drop='first')

In [None]:
encoded_combustible = encoder.fit_transform(df_encoded[['combustible']])
encoded_combustible_df = pd.DataFrame(encoded_combustible, columns=encoder.get_feature_names_out(['combustible']), index=df_encoded.index)

df_encoded = pd.concat([df_encoded, encoded_combustible_df], axis=1)

In [None]:
one_hot_columns = list(encoded_combustible_df.columns)

In [None]:
encoded_traccion = encoder.fit_transform(df_encoded[['id_traccion']])
encoded_traccion_df = pd.DataFrame(encoded_traccion, columns=encoder.get_feature_names_out(['id_traccion']), index=df_encoded.index)

df_encoded = pd.concat([df_encoded, encoded_traccion_df], axis=1)

In [None]:
one_hot_columns.extend(list(encoded_traccion_df.columns))

In [None]:
encoded_dist = encoder.fit_transform(df_encoded[['id_distintivo_ambiental']])
encoded_dist_df = pd.DataFrame(encoded_dist, columns=encoder.get_feature_names_out(['id_distintivo_ambiental']), index=df_encoded.index)

df_encoded = pd.concat([df_encoded, encoded_dist_df], axis=1)

In [None]:
one_hot_columns.extend(list(encoded_dist_df.columns))

In [None]:
df_encoded = df_encoded.drop(columns=['id_traccion', 'combustible', 'id_distintivo_ambiental'])

## Escalar datos

In [None]:
#Crear una carpeta para guardar el objeto escalador
os.makedirs("escaladores", exist_ok=True)

In [None]:
x = df_encoded.drop('log_precio_contado', axis=1)  
y = df_encoded['log_precio_contado']

In [None]:
cols_x = list(x.columns)


In [None]:
# Creamos los scalers, escalamos los datos y guardamos el scaler en un archivo .pickle

x_scaler = StandardScaler()
x = x_scaler.fit_transform(x)
nombre_pickle_x = "x_scaler.pickle"
ruta_completa_x = os.path.join("escaladores/", nombre_pickle_x)
with open(ruta_completa_x, "wb") as file:
    pickle.dump(x_scaler, file)

y_scaler = StandardScaler()
y = y_scaler.fit_transform(y.values.reshape(-1, 1))
nombre_pickle_y = "y_scaler.pickle"
ruta_completa_y = os.path.join("escaladores/", nombre_pickle_y)
with open(ruta_completa_y, "wb") as file:
    pickle.dump(y_scaler, file)


In [None]:
x_df = pd.DataFrame(x, columns=cols_x) 
y_df = pd.DataFrame(y, columns=['log_precio_contado'])

df_escalado = pd.concat([x_df, y_df], axis=1)

In [None]:
df_escalado.head()

In [None]:
#Guardar el DataFrame con las columnas transformadas
df_escalado.to_csv('../bin/data_process.csv', index=False)