In [15]:
from typing import Dict, List, Any, Optional, Tuple
import os
import json
import glob
import random
from datasets import Dataset, DatasetDict, Features, Value, Image as HFImage, ClassLabel, Sequence
from dotenv import load_dotenv

In [18]:
def extract_image_id(blob_url: str) -> str:
    """Extract the image ID from a blob URL.
    
    Args:
        blob_url: The blob URL containing the image ID
        
    Returns:
        The extracted image ID
    """
    # Strip "blob:https://scan.idena.io/" from the URL
    prefix = "blob:https://scan.idena.io/"
    if blob_url.startswith(prefix):
        return blob_url[len(prefix):]
    return blob_url

def process_task_data(task_data: dict[str, Any]) -> dict[str, Any]:
    """Process task data to transform it into the required format.
    
    Args:
        task_data: The original task data
        
    Returns:
        Processed task data in the required format
    """
    # Extract task ID from name field
    task_id = task_data.get("name", "").replace("/", "_")
    
    # Extract image IDs from image_lst1 and image_lst2
    image_ids = set()
    for lst in ["image_lst1", "image_lst2"]:
        if lst in task_data and isinstance(task_data[lst], dict):
            for _, blob_url in task_data[lst].items():
                image_id = extract_image_id(blob_url)
                image_ids.add(image_id)
    
    # Shuffle image IDs and create a mapping
    image_ids_list = list(image_ids)
    random.shuffle(image_ids_list)
    
    # Create images dictionary with shuffled order
    images_dict = {}
    for idx, img_id in enumerate(image_ids_list):
        images_dict[str(idx)] = img_id
    
    # Create inverse mapping for easy lookup
    img_id_to_key = {img_id: key for key, img_id in images_dict.items()}
    
    # Map image lists to their shuffled indices
    left_stack = []
    right_stack = []
    
    if "image_lst1" in task_data:
        for _, blob_url in sorted(task_data["image_lst1"].items(), key=lambda x: int(x[0])):
            img_id = extract_image_id(blob_url)
            left_stack.append(img_id_to_key[img_id])
    
    if "image_lst2" in task_data:
        for _, blob_url in sorted(task_data["image_lst2"].items(), key=lambda x: int(x[0])):
            img_id = extract_image_id(blob_url)
            right_stack.append(img_id_to_key[img_id])
    
    # Create new task data structure
    processed_data = {
        "task_id": task_id,
        "images": images_dict,
        "left_stack": left_stack,
        "right_stack": right_stack,
        "agreed_answer": task_data.get("agreed_answer", []),
        "votes": task_data.get("votes", {}),
        "details": task_data.get("details", {})
    }
    
    return processed_data


def test_task_processing() -> None:
    """Test the task processing with a sample task."""
    # Sample task data
    sample_task = {
        "name": "/flip/bafkreia2ftormk5ydiy4rb4wamzvuxr5g6bzmzu3lxdzxs4rncwsmcakne",
        "image_lst1": {
            "0": "blob:https://scan.idena.io/c3ac47de-429d-4cec-8242-139ac6a61bdf",
            "1": "blob:https://scan.idena.io/1c642f13-e1d5-44a1-ba6d-152a80121328",
            "2": "blob:https://scan.idena.io/582e5ff0-ca73-498e-aecf-2d6ea6e6e8bd",
            "3": "blob:https://scan.idena.io/b3e5851e-2eb3-4a23-bd1a-c111311ebcd1"
        },
        "image_lst2": {
            "0": "blob:https://scan.idena.io/c3ac47de-429d-4cec-8242-139ac6a61bdf",
            "1": "blob:https://scan.idena.io/b3e5851e-2eb3-4a23-bd1a-c111311ebcd1",
            "2": "blob:https://scan.idena.io/582e5ff0-ca73-498e-aecf-2d6ea6e6e8bd",
            "3": "blob:https://scan.idena.io/1c642f13-e1d5-44a1-ba6d-152a80121328"
        },
        "agreed_answer": [
            "Right",
            "Strong"
        ],
        "votes": {
            "Left": "2",
            "Right": "10",
            "Reported": "0"
        },
        "details": {
            "Author:": "0xC0FF90dE1a01b42345762bF77708F1255cB2d071",
            "Epoch:": "#0011",
            "Size:": "106224 bytes",
            "Created:": "10/9/2019 15:12:47",
            "Block:": "322755",
            "Tx:": "0x3194b518172d61f16878050a5ccd83758037080efe458a92fb71e99c161c284f"
        }
    }
    
    # Set seed for reproducibility
    random.seed(42)
    
    # Process the task
    processed_task = process_task_data(sample_task)
    
    # Print the result
    print(json.dumps(processed_task, indent=2))
    
    # Verify the structure
    print("\nVerification:")
    print(f"Task ID: {processed_task['task_id']}")
    print(f"Number of images: {len(processed_task['images'])}")
    print(f"Left stack: {processed_task['left_stack']}")
    print(f"Right stack: {processed_task['right_stack']}")
    
    # Verify that stacks contain the same image indices but in different orders
    left_set = set(processed_task['left_stack'])
    right_set = set(processed_task['right_stack'])
    
    print(f"Left and right stacks contain the same images: {left_set == right_set}")


