In [1]:
import pandas as pd
from dataclasses import dataclass
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense, Dropout, Embedding, Flatten, Concatenate, BatchNormalization

In [3]:
numerical_columns_names = [
            'vehicle_mileage', 'vehicle_year', 'vehicle_doors',
            'vehicle_length', 'vehicle_trunk_volume', 'vehicle_power_din',
            'vehicle_rated_horse_power', 'vehicle_max_power', 'vehicle_consumption',
            'vehicle_co2', 'price', 'initial_price', 'vehicle_price_new'
        ]
categorical_columns_names = [
            'customer_type', 'vehicle_category', 'vehicle_make', 'vehicle_model',
            'vehicle_version', 'vehicle_gearbox', 'vehicle_energy', 'vehicle_origin', 'vehicle_external_color', 'vehicle_internal_color',
            'vehicle_four_wheel_drive', 'vehicle_pollution_norm',
            'vehicle_condition', 'vehicle_motorization', 'vehicle_trim_level', "vehicle_commercial_name"
        ]

dataframe = pd.read_csv("../data/classified_23_06_2025.csv", nrows=100)
prediction_df = dataframe.sample(n=1)

In [4]:
from abc import ABC, abstractmethod
import dataclasses
import json
from typing import Dict, List


class Column(ABC):
    @abstractmethod
    def transform(self, series: pd.Series) -> pd.Series:
        pass


@dataclass
class CategoricalColumn (Column):
    name: str
    vocabulary: Dict[str, int]
    value_used_to_fill_na: str = "UNK"  
    embedding_dim: int = 32

    def __post_init__(self):
        self.vocabulary = {**self.vocabulary, self.value_used_to_fill_na: len(self.vocabulary)}

    @classmethod
    def from_vocabulary(cls, name: str, vocabulary: Dict[str, int]) -> "CategoricalColumn":
        return cls(
            name=name,
            vocabulary=vocabulary,
            embedding_dim=cls.infer_embedding_dim(vocabulary)
        )
    
    @classmethod
    def from_series(cls, series: pd.Series) -> "CategoricalColumn":
        series.fillna(cls.value_used_to_fill_na, inplace=True)
        vocabulary = {value: index for index, value in enumerate(series.unique())}
        return cls(
            name=series.name,
            vocabulary=vocabulary,
            embedding_dim=cls.infer_embedding_dim(vocabulary)
        )

    @staticmethod
    def infer_embedding_dim(vocabulary: Dict[str, int]) -> int:
        if len(vocabulary) < 30:
            return 10
        else:
            return 32

    def transform(self, series: pd.Series) -> pd.Series:
        series.fillna(self.value_used_to_fill_na, inplace=True)
        return series.map(self.vocabulary)

    def save(self, path: str) -> None:
        dict_to_save = dataclasses.asdict(self)
        with open(path, "w") as f:
            json.dump(dict_to_save, f)

    @classmethod
    def load(cls, path: str) -> "CategoricalColumn":
        with open(path, "r") as f:
            return cls(**json.load(f))

@dataclass
class CategoricalColumns:
    columns: Dict[str, CategoricalColumn]

    @classmethod
    def from_names(cls, dataframe: pd.DataFrame, columns: List[str]) -> "CategoricalColumns":
        return cls(
            columns={column: CategoricalColumn.from_series(dataframe[column]) for column in columns}
        )
    
    @classmethod
    def from_categorical_columns(cls, categorical_columns: List[CategoricalColumn]) -> "CategoricalColumns":
        return cls(
            columns={column.name: column for column in categorical_columns}
        )

@dataclass
class NumericalColumn(Column):
    name: str
    value_used_to_fill_na: float | int
    mean: float
    std: float = 1.0

    @classmethod
    def from_precomputed(cls, name: str, mean: float, std: float) -> "NumericalColumn":
        return cls(
            name=name,
            value_used_to_fill_na=mean,
            mean=mean,
            std=std
        )

    @classmethod
    def from_series(cls, series: pd.Series) -> "NumericalColumn":
        mean = series.mean() 
        series.fillna(mean, inplace=True)
        std = series.std()
        return NumericalColumn(
            name=series.name,
            value_used_to_fill_na=mean,
            mean=mean,
            std=std
        )
    
    def transform(self, series: pd.Series) -> pd.Series:
        series.fillna(self.value_used_to_fill_na, inplace=True)
        return (series - self.mean) / self.std

    def save(self, path: str) -> None:
        dict_to_save = dataclasses.asdict(self)
        with open(path, "w") as f:
            json.dump(dict_to_save, f)

    @classmethod
    def load(cls, path: str) -> "NumericalColumn":
        with open(path, "r") as f:
            return NumericalColumn(**json.load(f))

