In [3]:
import dask.dataframe as dd
from pathlib import Path
import dask
from dask.distributed import Client

In [4]:
# --- Конфигурация путей ---
DATA_DIR_PUBCH = Path("../data/pubchem_compounds.parquet.dask")
DATA_DIR_DRUGS = Path("../data/drugspacex.parquet.dask")


print("config:")
print(f"  pubchem Parquet: \t{DATA_DIR_PUBCH.name}")
print(f"  drugspacex Parquet: \t{DATA_DIR_DRUGS.name}")

config:
  pubchem Parquet: 	pubchem_compounds.parquet.dask
  drugspacex Parquet: 	drugspacex.parquet.dask


In [5]:
# Загружаем датафреймы
ddf_pubch = dd.read_parquet(DATA_DIR_PUBCH)
ddf_drugs = dd.read_parquet(DATA_DIR_DRUGS)

# Вычисляем и выводим количество строк
len_pubch, len_drugs = dask.compute(len(ddf_pubch), len(ddf_drugs))
# или
# len_pubch = ddf_pubch.shape[0].compute()
# len_drugs = ddf_drugs.shape[0].compute()


print(f"Количество строк в PubChem: {len_pubch}")
print(f"Количество строк в DrugsSpaceX: {len_drugs}")

Количество строк в PubChem: 121458159
Количество строк в DrugsSpaceX: 100946534


In [6]:
# Предположим, нас интересует колонка 'smiles' в PubChem и 'smiles' в DrugsSpaceX
unique_pubch = ddf_pubch['smiles'].nunique().compute()
unique_drugs = ddf_drugs['smiles'].nunique().compute()

print(f"Количество уникальных 'smiles' в PubChem: {unique_pubch}")
print(f"Количество уникальных 'smiles' в DrugsSpaceX: {unique_drugs}")

Количество уникальных 'smiles' в PubChem: 121401187
Количество уникальных 'smiles' в DrugsSpaceX: 100946534


In [7]:
len_pubch - unique_pubch

56972

In [8]:
len_drugs - unique_drugs

0

In [11]:
ddf_pubch.head()

Unnamed: 0,cid,smiles
0,1,CC(=O)OC(CC(=O)[O-])C[N+](C)(C)C
1,2,CC(=O)OC(CC(=O)O)C[N+](C)(C)C
2,3,C1=CC(C(C(=C1)C(=O)O)O)O
3,4,CC(CN)O
4,5,C(C(=O)COP(=O)(O)O)N


In [12]:
ddf_drugs.head()