test_task_processing()

{
  "task_id": "_flip_bafkreia2ftormk5ydiy4rb4wamzvuxr5g6bzmzu3lxdzxs4rncwsmcakne",
  "images": {
    "0": "b3e5851e-2eb3-4a23-bd1a-c111311ebcd1",
    "1": "582e5ff0-ca73-498e-aecf-2d6ea6e6e8bd",
    "2": "1c642f13-e1d5-44a1-ba6d-152a80121328",
    "3": "c3ac47de-429d-4cec-8242-139ac6a61bdf"
  },
  "left_stack": [
    "3",
    "2",
    "1",
    "0"
  ],
  "right_stack": [
    "3",
    "0",
    "1",
    "2"
  ],
  "agreed_answer": [
    "Right",
    "Strong"
  ],
  "votes": {
    "Left": "2",
    "Right": "10",
    "Reported": "0"
  },
  "details": {
    "Author:": "0xC0FF90dE1a01b42345762bF77708F1255cB2d071",
    "Epoch:": "#0011",
    "Size:": "106224 bytes",
    "Created:": "10/9/2019 15:12:47",
    "Block:": "322755",
    "Tx:": "0x3194b518172d61f16878050a5ccd83758037080efe458a92fb71e99c161c284f"
  }
}

Verification:
Task ID: _flip_bafkreia2ftormk5ydiy4rb4wamzvuxr5g6bzmzu3lxdzxs4rncwsmcakne
Number of images: 4
Left stack: ['3', '2', '1', '0']
Right stack: ['3', '0', '1', '2']
Left a

In [22]:

def read_json_file(file_path: str) -> Dict[str, Any]:
    """Read a JSON file and return its contents.
    
    Args:
        file_path: Path to the JSON file
        
    Returns:
        The contents of the JSON file as a dictionary
    """
    with open(file_path, 'r') as f:
        return json.load(f)

def find_image_file(image_id: str, images_dir: str, extensions: Optional[List[str]] = None) -> Optional[str]:
    """Find the image file for a given image ID.
    
    Args:
        image_id: ID of the image
        images_dir: Directory containing the images
        extensions: List of image extensions to search for
        
    Returns:
        The path to the image file, or None if not found
    """
    if extensions is None:
        # Get extensions from environment or use default
        extensions_str = os.getenv("IMAGE_EXTENSIONS", "png,jpg,jpeg")
        extensions = extensions_str.split(",")
    
    for ext in extensions:
        image_path = os.path.join(images_dir, f"{image_id}.{ext}")
        if os.path.exists(image_path):
            return image_path
    
    return None



def load_split_data(split_dir: str) -> Dict[str, List]:
    """Load data for a single split (train, test, or validation).
    
    Args:
        split_dir: Directory containing the split data
        
    Returns:
        A dictionary with lists for each field in the dataset
    """
    tasks_dir = os.path.join(split_dir, "tasks")
    images_dir = os.path.join(split_dir, "images")
    
    task_files = glob.glob(os.path.join(tasks_dir, "*.json"))
    
    # These will store the data for each example in the dataset
    task_ids = []
    task_data_list = []
    image_paths = []
    image_ids = []
    
    for task_file in task_files:
        try:
            task_data = read_json_file(task_file)
            
            # Process task data
            processed_data = process_task_data(task_data)
            task_id = processed_data["task_id"]
            
            # For each image in the task, create a dataset entry
            for img_key, img_id in processed_data["images"].items():
                img_path = find_image_file(img_id, images_dir)
                if img_path:
                    task_ids.append(task_id)
                    # Store processed data as JSON string
                    task_data_list.append(json.dumps(processed_data))
                    image_paths.append(img_path)
                    image_ids.append(img_id)
                else:
                    print(f"Warning: Image file not found for ID: {img_id} in {images_dir}")
        except Exception as e:
            print(f"Error processing task file {task_file}: {e}")
    
    return {
        "task_id": task_ids,
        "task_data": task_data_list,
        "image_id": image_ids,
        "image": image_paths,
    }