@dataclass
class NumericalColumns:
    columns: Dict[str, NumericalColumn]
    numerical_dimensions: int

    @classmethod
    def from_names(cls, dataframe: pd.DataFrame, columns: List[str]) -> "NumericalColumns":
        return cls(
            columns={column: NumericalColumn.from_series(dataframe[column]) for column in columns},
            numerical_dimensions=len(columns)
        )
    
    @classmethod
    def from_numerical_columns(cls, numerical_columns: List[NumericalColumn]) -> "NumericalColumns":
        return cls(
            columns={column.name: column for column in numerical_columns},
            numerical_dimensions=len(numerical_columns)
        )



In [5]:
import os
import pandas as pd

@dataclass
class DatasetAnalysis:
    numerical_columns: NumericalColumns
    categorical_columns: CategoricalColumns

    def get_analysis(self) -> Dict[str, float]:
        return {
            "numerical_columns_names": self.numerical_columns,
            "categorical_columns_names": self.categorical_columns
        }

class DatasetPreprocessor:
    def __init__(self, numerical_columns_names: List[str], categorical_columns_names: List[str]):
        self.numerical_columns_names = numerical_columns_names
        self.categorical_columns_names = categorical_columns_names

    def fit(self, dataframe: pd.DataFrame) -> None:
        print("Fitting dataset preprocessor")
        self.numerical_columns = NumericalColumns.from_names(dataframe, columns=self.numerical_columns_names)
        self.categorical_columns = CategoricalColumns.from_names(dataframe, columns=self.categorical_columns_names)

    def get_analysis(self) -> DatasetAnalysis:
        return DatasetAnalysis(self.numerical_columns, self.categorical_columns)

    def preprocess(self, dataframe: pd.DataFrame) -> Dict[str, pd.Series]:
        transformed_data = dataframe[self.numerical_columns_names + self.categorical_columns_names].copy()

        for column in self.numerical_columns.columns:
            transformed_data[column] = self.numerical_columns.columns[column].transform(transformed_data[column])

        for column in self.categorical_columns.columns:
            transformed_data[column] = self.categorical_columns.columns[column].transform(transformed_data[column])

        return {
            "numerical_inputs_features": transformed_data[self.numerical_columns_names].values,
            **{
                feature_name: transformed_data[feature_name].values
                for feature_name in self.categorical_columns_names
            }
        }
    
    def preprocess_target(self, dataframe: pd.DataFrame) -> Dict[str, pd.Series]:
        transformed_data = dataframe[self.numerical_columns_names + self.categorical_columns_names].copy()

        for column in self.numerical_columns.columns:
            transformed_data[column] = self.numerical_columns.columns[column].transform(transformed_data[column])

        for column in self.categorical_columns.columns:
            transformed_data[column] = self.categorical_columns.columns[column].transform(transformed_data[column])

        return {
            "numerical_outputs": transformed_data[self.numerical_columns_names].values,
            **{
                feature_name + "_outputs": transformed_data[feature_name].values
                for feature_name in self.categorical_columns_names
            }
        }

    # Move to dedicated infrastructure class
    def save(self, directory: str) -> None:
        os.makedirs(directory, exist_ok=True)
        
        for column_name, column in self.numerical_columns.columns.items():
            column.save(f"{directory}/{column_name}.json")

        for column_name, column in self.categorical_columns.columns.items():
            column.save(f"{directory}/{column_name}.json")

    @classmethod
    def from_columns(cls, numerical_columns: List[NumericalColumn], categorical_columns: List[CategoricalColumn]) -> "DatasetPreprocessor":
        return cls(numerical_columns, categorical_columns)

In [6]:
dataset_preprocessor = DatasetPreprocessor(numerical_columns_names, categorical_columns_names)
dataset_preprocessor.fit(dataframe)

Fitting dataset preprocessor


KeyError: 'vehicle_trim_level'

In [30]:
numerical_columns = NumericalColumns.from_names(dataframe, columns=numerical_columns_names)
categorical_columns = CategoricalColumns.from_names(dataframe, columns=categorical_columns_names)

In [7]:
inputs = {}
embeddings = []

FINAL_BOTTLENECK_DIM = 32
HIDDEN_LAYERS_DIM = [128, 64, 32]

# Encoding part
numerical_inputs_layer = Input(shape=(len(numerical_columns.columns),), name="numerical_inputs_features")
numerical_inputs_layer = BatchNormalization()(numerical_inputs_layer)
inputs["numerical_inputs_features"] = numerical_inputs_layer
embeddings.append(numerical_inputs_layer)

