In [1]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [2]:
!pip install polars

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
from collections import defaultdict, Counter
from typing import List, Dict, Union

from tqdm import tqdm
import polars as pl

In [4]:
TOP_N = 100
LOCALES = ["IT", "FR", "ES"]
VER = "40"
DIR = "/gdrive/MyDrive/amazon_kdd_2023/"

In [5]:
def preprocess(df:pl.DataFrame) -> pl.DataFrame:
    df = df.explode(["prev_items"])
    df = df.with_columns(
        df.select(pl.col("session_id").cumcount().over("session_id").alias("sequence_num"))
    )
    return df

In [6]:
def generate_co_visit_matrix(df:pl.DataFrame) -> pl.DataFrame:
    # 共起ペアの作成
    df = df[["session_id", "prev_items", "sequence_num"]].join(df[["session_id", "prev_items", "sequence_num"]], on="session_id")

    # 共起した間隔を計算し、絞り込み
    df = df.with_columns(
        (pl.col("sequence_num_right").cast(pl.Int64) - pl.col("sequence_num").cast(pl.Int64)).alias("diff_sequence_num")
    )
    df = df.filter((pl.col("diff_sequence_num") > 0)&(pl.col("diff_sequence_num") <= 5))
    df = df.filter(pl.col("prev_items") != pl.col("prev_items_right"))

    # weightを計算し、共起ペアごとに和を計算
    df = df.with_columns(
        pl.lit(1).alias("consective_5_weight")
    )
    df = df.groupby(["prev_items", "prev_items_right"]).sum()
    df = df.rename({"prev_items":"item", "prev_items_right":"candidate_item"})[["item", "candidate_item", "consective_5_weight"]]

    return df

In [7]:
def filter_by_locale_availability(co_visit_matrix:pl.DataFrame, product:pl.DataFrame):
    product = product.unique(subset=["id"])
    product = product[["id", "available_locales"]]
    co_visit_matrix = co_visit_matrix.join(product, left_on="item", right_on="id", how="left").rename({"available_locales":"item_locales"})
    co_visit_matrix = co_visit_matrix.join(product, left_on="candidate_item", right_on="id", how="left").rename({"available_locales":"candidate_item_locales"})
    dfs = []
    for locale in LOCALES:
        df = co_visit_matrix.filter(pl.lit(locale).is_in(pl.col("item_locales")) & pl.lit(locale).is_in(pl.col("candidate_item_locales"))) 
        df = df.with_columns(pl.lit(locale).alias("locale"))
        df = df[["item", "candidate_item", "consective_5_weight", "locale"]]
        df = df.sort(["item", "consective_5_weight"], descending=[False, True])
        df = df.groupby("item", maintain_order=True).head(TOP_N)
        df = df.with_columns(
            pl.col("consective_5_weight").rank(descending=True, method="min").over("item").alias("consective_5_rank")
        )
        dfs.append(df)
    co_visit_matrix = pl.concat(dfs)
    return co_visit_matrix

# For local train/eval

In [8]:
train1 = pl.read_parquet(DIR + "data/preprocessed/task1/train_task1.parquet")
train2 = pl.read_parquet(DIR + "data/preprocessed/task2/train_task2_augmented.parquet")
train1 = train1.with_columns(
    (pl.col("session_id") + "_from_task1").alias("session_id")
)
train = pl.concat([train1, train2])

test1_1 = pl.read_parquet(DIR + "data/preprocessed/task1/test_task1_phase1.parquet")
test1_2 = pl.read_parquet(DIR + "data/preprocessed/task1/test_task1_phase2.parquet")
test = pl.read_parquet(DIR + "data/preprocessed/task2/test_task2_leftover.parquet")
test1_1 = test1_1.with_columns(
    (pl.col("session_id") + "_from_task1").alias("session_id")
)
test1_2 = test1_2.with_columns(
    (pl.col("session_id") + "_from_task1").alias("session_id")
)
test = pl.concat([test1_1, test1_2, test])

In [9]:
train = preprocess(train)
test = preprocess(test)
session_df = pl.concat([
    train["prev_items", "locale", "session_id", "sequence_num"],
    test["prev_items", "locale", "session_id", "sequence_num"],
])

In [10]:
product = pl.read_parquet("/gdrive/MyDrive/amazon_kdd_2023/data/preprocessed/common/product_03.parquet")

In [11]:
co_visit_matrix = generate_co_visit_matrix(session_df)

In [12]:
co_visit_matrix = filter_by_locale_availability(co_visit_matrix, product)