def create_hf_dataset(base_dir: str) -> DatasetDict:
    """Create a Hugging Face dataset from the directory structure.
    
    Args:
        base_dir: Base directory containing train, test, and validation splits
        
    Returns:
        A DatasetDict object with train, test, and validation splits
    """
    splits = ["train", "test", "validation"]
    dataset_dict = {}
    
    for split in splits:
        print(f"Processing split: {split}")
        split_dir = os.path.join(base_dir, split)
        if not os.path.exists(split_dir):
            print(f"Warning: Split directory {split_dir} does not exist. Skipping.")
            continue
        
        # Load data for this split
        split_data = load_split_data(split_dir)
        
        # Create dataset
        features = Features({
            "task_id": Value("string"),
            "task_data": Value("string"),
            "image_id": Value("string"),
            "image": HFImage(),
        })
        
        dataset = Dataset.from_dict(
            split_data,
            features=features,
        )
        
        dataset_dict[split] = dataset
    
    return DatasetDict(dataset_dict)

def push_dataset_to_hf(dataset: DatasetDict, dataset_name: str, token: Optional[str] = None) -> None:
    """Push the dataset to Hugging Face.
    
    Args:
        dataset: The dataset to push
        dataset_name: Name of the dataset on Hugging Face
        token: Hugging Face API token. If None, will use the token from the Hugging Face CLI.
    """
    # Push to Hugging Face
    dataset.push_to_hub(
        dataset_name,
        token=token,
    )

In [25]:
load_dotenv("../access_tokens.env")

random.seed(42)

base_dir="flip_dataset/"
dataset_name="FLIP-Challenge"
token=os.getenv("HF_TOKEN")

print(f"Creating dataset from {base_dir}...")
dataset = create_hf_dataset(base_dir)

Creating dataset from flip_dataset/...
Processing split: train
Processing split: test
Processing split: validation


In [26]:
print(f"Pushing dataset to Hugging Face as {dataset_name}...")
push_dataset_to_hf(dataset, dataset_name, token)

print(f"Dataset successfully created and pushed to Hugging Face: https://huggingface.co/datasets/{dataset_name}")

Pushing dataset to Hugging Face as FLIP-Challenge...


Map: 100%|██████████| 11404/11404 [00:01<00:00, 9155.60 examples/s]
Creating parquet from Arrow format: 100%|██████████| 115/115 [00:00<00:00, 174.03ba/s]
Map: 100%|██████████| 11403/11403 [00:01<00:00, 8755.01 examples/s].20s/it]
Creating parquet from Arrow format: 100%|██████████| 115/115 [00:00<00:00, 180.53ba/s]
Map: 100%|██████████| 11403/11403 [00:01<00:00, 8549.14 examples/s].68s/it]
Creating parquet from Arrow format: 100%|██████████| 115/115 [00:00<00:00, 176.09ba/s]
Uploading the dataset shards: 100%|██████████| 3/3 [01:00<00:00, 20.20s/it]
Map: 100%|██████████| 7354/7354 [00:01<00:00, 5649.92 examples/s]s]
Creating parquet from Arrow format: 100%|██████████| 74/74 [00:00<00:00, 181.72ba/s]
Uploading the dataset shards: 100%|██████████| 1/1 [00:09<00:00,  9.13s/it]
Map: 100%|██████████| 7317/7317 [00:01<00:00, 7191.50 examples/s]s]
Creating parquet from Arrow format: 100%|██████████| 74/74 [00:00<00:00, 180.59ba/s]
Uploading the dataset shards: 100%|██████████| 1/1 [00:14<00:

Dataset successfully created and pushed to Hugging Face: https://huggingface.co/datasets/FLIP-Challenge
