In [None]:
from huggingface_hub import snapshot_download

snapshot_download(
    repo_id="t-tech/T-ECD",
    repo_type="dataset",
    allow_patterns="dataset/small/**/*.pq",  
    local_dir="t_ecd_small",
    token="ваш токен HF" 
)

In [15]:
!python -c "from huggingface_hub import dataset_info; print(dataset_info('t-tech/T-ECD'))"

DatasetInfo(id='t-tech/T-ECD', author='t-tech', sha='cabdd9bbb53ec2aaa79075a15f0904f9cadf2484', created_at=datetime.datetime(2025, 9, 22, 17, 44, 13, tzinfo=datetime.timezone.utc), last_modified=datetime.datetime(2025, 9, 26, 14, 29, 44, tzinfo=datetime.timezone.utc), private=False, gated=False, disabled=False, downloads=19044, downloads_all_time=None, likes=26, paperswithcode_id=None, tags=['language:ru', 'language:en', 'license:cc-by-nc-sa-4.0', 'size_categories:100B<n<1T', 'region:us', 'recsys', 'e-commerce', 'retrieval', 'dataset', 'ranking', 'cross-domain'], trending_score=None, card_data={'annotations_creators': None, 'language_creators': None, 'language': ['ru', 'en'], 'license': 'cc-by-nc-sa-4.0', 'multilinguality': None, 'size_categories': ['100B<n<1T'], 'source_datasets': None, 'task_categories': None, 'task_ids': None, 'paperswithcode_id': None, 'pretty_name': 'T-ECD', 'config_names': None, 'train_eval_index': None, 'tags': ['recsys', 'e-commerce', 'retrieval', 'dataset', 'r

In [1]:
import polars as pl
base_path = "t_ecd_small/dataset"

In [2]:
# Используйте этот код для надежной загрузки
from pathlib import Path

def safe_data_loader():
    base_path = Path("t_ecd_small/dataset")
    
    # 1. Статические файлы (users, brands, items)
    static_data = {}
    static_files = {
        'users': 'users.pq',
        'brands': 'brands.pq', 
        'marketplace_items': 'marketplace/items.pq',
        'retail_items': 'retail/items.pq',
        'offers_items': 'offers/items.pq'
    }
    
    for name, rel_path in static_files.items():
        try:
            static_data[name] = pl.read_parquet(base_path / rel_path)
            print(f"✅ {name}: {static_data[name].shape}")
        except Exception as e:
            print(f"❌ {name}: {e}")
    
    # 2. События: для предпросмотра берём несколько дней (не все!)
    # ВАЖНО: Это только для быстрого предпросмотра в ноутбуке!
    # Основная обработка (ячейка 8) работает со ВСЕМИ данными без сэмплирования.
    events_data = {}
    events_config = {
        'marketplace': ('marketplace/events', 3),  # Берём последние 3 дня для предпросмотра
        'retail': ('retail/events', 3), 
        'offers': ('offers/events', 3),
        'reviews': ('reviews', 3)                  # Без папки events!
    }
    
    for domain, (rel_dir, num_days) in events_config.items():
        try:
            domain_dir = base_path / rel_dir
            files = sorted(domain_dir.glob("*.pq"))
            if not files:
                print(f"❌ {domain}: нет файлов в {domain_dir}")
                continue

            # Для предпросмотра берём только последние N дней
            preview_files = files[-num_days:] if len(files) > num_days else files
            print(f"📊 {domain}: загружаем {len(preview_files)} из {len(files)} дней для предпросмотра")
            
            # Ленивая загрузка выбранных дней
            lazy_df = pl.scan_parquet([str(p) for p in preview_files])
            events = lazy_df.collect()  # Загружаем все выбранные дни (без сэмплирования строк)
            events_data[domain] = events
            print(f"✅ {domain}: {events.shape}")
        except Exception as e:
            print(f"❌ {domain}: {e}")
    
    return static_data, events_data

# Запускаем
static, events = safe_data_loader()

✅ users: (3500000, 3)
✅ brands: (24513, 2)
✅ marketplace_items: (2325409, 6)
✅ retail_items: (250171, 6)
✅ offers_items: (22368, 3)
📊 marketplace: загружаем 3 из 227 дней для предпросмотра
✅ marketplace: (1604537, 6)
📊 retail: загружаем 3 из 227 дней для предпросмотра
✅ retail: (4911093, 6)
📊 offers: загружаем 3 из 227 дней для предпросмотра
✅ offers: (9348815, 4)
📊 reviews: загружаем 3 из 227 дней для предпросмотра
✅ reviews: (1624, 5)


In [20]:
# Комплексный анализ null значений во ВСЕХ файлах датасета
import polars as pl
from pathlib import Path
from IPython.display import display
import numpy as np

def analyze_all_nulls():
    """Комплексный анализ null значений во всех файлах датасета"""
    
    base_path = Path("t_ecd_small/dataset")
    results = {}
    
    print("="*80)
    print("🔍 КОМПЛЕКСНЫЙ АНАЛИЗ NULL ЗНАЧЕНИЙ ВО ВСЕХ ФАЙЛАХ")
    print("="*80)
    
    # 1. Анализ brands.pq
    print("\n" + "="*80)
    print("📦 BRANDS.PQ")
    print("="*80)
    
    try:
        brands_path = base_path / "brands.pq"
        brands_df = pl.read_parquet(brands_path)
        
        total = brands_df.height
        null_embedding = brands_df["embedding"].null_count()
        null_pct = (null_embedding / total * 100) if total > 0 else 0
        
        results["brands"] = {
            "file": "brands.pq",
            "total_rows": total,
            "null_embedding": null_embedding,
            "null_percentage": null_pct,
            "columns": list(brands_df.columns)
        }
        
        print(f"✅ Всего брендов: {total:,}")
        print(f"   Null в embedding: {null_embedding:,} ({null_pct:.2f}%)")
        print(f"   С embedding: {total - null_embedding:,} ({(100-null_pct):.2f}%)")
        
    except Exception as e:
        print(f"❌ Ошибка: {e}")
        results["brands"] = {"error": str(e)}
    
    # 2. Анализ items файлов (marketplace, retail, offers)
    print("\n" + "="*80)
    print("📦 ITEMS ФАЙЛЫ (marketplace, retail, offers)")
    print("="*80)
    
    for domain in ["marketplace", "retail", "offers"]:
        try:
            items_path = base_path / domain / "items.pq"
            items_df = pl.read_parquet(items_path)
            
            total = items_df.height
            null_stats = {}
            
            # Проверяем все колонки на null
            for col in items_df.columns:
                null_count = items_df[col].null_count()
                null_pct = (null_count / total * 100) if total > 0 else 0
                null_stats[col] = {"count": null_count, "percentage": null_pct}
            
            results[f"{domain}_items"] = {
                "file": f"{domain}/items.pq",
                "total_rows": total,
                "null_stats": null_stats,
                "columns": list(items_df.columns)
            }
            
            print(f"\n📄 {domain}/items.pq:")
            print(f"   Всего строк: {total:,}")
            for col, stats in null_stats.items():
                if stats["count"] > 0:
                    print(f"   ⚠️  {col}: {stats['count']:,} null ({stats['percentage']:.2f}%)")
                else:
                    print(f"   ✅ {col}: нет null")
                    
        except Exception as e:
            print(f"❌ {domain}/items.pq: ошибка - {e}")
            results[f"{domain}_items"] = {"error": str(e)}
    
    # 3. Анализ events файлов (берём несколько дней для анализа)
    print("\n" + "="*80)
    print("📦 EVENTS ФАЙЛЫ (анализ последних 3 дней)")
    print("="*80)
    
    for domain in ["marketplace", "retail", "offers"]:
        try:
            events_dir = base_path / domain / "events"
            files = sorted(events_dir.glob("*.pq"))
            
            if not files:
                print(f"❌ {domain}/events: нет файлов")
                continue
            
            # Берём последние 3 дня для анализа
            sample_files = files[-3:]
            print(f"\n📄 {domain}/events (анализ {len(sample_files)} из {len(files)} дней):")
            
            # Ленивая загрузка и анализ
            lazy_df = pl.scan_parquet([str(f) for f in sample_files])
            schema = lazy_df.collect_schema()
            
            # Подсчитываем null для всех колонок
            null_stats = {}
            for col in schema.keys():
                null_count = lazy_df.select(pl.col(col).null_count()).collect().item()
                total_rows = lazy_df.select(pl.len()).collect().item()
                null_pct = (null_count / total_rows * 100) if total_rows > 0 else 0
                null_stats[col] = {"count": null_count, "percentage": null_pct}
            
            results[f"{domain}_events"] = {
                "file": f"{domain}/events/*.pq",
                "files_analyzed": len(sample_files),
                "total_files": len(files),
                "null_stats": null_stats,
                "columns": list(schema.keys())
            }
            
            print(f"   Всего строк в выборке: {total_rows:,}")
            for col, stats in null_stats.items():
                if stats["count"] > 0:
                    print(f"   ⚠️  {col}: {stats['count']:,} null ({stats['percentage']:.2f}%)")
                else:
                    print(f"   ✅ {col}: нет null")
                    
        except Exception as e:
            print(f"❌ {domain}/events: ошибка - {e}")
            results[f"{domain}_events"] = {"error": str(e)}
    
    # 4. Анализ reviews
    print("\n" + "="*80)
    print("📦 REVIEWS ФАЙЛЫ")
    print("="*80)
    
    try:
        reviews_dir = base_path / "reviews"
        files = sorted(reviews_dir.glob("*.pq"))
        
        if not files:
            print("❌ reviews: нет файлов")
        else:
            # Берём последние 3 дня для анализа
            sample_files = files[-3:]
            print(f"📄 reviews (анализ {len(sample_files)} из {len(files)} дней):")
            
            lazy_df = pl.scan_parquet([str(f) for f in sample_files])
            schema = lazy_df.collect_schema()
            
            null_stats = {}
            for col in schema.keys():
                null_count = lazy_df.select(pl.col(col).null_count()).collect().item()
                total_rows = lazy_df.select(pl.len()).collect().item()
                null_pct = (null_count / total_rows * 100) if total_rows > 0 else 0
                null_stats[col] = {"count": null_count, "percentage": null_pct}
            
            results["reviews"] = {
                "file": "reviews/*.pq",
                "files_analyzed": len(sample_files),
                "total_files": len(files),
                "null_stats": null_stats,
                "columns": list(schema.keys())
            }
            
            print(f"   Всего строк в выборке: {total_rows:,}")
            for col, stats in null_stats.items():
                if stats["count"] > 0:
                    print(f"   ⚠️  {col}: {stats['count']:,} null ({stats['percentage']:.2f}%)")
                else:
                    print(f"   ✅ {col}: нет null")
                    
    except Exception as e:
        print(f"❌ reviews: ошибка - {e}")
        results["reviews"] = {"error": str(e)}
    
    # 5. Сводная таблица
    print("\n" + "="*80)
    print("📊 СВОДНАЯ ТАБЛИЦА NULL ЗНАЧЕНИЙ")
    print("="*80)
    
    summary_data = []
    for key, data in results.items():
        if "error" in data:
            continue
        
        if "null_stats" in data:
            # Для файлов с несколькими колонками
            for col, stats in data["null_stats"].items():
                if stats["count"] > 0:
                    summary_data.append({
                        "Файл": data.get("file", key),
                        "Колонка": col,
                        "Null значений": f"{stats['count']:,}",
                        "Процент": f"{stats['percentage']:.2f}%",
                        "Всего строк": f"{data.get('total_rows', 'N/A')}"
                    })
        elif "null_embedding" in data:
            # Для brands
            summary_data.append({
                "Файл": data["file"],
                "Колонка": "embedding",
                "Null значений": f"{data['null_embedding']:,}",
                "Процент": f"{data['null_percentage']:.2f}%",
                "Всего строк": f"{data['total_rows']:,}"
            })
    
    if summary_data:
        summary_df = pl.DataFrame(summary_data)
        display(summary_df)
    else:
        print("✅ Null значений не найдено!")
    
    # 6. Рекомендации для ML
    print("\n" + "="*80)
    print("💡 РЕКОМЕНДАЦИИ ДЛЯ ML МОДЕЛИ")
    print("="*80)
    
    critical_nulls = []
    for key, data in results.items():
        if "error" in data:
            continue
        
        if "null_stats" in data:
            for col, stats in data["null_stats"].items():
                if stats["percentage"] > 5:  # Больше 5% null
                    critical_nulls.append({
                        "file": data.get("file", key),
                        "column": col,
                        "percentage": stats["percentage"]
                    })
        elif "null_embedding" in data and data["null_percentage"] > 5:
            critical_nulls.append({
                "file": data["file"],
                "column": "embedding",
                "percentage": data["null_percentage"]
            })
    
    if critical_nulls:
        print("\n⚠️  КРИТИЧЕСКИЕ NULL (>5%):")
        for item in critical_nulls:
            print(f"   • {item['file']}.{item['column']}: {item['percentage']:.2f}%")
        
        print("\n🔧 СТРАТЕГИИ ОБРАБОТКИ:")
        print("   1. Для embedding колонок:")
        print("      • Использовать специальный 'UNK' embedding (нулевой вектор или средний)")
        print("      • Или фильтровать записи с null embedding")
        print("      • Или использовать learned embedding для null")
        
        print("\n   2. Для других колонок:")
        print("      • Если null < 10%: можно удалить строки")
        print("      • Если null > 10%: использовать imputation (среднее, медиана, мода)")
        print("      • Или создать отдельную категорию 'missing'")
        
        print("\n   3. Для последовательностей:")
        print("      • При построении vocab: пропускать записи с null")
        print("      • Или использовать специальный токен [MISSING]")
        print("      • При обучении: маскировать null значения")
    else:
        print("\n✅ Критических null значений не обнаружено (<5%)")
    
    return results

# Запускаем комплексный анализ
null_results = analyze_all_nulls()


🔍 КОМПЛЕКСНЫЙ АНАЛИЗ NULL ЗНАЧЕНИЙ ВО ВСЕХ ФАЙЛАХ

📦 BRANDS.PQ
✅ Всего брендов: 24,513
   Null в embedding: 17,954 (73.24%)
   С embedding: 6,559 (26.76%)

📦 ITEMS ФАЙЛЫ (marketplace, retail, offers)

📄 marketplace/items.pq:
   Всего строк: 2,325,409
   ✅ item_id: нет null
   ✅ brand_id: нет null
   ⚠️  category: 966,395 null (41.56%)
   ⚠️  subcategory: 1,233,023 null (53.02%)
   ⚠️  price: 2,882 null (0.12%)
   ⚠️  embedding: 73 null (0.00%)

📄 retail/items.pq:
   Всего строк: 250,171
   ✅ item_id: нет null
   ✅ brand_id: нет null
   ⚠️  category: 9,585 null (3.83%)
   ⚠️  subcategory: 9,585 null (3.83%)
   ⚠️  price: 26,489 null (10.59%)
   ✅ embedding: нет null

📄 offers/items.pq:
   Всего строк: 22,368
   ✅ item_id: нет null
   ⚠️  brand_id: 542 null (2.42%)
   ✅ embedding: нет null

📦 EVENTS ФАЙЛЫ (анализ последних 3 дней)

📄 marketplace/events (анализ 3 из 227 дней):
   Всего строк в выборке: 1,604,537
   ✅ timestamp: нет null
   ✅ user_id: нет null
   ✅ item_id: нет null
   ⚠️ 

Файл,Колонка,Null значений,Процент,Всего строк
str,str,str,str,str
"""brands.pq""","""embedding""","""17,954""","""73.24%""","""24,513"""
"""marketplace/items.pq""","""category""","""966,395""","""41.56%""","""2325409"""
"""marketplace/items.pq""","""subcategory""","""1,233,023""","""53.02%""","""2325409"""
"""marketplace/items.pq""","""price""","""2,882""","""0.12%""","""2325409"""
"""marketplace/items.pq""","""embedding""","""73""","""0.00%""","""2325409"""
…,…,…,…,…
"""retail/items.pq""","""subcategory""","""9,585""","""3.83%""","""250171"""
"""retail/items.pq""","""price""","""26,489""","""10.59%""","""250171"""
"""offers/items.pq""","""brand_id""","""542""","""2.42%""","""22368"""
"""marketplace/events/*.pq""","""subdomain""","""202""","""0.01%""","""N/A"""



💡 РЕКОМЕНДАЦИИ ДЛЯ ML МОДЕЛИ

⚠️  КРИТИЧЕСКИЕ NULL (>5%):
   • brands.pq.embedding: 73.24%
   • marketplace/items.pq.category: 41.56%
   • marketplace/items.pq.subcategory: 53.02%
   • retail/items.pq.price: 10.59%
   • reviews/*.pq.embedding: 88.98%

🔧 СТРАТЕГИИ ОБРАБОТКИ:
   1. Для embedding колонок:
      • Использовать специальный 'UNK' embedding (нулевой вектор или средний)
      • Или фильтровать записи с null embedding
      • Или использовать learned embedding для null

   2. Для других колонок:
      • Если null < 10%: можно удалить строки
      • Если null > 10%: использовать imputation (среднее, медиана, мода)
      • Или создать отдельную категорию 'missing'

   3. Для последовательностей:
      • При построении vocab: пропускать записи с null
      • Или использовать специальный токен [MISSING]
      • При обучении: маскировать null значения


In [5]:
# Верификация null-значений через PyArrow (исключаем ошибки Polars)
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc

from pathlib import Path

def verify_nulls_with_pyarrow():
    base_path = Path("t_ecd_small/dataset")
    targets = [
        {
            "file": base_path / "brands.pq",
            "columns": ["embedding"],
            "label": "brands.pq.embedding"
        },
        {
            "file": base_path / "marketplace" / "items.pq",
            "columns": ["category", "subcategory"],
            "label": "marketplace/items.pq"
        },
        {
            "file": base_path / "retail" / "items.pq",
            "columns": ["price"],
            "label": "retail/items.pq"
        }
    ]
    
    # Добавляем один файл reviews для проверки (последний день)
    reviews_files = sorted((base_path / "reviews").glob("*.pq"))
    if reviews_files:
        targets.append({
            "file": reviews_files[-1],
            "columns": ["embedding"],
            "label": f"reviews/{reviews_files[-1].name}"
        })
    
    print("="*80)
    print("🔬 ВЕРИФИКАЦИЯ NULL ЧЕРЕЗ PYARROW")
    print("="*80)
    
    results = []
    
    for target in targets:
        file_path = target["file"]
        if not file_path.exists():
            print(f"❌ {target['label']}: файл не найден ({file_path})")
            continue
        
        print(f"\n📄 {target['label']}")
        
        try:
            table = pq.read_table(file_path, columns=target["columns"])
            backend = "pyarrow"
        except pa.ArrowInvalid as e:
            print(f"   ⚠️ PyArrow не смог прочитать колонку напрямую ({e}). Переключаемся на Polars.")
            polars_df = pl.read_parquet(file_path)
            table = polars_df.to_arrow()
            backend = "polars"
        
        total_rows = table.num_rows
        print(f"   Всего строк: {total_rows:,} (backend: {backend})")
        
        for col in target["columns"]:
            column = table.column(col)
            null_count = column.null_count
            null_pct = (null_count / total_rows * 100) if total_rows > 0 else 0
            print(f"   • {col}: null = {null_count:,} ({null_pct:.2f}%)")
            
            results.append({
                "file": target["label"],
                "column": col,
                "null_count": null_count,
                "null_pct": null_pct,
                "total_rows": total_rows,
                "backend": backend
            })
    
    return results

pyarrow_null_results = verify_nulls_with_pyarrow()


🔬 ВЕРИФИКАЦИЯ NULL ЧЕРЕЗ PYARROW

📄 brands.pq.embedding
   ⚠️ PyArrow не смог прочитать колонку напрямую (Expected all lists to be of size=300 but index 1 had size=0). Переключаемся на Polars.
   Всего строк: 24,513 (backend: polars)
   • embedding: null = 17,954 (73.24%)

📄 marketplace/items.pq
   Всего строк: 2,325,409 (backend: pyarrow)
   • category: null = 966,395 (41.56%)
   • subcategory: null = 1,233,023 (53.02%)

📄 retail/items.pq
   Всего строк: 250,171 (backend: pyarrow)
   • price: null = 26,489 (10.59%)

📄 reviews/01308.pq
   ⚠️ PyArrow не смог прочитать колонку напрямую (Expected all lists to be of size=312 but index 1 had size=0). Переключаемся на Polars.
   Всего строк: 73 (backend: polars)
   • embedding: null = 68 (93.15%)


In [14]:
import polars as pl
import os
from pathlib import Path

# Конфигурация
RAW_DATA_PATH = Path("t_ecd_small/dataset")
PROCESSED_PATH = Path("t_ecd_small/dataset/processed")
VOCAB_PATH = PROCESSED_PATH / "vocab.parquet"
TEMP_SHARDS_PATH = PROCESSED_PATH / "shards"

# Создаем папки (parents=True создает все родительские директории)
PROCESSED_PATH.mkdir(parents=True, exist_ok=True)
TEMP_SHARDS_PATH.mkdir(parents=True, exist_ok=True)

def step_1_build_vocab():
    """
    Сканирует каталоги товаров и брендов, создает единый маппинг:
    domain_entity_id (str) -> token_id (int)
    """
    print("🏗  Building Vocabulary...")
    
    # Пример для Marketplace Items
    # Используем Lazy API (scan_parquet)
    q_market = (
        pl.scan_parquet(RAW_DATA_PATH / "marketplace/items.pq")
        .select(pl.col("item_id").cast(pl.Utf8))
        .select(pl.format("MARKET_ITEM_{}", pl.col("item_id")).alias("global_id"))
    )
    
    # ... (аналогично для Retail items, Offers items, Brands)
    # q_retail = ...
    
    # Объединяем (Union) все потоки - это все еще лениво!
    # q_full = pl.concat([q_market, q_retail, ...]).unique()
    
    # В реальном коде здесь будет collect()
    # vocab_df = q_full.with_row_index("token_id", offset=4).collect() 
    # offset=4, т.к. 0-3 зарезервированы под PAD, BOS, EOS, MASK
    
    # Сохраняем
    # vocab_df.write_parquet(VOCAB_PATH)
    print("✅ Vocab built")

def step_2_sharding_events():
    """
    Читает события по дням, мапит их в токены и раскидывает по шардам (бакетам) юзеров.
    """
    print("🔄 Sharding Events...")
    
    # Пример списка файлов (в реальности используйте glob)
    event_files = sorted(list((RAW_DATA_PATH / "marketplace/events").glob("*.pq")))
    
    for file_path in event_files:
        print(f"Processing {file_path.name}...")
        
        # 1. Читаем файл (он небольшой, поместится в память)
        df = pl.read_parquet(file_path)
        
        # 2. Трансформация (Vectorized)
        # Нужно заджойнить с vocab (который уже в памяти) чтобы получить token_id
        # Оставить только user_id, timestamp, token_id
        
        # 3. Вычисляем шард
        # N_SHARDS = 50. Если датасет огромный, файлов будет 50.
        df = df.with_columns(
            (pl.col("user_id").hash() % 50).alias("shard_id")
        )
        
        # 4. Partitioned Write
        # Polars умеет писать партиции, но для надежности часто пишут в цикле
        # или используют Hive-style partitioning write_parquet(..., partition_by="shard_id")
        df.write_parquet(
            TEMP_SHARDS_PATH,
            use_pyarrow=True,
            pyarrow_options={"partition_cols": ["shard_id"], "existing_data_behavior": "overwrite_or_ignore"} 
            # Внимание: Polars overwrite пока не умеет append в parquet легко.
            # Для "дописывания" в parquet эффективнее всего писать маленькие файлы:
            # shards/shard_id=1/part_day_01307.parquet
        )

def step_3_compile_sequences():
    """
    Читает каждый шард (историю куска юзеров), сортирует и собирает списки.
    """
    print("📦 Compiling User Sequences...")
    
    # Итерируемся по папкам шардов
    shard_folders = sorted(list(TEMP_SHARDS_PATH.glob("shard_id=*")))
    
    for shard_dir in shard_folders:
        # Читаем все кусочки истории для этого набора юзеров
        q_shard = pl.scan_parquet(shard_dir / "*.parquet")
        
        user_histories = (
            q_shard
            .sort("timestamp") # Важно! Сортировка по времени
            .group_by("user_id")
            .agg([
                pl.col("token_id").alias("history") # Собираем список
            ])
            .collect()
        )
        
        # Сохраняем готовый кусок для обучения
        output_name = f"train_{shard_dir.name}.parquet"
        user_histories.write_parquet(PROCESSED_PATH / output_name)

if __name__ == "__main__":
    # step_1_build_vocab()
    # step_2_sharding_events()
    pass

In [17]:
import polars as pl
import os
from pathlib import Path
from IPython.display import display, HTML

# Настройка отображения Polars для лучшей читаемости
pl.Config.set_tbl_rows(10)  # Показывать до 10 строк
pl.Config.set_tbl_cols(20)  # Показывать до 20 колонок
pl.Config.set_fmt_str_lengths(50)  # Длина строк до 50 символов

# Базовый путь к данным
base_path = Path("t_ecd_small/dataset")

def analyze_all_files():
    """Анализируем структуру всех файлов в датасете"""
    
    # Автоматически находим первый и последний день для каждого домена
    def find_day_range(domain_dir):
        """Находит минимальный и максимальный день в домене"""
        if domain_dir.name == "reviews":
            files = list(domain_dir.glob("*.pq"))
        else:
            files = list((domain_dir / "events").glob("*.pq"))
        
        if not files:
            return None, None
        
        days = sorted([int(f.stem) for f in files])
        return days[0], days[-1]
    
    # Список всех файлов для анализа
    files_to_analyze = []
    
    # Статические файлы
    files_to_analyze.extend([
        ("brands.pq", "brands.pq", "Статические данные"),
        ("users.pq", "users.pq", "Статические данные"),
    ])
    
    # Items файлы
    for domain in ["marketplace", "retail", "offers"]:
        files_to_analyze.append(
            (f"{domain}/items.pq", f"{domain}/items.pq", "Items каталоги")
        )
    
    # События: находим первый и последний день автоматически
    for domain in ["marketplace", "retail", "offers"]:
        domain_dir = base_path / domain
        first_day, last_day = find_day_range(domain_dir)
        if first_day and last_day:
            first_str = str(first_day).zfill(5)
            last_str = str(last_day).zfill(5)
            files_to_analyze.extend([
                (f"{domain}/events/{first_str}.pq", f"{domain} events (первый день {first_day})", "События"),
                (f"{domain}/events/{last_str}.pq", f"{domain} events (последний день {last_day})", "События"),
            ])
    
    # Reviews: находим первый и последний день
    reviews_dir = base_path / "reviews"
    first_day, last_day = find_day_range(reviews_dir)
    if first_day and last_day:
        first_str = str(first_day).zfill(5)
        last_str = str(last_day).zfill(5)
        files_to_analyze.extend([
            (f"reviews/{first_str}.pq", f"reviews (первый день {first_day})", "Reviews"),
            (f"reviews/{last_str}.pq", f"reviews (последний день {last_day})", "Reviews"),
        ])
    
    results = {}
    
    # Группируем по категориям для красивого вывода
    categories = {}
    for file_path, description, category in files_to_analyze:
        if category not in categories:
            categories[category] = []
        categories[category].append((file_path, description))
    
    # Анализируем файлы по категориям
    for category, file_list in categories.items():
        print(f"\n{'='*80}")
        print(f"📁 {category.upper()}")
        print(f"{'='*80}\n")
        
        for file_path, description in file_list:
            full_path = base_path / file_path
            
            # Проверяем существование файла
            if not full_path.exists():
                print(f"❌ {description}: файл не найден")
                results[description] = {"error": "File not found", "category": category}
                continue
                
            try:
                # Получаем размер файла
                size_mb = full_path.stat().st_size / (1024 * 1024)
                size_gb = size_mb / 1024
                
                # Используем LazyFrame для безопасного анализа
                lazy_df = pl.scan_parquet(str(full_path))
                schema = lazy_df.collect_schema()
                
                # Получаем количество строк
                count = lazy_df.select(pl.len()).collect().item()
                
                # Получаем первые 5 строк
                sample_df = lazy_df.limit(5).collect()
                
                # Анализ уникальных значений для ключевых колонок
                unique_counts = {}
                key_columns = ['user_id', 'item_id', 'brand_id', 'action_type', 'timestamp']
                for col in key_columns:
                    if col in schema:
                        unique_counts[col] = lazy_df.select(pl.col(col).n_unique()).collect().item()
                
                # Сохраняем результаты
                results[description] = {
                    "schema": {k: str(v) for k, v in schema.items()},
                    "row_count": count,
                    "file_size_mb": size_mb,
                    "file_size_gb": size_gb,
                    "sample_data": sample_df,
                    "unique_counts": unique_counts,
                    "category": category,
                    "columns": list(schema.keys())
                }
                
                # Красивый вывод информации о файле
                print(f"📄 {description}")
                print(f"   📏 Размер: {size_mb:.1f} MB ({size_gb:.3f} GB)")
                print(f"   📊 Строк: {count:,}")
                print(f"   📋 Колонки ({len(schema)}): {', '.join(schema.keys())}")
                
                if unique_counts:
                    print(f"   🔑 Уникальные значения:")
                    for col, count in unique_counts.items():
                        print(f"      • {col}: {count:,}")
                
                print()
                
            except Exception as e:
                print(f"❌ {description}: ошибка - {e}")
                results[description] = {"error": str(e), "category": category}
    
    return results

def create_summary_table(results):
    """Создаем сводную таблицу по всем файлам с красивым форматированием"""
    
    print(f"\n{'='*80}")
    print(f"📈 СВОДНАЯ ТАБЛИЦА ПО ВСЕМ ФАЙЛАМ")
    print(f"{'='*80}\n")
    
    summary_data = []
    
    for file_desc, data in results.items():
        if "error" in data:
            summary_data.append({
                "Файл": file_desc,
                "Категория": data.get("category", "Unknown"),
                "Строк": "❌ ERROR",
                "Размер (MB)": "❌ ERROR", 
                "Колонок": "❌ ERROR",
                "Колонки": "❌ ERROR"
            })
        else:
            summary_data.append({
                "Файл": file_desc,
                "Категория": data.get("category", "Unknown"),
                "Строк": f"{data['row_count']:,}",
                "Размер (MB)": f"{data['file_size_mb']:.1f}",
                "Колонок": len(data['columns']),
                "Колонки": ", ".join(data['columns'][:5]) + ("..." if len(data['columns']) > 5 else "")
            })
    
    # Создаем DataFrame для красивого вывода
    summary_df = pl.DataFrame(summary_data)
    
    # Выводим с правильным форматированием
    display(summary_df)
    
    return summary_df

def create_schema_comparison_table(results):
    """Создаем таблицу сравнения схем данных"""
    
    print(f"\n{'='*80}")
    print(f"📋 СРАВНЕНИЕ СХЕМ ДАННЫХ")
    print(f"{'='*80}\n")
    
    # Собираем все уникальные колонки
    all_columns = set()
    for data in results.values():
        if "columns" in data:
            all_columns.update(data["columns"])
    
    # Создаем таблицу: колонка -> в каких файлах присутствует
    schema_data = []
    for col in sorted(all_columns):
        files_with_col = []
        for file_desc, data in results.items():
            if "columns" in data and col in data["columns"]:
                files_with_col.append(file_desc)
        
        schema_data.append({
            "Колонка": col,
            "Количество файлов": len(files_with_col),
            "Файлы": ", ".join(files_with_col[:3]) + ("..." if len(files_with_col) > 3 else "")
        })
    
    schema_df = pl.DataFrame(schema_data)
    display(schema_df)
    
    return schema_df

def create_sample_data_tables(results):
    """Выводит примеры данных из каждого файла в читаемом виде"""
    
    print(f"\n{'='*80}")
    print(f"👀 ПРИМЕРЫ ДАННЫХ")
    print(f"{'='*80}\n")
    
    for file_desc, data in results.items():
        if "error" in data or "sample_data" not in data:
            continue
        
        print(f"\n📄 {file_desc}")
        print(f"{'─'*80}")
        sample_df = data["sample_data"]
        
        # Выводим таблицу с правильным форматированием
        display(sample_df)
        print()

# Запускаем анализ всех файлов
print("🚀 ЗАПУСКАЕМ ПОЛНЫЙ АНАЛИЗ ДАТАСЕТА...")
all_results = analyze_all_files()

# Выводим сводную таблицу
summary_df = create_summary_table(all_results)

# Выводим сравнение схем
schema_df = create_schema_comparison_table(all_results)

# Выводим примеры данных
create_sample_data_tables(all_results)

# Дополнительный анализ: общие ключи между файлами
print(f"\n{'='*80}")
print(f"🔗 АНАЛИЗ ОБЩИХ КЛЮЧЕВЫХ ПОЛЕЙ")
print(f"{'='*80}\n")

# Собираем все схемы
all_schemas = {}
for file_desc, data in all_results.items():
    if "columns" in data:
        all_schemas[file_desc] = set(data["columns"])

# Находим общие колонки
if all_schemas:
    common_columns = set.intersection(*[schema for schema in all_schemas.values() if schema])
    if common_columns:
        print("📌 Колонки, присутствующие во ВСЕХ файлах:")
        for col in sorted(common_columns):
            print(f"   ✅ {col}")
    else:
        print("⚠️  Нет колонок, присутствующих во всех файлах")
    
    print("\n📌 Ключевые колонки для маппинга:")
    key_mapping_cols = ['user_id', 'item_id', 'brand_id']
    for col in key_mapping_cols:
        files_with_col = [desc for desc, schema in all_schemas.items() if col in schema]
        if files_with_col:
            print(f"   🔑 {col}: присутствует в {len(files_with_col)} файлах")
            print(f"      Файлы: {', '.join(files_with_col[:5])}{'...' if len(files_with_col) > 5 else ''}")

print(f"\n{'='*80}")
print(f"🎉 АНАЛИЗ ЗАВЕРШЕН! Теперь у нас есть полное понимание структуры данных.")
print(f"{'='*80}")

🚀 ЗАПУСКАЕМ ПОЛНЫЙ АНАЛИЗ ДАТАСЕТА...

📁 СТАТИЧЕСКИЕ ДАННЫЕ

📄 brands.pq
   📏 Размер: 7.0 MB (0.007 GB)
   📊 Строк: 24,513
   📋 Колонки (2): brand_id, embedding
   🔑 Уникальные значения:
      • brand_id: 24,467

📄 users.pq
   📏 Размер: 18.6 MB (0.018 GB)
   📊 Строк: 3,500,000
   📋 Колонки (3): user_id, socdem_cluster, region
   🔑 Уникальные значения:
      • user_id: 3,500,000


📁 ITEMS КАТАЛОГИ

📄 marketplace/items.pq
   📏 Размер: 2480.5 MB (2.422 GB)
   📊 Строк: 2,325,409
   📋 Колонки (6): item_id, brand_id, category, subcategory, price, embedding
   🔑 Уникальные значения:
      • item_id: 2,325,409
      • brand_id: 231

📄 retail/items.pq
   📏 Размер: 267.4 MB (0.261 GB)
   📊 Строк: 250,171
   📋 Колонки (6): item_id, brand_id, category, subcategory, price, embedding
   🔑 Уникальные значения:
      • item_id: 250,171
      • brand_id: 8

📄 offers/items.pq
   📏 Размер: 22.0 MB (0.021 GB)
   📊 Строк: 22,368
   📋 Колонки (3): item_id, brand_id, embedding
   🔑 Уникальные значения:
     

Файл,Категория,Строк,Размер (MB),Колонок,Колонки
str,str,str,str,i64,str
"""brands.pq""","""Статические данные""","""24,513""","""7.0""",2,"""brand_id, embedding"""
"""users.pq""","""Статические данные""","""3,500,000""","""18.6""",3,"""user_id, socdem_cluster, region"""
"""marketplace/items.pq""","""Items каталоги""","""2,325,409""","""2480.5""",6,"""item_id, brand_id, category, subcategory, price..."""
"""retail/items.pq""","""Items каталоги""","""250,171""","""267.4""",6,"""item_id, brand_id, category, subcategory, price..."""
"""offers/items.pq""","""Items каталоги""","""22,368""","""22.0""",3,"""item_id, brand_id, embedding"""
…,…,…,…,…,…
"""retail events (последний день 1308)""","""События""","""1,238,054""","""8.0""",6,"""timestamp, user_id, item_id, subdomain, action_typ…"
"""offers events (первый день 1082)""","""События""","""2,572,392""","""17.1""",4,"""timestamp, user_id, item_id, action_type"""
"""offers events (последний день 1308)""","""События""","""3,383,483""","""22.4""",4,"""timestamp, user_id, item_id, action_type"""
"""reviews (первый день 1082)""","""Reviews""","""2,439""","""0.5""",5,"""timestamp, user_id, brand_id, rating, embedding"""



📋 СРАВНЕНИЕ СХЕМ ДАННЫХ



Колонка,Количество файлов,Файлы
str,i64,str
"""action_type""",6,"""marketplace events (первый день 1082), marketplace…"
"""brand_id""",6,"""brands.pq, marketplace/items.pq, retail/items.pq..…"
"""category""",2,"""marketplace/items.pq, retail/items.pq"""
"""embedding""",6,"""brands.pq, marketplace/items.pq, retail/items.pq..…"
"""item_id""",9,"""marketplace/items.pq, retail/items.pq, offers/item…"
…,…,…
"""socdem_cluster""",1,"""users.pq"""
"""subcategory""",2,"""marketplace/items.pq, retail/items.pq"""
"""subdomain""",4,"""marketplace events (первый день 1082), marketplace…"
"""timestamp""",8,"""marketplace events (первый день 1082), marketplace…"



👀 ПРИМЕРЫ ДАННЫХ


📄 brands.pq
────────────────────────────────────────────────────────────────────────────────


brand_id,embedding
u64,"array[f32, 300]"
4,
34,
45,
46,
51,




📄 users.pq
────────────────────────────────────────────────────────────────────────────────


user_id,socdem_cluster,region
u64,u8,u8
77309558,21,2
72517894,10,90
86699708,9,9
54241043,17,58
23591057,17,4




📄 marketplace/items.pq
────────────────────────────────────────────────────────────────────────────────


item_id,brand_id,category,subcategory,price,embedding
str,u64,str,str,f64,"array[f32, 300]"
"""nfmcg_10000001""",137356,,,6.06364,"[-0.070853, 0.03246, … 0.082178]"
"""nfmcg_10000012""",53389,,,0.960436,"[-0.091099, 0.043168, … 0.060287]"
"""nfmcg_1000002""",34153,"""Fashion Accessories, Tech Add-ons, and Style Enhan…","""Hats, Scarves, and Shawls""",-1.793113,"[-0.056533, 0.082878, … 0.037475]"
"""nfmcg_10000034""",39543,,,3.416755,"[-0.112588, 0.043333, … -0.001615]"
"""nfmcg_10000039""",82320,"""Fashion Accessories, Tech Add-ons, and Style Enhan…","""Jewelry and Costume Jewelry""",4.293433,"[-0.157201, 0.026234, … 0.013904]"




📄 retail/items.pq
────────────────────────────────────────────────────────────────────────────────


item_id,brand_id,category,subcategory,price,embedding
str,u64,str,str,f64,"array[f32, 300]"
"""fmcg_10""",65693,"""Office Supplies and Educational Materials""","""Stationery Paper""",-4.406018,"[-0.135573, 0.049446, … 0.023161]"
"""fmcg_10000""",146468,"""Cleaning Supplies and Everyday Household Items""","""Cleaning and Detergent Products""",-4.205185,"[-0.043275, -0.035317, … 0.068158]"
"""fmcg_1000006""",37799,,,-4.46366,"[-0.083411, 0.049153, … 0.013355]"
"""fmcg_1000008""",240838,"""Children's Products and Childcare Items""","""Food Products""",-5.138377,"[-0.055184, 0.023423, … 0.060593]"
"""fmcg_1000017""",240838,"""Foodstuffs and Beverages""","""Sweet Desserts and Confectionery""",-3.980383,"[-0.052521, -0.00639, … -0.012626]"




📄 offers/items.pq
────────────────────────────────────────────────────────────────────────────────


item_id,brand_id,embedding
str,u64,"array[f32, 312]"
"""offer_1000""",25974,"[0.012451, -0.006554, … -0.104263]"
"""offer_10001""",233535,"[0.084303, 0.004278, … -0.109984]"
"""offer_10010""",237081,"[0.02631, 0.07163, … -0.004321]"
"""offer_10011""",139056,"[0.044292, 0.028968, … -0.112416]"
"""offer_10019""",11463,"[0.073994, 0.055167, … -0.103118]"




📄 marketplace events (первый день 1082)
────────────────────────────────────────────────────────────────────────────────


timestamp,user_id,item_id,subdomain,action_type,os
duration[μs],u64,str,str,str,str
1082d 89901µs,59328774,"""nfmcg_14777696""","""u2i""","""view""","""android"""
1082d 163483µs,66295302,"""nfmcg_26098539""","""catalog""","""view""","""android"""
1082d 864625µs,37542104,"""nfmcg_10840247""","""u2i""","""view""","""android"""
1082d 889192µs,35193548,"""nfmcg_8040572""","""catalog""","""view""","""android"""
1082d 936008µs,27256137,"""nfmcg_6770239""","""catalog""","""view""","""android"""




📄 marketplace events (последний день 1308)
────────────────────────────────────────────────────────────────────────────────


timestamp,user_id,item_id,subdomain,action_type,os
duration[μs],u64,str,str,str,str
1308d 183108µs,73774100,"""nfmcg_18200186""","""other""","""view""","""android"""
1308d 333142µs,38161973,"""nfmcg_14129567""","""other""","""view""","""android"""
1308d 570986µs,52431976,"""nfmcg_4396804""","""search""","""view""","""android"""
1308d 656523µs,75059116,"""nfmcg_27122678""","""i2i""","""view""","""other"""
1308d 923703µs,10628666,"""nfmcg_6136132""","""other""","""view""","""android"""




📄 retail events (первый день 1082)
────────────────────────────────────────────────────────────────────────────────


timestamp,user_id,item_id,subdomain,action_type,os
duration[μs],u64,str,str,str,str
1082d 326593µs,65070494,"""fmcg_607128""","""catalog""","""view""","""android"""
1082d 334152µs,25095614,"""fmcg_40462""","""catalog""","""view""","""android"""
1082d 370722µs,38969384,"""fmcg_327866""","""catalog""","""view""","""android"""
1082d 613473µs,2266567,"""fmcg_965886""","""search""","""view""","""android"""
1082d 1s 152587µs,25095614,"""fmcg_576561""","""catalog""","""view""","""android"""




📄 retail events (последний день 1308)
────────────────────────────────────────────────────────────────────────────────


timestamp,user_id,item_id,subdomain,action_type,os
duration[μs],u64,str,str,str,str
1308d 6078µs,8923243,"""fmcg_208146""","""catalog""","""view""","""android"""
1308d 32272µs,12124655,"""fmcg_1034096""","""catalog""","""view""","""android"""
1308d 77336µs,20658160,"""fmcg_112697""","""catalog""","""view""","""android"""
1308d 112253µs,85296647,"""fmcg_618508""","""catalog""","""view""","""android"""
1308d 250868µs,85296647,"""fmcg_509248""","""catalog""","""view""","""android"""




📄 offers events (первый день 1082)
────────────────────────────────────────────────────────────────────────────────


timestamp,user_id,item_id,action_type
duration[μs],u64,str,str
1082d 126754µs,20110833,"""offer_83661""","""seen"""
1082d 143308µs,36541803,"""offer_81670""","""seen"""
1082d 157553µs,34286733,"""offer_42637""","""seen"""
1082d 160743µs,70030166,"""offer_787""","""seen"""
1082d 165194µs,40343780,"""offer_32005""","""seen"""




📄 offers events (последний день 1308)
────────────────────────────────────────────────────────────────────────────────


timestamp,user_id,item_id,action_type
duration[μs],u64,str,str
1308d 4073µs,19604709,"""offer_36276""","""seen"""
1308d 7900µs,26619747,"""offer_14844""","""seen"""
1308d 18551µs,55695634,"""offer_68845""","""seen"""
1308d 36397µs,17967513,"""offer_60889""","""seen"""
1308d 78679µs,1903041,"""offer_39233""","""seen"""




📄 reviews (первый день 1082)
────────────────────────────────────────────────────────────────────────────────


timestamp,user_id,brand_id,rating,embedding
duration[μs],u64,u64,u8,"array[f32, 312]"
1082d 1m 5s 711723µs,25741061,141226,5,
1082d 1m 12s 26501µs,71011848,65693,3,
1082d 2m 15s 540704µs,26101012,72285,5,
1082d 2m 25s 126055µs,67977146,184380,3,
1082d 2m 36s 993924µs,4055428,216938,5,




📄 reviews (последний день 1308)
────────────────────────────────────────────────────────────────────────────────


timestamp,user_id,brand_id,rating,embedding
duration[μs],u64,u64,u8,"array[f32, 312]"
1308d 44s 88783µs,84382219,90322,5,
1308d 3m 21s 983860µs,36202686,242134,3,"[0.077505, 0.000087, … -0.089244]"
1308d 4m 1s 141813µs,61622957,183768,1,
1308d 4m 15s 910776µs,82539268,111192,5,
1308d 4m 47s 791092µs,31206673,23634,5,




🔗 АНАЛИЗ ОБЩИХ КЛЮЧЕВЫХ ПОЛЕЙ

⚠️  Нет колонок, присутствующих во всех файлах

📌 Ключевые колонки для маппинга:
   🔑 user_id: присутствует в 9 файлах
      Файлы: users.pq, marketplace events (первый день 1082), marketplace events (последний день 1308), retail events (первый день 1082), retail events (последний день 1308)...
   🔑 item_id: присутствует в 9 файлах
      Файлы: marketplace/items.pq, retail/items.pq, offers/items.pq, marketplace events (первый день 1082), marketplace events (последний день 1308)...
   🔑 brand_id: присутствует в 6 файлах
      Файлы: brands.pq, marketplace/items.pq, retail/items.pq, offers/items.pq, reviews (первый день 1082)...

🎉 АНАЛИЗ ЗАВЕРШЕН! Теперь у нас есть полное понимание структуры данных.


In [9]:
import polars as pl
import json
from pathlib import Path
from typing import List, Dict, Union

class RecSysTokenizer:
    """
    Управляет маппингом Raw ID <-> Token ID.
    Поддерживает спец-токены для Seq2Seq.
    """
    PAD_TOKEN = "[PAD]"
    BOS_TOKEN = "[BOS]"
    EOS_TOKEN = "[EOS]"
    MASK_TOKEN = "[MASK]"
    UNK_TOKEN = "[UNK]" # Для новых товаров, которых не было в трейне

    def __init__(self):
        self.token_to_id: Dict[str, int] = {}
        self.id_to_token: Dict[int, str] = {}
        self.vocab_size = 0
        
        # Инициализируем спец-токенами
        specials = [self.PAD_TOKEN, self.BOS_TOKEN, self.EOS_TOKEN, self.MASK_TOKEN, self.UNK_TOKEN]
        for i, token in enumerate(specials):
            self.token_to_id[token] = i
            self.id_to_token[i] = token
        self.vocab_size = len(specials)

    def load_vocab(self, path: Union[str, Path]):
        """Загружает словарь из Parquet файла (token_id, raw_token)"""
        df = pl.read_parquet(path)
        
        # Ожидаем колонки: ['token_str', 'token_id']
        # Конвертируем в dict для быстрого доступа (O(1))
        vocab_dict = dict(zip(df["token_str"], df["token_id"]))
        
        # Мержим с текущими (спец-токены могут быть перезаписаны, если они есть в файле, 
        # но лучше держать логику спец-токенов внутри класса)
        self.token_to_id.update(vocab_dict)
        self.id_to_token = {v: k for k, v in self.token_to_id.items()}
        self.vocab_size = len(self.token_to_id)
        print(f"✅ Tokenizer loaded. Vocab size: {self.vocab_size}")

    def encode(self, domain: str, raw_id: str) -> int:
        """
        Превращает (MP, nfmcg_0) -> 435
        Если не найдено -> возвращает UNK_TOKEN ID
        """
        token_str = f"{domain}_{raw_id}"
        return self.token_to_id.get(token_str, self.token_to_id[self.UNK_TOKEN])

    def encode_sequence(self, history: List[str]) -> List[int]:
        """
        Принимает список строк вида 'MP_nfmcg_0' или тюплов?
        Для простоты предположим, что входные данные уже имеют префикс.
        """
        return [self.token_to_id.get(x, self.token_to_id[self.UNK_TOKEN]) for x in history]

    def decode(self, token_ids: List[int]) -> List[str]:
        return [self.id_to_token.get(tid, self.UNK_TOKEN) for tid in token_ids]
    
    @property
    def pad_token_id(self): return self.token_to_id[self.PAD_TOKEN]
    
    @property
    def bos_token_id(self): return self.token_to_id[self.BOS_TOKEN]
    
    @property
    def eos_token_id(self): return self.token_to_id[self.EOS_TOKEN]

In [10]:
import polars as pl
from pathlib import Path
import sys

# Добавляем корень проекта в путь, чтобы видеть модули, если нужно
sys.path.append(".") 

RAW_DIR = Path("t_ecd_small/dataset")
OUTPUT_DIR = Path("t_ecd_small/dataset/processed")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

def build_vocab():
    print("🏗  Starting Vocabulary Build...")

    # 1. MARKETPLACE ITEMS (String ID)
    # Читаем только item_id, лениво!
    print("... Scanning Marketplace")
    q_mp = (
        pl.scan_parquet(RAW_DIR / "marketplace/items.pq")
        .select(pl.col("item_id"))
        .select(
            pl.format("MP_{}", pl.col("item_id")).alias("token_str")
        )
    )

    # 2. RETAIL ITEMS (String ID)
    print("... Scanning Retail")
    q_rt = (
        pl.scan_parquet(RAW_DIR / "retail/items.pq")
        .select(pl.col("item_id"))
        .select(
            pl.format("RT_{}", pl.col("item_id")).alias("token_str")
        )
    )

    # 3. OFFERS ITEMS (String ID)
    print("... Scanning Offers")
    q_of = (
        pl.scan_parquet(RAW_DIR / "offers/items.pq")
        .select(pl.col("item_id"))
        .select(
            pl.format("OF_{}", pl.col("item_id")).alias("token_str")
        )
    )

    # 4. BRANDS (UInt64 ID -> String)
    # Бренды есть во всех доменах, но сущность одна.
    print("... Scanning Brands")
    q_br = (
        pl.scan_parquet(RAW_DIR / "brands.pq")
        .select(pl.col("brand_id"))
        .select(
            pl.format("BR_{}", pl.col("brand_id").cast(pl.Utf8)).alias("token_str")
        )
    )

    # 5. Объединение и дедупликация
    print("⚡ Collecting and merging unique tokens...")
    # concat - ленивый, collect выполнит реальное чтение и объединение
    all_tokens = pl.concat([q_mp, q_rt, q_of, q_br]).unique().sort("token_str")
    
    # 6. Добавляем индексы
    # Резервируем 0-4 под спец токены ([PAD], [BOS], [EOS], [MASK], [UNK])
    SPECIALS_OFFSET = 5 
    
    vocab_df = (
        all_tokens
        .collect() # Trigger computation
        .with_row_index("idx", offset=SPECIALS_OFFSET)
        .select([
            pl.col("idx").alias("token_id").cast(pl.UInt32),
            pl.col("token_str")
        ])
    )
    
    # Добавим спецтокены в начало датафрейма для полноты картины (опционально, но полезно для дебага)
    specials_df = pl.DataFrame({
        "token_id": [0, 1, 2, 3, 4],
        "token_str": ["[PAD]", "[BOS]", "[EOS]", "[MASK]", "[UNK]"]
    }).select([pl.col("token_id").cast(pl.UInt32), pl.col("token_str")])
    
    final_vocab = pl.concat([specials_df, vocab_df])

    print(f"✅ Vocab built! Total size: {final_vocab.height}")
    print(f"Examples:\n{final_vocab.sample(5)}")

    output_path = OUTPUT_DIR / "vocab.parquet"
    final_vocab.write_parquet(output_path)
    print(f"💾 Saved to {output_path}")

if __name__ == "__main__":
    build_vocab()

🏗  Starting Vocabulary Build...
... Scanning Marketplace
... Scanning Retail
... Scanning Offers
... Scanning Brands
⚡ Collecting and merging unique tokens...
✅ Vocab built! Total size: 2622420
Examples:
shape: (5, 2)
┌──────────┬───────────────────┐
│ token_id ┆ token_str         │
│ ---      ┆ ---               │
│ u32      ┆ str               │
╞══════════╪═══════════════════╡
│ 2341717  ┆ MP_nfmcg_9908823  │
│ 943913   ┆ MP_nfmcg_20187247 │
│ 1324783  ┆ MP_nfmcg_2440717  │
│ 1097373  ┆ MP_nfmcg_21889621 │
│ 1657231  ┆ MP_nfmcg_28073300 │
└──────────┴───────────────────┘
💾 Saved to t_ecd_small\dataset\processed\vocab.parquet


In [11]:
import polars as pl
import gc
import logging
from pathlib import Path
from datetime import timedelta


logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
logger = logging.getLogger(__name__)

 
RAW_DIR = Path("t_ecd_small/dataset")
PROCESSED_DIR = Path("t_ecd_small/dataset/processed")
VOCAB_PATH = PROCESSED_DIR / "vocab.parquet"
OUTPUT_DIR = PROCESSED_DIR / "shards"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)


NUM_SHARDS = 50 


TEST_DAYS_CUTOFF = 2 

def load_vocab_lazy():
    """
    Загружаем словарь как LazyFrame для эффективного join.
    Для больших словарей (30M+) это эффективнее чем dict.
    """
    logger.info(" Loading Vocabulary...")
    vocab_lazy = pl.scan_parquet(VOCAB_PATH)
    return vocab_lazy

def get_domain_plan(domain_folder: Path, domain_prefix: str, vocab_lazy):
    """
    Создает план чтения для конкретного домена.
    Превращает item_id -> token_id через join с vocab.
    """
    # Для reviews файлы лежат прямо в папке, без подпапки events
    if "reviews" in str(domain_folder):
        file_paths = list(domain_folder.glob("*.pq"))
    else:
        file_paths = list((domain_folder / "events").glob("*.pq"))
    
    if not file_paths:
        return None

    # Сканируем все файлы и объединяем
    lazy_frames = [pl.scan_parquet(str(p)) for p in file_paths]
    q = pl.concat(lazy_frames)

    # Определяем колонку с ID сущности
    entity_col = "brand_id" if "reviews" in str(domain_folder) else "item_id"
    
    # Проверяем наличие колонки
    schema = q.collect_schema()
    if entity_col not in schema:
        logger.warning(f"Column {entity_col} not found in {domain_folder}, skipping...")
        return None

    # Создаем token_key для join
    q = q.select([
        pl.col("user_id"),
        pl.col("timestamp"),
        (pl.lit(domain_prefix) + pl.col(entity_col).cast(pl.Utf8)).alias("token_str")
    ])

    # Join с vocab для получения token_id
    # Используем left join, чтобы не потерять данные, если токена нет в словаре
    q = q.join(
        vocab_lazy.select(["token_str", "token_id"]),
        on="token_str",
        how="left"
    ).with_columns(
        # Заменяем null на 4 (UNK token)
        pl.col("token_id").fill_null(4).cast(pl.UInt32)
    ).select([
        pl.col("user_id"),
        pl.col("timestamp"),
        pl.col("token_id")
    ])

    return q

def process_shards():
    """
    Главный цикл обработки.
    """
    
    vocab_lazy = load_vocab_lazy()
    

    domains = {
        RAW_DIR / "marketplace": "MP_",
        RAW_DIR / "retail": "RT_",
        RAW_DIR / "offers": "OF_",
        RAW_DIR / "reviews": "BR_" 
    }

    for shard_id in range(NUM_SHARDS):
        logger.info(f" Processing Shard {shard_id + 1}/{NUM_SHARDS}...")

        plans = []
        for domain_path, prefix in domains.items():
            lazy_df = get_domain_plan(domain_path, prefix, vocab_lazy)
            
            if lazy_df is not None:
                sharded_df = lazy_df.filter(
                    (pl.col("user_id").hash() % NUM_SHARDS) == shard_id
                )
                plans.append(sharded_df)

        if not plans:
            continue

        combined_lazy = pl.concat(plans)

        df_shard = combined_lazy.collect()

        if df_shard.height == 0:
            continue

        df_shard = df_shard.sort(["user_id", "timestamp"])

        sequences = df_shard.group_by("user_id").agg([
            pl.col("token_id").alias("sequence"),
            pl.col("timestamp").alias("timestamps") 
        ])

        max_time = df_shard["timestamp"].max()
        cutoff_time = max_time - timedelta(days=TEST_DAYS_CUTOFF)

        
        output_file = OUTPUT_DIR / f"shard_{shard_id}.parquet"
        sequences.write_parquet(output_file)
        
        logger.info(f" Saved shard {shard_id} to {output_file} (Users: {sequences.height})")

        del df_shard
        del sequences
        gc.collect()

if __name__ == "__main__":
    process_shards()

2025-11-26 20:42:17,266 -  Loading Vocabulary...
2025-11-26 20:42:17,266 -  Processing Shard 1/50...
2025-11-26 20:42:25,601 -  Saved shard 0 to t_ecd_small\dataset\processed\shards\shard_0.parquet (Users: 49436)
2025-11-26 20:42:25,625 -  Processing Shard 2/50...
2025-11-26 20:42:29,696 -  Saved shard 1 to t_ecd_small\dataset\processed\shards\shard_1.parquet (Users: 49741)
2025-11-26 20:42:29,714 -  Processing Shard 3/50...
2025-11-26 20:42:33,714 -  Saved shard 2 to t_ecd_small\dataset\processed\shards\shard_2.parquet (Users: 49379)
2025-11-26 20:42:33,733 -  Processing Shard 4/50...
2025-11-26 20:42:37,767 -  Saved shard 3 to t_ecd_small\dataset\processed\shards\shard_3.parquet (Users: 50150)
2025-11-26 20:42:37,787 -  Processing Shard 5/50...
2025-11-26 20:42:41,958 -  Saved shard 4 to t_ecd_small\dataset\processed\shards\shard_4.parquet (Users: 49593)
2025-11-26 20:42:41,975 -  Processing Shard 6/50...
2025-11-26 20:42:46,235 -  Saved shard 5 to t_ecd_small\dataset\processed\shard