for feature_name, feature in categorical_columns.columns.items():
    categorical_input_layer = Input(shape=(1,), name=feature_name)
    inputs[feature_name] = categorical_input_layer

    embedding_layer = Embedding(
        input_dim=len(feature.vocabulary) + 1,
        output_dim=feature.embedding_dim,
        name=f"{feature_name}_embedding"
    )(categorical_input_layer)

    embedding_layer = Flatten(name=f"{feature_name}_embedding_flatten")(embedding_layer)
    embeddings.append(embedding_layer)

all_features_layer = Concatenate()(embeddings)

for index, hidden_layer_dim in enumerate(HIDDEN_LAYERS_DIM):
    all_features_layer = Dense(units=hidden_layer_dim, activation="relu", name=f"hidden_layer_{index}")(all_features_layer)

 
bottleneck_layer = Dense(units=FINAL_BOTTLENECK_DIM, activation="tanh", name="bottleneck_layer")(all_features_layer)

# Decoding part

first_decoding_layer = Dense(units=64, activation="relu", name="first_decoding_layer")(bottleneck_layer)

for index, hidden_layer_dim in enumerate(reversed(HIDDEN_LAYERS_DIM)):
    first_decoding_layer = Dense(units=hidden_layer_dim, activation="relu", name=f"decoding_layer_{index}")(first_decoding_layer)
    first_decoding_layer = Dropout(0.2)(first_decoding_layer)

outputs = {}

numerical_outputs = Dense(units=len(numerical_columns.columns), name="numerical_outputs")(first_decoding_layer)
outputs["numerical_outputs"] = numerical_outputs

for feature_name, feature in categorical_columns.columns.items():
    categorical_output_layer = Dense(units=len(feature.vocabulary) + 1, name=f"{feature_name}_outputs", activation="softmax")(first_decoding_layer)
    outputs[f"{feature_name}_outputs"] = categorical_output_layer

autoencoder = Model(inputs=inputs, outputs=outputs)
encoder = Model(inputs=inputs, outputs=bottleneck_layer)


losses = {}
loss_weights = {}

losses["numerical_outputs"] = "mse"
loss_weights["numerical_outputs"] = 1.0

for feature_name in categorical_columns.columns:
    losses[f"{feature_name}_outputs"] = "sparse_categorical_crossentropy"
    loss_weights[f"{feature_name}_outputs"] = 1.0

autoencoder.compile(optimizer="adam", loss=losses, loss_weights=loss_weights)
autoencoder.save("autoencoder.keras")
model = load_model("autoencoder.keras")

NameError: name 'numerical_columns' is not defined

In [8]:
x = dataset_preprocessor.preprocess(dataframe)
y = dataset_preprocessor.preprocess_target(dataframe)

KeyError: "['vehicle_trim_level'] not in index"

In [9]:
autoencoder.fit(x=x, y=y, epochs=10, batch_size=100, validation_split=0.2)

NameError: name 'autoencoder' is not defined

In [297]:
encoder.predict(x)

[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step


array([[ 8.9397067e-01,  9.6709681e-01,  7.9613245e-01, ...,
        -1.7263100e-01,  6.7366630e-01, -1.4135163e-02],
       [ 9.3868989e-01,  9.8516482e-01,  9.1622311e-01, ...,
        -2.9181448e-01,  8.3701688e-01, -1.4009577e-01],
       [ 8.8373524e-01,  9.5905167e-01,  7.8330857e-01, ...,
        -1.5059380e-01,  6.3525021e-01,  1.7593242e-04],
       ...,
       [ 9.3370491e-01,  9.7682655e-01,  8.9947945e-01, ...,
        -2.0524356e-01,  8.1432486e-01, -1.5843923e-01],
       [ 7.3241997e-01,  8.7240243e-01,  6.4386541e-01, ...,
        -1.5077646e-02,  5.8866918e-01,  1.6981578e-02],
       [ 9.7714382e-01,  9.9614549e-01,  9.7372222e-01, ...,
        -1.9887161e-01,  9.2300165e-01, -3.5595196e-01]], dtype=float32)

In [43]:
encoder.predict(dataset_preprocessor.preprocess(prediction_df))

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 129ms/step


array([[-0.09129161, -0.3003128 ,  0.14279275, -0.0788637 , -0.20313236,
         0.16591106,  0.23145911, -0.4382897 , -0.11219118, -0.11739425,
        -0.17356685, -0.3678083 , -0.12830657,  0.19102648, -0.2664427 ,
        -0.06730663,  0.41341332,  0.37312272, -0.09601668, -0.01168428,
         0.1544762 , -0.08641091, -0.00380521,  0.13412751, -0.07791837,
         0.5365318 ,  0.531186  , -0.08955383,  0.05663909,  0.40029883,
        -0.31981412,  0.22963767]], dtype=float32)