In [None]:
import os
import polars as pl
from pathlib import Path

from tqdm import tqdm
from typing import Iterable
import zipfile

#Logging
import logging

# ロガーの取得
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('logs.log')

# # ログのフォーマットを設定
formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

# Notebook上にもログを表示するためのハンドラ
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

In [None]:
TEST_DIR = Path('/home/data/ebnerd_testset')

test_behaviors = pl.read_parquet(TEST_DIR/'test'/'behaviors.parquet')
test_history = pl.read_parquet(TEST_DIR/'test'/'history_extended.parquet')

articles = pl.read_parquet('/home/data/ebnerd_testset/articles.parquet')

#impression_idとuser_idでユニークかどうか
assert len(test_behaviors) == len(test_behaviors.groupby(['impression_id','user_id']).count())

sub_impression = test_behaviors.select(['impression_id','user_id'])

In [None]:
BASE_DIR = "./test_predictions/sub_df_orgs/"

chunks = []


for i in range(100):
    path = os.path.join(BASE_DIR, [p for p in os.listdir(BASE_DIR) if f"sub_df_org_chunk{i}.parquet" in p][0])
    print(path)
    chunks.append(pl.read_parquet(path))

sub_df = pl.concat(chunks)

In [None]:
sub_df = sub_df.with_columns([
        pl.col('y_pred1').rank().over('impression_id').alias('y_pred1'),
        pl.col('y_pred2').rank().over('impression_id').alias('y_pred2'),
        pl.col('y_pred3').rank().over('impression_id').alias('y_pred3'),
        pl.col('y_pred4').rank().over('impression_id').alias('y_pred4'),
        pl.col('y_pred5').rank().over('impression_id').alias('y_pred5'),
        pl.col('y_pred6').rank().over('impression_id').alias('y_pred6'),
        pl.col('y_pred7').rank().over('impression_id').alias('y_pred7'),
        pl.col('y_pred8').rank().over('impression_id').alias('y_pred8'),
]).with_columns([
    (
                pl.col('y_pred1')*0.125 + \
                pl.col('y_pred2')*0.125 + \
                pl.col('y_pred3')*0.125 + \
                pl.col('y_pred4')*0.125 + \
                pl.col('y_pred5')*0.125 + \
                pl.col('y_pred6')*0.125 + \
                pl.col('y_pred7')*0.125 + \
                pl.col('y_pred8')*0.125
    ).alias('pred')
])

In [None]:
#impression_idごとにpredの順位をつける
sub_df = sub_df.groupby(['impression_id','user_id']).agg(
    pl.col('pred').rank(method = 'ordinal',descending = True).alias('prediction_scores')
)

In [None]:
#parquetで保存
sub_df.write_parquet('sub.parquet')

#sub_impressionと同じ並びにする
all_sub_df = sub_impression.join(sub_df, on=['impression_id','user_id'], how='left')

In [None]:
def write_submission_file(
    impression_ids: Iterable[int],
    prediction_scores: Iterable[any],
    path: Path = Path("predictions.txt"),
    rm_file: bool = True,
    filename_zip: str = None,
) -> None:
    """
    We align the submission file similar to MIND-format for users who are familar.

    Reference:
        https://github.com/recommenders-team/recommenders/blob/main/examples/00_quick_start/nrms_MIND.ipynb

    Example:
    >>> impression_ids = [237, 291, 320]
    >>> prediction_scores = [[0.2, 0.1, 0.3], [0.1, 0.2], [0.4, 0.2, 0.1, 0.3]]
    >>> write_submission_file(impression_ids, prediction_scores, path="predictions.txt", rm_file=False)
    ## Output file:
        237 [0.2,0.1,0.3]
        291 [0.1,0.2]
        320 [0.4,0.2,0.1,0.3]
    """
    path = Path(path)
    with open(path, "w") as f:
        for impr_index, preds in tqdm(zip(impression_ids, prediction_scores)):
            preds = "[" + ",".join([str(i) for i in preds]) + "]"
            f.write(" ".join([str(impr_index), preds]) + "\n")
            
    # =>
    
    zip_submission_file(path=path, rm_file=rm_file, filename_zip=filename_zip)


def zip_submission_file(
    path: Path,
    filename_zip: str = None,
    verbose: bool = True,
    rm_file: bool = True,
) -> None:
    """
    Compresses a specified file into a ZIP archive within the same directory.

    Args:
        path (Path): The directory path where the file to be zipped and the resulting zip file will be located.
        filename_input (str, optional): The name of the file to be compressed. Defaults to the path.name.
        filename_zip (str, optional): The name of the output ZIP file. Defaults to "prediction.zip".
        verbose (bool, optional): If set to True, the function will print the process details. Defaults to True.
        rm_file (bool, optional): If set to True, the original file will be removed after compression. Defaults to True.

    Returns:
        None: This function does not return any value.
    """
    path = Path(path)
    if filename_zip:
        path_zip = path.parent.joinpath(filename_zip)
    else:
        path_zip = path.with_suffix(".zip")

    if path_zip.suffix != ".zip":
        raise ValueError(f"suffix for {path_zip.name} has to be '.zip'")
    if verbose:
        print(f"Zipping {path} to {path_zip}")
    f = zipfile.ZipFile(path_zip, "w", zipfile.ZIP_DEFLATED)
    f.write(path, arcname=path.name)
    f.close()
    if rm_file:
        path.unlink()

In [None]:
sub_impression_id = all_sub_df['impression_id'].to_list()
sub_prediction_scores = all_sub_df['prediction_scores'].to_list()
write_submission_file(
    impression_ids=sub_impression_id,
    prediction_scores=sub_prediction_scores,
    path="predictions.txt",
    filename_zip="./test_predictions_final2/predictions.zip",
)

logger.info('finish')