In [1]:
import pandas as pd
from datasets import load_dataset

In [2]:
metadata_raw = load_dataset("McAuley-Lab/Amazon-Reviews-2023", "raw_meta_Books", trust_remote_code=True)

Downloading data:   0%|          | 0.00/14.7G [00:00<?, ?B/s]

Generating full split: 0 examples [00:00, ? examples/s]

Loading dataset shards:   0%|          | 0/28 [00:00<?, ?it/s]

In [None]:
metadata_raw_df = metadata_raw['full'].to_pandas()

In [None]:
metadata_raw_df

In [None]:
with pd.option_context('display.max_colwidth', None):
    display(metadata_raw_df.sample(10))

# Build Feature Transformations

In [None]:
train_df = pd.read_parquet("../data/train.parquet")
val_df = pd.read_parquet("../data/val.parquet")

In [None]:
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import FunctionTransformer
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.pipeline import Pipeline

In [None]:
# Custom function to reshape data from 1D to 2D
def reshape_2d_to_1d(X):
    return np.array(X).reshape(-1)

def flatten_string_array_col(X):
    return np.array(['\n'.join(x) for x in X])

def tfidf_pipeline_steps():
    steps = [
        ('impute', SimpleImputer(strategy='constant', fill_value='')),
        ('reshape', FunctionTransformer(reshape_2d_to_1d, validate=False)),
        ('tfidf', TfidfVectorizer(min_df=5, max_features=1000, ngram_range=(1, 2)))
    ]
    return steps

def description_pipeline_steps():
    steps = [
        ('flatten_string_array_col', FunctionTransformer(flatten_string_array_col, validate=False)),
        ('tfidf', TfidfVectorizer(min_df=5, max_features=1000, ngram_range=(1, 2)))
    ]
    return steps

def categories_pipeline_steps():
    def tokenizer(s):
        return s.split('\n')
    steps = [
        ('flatten_string_array_col', FunctionTransformer(flatten_string_array_col, validate=False)),
        ('tfidf', CountVectorizer(tokenizer=tokenizer, token_pattern=None))
    ]
    return steps

data = pd.Series(["from 14.99", "14.99", "price: 9.99", "20 dollars", "none"])
price_pattern = r'\b((?:\d+\.\d*)|(?:\d+))\b'
display(data.str.extract(price_pattern))

def price_parse_dtype(series, pattern):
    return series.str.extract(pattern).astype(float)

def price_pipeline_steps(price_pattern):
    steps = [
        ('extract_price', FunctionTransformer(price_parse_dtype, kw_args=dict(pattern=price_pattern), validate=False)),
        ('impute', SimpleImputer(strategy='constant', fill_value=0)),
        ('min_max_scale', MinMaxScaler())
    ]
    return steps

In [None]:
tfm = [
    ('main_category', OneHotEncoder(handle_unknown='ignore'), ['main_category']),  # One-hot encoding for categorical field
    ('title', Pipeline(tfidf_pipeline_steps()), ['title']),  # TF-IDF vectorizer for text field
    ('description', Pipeline(description_pipeline_steps()), 'description'),  # TF-IDF vectorizer for another text field
    ('categories', Pipeline(categories_pipeline_steps()), 'categories'),  # Count Vectorizer for multi-label categorical
    ('price', Pipeline(price_pipeline_steps(price_pattern)), 'price')  # Normalizing price
]
cols = [x[0] for x in tfm]
cols

In [None]:
train_full_df = pd.merge(train_df, metadata_raw_df[['parent_asin'] + cols], how='left', on='parent_asin')
# Drop duplicated item so that the Pipeline only fit the unique item features
fit_df = train_full_df.drop_duplicates(subset=['parent_asin'])
val_full_df = pd.merge(val_df, metadata_raw_df[['parent_asin'] + cols], how='left', on='parent_asin')
train_full_df

In [None]:
# Define preprocessing steps for each column
preprocessing_pipeline = ColumnTransformer(
    transformers=tfm,
    remainder='drop'  # Drop any columns not specified in transformers
)

# Create a pipeline object
item_metadata_pipeline = Pipeline(steps=[
    ('preprocessing', preprocessing_pipeline)
])

# Fit the pipeline
item_metadata_pipeline.fit(fit_df)

# Transform the data (useful for training)
transformed_item_metadata = item_metadata_pipeline.transform(train_full_df)

# For demonstration, print the shape of the transformed features and a few rows
print(f"Transformed Item Metadata Shape: {transformed_item_metadata.shape}")

# Persist

In [None]:
import joblib
import pickle

In [None]:
train_full_df.to_parquet("../data/train_item_features.parquet")
val_full_df.to_parquet("../data/val_item_features.parquet")

In [None]:
joblib.dump(item_metadata_pipeline, '../data/item_metadata_pipeline.joblib')