In [None]:
import os
import sys
import math
import shutil
import tarfile
from pathlib import Path

from huggingface_hub import snapshot_download

data_path = snapshot_download(repo_id="neuroback/DataBack", repo_type="dataset", cache_dir="./full_data")
data_path = Path(data_path)

Fetching 10 files: 100%|██████████| 10/10 [00:00<00:00, 206615.96it/s]


In [None]:
#  Unzip dataset files

DATA_CACHE_PATH = Path("./full_data")
TARGET_PATH = Path("./processed_data")

DATA_CACHE_PATH.mkdir(parents=True, exist_ok=True)
TARGET_PATH.mkdir(parents=True, exist_ok=True)

datasets = ["original", "dual"]

for d_name in datasets:
    search_dir = data_path / d_name
    targz_files = list(search_dir.glob("*.tar.gz"))

    for targz in targz_files:
        print(f"Sorting {targz.name}...")
 
        try:
            tarfile.open(targz, "r:gz").extractall(path=TARGET_PATH)
        except tarfile.ReadError:
            print(f"Could not extract {targz.name}", file=sys.stderr)
            continue


Sorting bb_pt.tar.gz...
Sorting cnf_pt.tar.gz...
Sorting bb_ft.tar.gz...
Sorting cnf_ft.tar.gz...
Sorting d_cnf_pt.tar.gz...
Sorting d_bb_pt.tar.gz...
Sorting d_cnf_ft.tar.gz...
Sorting d_bb_ft.tar.gz...


In [None]:
# Split dataset into train, validation and test and create symlinks

TARGET_PATH = Path("./processed_data")
DATASET_PATH = Path("./data")

TRAIN_RATIO = 0.7
VALIDATION_RATIO = 0.2
TEST_RATIO = 0.1

if DATASET_PATH.exists():
    shutil.rmtree(DATASET_PATH)

DATASET_PATH.mkdir(parents=True, exist_ok=True)

features_source_cnf = TARGET_PATH / "cnf_pt"
y_source_bb = TARGET_PATH / "bb_pt"

features_source_d_cnf = TARGET_PATH / "d_cnf_pt"
y_source_d_bb = TARGET_PATH / "d_bb_pt"

sources = [features_source_cnf, y_source_bb, features_source_d_cnf, y_source_d_bb]
sources_path = ["cnf", "backbone", "cnf", "backbone"]

for source, s_path in zip(sources, sources_path):
    items = [item for item in source.iterdir() if not item.is_dir()]
    items.sort() # Se podria hacer random
    n_total = len(items)

    train_count = math.floor(n_total * TRAIN_RATIO)
    val_count = math.floor(n_total * VALIDATION_RATIO)

    train = items[:train_count]
    val = items[train_count:train_count + val_count]
    test = items[train_count + val_count:]

    dataset_items = [train, val, test]
    dataset_paths = ["pretrain", "validation", "test"]

    for split_items, split_path in zip(dataset_items, dataset_paths):
        sym_path = DATASET_PATH / s_path / split_path
        sym_path.mkdir(parents=True, exist_ok=True)

        for item in split_items:
            sym_path = DATASET_PATH / s_path / split_path / item.name
            item_path = item.resolve()

            os.symlink(item_path, sym_path)


In [None]:
# Create finetune dataset symlinks

TARGET_PATH = Path("./processed_data")
DATASET_PATH = Path("./data")

#if DATASET_PATH.exists():
#    shutil.rmtree(DATASET_PATH)

#DATASET_PATH.mkdir(parents=True, exist_ok=True)

cnf_ft = TARGET_PATH / "cnf_ft"
bb_ft = TARGET_PATH / "bb_ft"

d_cnf_ft = TARGET_PATH / "d_cnf_ft"
d_bb_ft = TARGET_PATH / "d_bb_ft"

sources = [cnf_ft, bb_ft, d_cnf_ft, d_bb_ft]
sources_path = ["cnf", "backbone", "cnf", "backbone"]

for source, s_path in zip(sources, sources_path):
    items = [item for item in source.iterdir() if not item.is_dir()]
    items.sort() # Se podria hacer random

    sym_path = DATASET_PATH / s_path / "finetune"
    sym_path.mkdir(parents=True, exist_ok=True)

    for item in items:
        sym_path = DATASET_PATH / s_path / "finetune" / item.name
        item_path = item.resolve()

        os.symlink(item_path, sym_path)