In [13]:
file_name = f"co_visit_matrix_{VER}_for_train_or_eval.parquet"
co_visit_matrix.write_parquet("/gdrive/MyDrive/amazon_kdd_2023/data/interim/candidates/task2/" + file_name)

In [14]:
co_visit_matrix.head()

item,candidate_item,consective_5_weight,locale,consective_5_rank
str,str,i32,str,u32
"""0007477155""","""0008402787""",25,"""IT""",1
"""0007477155""","""880475172X""",3,"""IT""",2
"""0007477155""","""B07CKHS8J1""",3,"""IT""",2
"""0007477155""","""0008307733""",2,"""IT""",4
"""0007477155""","""B00007KQF8""",2,"""IT""",4


## MRR@100

In [15]:
train = pl.read_parquet("/gdrive/MyDrive/amazon_kdd_2023/data/preprocessed/task2/train_task2.parquet")

In [16]:
# last_itemの抽出
last_item_list = []
prev_items_list = train["prev_items"].to_list()
for prev_items in prev_items_list:
    last_item_list.append(prev_items[-1])
train = train.with_columns(pl.Series(name="last_item", values=last_item_list))

In [17]:
train = train[["session_id", "locale", "last_item", "next_item"]]

In [18]:
co_visit_matrix = pl.read_parquet("/gdrive/MyDrive/amazon_kdd_2023/data/interim/candidates/task2/" + file_name)

In [19]:
# candidateの結合とlabelの付与
df = train.join(co_visit_matrix, left_on=["locale", "last_item"], right_on=["locale", "item"], how="left")
df = df.sort(["session_id", "consective_5_weight"], descending=[False, True])
df = df.with_columns((pl.col("candidate_item") == pl.col("next_item")).cast(pl.Int8).alias("label"))
label_lists = df.groupby("session_id", maintain_order=True).all()["label"].to_list()

In [20]:
# MRRの計算
rr = 0
for labels in label_lists:
    labels = labels[:100]
    for i, label in enumerate(labels):
        if label == 1:
            rr += 1 / (i+1)
            break
mrr = rr / len(label_lists)
print("MRR:", round(mrr, 5))

MRR: 0.27622


# For test inference

In [21]:
train1 = pl.read_parquet(DIR + "data/preprocessed/task1/train_task1.parquet")
train2 = pl.read_parquet(DIR + "data/preprocessed/task2/train_task2_augmented.parquet")
train1 = train1.with_columns(
    (pl.col("session_id") + "_from_task1").alias("session_id")
)
train = pl.concat([train1, train2])

test1_1 = pl.read_parquet(DIR + "data/preprocessed/task1/test_task1_phase1.parquet")
test1_2 = pl.read_parquet(DIR + "data/preprocessed/task1/test_task1_phase2.parquet")
test = pl.read_parquet(DIR + "data/preprocessed/task2/test_task2_leftover.parquet")
test1_1 = test1_1.with_columns(
    (pl.col("session_id") + "_from_task1").alias("session_id")
)
test1_2 = test1_2.with_columns(
    (pl.col("session_id") + "_from_task1").alias("session_id")
)
test = pl.concat([test1_1, test1_2, test])

In [22]:
# trainのnext_itemをprev_itemsにappendする
prev_items_list = train["prev_items"].to_list()
next_item_list = train["next_item"].to_list()
prev_items_list_updated = []
for prev_items, next_item in zip(prev_items_list, next_item_list):
    prev_items.append(next_item)
    prev_items_list_updated.append(prev_items)

train = train.with_columns(
    pl.Series(name="prev_items", values=prev_items_list_updated)
)

In [23]:
train = preprocess(train)
test = preprocess(test)
session_df = pl.concat([
    train["prev_items", "locale", "session_id", "sequence_num"],
    test["prev_items", "locale", "session_id", "sequence_num"],
])

In [24]:
product = pl.read_parquet("/gdrive/MyDrive/amazon_kdd_2023/data/preprocessed/common/product_03.parquet")

In [25]:
co_visit_matrix = generate_co_visit_matrix(session_df)

In [None]:
co_visit_matrix = filter_by_locale_availability(co_visit_matrix, product)

In [None]:
file_name = f"co_visit_matrix_{VER}_for_inference.parquet"
co_visit_matrix.write_parquet("/gdrive/MyDrive/amazon_kdd_2023/data/interim/candidates/task2/" + file_name)

In [None]:
co_visit_matrix.head()