In [1]:
import pandas as pd
import numpy as np
import polars as pl

from recommenders.utils import fix_dtypes

### Merging candidates

In [2]:
paths_candidates_val = [
    './data/candidates/als_candidates_300_val.parquet.gzip',
    './data/candidates/bm25_candidates_2mz_200_val.parquet.gzip',
    './data/candidates/content_candidates_2mz_150_val.parquet.gzip',
    './data/candidates/popular_candidates_200_val.parquet.gzip',
    ]

merged_candidates_val = fix_dtypes(pl.read_parquet(paths_candidates_val[0])).unique(subset=['user_id', 'item_id'])
print(f'Merged: {paths_candidates_val[0]}, Shape: {merged_candidates_val.shape}')

for path_cand_val in paths_candidates_val[1:]:
    to_merge = fix_dtypes(pl.read_parquet(path_cand_val)).unique(subset=['user_id', 'item_id'])
    shared_columns = list(set(merged_candidates_val.columns).intersection(to_merge.columns))
    merged_candidates_val = merged_candidates_val.join(to_merge, on=shared_columns, how='outer')
    print(f'Merged: {path_cand_val}, Shape: {merged_candidates_val.shape}, Columns: {shared_columns}')

merged_candidates_val = merged_candidates_val.fill_null(0).sort(['user_id'])

Merged: ./data/candidates/als_candidates_300_val.parquet.gzip, Shape: (60000000, 4)
Merged: ./data/candidates/bm25_candidates_2mz_200_val.parquet.gzip, Shape: (85407936, 6), Columns: ['user_id', 'item_id']
Merged: ./data/candidates/content_candidates_2mz_150_val.parquet.gzip, Shape: (110494683, 8), Columns: ['user_id', 'item_id']
Merged: ./data/candidates/popular_candidates_200_val.parquet.gzip, Shape: (149810474, 9), Columns: ['user_id', 'item_id']


In [3]:
merged_candidates_val.head(3)

user_id,item_id,als_sim_score,als_sim_rank,bm25_sim_score,bm25_sim_rank,content_sim_score,content_sim_rank,mean_timespent
i32,i32,f32,i64,f32,i64,f32,i64,f32
4,93615,0.514802,0,0.0,0,0.0,0,0.0
4,87797,0.234164,1,0.0,0,0.0,0,0.0
4,159229,0.224879,2,0.0,0,0.0,0,0.0


In [5]:
merged_candidates_val[['user_id', 'item_id']].write_parquet('./data/candidates/merged_candidates_val.parquet.gzip', compression='gzip')

In [6]:
paths_candidates_test = [
    './data/candidates/als_candidates_300_test.parquet.gzip',
    './data/candidates/bm25_candidates_2mz_200_test.parquet.gzip',
    './data/candidates/content_candidates_2mz_150_test.parquet.gzip',
    './data/candidates/popular_candidates_200_test.parquet.gzip',
    ]

merged_candidates_test = fix_dtypes(pl.read_parquet(paths_candidates_test[0])).unique(subset=['user_id', 'item_id'])
print(f'Merged: {paths_candidates_test[0]}, Shape: {merged_candidates_test.shape}')

for path_cand_test in paths_candidates_test[1:]:
    to_merge = fix_dtypes(pl.read_parquet(path_cand_test)).unique(subset=['user_id', 'item_id'])
    shared_columns = list(set(merged_candidates_test.columns).intersection(to_merge.columns))
    merged_candidates_test = merged_candidates_test.join(to_merge, on=shared_columns, how='outer')
    print(f'Merged: {path_cand_test}, Shape: {merged_candidates_test.shape}, Columns: {shared_columns}')

merged_candidates_test = merged_candidates_test.fill_null(0).sort(['user_id'])

Merged: ./data/candidates/als_candidates_300_test.parquet.gzip, Shape: (60000000, 4)
Merged: ./data/candidates/bm25_candidates_2mz_200_test.parquet.gzip, Shape: (83765289, 6), Columns: ['user_id', 'item_id']
Merged: ./data/candidates/content_candidates_2mz_150_test.parquet.gzip, Shape: (108129730, 8), Columns: ['user_id', 'item_id']
Merged: ./data/candidates/popular_candidates_200_test.parquet.gzip, Shape: (146573625, 9), Columns: ['user_id', 'item_id']


In [7]:
merged_candidates_test.head(3)

user_id,item_id,als_sim_score,als_sim_rank,bm25_sim_score,bm25_sim_rank,content_sim_score,content_sim_rank,mean_timespent
i32,i32,f32,i64,f32,i64,f32,i64,f32
7,7286,0.4333,0,0.0,0,0.0,0,0.0
7,225411,0.3778,1,0.0,0,0.0,0,0.0
7,117495,0.359515,2,0.0,0,0.0,0,0.0


In [9]:
merged_candidates_test[['user_id', 'item_id']].write_parquet('./data/candidates/merged_candidates_test.parquet.gzip', compression='gzip')

### Merging candidates with features

In [2]:
paths_features_val = [
    './data/candidates/merged_candidates_val.parquet.gzip',
    './data/features/als_features_val.parquet.gzip',
    './data/features/bm25_features_val.parquet.gzip',
    './data/features/content_features_val.parquet.gzip',
    './data/features/items_features_val.parquet.gzip',
    './data/features/source_item_features_val.parquet.gzip',
    './data/features/sources_features_val.parquet.gzip',
    './data/features/user_features_val.parquet.gzip',
    './data/features/user_source_features_val.parquet.gzip',
    './data/splits/val_targets.parquet.gzip'
    ]

