Download dataset via terminal(dont execute this below ltw lines in .ipynb jupyter notebook,instead execute on your terminal)

In [None]:
git lfs install
git clone https://huggingface.co/datasets/pranavmr/MM-IMDb mmimdb_hf

In [4]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# MM-IMDb (HuggingFace pranavmr/MM-IMDb, local Parquet) → JPGs + CSV + federated client splits

from pathlib import Path
import io, json, os, pandas as pd
from tqdm import tqdm
from PIL import Image
from datasets import load_dataset
from sklearn.model_selection import train_test_split

# ------------------------------
# Paths (edit if needed)
# ------------------------------
# where you cloned pranavmr/MM-IMDb
HF_LOCAL_DATA = Path("/home/ccl/Desktop/akeel_folder/MMFL_Flower/FedOps/silo/examples/torch/new_hateful_memes_classification/new_hateful_memes/FedMAP/IMDB_usecase/mmimdb_hf/data/train-*.parquet")

# your project layout (same style you used before)
BASE_DIR = Path("./all_in_one_dataset")
POSTERS_DIR = BASE_DIR / "mmimdb_posters"
CSV_PATH = BASE_DIR / "mmimdb.csv"

# federated output
OUTPUT_DIR = Path("dataset")
SERVER_DIR = OUTPUT_DIR / "server"

# ------------------------------
# Split / client config
# ------------------------------
SEED = 42
NUM_CLIENTS = 5
PROPORTIONS = [0.35, 0.10, 0.15, 0.2, 0.2]  # must sum to 1.0
SERVER_SAMPLE_LIMIT = 100  # rows for server eval csv

# ------------------------------
# Step 1: Load local Parquet shards
# ------------------------------
print("🔎 Loading local Parquet shards ...")
ds = load_dataset("parquet", data_files={"train": str(HF_LOCAL_DATA)}, split="train")
print("Columns:", ds.column_names)

# sanity: expect image, text, labels (labels is a sequence[str])
for col in ["image", "text", "labels"]:
    assert col in ds.column_names, f"Expected column '{col}' not found in dataset."

# ------------------------------
# Step 2: Export images to JPG + build rows
# ------------------------------
POSTERS_DIR.mkdir(parents=True, exist_ok=True)
rows = []

print("🖼️  Exporting posters and building csv rows ...")
for i, ex in enumerate(tqdm(ds, total=len(ds))):
    # image column may already be PIL.Image; otherwise can be dict with "bytes" or a path
    img = ex["image"]
    if isinstance(img, dict) and "bytes" in img:
        pil = Image.open(io.BytesIO(img["bytes"])).convert("RGB")
    elif isinstance(img, Image.Image):
        pil = img.convert("RGB")
    else:
        # path-like
        pil = Image.open(img).convert("RGB")

    img_name = f"{i:07d}.jpg"
    pil.save(POSTERS_DIR / img_name, "JPEG")

    # text
    text = ex.get("text", "") or ""

    # labels: list[str] -> pipe-separated
    labs = ex.get("labels", [])
    if isinstance(labs, str):
        label_str = labs
    else:
        label_str = "|".join(sorted({str(x) for x in labs}))

    rows.append({"img_name": img_name, "text": text, "labels": label_str})

df = pd.DataFrame(rows)
CSV_PATH.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(CSV_PATH, index=False)
print(f"✅ Wrote CSV: {CSV_PATH}  (rows={len(df)})")
print(f"✅ Saved posters to: {POSTERS_DIR}")

# ------------------------------
# Step 3: Create global train/dev/test (80/10/10)
# ------------------------------
print("✂️  Creating global train/dev/test splits ...")
df = df.sample(frac=1.0, random_state=SEED).reset_index(drop=True)
train_all, test_all = train_test_split(df, test_size=0.1, random_state=SEED)
train_all, dev_all  = train_test_split(train_all, test_size=0.1111, random_state=SEED)  # ≈10% total

# ------------------------------
# Step 4: Shard train_all to clients (non-IID sizes by PROPORTIONS)
# ------------------------------
assert abs(sum(PROPORTIONS) - 1.0) < 1e-6, "PROPORTIONS must sum to 1.0"
n = len(train_all)
cuts = [0]
cum = 0
for p in PROPORTIONS[:-1]:
    cum += int(n * p)
    cuts.append(cum)
