In [None]:
import numpy as np
import pandas as pd
import re
import torch
import lightgbm as lgbm

from datasets import Dataset
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import (
    OneHotEncoder,
    StandardScaler,
    LabelEncoder,
)
from transformers import AutoTokenizer, AutoModel

# -------------------------------------
# Display
# -------------------------------------
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
pd.set_option('display.max_colwidth', None)

# Load the data and split into a training and test dataset

In [None]:
wine_df = pd.read_csv("data/wine_data.csv")
bins = [0, 87, 94, np.inf]
names = ["neutral", "good", "excellent"]

wine_df["rating"] = pd.cut(wine_df["points"], bins, labels=names)

In [None]:
NUMERICAL_FEATURE = "price"
CATEGORICAL_FEATURE = "variety"
TEXT_FEATURE = "description"
TARGET = "rating"
FEATURES = [TEXT_FEATURE, NUMERICAL_FEATURE, CATEGORICAL_FEATURE]

wine_df = wine_df[FEATURES + [TARGET]]

In [None]:
train_df, test_df = train_test_split(wine_df, test_size=0.2)

# Preprocessing
- preprocess numerical and categorical variables 
- tokenize text data
- extract vector representation of the text

In [None]:
def preprocess_number():
    return make_pipeline(
        SimpleImputer(strategy="median"),
        StandardScaler(),
    )

def preprocess_categories():
    return make_pipeline(
       SimpleImputer(strategy="constant", fill_value="other", missing_values=np.nan),
       OneHotEncoder(handle_unknown="ignore", sparse_output=False),
    )

def create_preprocessor():

    transformers = [
        ("num_preprocessor", preprocess_number(), [NUMERICAL_FEATURE]),
        ("cat_preprocessor", preprocess_categories(), [CATEGORICAL_FEATURE]),
    ]

    return ColumnTransformer(transformers=transformers, remainder="drop")


column_transformer = create_preprocessor()
column_transformer.set_output(transform="pandas")
preprocessed_num_cat_features_df = column_transformer.fit_transform(
    train_df[[NUMERICAL_FEATURE, CATEGORICAL_FEATURE]]
)


In [None]:
MODEL_NAME = "distilbert-base-uncased"

def tokenized_pytorch_tensors(
        df: pd.DataFrame,
        column_list: list
    ) -> Dataset:

    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    transformers_dataset = Dataset.from_pandas(df)

    def tokenize(model_inputs_batch: Dataset) -> Dataset:
        return tokenizer(
            model_inputs_batch[TEXT_FEATURE],
            padding=True,
            max_length=120,
            truncation=True,
        )

    tokenized_dataset = transformers_dataset.map(
        tokenize,
        batched=True,
        batch_size=128
    )

    tokenized_dataset.set_format(
        "torch",
        columns=column_list
    )

    columns_to_remove = set(tokenized_dataset.column_names) - set(column_list)

    tokenized_dataset = tokenized_dataset.remove_columns(list(columns_to_remove))

    return tokenized_dataset

print("Tokenize text in Dataset of Pytorch tensors")
train_df[TEXT_FEATURE] = train_df[TEXT_FEATURE].fillna("")
tokenized_df = tokenized_pytorch_tensors(
    train_df[[TEXT_FEATURE]],
    column_list=["input_ids", "attention_mask"]
)


In [None]:
def hidden_state_from_text_inputs(df) -> pd.DataFrame:

    def extract_hidden_states(batch):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
        model = AutoModel.from_pretrained(MODEL_NAME)

        # keys are input_ids and attention_mask
        # values are both tensor[batch_size, max_number_of_tokens_in_batch]
        inputs = {
            k: v.to(device)
            for k, v in batch.items()
            # this avoids us trying to pass in other features in the dataset
            if k in tokenizer.model_input_names
        }

        # turn off gradient calculation as we don't need it
        with torch.no_grad():
            # final output of the model, the representation of the text tokens
            # use ** as the model takes input_ids and attention_mask arguments
            last_hidden_state = model(**inputs).last_hidden_state
            # get the CLS token, which is the first one
            # [:, 0] gives us a row for each batch with the first column of 768 for each
            return {"cls_hidden_state": last_hidden_state[:, 0].cpu().numpy()}

    cls_dataset = df.map(extract_hidden_states, batched=True, batch_size=128)
    cls_dataset.set_format(type="pandas")

    return pd.DataFrame(
        cls_dataset["cls_hidden_state"].to_list(),
        columns=[f"feature_{n}" for n in range(1, 769)],
    )

print("Extract text feature hidden state")
hidden_states_df = hidden_state_from_text_inputs(tokenized_df)
print(f"Data with hidden state shape: {hidden_states_df.shape}")


In [None]:
print("Saving preprocessed features and targets")
preprocessed_data = pd.concat(
    [
        preprocessed_num_cat_features_df,
        hidden_states_df,
        train_df[TARGET]
    ],
    axis=1
)


# Train the lightGBM model

In [None]:
#  encode target and rename features
TARGET_CATEGORIES = ["negative", "positive", "neutral"]
le = LabelEncoder().fit(TARGET_CATEGORIES)
preprocessed_data["encoded_target"] = le.transform(preprocessed_data[TARGET])

In [None]:
import lightgbm as lgbm

features = [col for col in list(preprocessed_data.columns) if col not in [TARGET, "encoded_target"]]

# create the model
lgbm_clf = lgbm.LGBMClassifier(
    n_estimators=100,
    max_depth=10,
    num_leaves=10,
    objective="multiclass",
)

lgbm_clf.fit(preprocessed_data[features], preprocessed_data["encoded_target"])


# Evaluate the model

In [None]:
preprocessed_num_cat_features_test_df = column_transformer.transform(
        test_df[[NUMERICAL_FEATURE, CATEGORICAL_FEATURE]]
)

# preprocess the text column
test_df[TEXT_FEATURE] = test_df[TEXT_FEATURE].fillna("")
tokenized_test_text_df = tokenized_pytorch_tensors(
    test_df[[TEXT_FEATURE]],
    column_list=["input_ids", "attention_mask"]
)

#  extract last hidden state
hidden_states_test_df = hidden_state_from_text_inputs(tokenized_test_text_df)

preprocessed_eval_df = pd.concat(
    [
        preprocessed_num_cat_features_test_df,
        hidden_states_test_df,
        test_df[TARGET]
    ],
    axis=1
)


In [None]:
# generate predictions
preprocessed_eval_df = preprocessed_eval_df.rename(
        columns = lambda x:re.sub("[^A-Za-z0-9_]+", "", x)
)

actual = preprocessed_eval_df[TARGET].values
predictions = lgbm_clf.predict(preprocessed_eval_df[features])
decoded_predictions = le.inverse_transform(predictions)

accuracy_score(actual, decoded_predictions)