merged_features_val = fix_dtypes(pl.read_parquet(paths_features_val[0])).unique(subset=['user_id', 'item_id'])
print(f'Merged: {paths_features_val[0]}, Shape: {merged_features_val.shape}')

for path_feat_val in paths_features_val[1:]:
    to_merge = fix_dtypes(pl.read_parquet(path_feat_val))

    if 'user_id' in to_merge.columns and 'item_id' in to_merge.columns:
        to_merge = to_merge.unique(subset=['user_id', 'item_id'])
    if 'timestamp' in to_merge.columns:
        to_merge = to_merge.drop(['timestamp'])
    if 'reaction' in to_merge.columns:
        to_merge = to_merge.drop(['reaction'])
        
    shared_columns = list(set(merged_features_val.columns).intersection(to_merge.columns))
    merged_features_val = merged_features_val.join(to_merge, on=shared_columns, how='left')
    print(f'Merged: {path_feat_val}, Shape: {merged_features_val.shape}, Columns: {shared_columns}')

merged_features_val = merged_features_val.fill_null(0)

Merged: ./data/candidates/merged_candidates_val.parquet.gzip, Shape: (149810474, 2)
Merged: ./data/features/als_features_val.parquet.gzip, Shape: (149810474, 8), Columns: ['item_id', 'user_id']
Merged: ./data/features/bm25_features_val.parquet.gzip, Shape: (149810474, 12), Columns: ['item_id', 'user_id']
Merged: ./data/features/content_features_val.parquet.gzip, Shape: (149810474, 16), Columns: ['item_id', 'user_id']
Merged: ./data/features/items_features_val.parquet.gzip, Shape: (149810474, 20), Columns: ['item_id']
Merged: ./data/features/source_item_features_val.parquet.gzip, Shape: (149810474, 27), Columns: ['item_id']
Merged: ./data/features/sources_features_val.parquet.gzip, Shape: (149810474, 31), Columns: ['source_id']
Merged: ./data/features/user_features_val.parquet.gzip, Shape: (149810474, 34), Columns: ['user_id']
Merged: ./data/features/user_source_features_val.parquet.gzip, Shape: (149810474, 35), Columns: ['user_id', 'source_id']
Merged: ./data/splits/val_targets.parquet

In [3]:
merged_features_val.filter(pl.col('timespent') != 0).shape

(612262, 36)

In [5]:
merged_features_val.write_parquet('./data/features/merged_features_val.parquet.gzip', compression='gzip')

In [6]:
paths_features_test = [
    './data/candidates/merged_candidates_test.parquet.gzip',
    './data/features/als_features_test.parquet.gzip',
    './data/features/bm25_features_test.parquet.gzip',
    './data/features/content_features_test.parquet.gzip',
    './data/features/items_features_test.parquet.gzip',
    './data/features/source_item_features_test.parquet.gzip',
    './data/features/sources_features_test.parquet.gzip',
    './data/features/user_features_test.parquet.gzip',
    './data/features/user_source_features_test.parquet.gzip',
    ]

merged_features_test = fix_dtypes(pl.read_parquet(paths_features_test[0])).unique(subset=['user_id', 'item_id'])
print(f'Merged: {paths_features_test[0]}, Shape: {merged_features_test.shape}')

for path_feat_test in paths_features_test[1:]:
    to_merge = fix_dtypes(pl.read_parquet(path_feat_test))

    if 'user_id' in to_merge.columns and 'item_id' in to_merge.columns:
        to_merge = to_merge.unique(subset=['user_id', 'item_id'])
    if 'timestamp' in to_merge.columns:
        to_merge = to_merge.drop(['timestamp'])
    if 'reaction' in to_merge.columns:
        to_merge = to_merge.drop(['reaction'])
        
    shared_columns = list(set(merged_features_test.columns).intersection(to_merge.columns))
    merged_features_test = merged_features_test.join(to_merge, on=shared_columns, how='left')
    print(f'Merged: {path_feat_test}, Shape: {merged_features_test.shape}, Columns: {shared_columns}')

merged_features_test = merged_features_test.fill_null(0)

Merged: ./data/candidates/merged_candidates_test.parquet.gzip, Shape: (146573625, 2)
Merged: ./data/features/als_features_test.parquet.gzip, Shape: (146573625, 8), Columns: ['item_id', 'user_id']
Merged: ./data/features/bm25_features_test.parquet.gzip, Shape: (146573625, 12), Columns: ['item_id', 'user_id']
Merged: ./data/features/content_features_test.parquet.gzip, Shape: (146573625, 16), Columns: ['item_id', 'user_id']
Merged: ./data/features/items_features_test.parquet.gzip, Shape: (146573625, 20), Columns: ['item_id']
Merged: ./data/features/source_item_features_test.parquet.gzip, Shape: (146573625, 27), Columns: ['item_id']
Merged: ./data/features/sources_features_test.parquet.gzip, Shape: (146573625, 31), Columns: ['source_id']
Merged: ./data/features/user_features_test.parquet.gzip, Shape: (146573625, 34), Columns: ['user_id']
Merged: ./data/features/user_source_features_test.parquet.gzip, Shape: (146573625, 35), Columns: ['user_id', 'source_id']


In [7]:
merged_features_test.write_parquet('./data/features/merged_features_test.parquet.gzip', compression='gzip')