cuts.append(n)

client_slices = []
for i in range(NUM_CLIENTS):
    s, e = cuts[i], cuts[i+1]
    client_slices.append(train_all.iloc[s:e].reset_index(drop=True))

# optional modality flags (all multimodal here)
client_modalities = {i: {"use_text": 1, "use_image": 1} for i in range(NUM_CLIENTS)}

# ------------------------------
# Step 5: Save per-client CSVs + server eval CSV
# ------------------------------
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
for i, cdf in enumerate(client_slices):
    cdir = OUTPUT_DIR / f"client_{i}"
    cdir.mkdir(parents=True, exist_ok=True)

    # small val split per client
    if len(cdf) >= 10:
        c_train, c_val = train_test_split(cdf, test_size=0.1, random_state=SEED + i)
    else:
        c_train, c_val = cdf, cdf.iloc[0:0]

    c_train.to_csv(cdir / "train.csv", index=False)
    c_val.to_csv(cdir / "val.csv", index=False)
    test_all.to_csv(cdir / "test.csv", index=False)

    with open(cdir / "modality.json", "w") as f:
        json.dump(client_modalities[i], f)

    print(f"📂 client_{i}: train={len(c_train)}, val={len(c_val)}, test={len(test_all)}")

SERVER_DIR.mkdir(parents=True, exist_ok=True)
serv_df = dev_all.sample(n=min(SERVER_SAMPLE_LIMIT, len(dev_all)), random_state=SEED).reset_index(drop=True)
serv_df.to_csv(SERVER_DIR / "server_test.csv", index=False)
print(f"🖥️  Server eval CSV → {SERVER_DIR / 'server_test.csv'}  (rows={len(serv_df)})")

print("🎉 Done. You can now point your loaders to:")
print(f"   POSTERS_DIR = {POSTERS_DIR}")
print(f"   dataset/client_*/train.csv|val.csv|test.csv  (columns: img_name, text, labels)")


🔎 Loading local Parquet shards ...
Columns: ['image', 'text', 'labels']
🖼️  Exporting posters and building csv rows ...


100%|██████████| 25959/25959 [00:41<00:00, 618.23it/s]


✅ Wrote CSV: all_in_one_dataset/mmimdb.csv  (rows=25959)
✅ Saved posters to: all_in_one_dataset/mmimdb_posters
✂️  Creating global train/dev/test splits ...
📂 client_0: train=6541, val=727, test=2596
📂 client_1: train=1868, val=208, test=2596
📂 client_2: train=2803, val=312, test=2596
📂 client_3: train=3737, val=416, test=2596
📂 client_4: train=3739, val=416, test=2596
🖥️  Server eval CSV → dataset/server/server_test.csv  (rows=100)
🎉 Done. You can now point your loaders to:
   POSTERS_DIR = all_in_one_dataset/mmimdb_posters
   dataset/client_*/train.csv|val.csv|test.csv  (columns: img_name, text, labels)


modality hetreogenity

In [6]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# MM-IMDb (HuggingFace pranavmr/MM-IMDb, local Parquet) → JPGs + CSV + federated client splits
# with PER-CLIENT MODALITY CONTROL (text-only / image-only / both)

from pathlib import Path
import io, json, os, pandas as pd
from tqdm import tqdm
from PIL import Image
from datasets import load_dataset
from sklearn.model_selection import train_test_split

# ------------------------------
# Paths (edit if needed)
# ------------------------------
HF_LOCAL_DATA = Path("/home/ccl/Desktop/akeel_folder/MMFL_Flower/FedOps/silo/examples/torch/new_hateful_memes_classification/new_hateful_memes/FedMAP/IMDB_usecase/mmimdb_hf/data/train-*.parquet")

BASE_DIR = Path("./all_in_one_dataset")
POSTERS_DIR = BASE_DIR / "mmimdb_posters"   # if your loader expects "img", change to BASE_DIR / "img"
CSV_PATH = BASE_DIR / "mmimdb.csv"

OUTPUT_DIR = Path("dataset")
SERVER_DIR = OUTPUT_DIR / "server"

