In [1]:
import warnings
import torch
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
import polars as pl
from tqdm import tqdm 
import math 
import glob
from pathlib import Path
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder
warnings.filterwarnings("ignore")
%matplotlib inline

### 1. Обработка таблицы логов

Сначала внутри каждого .parquet файла происходит агрегация и создается сгрупированный файл .parquet в папку output_dir

In [None]:
# Пути
orders_path = "./data/ml_ozon_recsys_train_final_apparel_orders_data/*.parquet"
tracker_path = "./data/ml_ozon_recsys_train_final_apparel_tracker_data/*.parquet"
output_dir = Path("./data/train_data")
output_dir.mkdir(exist_ok=True)

df_orders = (pl.scan_parquet(orders_path)
             .select(["user_id", "item_id", "last_status"])
             .with_columns([
                   pl.col("user_id").cast(pl.Int64),
                    pl.col("item_id").cast(pl.Int64),
                    pl.col("last_status")
             ])
             .filter(pl.col("last_status") != "proccesed_orders"))


tracker_files = sorted(glob.glob(tracker_path))
chunk_size = 1  

def encode_actions(df):
    return df.with_columns([
        (pl.col("action_type") == "view_description").cast(pl.Int32).alias("action_type_view_description"),
        (pl.col("action_type") == "to_cart").cast(pl.Int32).alias("action_type_to_cart"),
        (pl.col("action_type") == "page_view").cast(pl.Int32).alias("action_type_page_view"),
        (pl.col("action_type") == "favorite").cast(pl.Int32).alias("action_type_favorite"),
        (pl.col("action_type") == "unfavorite").cast(pl.Int32).alias("action_type_unfavorite"),
        (pl.col("action_type") == "review_view").cast(pl.Int32).alias("action_type_review_view"),
        (pl.col("action_type") == "remove").cast(pl.Int32).alias("action_type_remove"),
        pl.when(pl.col("last_status") == "canceled_orders").then(0)
          .when(pl.col("last_status") == "delivered_orders").then(1)
          .otherwise(0)
          .cast(pl.Int8)
          .alias("last_status")
    ]).drop("action_type")

for i in range(0, len(tracker_files), chunk_size):
    files_chunk = tracker_files[i:i+chunk_size]
    print(f"Processing chunk {i//chunk_size + 1}/{(len(tracker_files)+chunk_size-1)//chunk_size}")

    chunk_df = (pl.scan_parquet(files_chunk)
                .with_columns([
                    pl.col("user_id").cast(pl.Int64),
                    pl.col("item_id").cast(pl.Int64),
                ]))

    temp = (
        chunk_df
        .join(df_orders, on=["item_id","user_id"], how="left")
    )
    temp = encode_actions(temp)

    temp_agg = temp.group_by(["user_id","item_id"]).agg([
        pl.col([
            "action_type_view_description",
            "action_type_to_cart",
            "action_type_page_view",
            "action_type_favorite",
            "action_type_unfavorite",
            "action_type_review_view",
            "action_type_remove"
        ]).sum(),
        pl.col("last_status").first()
    ])

    output_file = output_dir / f"agg_chunk_{i//chunk_size}.parquet"
    temp_agg.collect(streaming=True).write_parquet(output_file)
    print(f"Saved {output_file}")


all_chunks = pl.scan_parquet(str(output_dir / "*.parquet"))

final_result = (
    all_chunks
    .group_by(["user_id","item_id"])
    .agg([
        pl.col([
            "action_type_view_description",
            "action_type_to_cart",
            "action_type_page_view",
            "action_type_favorite",
            "action_type_unfavorite",
            "action_type_review_view",
            "action_type_remove"
        ]).sum(),
        pl.col("last_status").first()
    ])
)

In [None]:
del final_result, temp_agg, files_chunk,chunk_df, df_orders, all_chunks

Потом проводим сжатие нескольких parquet файлов в один

In [None]:
chunks_dir = Path("./data/grouped3")

all_files = sorted(glob.glob(str(chunks_dir / "*.parquet")))


out_path = Path("./data/grouped4")
out_path.mkdir(exist_ok=True)
n = 2

for i in tqdm(range(0, len(all_files), n)):
    batch = all_files[i:i+n]
    
    # читаем сразу все parquet из батча
    df = pl.scan_parquet(batch)
    agg_data = (
    df
    .group_by(["user_id","item_id"])
    .agg([
        pl.col([
            "action_type_view_description",
            "action_type_to_cart",
            "action_type_page_view",
            "action_type_favorite",
            "action_type_unfavorite",
            "action_type_review_view",
            "action_type_remove"
        ]).sum(),
        pl.col("last_status").first()
    ])
    )
    del df
    out_file = out_path / f"grouped_batch_{i//n + 1}.parquet"
    agg_data.sink_parquet(out_file)
    del agg_data