Unnamed: 0,smiles,de_id
0,CC[C@H](C)[C@H](NC(=O)[C@H](CCC(=O)O)NC(=O)[C@...,DE1
1,CC(C)C[C@H](NC(=O)[C@@H](COC(C)(C)C)NC(=O)[C@H...,DE2
2,CC(C)C[C@@H](NC(=O)CNC(=O)[C@@H](NC=O)C(C)C)C(...,DE3
3,N=C(N)NCCC[C@H](NC(=O)[C@@H]1CCCN1C(=O)[C@@H]1...,DE4
4,CC(=O)N[C@H](Cc1ccc2ccccc2c1)C(=O)N[C@H](Cc1cc...,DE5


In [2]:
import dask.dataframe as dd
import dask
from dask.distributed import Client
from pathlib import Path

# --- Конфигурация, как у вас ---
DATA_DIR_PUBCH = Path("../data/pubchem_compounds.parquet.dask")
DATA_DIR_DRUGS = Path("../data/drugspacex.parquet.dask")
client = Client() # Запускаем клиент
print(f"Dask dashboard link: {client.dashboard_link}")


Dask dashboard link: http://127.0.0.1:8787/status


Task exception was never retrieved
future: <Task finished name='Task-394784' coro=<Client._gather.<locals>.wait() done, defined at /app/.venv/lib/python3.11/site-packages/distributed/client.py:2377> exception=AllExit()>
Traceback (most recent call last):
  File "/app/.venv/lib/python3.11/site-packages/distributed/client.py", line 2386, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-476641' coro=<Client._gather.<locals>.wait() done, defined at /app/.venv/lib/python3.11/site-packages/distributed/client.py:2377> exception=AllExit()>
Traceback (most recent call last):
  File "/app/.venv/lib/python3.11/site-packages/distributed/client.py", line 2386, in wait
    raise AllExit()
distributed.client.AllExit


In [3]:

# Загружаем данные
ddf_pubch = dd.read_parquet(DATA_DIR_PUBCH)
ddf_drugs = dd.read_parquet(DATA_DIR_DRUGS)

# --- Шаг 1 и 2: Извлекаем и находим уникальные идентификаторы ---
# Работаем только с одной колонкой, что сильно экономит память
unique_smiles_pubch = ddf_pubch[['smiles']].drop_duplicates()
unique_smiles_drugs = ddf_drugs[['smiles']].drop_duplicates()


In [4]:

# --- Шаг 3: Находим пересечение уникальных идентификаторов ---
# Это самый быстрый способ найти общие молекулы
intersecting_smiles = dd.merge(
    unique_smiles_pubch,
    unique_smiles_drugs,
    on='smiles',  # Колонка для сравнения
    how='inner'  # 'inner' означает "пересечение"
)

# --- Шаг 4: Готовим вычисления ---
# Мы хотим посчитать 3 значения:
# 1. Количество общих молекул (размер пересечения)
# 2. Общее количество уникальных молекул в PubChem
# 3. Общее количество уникальных молекул в DrugsSpaceX
count_intersection = len(intersecting_smiles)
count_unique_pubch = len(unique_smiles_pubch)
count_unique_drugs = len(unique_smiles_drugs)

# --- Шаг 5: Запускаем все вычисления одной командой ---
# Dask оптимизирует и выполнит их параллельно
print("Начинаем вычисление пересечения и общего количества...")
num_intersection, total_unique_pubch, total_unique_drugs = dask.compute(
    count_intersection,
    count_unique_pubch,
    count_unique_drugs
)
print("Вычисления завершены.")

# --- Рассчитываем и выводим результаты ---
print(f"\nОбщее число уникальных молекул в PubChem: {total_unique_pubch}")
print(f"Общее число уникальных молекул в DrugsSpaceX: {total_unique_drugs}")
print(f"Найдено общих уникальных молекул между датасетами: {num_intersection}")

# Рассчитываем проценты
if total_unique_pubch > 0:
    percent_vs_pubchem = (num_intersection / total_unique_pubch) * 100
    print(f"Общие молекулы составляют {percent_vs_pubchem:.2f}% от уникальных молекул в PubChem.")

if total_unique_drugs > 0:
    percent_vs_drugs = (num_intersection / total_unique_drugs) * 100
    print(f"Общие молекулы составляют {percent_vs_drugs:.2f}% от уникальных молекул в DrugsSpaceX.")

Начинаем вычисление пересечения и общего количества...
Вычисления завершены.

Общее число уникальных молекул в PubChem: 121401187
Общее число уникальных молекул в DrugsSpaceX: 100946534
Найдено общих уникальных молекул между датасетами: 37942
Общие молекулы составляют 0.03% от уникальных молекул в PubChem.
Общие молекулы составляют 0.04% от уникальных молекул в DrugsSpaceX.


---

In [4]:
import os
import time
from pathlib import Path
import pandas as pd
from tqdm import tqdm
import dask.dataframe as dd

import torch
from torch.utils.data import Dataset, DataLoader

# --- ШАГ 1: КОНФИГУРАЦИЯ И ПРОВЕРКА ПУТЕЙ ---
DATA_DIR = Path("../data")
CSV_PATH = DATA_DIR / "CID-SMILES"
# Dask сохраняет паркет в директорию, pandas/pyarrow умеют читать ее напрямую
PARQUET_PATH = DATA_DIR / "pubchem_compounds.parquet.dask"

print("--- Проверка наличия файлов для теста ---")

# Проверяем, что оба файла/директории существуют
if not CSV_PATH.exists():
    print(f"ОШИБКА: CSV файл не найден по пути: {CSV_PATH}")
    exit()
else:
    print(f"✔️  CSV файл найден: {CSV_PATH}")

if not PARQUET_PATH.exists():
    print(f"ОШИБКА: Директория Parquet не найдена по пути: {PARQUET_PATH}")
    exit()
else:
    print(f"✔️  Parquet директория найдена: {PARQUET_PATH}")


--- Проверка наличия файлов для теста ---
✔️  CSV файл найден: ../data/CID-SMILES
✔️  Parquet директория найдена: ../data/pubchem_compounds.parquet.dask


In [5]:
ddf = dd.read_csv(
    CSV_PATH, 
    sep='\t', 
    header=None,                  
    names=['cid', 'smiles'],      
    dtype={'cid': 'str', 'smiles': 'str'}, 
    blocksize='64MB'
)
display(ddf.head())

Unnamed: 0,cid,smiles
0,1,CC(=O)OC(CC(=O)[O-])C[N+](C)(C)C
1,2,CC(=O)OC(CC(=O)O)C[N+](C)(C)C
2,3,C1=CC(C(C(=C1)C(=O)O)O)O
3,4,CC(CN)O
4,5,C(C(=O)COP(=O)(O)O)N


In [7]:
df = pd.read_csv(
    CSV_PATH, 
    sep='\t', 
    header=None,                  
    names=['cid', 'smiles'],      
    dtype={'cid': 'str', 'smiles': 'str'}
)
display(df.head())

Unnamed: 0,cid,smiles
0,1,CC(=O)OC(CC(=O)[O-])C[N+](C)(C)C
1,2,CC(=O)OC(CC(=O)O)C[N+](C)(C)C
2,3,C1=CC(C(C(=C1)C(=O)O)O)O
3,4,CC(CN)O
4,5,C(C(=O)COP(=O)(O)O)N


In [2]:

# --- ШАГ 2: СОЗДАНИЕ КЛАССОВ DATASET ---

class CsvSmilesDataset(Dataset):
    """
    Читает колонку 'smiles' из большого CSV файла БЕЗ ЗАГОЛОВКА и с табуляцией.
    """
    def __init__(self, path):
        # --- ИЗМЕНЕНИЕ ЗДЕСЬ ---
        # Применяем предоставленные вами, правильные параметры для чтения.
        # usecols=['smiles'] все еще важен - он говорит pandas загрузить в память
        # только данные из колонки, которую мы назвали 'smiles'.
        self.smiles = pd.read_csv(
            path,
            sep='\t',
            header=None,
            names=['cid', 'smiles'],
            usecols=['smiles']  # Указываем имя, которое мы только что присвоили
        )['smiles'].head(1000)


class ParquetSmilesDataset(Dataset):
    """
    Читает колонку 'smiles' из директории с Parquet файлами.
    КОД НЕ ИЗМЕНИЛСЯ - Parquet хранит свою схему и не нуждается во внешних подсказках.
    """
    def __init__(self, path):
        self.smiles = pd.read_parquet(path, columns=['smiles'])['smiles']

    def __len__(self):
        return len(self.smiles)

    def __getitem__(self, idx):
        return self.smiles.iloc[idx]


# --- ШАГ 3: ФУНКЦИЯ ДЛЯ БЕНЧМАРКА (БЕЗ ИЗМЕНЕНИЙ) ---

def run_benchmark(dataset_class, path, format_name):
    print(f"\n--- Тестирование производительности для формата: {format_name} ---")
    print("1. Загрузка данных в память...")
    start_load_time = time.time()
    dataset = dataset_class(path)
    load_duration = time.time() - start_load_time
    print(f"   ...завершено за {load_duration:.2f} секунд.")

    print("2. Итерация по данным (1 эпоха)...")
    dataloader = DataLoader(dataset, batch_size=512, num_workers=4, shuffle=False)
    
    start_iter_time = time.time()
    for _ in tqdm(dataloader, desc=f"Прогон {format_name}"):
        pass
    iter_duration = time.time() - start_iter_time
    print(f"   ...завершено за {iter_duration:.2f} секунд.")

    return load_duration, iter_duration


# --- ШАГ 4: ЗАПУСК И ВЫВОД РЕЗУЛЬТАТОВ (БЕЗ ИЗМЕНЕНИЙ) ---

csv_load_time, csv_iter_time = run_benchmark(CsvSmilesDataset, CSV_PATH, "CSV")
parquet_load_time, parquet_iter_time = run_benchmark(ParquetSmilesDataset, PARQUET_PATH, "Parquet")

csv_size_gb = os.path.getsize(CSV_PATH) / (1024**3)
parquet_size_gb = sum(f.stat().st_size for f in PARQUET_PATH.glob('*.parquet')) / (1024**3)

print("\n" + "="*50)
print(" " * 15 + "ИТОГОВЫЕ РЕЗУЛЬТАТЫ")
print("="*50)
print(f"{'Параметр':<25} {'CSV':>10} {'Parquet':>12}")
print("-"*50)
print(f"{'Размер файла (GB)':<25} {csv_size_gb:>9.2f} G {parquet_size_gb:>11.2f} G")
print(f"{'Время загрузки (сек)':<25} {csv_load_time:>9.2f} с {parquet_load_time:>11.2f} с")
print(f"{'Время эпохи (сек)':<25} {csv_iter_time:>9.2f} с {parquet_iter_time:>11.2f} с")
print("-"*50)

if parquet_load_time > 0:
    print(f"\n✔️  Загрузка данных из Parquet оказалась быстрее в {csv_load_time / parquet_load_time:.1f} раз.")
if csv_size_gb > 0:
    print(f"✔️  Parquet занимает на диске на {100 * (1 - parquet_size_gb/csv_size_gb):.1f}% меньше места.")
print("="*50)


--- Тестирование производительности для формата: CSV ---
1. Загрузка данных в память...


ParserError: Error tokenizing data. C error: Calling read(nbytes) on source failed. Try engine='python'.