# ------------------------------
# Split / client config
# ------------------------------
SEED = 42
NUM_CLIENTS = 5
PROPORTIONS = [0.35, 0.10, 0.15, 0.20, 0.20]  # must sum to 1.0
SERVER_SAMPLE_LIMIT = 100

# ------------------------------
# MODALITY POLICY (edit this)
# 1 = keep modality, 0 = remove modality for that client
# Examples:
#  - text-only: {"use_text": 1, "use_image": 0}
#  - image-only: {"use_text": 0, "use_image": 1}
#  - both: {"use_text": 1, "use_image": 1}
# ------------------------------
client_modalities = {
    0: {"use_text": 1, "use_image": 1},  # both
    1: {"use_text": 1, "use_image": 0},  # text-only
    2: {"use_text": 0, "use_image": 1},  # image-only
    3: {"use_text": 1, "use_image": 1},  # both
    4: {"use_text": 0, "use_image": 1},  # image-only
}

# ------------------------------
# Step 1: Load local Parquet shards
# ------------------------------
print("🔎 Loading local Parquet shards ...")
ds = load_dataset("parquet", data_files={"train": str(HF_LOCAL_DATA)}, split="train")
print("Columns:", ds.column_names)

for col in ["image", "text", "labels"]:
    assert col in ds.column_names, f"Expected column '{col}' not found in dataset."

# ------------------------------
# Step 2: Export images to JPG + build rows
# ------------------------------
POSTERS_DIR.mkdir(parents=True, exist_ok=True)
rows = []

print("🖼️  Exporting posters and building csv rows ...")
for i, ex in enumerate(tqdm(ds, total=len(ds))):
    img = ex["image"]
    if isinstance(img, dict) and "bytes" in img:
        pil = Image.open(io.BytesIO(img["bytes"])).convert("RGB")
    elif isinstance(img, Image.Image):
        pil = img.convert("RGB")
    else:
        pil = Image.open(img).convert("RGB")

    img_name = f"{i:07d}.jpg"
    pil.save(POSTERS_DIR / img_name, "JPEG")

    text = ex.get("text", "") or ""

    labs = ex.get("labels", [])
    if isinstance(labs, str):
        label_str = labs
    else:
        label_str = "|".join(sorted({str(x) for x in labs}))

    rows.append({"img_name": img_name, "text": text, "labels": label_str})

df = pd.DataFrame(rows)
CSV_PATH.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(CSV_PATH, index=False)
print(f"✅ Wrote CSV: {CSV_PATH}  (rows={len(df)})")
print(f"✅ Saved posters to: {POSTERS_DIR}")

# ------------------------------
# Step 3: Create global train/dev/test (80/10/10)
# ------------------------------
print("✂️  Creating global train/dev/test splits ...")
df = df.sample(frac=1.0, random_state=SEED).reset_index(drop=True)
train_all, test_all = train_test_split(df, test_size=0.1, random_state=SEED)
train_all, dev_all  = train_test_split(train_all, test_size=0.1111, random_state=SEED)  # ≈10% total

# ------------------------------
# Step 4: Shard train_all to clients (non-IID sizes by PROPORTIONS)
# ------------------------------
assert abs(sum(PROPORTIONS) - 1.0) < 1e-6, "PROPORTIONS must sum to 1.0"
n = len(train_all)
cuts = [0]
cum = 0
for p in PROPORTIONS[:-1]:
    cum += int(n * p)
    cuts.append(cum)
cuts.append(n)

client_slices = []
for i in range(NUM_CLIENTS):
    s, e = cuts[i], cuts[i+1]
    client_slices.append(train_all.iloc[s:e].reset_index(drop=True))

# ------------------------------
# Helper: apply per-client modality mask BEFORE saving CSVs
# ------------------------------
# Create one reusable blank image for image-masked clients
blank_img_path = POSTERS_DIR / "__BLANK__.jpg"
if not blank_img_path.exists():
    Image.new("RGB", (224, 224), (0, 0, 0)).save(blank_img_path, "JPEG")

def apply_modality_mask(df_in: pd.DataFrame, m: dict) -> pd.DataFrame:
    """Strip text and/or replace image with a blank file as per client policy."""
    df = df_in.copy()
    if m.get("use_text", 1) == 0:
        df["text"] = ""
    if m.get("use_image", 1) == 0:
        df["img_name"] = "__BLANK__.jpg"
    return df

