# FLOW-DC: Flexible Large-scale Orchestrated Workflow for Data Collection

This notebook demonstrates the complete FLOW-DC pipeline for distributed image downloading:

1. **Partition** - Split dataset manifest into partitions grouped by URL host
2. **Estimate** - Calculate expected download size
3. **Download** - Run distributed downloads with TaskVine

## Prerequisites

```bash
conda activate FLOW-DC
```

In [None]:
import os
import json
import polars as pl

# Verify we're in the right directory
if not os.path.exists('../bin/download_batch.py'):
    os.chdir('..')
    
print(f"Working directory: {os.getcwd()}")

## Step 1: Check Input Data

Verify that your input parquet file exists and has the expected columns.

In [None]:
# Configure your input file
INPUT_PARQUET = "files/input/dataset.parquet"  # Change this to your input file
URL_COLUMN = "url"  # Change this to your URL column name
LABEL_COLUMN = "species"  # Change this to your label column (or None)

# Load and inspect with Polars
df = pl.read_parquet(INPUT_PARQUET)
print(f"Loaded {df.height:,} rows")
print(f"\nColumns: {df.columns}")
print(f"\nSample URLs:")
df.select(URL_COLUMN).head()

## Step 2: Estimate Download Size

Use the CalcDatasetSize script to estimate how much storage you'll need.

In [None]:
%%bash -s "$INPUT_PARQUET" "$URL_COLUMN"
# Sample 1000 URLs for quick estimation
python bin/CalcDatasetSize.py --input "$1" --url_column "$2" --sample 1000

## Step 3: Partition Dataset by Host

Split the dataset into partitions grouped by URL host. This optimizes distributed downloading by:
- Keeping URLs from the same host in the same partition
- Balancing partition sizes using greedy bin-packing
- Enabling per-host rate limiting with PAARC (Policy-Aware Adaptive Request Controller)

In [None]:
# Configure partitioning
NUM_PARTITIONS = 10  # Number of worker partitions
OUTPUT_FOLDER = "files/input/partitions"

# Create output directory
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

In [None]:
%%bash -s "$INPUT_PARQUET" "$URL_COLUMN" "$NUM_PARTITIONS" "$OUTPUT_FOLDER"
python bin/SplitParquet.py \
    --parquet "$1" \
    --url_col "$2" \
    --groups $3 \
    --output_folder "$4" \
    --method host \
    --add_host_column

In [None]:
# Verify partitions were created
partitions = sorted([f for f in os.listdir(OUTPUT_FOLDER) if f.endswith('.parquet')])
print(f"Created {len(partitions)} partitions:")
for p in partitions:
    pf = pl.read_parquet(os.path.join(OUTPUT_FOLDER, p))
    hosts = pf.select('host').n_unique() if 'host' in pf.columns else 'N/A'
    print(f"  {p}: {pf.height:,} rows, {hosts} unique hosts")

## Step 4: Configure TaskVine

Create or update the TaskVine configuration file.

In [None]:
# TaskVine configuration with PAARC rate control
config = {
    "port_number": 9123,
    "parquets_directory": OUTPUT_FOLDER,
    "output_directory": "files/output/images",
    
    "url_col": URL_COLUMN,
    "label_col": LABEL_COLUMN,
    
    # Download settings
    "concurrent_downloads": 1000,
    "timeout_sec": 30,
    
    # PAARC toggle
    "enable_paarc": True,
    
    # PAARC concurrency bounds
    "C_init": 8,
    "C_min": 2,
    "C_max": 2000,
    
    # PAARC utilization and backoff
    "mu": 1.0,
    "beta": 0.7,
    
    # PAARC latency thresholds
    "theta_50": 1.5,
    "theta_95": 2.0,
    "startup_theta_50": 3.0,
    "startup_theta_95": 4.0,
    
    # PAARC timing
    "probe_rtt_period": 30.0,
    "rtprop_window": 35.0,
    "cooldown_floor": 2.0,
    
    # PAARC smoothing
    "alpha_ema": 0.3,
    
    # Retry settings
    "max_retry_attempts": 3,
    "retry_backoff_sec": 2.0,
    
    # Task settings
    "timeout_minutes": 60,
    "task_cores": 4,
    "task_memory_mb": 8000,
    
    # Output settings
    "create_tar": True,
    "output_format": "imagefolder"
}

# Save configuration
config_path = "files/config/taskvine_flowdc.json"
with open(config_path, 'w') as f:
    json.dump(config, f, indent=2)
    
print(f"Saved configuration to {config_path}")

## Step 5: Start TaskVine Manager

Run the TaskVine manager to distribute download tasks to workers.

**Note:** You need to have TaskVine workers running and connected to the manager port.

In [None]:
# Dry run to preview tasks
!python bin/TaskvineFLOWDC.py --config files/config/taskvine_flowdc.json --dry_run

In [None]:
# Start the actual TaskVine manager (uncomment to run)
# !python bin/TaskvineFLOWDC.py --config files/config/taskvine_flowdc.json

## Alternative: Cloud Upload Mode

For large datasets, use the cloud version that uploads directly to cloud storage.

In [None]:
# Cloud configuration with PAARC rate control
cloud_config = {
    "port_number": 9123,
    "parquets_directory": OUTPUT_FOLDER,
    
    "cloud_destination": "AIIRA_Dataset",  # Change to your destination
    "cloud_tool": "gocmd",  # Options: gocmd, aws, gsutil, rclone
    
    "url_col": URL_COLUMN,
    "label_col": LABEL_COLUMN,
    
    # Download settings
    "concurrent_downloads": 1000,
    "timeout_sec": 30,
    
    # PAARC toggle
    "enable_paarc": True,
    
    # PAARC concurrency bounds
    "C_init": 8,
    "C_min": 2,
    "C_max": 2000,
    
    # PAARC utilization and backoff
    "mu": 1.0,
    "beta": 0.7,
    
    # Retry settings
    "max_retry_attempts": 3,
    "retry_backoff_sec": 2.0,
    
    # Task settings
    "timeout_minutes": 120,
    "task_cores": 8,
    "task_memory_mb": 16000,
}

# Save cloud configuration
cloud_config_path = "files/config/taskvine_flowdc_cloud.json"
with open(cloud_config_path, 'w') as f:
    json.dump(cloud_config, f, indent=2)
    
print(f"Saved cloud configuration to {cloud_config_path}")

In [None]:
# Dry run cloud tasks
!python bin/TaskvineFLOWDCCloud.py --config files/config/taskvine_flowdc_cloud.json --dry_run

## Step 6: Verify Results

After downloads complete, verify the output.

In [None]:
# Check output directory
output_dir = config["output_directory"]
if os.path.exists(output_dir):
    tar_files = [f for f in os.listdir(output_dir) if f.endswith('.tar.gz')]
    print(f"Output tar files in {output_dir}:")
    for f in sorted(tar_files):
        size_mb = os.path.getsize(os.path.join(output_dir, f)) / (1024 * 1024)
        print(f"  {f}: {size_mb:.1f} MB")
else:
    print(f"Output directory {output_dir} does not exist yet.")

## Single-Machine Download (Alternative)

For smaller datasets or testing, you can run downloads directly without TaskVine.
This uses the PAARC (Policy-Aware Adaptive Request Controller) algorithm for per-host rate limiting.

In [None]:
# Single machine download (for testing or small datasets)
# Uses PAARC rate control for adaptive per-host concurrency management
# !python bin/download_batch.py --config files/config/gbif.json --enable_paarc