# ------------------------------
# Step 5: Save per-client CSVs + server eval CSV (with enforced modality)
# ------------------------------
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
for i, cdf in enumerate(client_slices):
    cdir = OUTPUT_DIR / f"client_{i}"
    cdir.mkdir(parents=True, exist_ok=True)

    if len(cdf) >= 10:
        c_train, c_val = train_test_split(cdf, test_size=0.1, random_state=SEED + i)
    else:
        c_train, c_val = cdf, cdf.iloc[0:0]

    m = client_modalities.get(i, {"use_text": 1, "use_image": 1})
    c_train_masked = apply_modality_mask(c_train, m)
    c_val_masked   = apply_modality_mask(c_val,   m)
    test_masked    = apply_modality_mask(test_all, m)

    c_train_masked.to_csv(cdir / "train.csv", index=False)
    c_val_masked.to_csv(cdir / "val.csv", index=False)
    test_masked.to_csv(cdir / "test.csv", index=False)

    with open(cdir / "modality.json", "w") as f:
        json.dump(m, f)

    print(f"📂 client_{i}: train={len(c_train_masked)}, val={len(c_val_masked)}, test={len(test_masked)} → modality={m}")

SERVER_DIR.mkdir(parents=True, exist_ok=True)
serv_df = dev_all.sample(n=min(SERVER_SAMPLE_LIMIT, len(dev_all)), random_state=SEED).reset_index(drop=True)
serv_df.to_csv(SERVER_DIR / "server_test.csv", index=False)
print(f"🖥️  Server eval CSV → {SERVER_DIR / 'server_test.csv'}  (rows={len(serv_df)})")

print("🎉 Done. Point your loaders to:")
print(f"   POSTERS_DIR = {POSTERS_DIR}")
print(f"   dataset/client_*/train.csv|val.csv|test.csv  (columns: img_name, text, labels)")
print("   Each client has modality.json reflecting its enforced modality.")


🔎 Loading local Parquet shards ...
Columns: ['image', 'text', 'labels']
🖼️  Exporting posters and building csv rows ...


100%|██████████| 25959/25959 [00:41<00:00, 620.50it/s]


✅ Wrote CSV: all_in_one_dataset/mmimdb.csv  (rows=25959)
✅ Saved posters to: all_in_one_dataset/mmimdb_posters
✂️  Creating global train/dev/test splits ...
📂 client_0: train=6541, val=727, test=2596 → modality={'use_text': 1, 'use_image': 1}
📂 client_1: train=1868, val=208, test=2596 → modality={'use_text': 1, 'use_image': 0}
📂 client_2: train=2803, val=312, test=2596 → modality={'use_text': 0, 'use_image': 1}
📂 client_3: train=3737, val=416, test=2596 → modality={'use_text': 1, 'use_image': 1}
📂 client_4: train=3739, val=416, test=2596 → modality={'use_text': 0, 'use_image': 1}
🖥️  Server eval CSV → dataset/server/server_test.csv  (rows=100)
🎉 Done. Point your loaders to:
   POSTERS_DIR = all_in_one_dataset/mmimdb_posters
   dataset/client_*/train.csv|val.csv|test.csv  (columns: img_name, text, labels)
   Each client has modality.json reflecting its enforced modality.


In [7]:
# run once (e.g., python - <<'PY' ... PY)
import json, pandas as pd, os
CSV = "all_in_one_dataset/mmimdb.csv"
labs = set()
for s in pd.read_csv(CSV, usecols=["labels"])["labels"].astype(str):
    labs.update([t for t in s.split("|") if t.strip() != ""])
label_list = sorted(labs)
os.makedirs("all_in_one_dataset", exist_ok=True)
json.dump(label_list, open("all_in_one_dataset/labels.json","w"))
print("num_labels =", len(label_list))


num_labels = 23


In [2]:
pip install datasets

Collecting datasets
  Using cached datasets-4.3.0-py3-none-any.whl.metadata (18 kB)
Collecting pyarrow>=21.0.0 (from datasets)
  Using cached pyarrow-22.0.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.2 kB)
Collecting dill<0.4.1,>=0.3.0 (from datasets)
  Using cached dill-0.4.0-py3-none-any.whl.metadata (10 kB)
Collecting httpx<1.0.0 (from datasets)
  Using cached httpx-0.28.1-py3-none-any.whl.metadata (7.1 kB)
Collecting xxhash (from datasets)
  Using cached xxhash-3.6.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (13 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Using cached multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting anyio (from httpx<1.0.0->datasets)
  Using cached anyio-4.11.0-py3-none-any.whl.metadata (4.1 kB)
Collecting httpcore==1.* (from httpx<1.0.0->datasets)
  Using cached httpcore-1.0.9-py3-none-any.whl.metadata (21 kB)
Collecting h11>=0.16 (from httpcore==1.*->httpx<1.0.0->datasets)
  Usin

server data download

In [9]:
# JUPYTER CELL — build a minimal server package from your existing server_test.csv

import os, csv, shutil, zipfile
from pathlib import Path

# --- paths (edit if needed) ---
SERVER_CSV = Path("dataset/server/server_test.csv")
SRC_IMG_DIR = Path("all_in_one_dataset/mmimdb_posters")  # where your exporter saved all posters
DEST_ROOT   = Path("server_data")                        # new minimal package root
DEST_IMG_DIR = DEST_ROOT / "mmimdb_posters"
MAKE_ZIP = True                                          # False if you don't want a zip
MODE = "copy"                                            # "copy" or "symlink"

# --- helpers ---
def ensure_dirs():
    DEST_IMG_DIR.mkdir(parents=True, exist_ok=True)
    DEST_ROOT.mkdir(parents=True, exist_ok=True)

def gather_img_names(csv_path: Path):
    img_names = []
    with csv_path.open("r", newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        if "img_name" not in reader.fieldnames:
            raise ValueError(f"'img_name' column not found in {csv_path} (columns={reader.fieldnames})")
        for row in reader:
            name = row["img_name"].strip()
            if name:
                img_names.append(name)
    return img_names

def copy_or_link(src: Path, dst: Path, mode: str):
    if mode == "symlink":
        try:
            if dst.exists():
                dst.unlink()
            os.symlink(os.path.abspath(src), os.path.abspath(dst))
        except Exception:
            shutil.copy2(src, dst)  # fallback
    else:
        shutil.copy2(src, dst)

# --- main ---
assert SERVER_CSV.exists(), f"CSV not found: {SERVER_CSV}"
assert SRC_IMG_DIR.is_dir(), f"SRC_IMG_DIR not found: {SRC_IMG_DIR}"

ensure_dirs()
img_names = gather_img_names(SERVER_CSV)

print(f"Found {len(img_names)} rows in {SERVER_CSV}")
copied = 0
missing = []

for name in img_names:
    src = SRC_IMG_DIR / name
    dst = DEST_IMG_DIR / name
    if src.exists():
        copy_or_link(src, dst, MODE)
        copied += 1
    else:
        missing.append(name)

# copy the csv itself
shutil.copy2(SERVER_CSV, DEST_ROOT / "server_test.csv")

print(f"✅ Images placed in: {DEST_IMG_DIR}")
print(f"✅ CSV copied to:    {DEST_ROOT / 'server_test.csv'}")
print(f"   Copied/Symlinked: {copied}")
if missing:
    print(f"   Missing ({len(missing)}): e.g. {missing[:5]} ...")

if MAKE_ZIP:
    zip_path = DEST_ROOT.with_suffix(".zip")  # 'server_data.zip'
    print(f"📦 Creating zip: {zip_path}")
    with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
        zf.write(DEST_ROOT / "server_test.csv", arcname="server_test.csv")
        for p in DEST_IMG_DIR.iterdir():
            if p.is_file():
                zf.write(p, arcname=f"img/{p.name}")
    print(f"🎉 Done: {zip_path}")


Found 100 rows in dataset/server/server_test.csv
✅ Images placed in: server_data/mmimdb_posters
✅ CSV copied to:    server_data/server_test.csv
   Copied/Symlinked: 100
📦 Creating zip: server_data.zip
🎉 Done: server_data.zip
