Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 40 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ in [issue #1](https://github.com/Imageomics/distributed-downloader/issues/1)).

1. Install [Miniconda](https://docs.conda.io/en/latest/miniconda.html)
2. Create a new conda environment:

```commandline
conda env create -f environment.yaml --solver=libmamba -y
```
Expand All @@ -31,14 +32,21 @@ in [issue #1](https://github.com/Imageomics/distributed-downloader/issues/1)).
- [IntelMPI](https://www.intel.com/content/www/us/en/docs/mpi-library/developer-guide-linux/2021-6/installation.html)
3. Install the required package:
- For general use:

```commandline
pip install distributed-downloader
```

- For development:

```commandline
pip install -e .[dev]
```

### Scripts creation

After installation, you need to create scripts for the downloader and tools. Follow the instructions [here](./docs/scripts_README.md)

## How to Use

`distributed-downloader` utilizes multiple nodes on a High Performance Computing (HPC) system (specifically, an HPC
Expand Down Expand Up @@ -187,9 +195,9 @@ You can also add your own tool by creating 3 classes and registering them with t

- Each tool's output will be saved in separate folder in `{config.output_structure.tools_folder}/{tool_name}`
- There are 3 steps in the tool pipeline: `filter`, `scheduler` and `runner`.
- `filter` - filters the images that should be processed by the tool and creates csv files with them
- `scheduler` - creates a schedule for processing the images for MPI
- `runner` - processes the images using MPI
- `filter` - filters the images that should be processed by the tool and creates csv files with them
- `scheduler` - creates a schedule for processing the images for MPI
- `runner` - processes the images using MPI
- Each step should be implemented in a separate class.
- Tool name should be the same across all classes.
- Each tool should inherit from `ToolsBase` class.
Expand Down Expand Up @@ -229,38 +237,38 @@ All scripts can expect to have the following custom environment variables, speci
when respective tool is called:

- General parameters
- `CONFIG_PATH`
- `DISTRIBUTED_DOWNLOADER_PATH` - path to the package directory, so that python files could be called from outside
- `CONFIG_PATH`
- `DISTRIBUTED_DOWNLOADER_PATH` - path to the package directory, so that python files could be called from outside
scripts
- `ACCOUNT`
- `PATH_TO_INPUT`
- `PATH_TO_OUTPUT`
- `OUTPUT_URLS_FOLDER`
- `OUTPUT_LOGS_FOLDER`
- `OUTPUT_IMAGES_FOLDER`
- `OUTPUT_SCHEDULES_FOLDER`
- `OUTPUT_PROFILES_TABLE`
- `OUTPUT_IGNORED_TABLE`
- `OUTPUT_INNER_CHECKPOINT_FILE`
- `OUTPUT_TOOLS_FOLDER`
- `ACCOUNT`
- `PATH_TO_INPUT`
- `PATH_TO_OUTPUT`
- `OUTPUT_URLS_FOLDER`
- `OUTPUT_LOGS_FOLDER`
- `OUTPUT_IMAGES_FOLDER`
- `OUTPUT_SCHEDULES_FOLDER`
- `OUTPUT_PROFILES_TABLE`
- `OUTPUT_IGNORED_TABLE`
- `OUTPUT_INNER_CHECKPOINT_FILE`
- `OUTPUT_TOOLS_FOLDER`
- Specific for downloader
- `DOWNLOADER_NUM_DOWNLOADS`
- `DOWNLOADER_MAX_NODES`
- `DOWNLOADER_WORKERS_PER_NODE`
- `DOWNLOADER_CPU_PER_WORKER`
- `DOWNLOADER_HEADER`
- `DOWNLOADER_IMAGE_SIZE`
- `DOWNLOADER_LOGGER_LEVEL`
- `DOWNLOADER_BATCH_SIZE`
- `DOWNLOADER_RATE_MULTIPLIER`
- `DOWNLOADER_DEFAULT_RATE_LIMIT`
- `DOWNLOADER_NUM_DOWNLOADS`
- `DOWNLOADER_MAX_NODES`
- `DOWNLOADER_WORKERS_PER_NODE`
- `DOWNLOADER_CPU_PER_WORKER`
- `DOWNLOADER_HEADER`
- `DOWNLOADER_IMAGE_SIZE`
- `DOWNLOADER_LOGGER_LEVEL`
- `DOWNLOADER_BATCH_SIZE`
- `DOWNLOADER_RATE_MULTIPLIER`
- `DOWNLOADER_DEFAULT_RATE_LIMIT`
- Specific for tools
- `TOOLS_NUM_WORKERS`
- `TOOLS_MAX_NODES`
- `TOOLS_WORKERS_PER_NODE`
- `TOOLS_CPU_PER_WORKER`
- `TOOLS_THRESHOLD_SIZE`
- `TOOLS_NEW_RESIZE_SIZE`
- `TOOLS_NUM_WORKERS`
- `TOOLS_MAX_NODES`
- `TOOLS_WORKERS_PER_NODE`
- `TOOLS_CPU_PER_WORKER`
- `TOOLS_THRESHOLD_SIZE`
- `TOOLS_NEW_RESIZE_SIZE`

## Working with downloaded data

Expand Down
29 changes: 14 additions & 15 deletions docs/scripts_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package. These scripts are used to submit jobs to Slurm and execute various task

## Submission Scripts

## general_submit.sh
## general_submitter (general_submit.sh)

### Purpose

Expand Down Expand Up @@ -64,7 +64,7 @@ resource allocation.
- The `afterok` dependency ensures the job only runs if all dependencies completed successfully (unlike `afterany` which
runs regardless of the exit status of the dependencies)

## submit_schedule_creator.sh
## schedule_creator_submitter (submit_schedule_creator.sh)

### Purpose

Expand Down Expand Up @@ -116,7 +116,7 @@ additional environment variables needed for schedule creation.
- Unlike `general_submit.sh`, this script always specifies resource requirements to ensure proper operation of the
schedule creation process

## submit_mpi_download.sh
## mpi_submitter (submit_mpi_download.sh)

### Purpose

Expand Down Expand Up @@ -185,7 +185,7 @@ underlying scripts.
- The nested log directory structure helps organize output by schedule and iteration number
- Unlike `general_submit.sh`, this script expects specific arguments for the downloader process

## tools_submit.sh
## tools_submitter (tools_submit.sh)

### Purpose

Expand Down Expand Up @@ -247,8 +247,7 @@ scripts are environment-specific and are currently configured for the Ohio Super
> ### Job Output Format Requirement
>
> All Slurm scripts must output the job ID of the submitted job in a specific format. The job ID must be the last item
> on
> the line and separated by a space:
> on the line and separated by a space:
>
> ```
> {anything} {id}
Expand All @@ -262,7 +261,7 @@ scripts are environment-specific and are currently configured for the Ohio Super
>
> This format is essential as the submission scripts parse this output to extract the job ID for dependency tracking.

### initialization.slurm
### initialization_script (initialization.slurm)

**Purpose**: Sets up the initial environment and partitions the input file using Spark.

Expand All @@ -273,7 +272,7 @@ scripts are environment-specific and are currently configured for the Ohio Super
- Typical run time is 30 minutes
- Requires 4 nodes for optimal performance

### profiling.slurm
### profiling_script (profiling.slurm)

**Purpose**: Profiles the servers to determine download rates and capabilities.

Expand All @@ -284,7 +283,7 @@ scripts are environment-specific and are currently configured for the Ohio Super
- Typical run time is 5 minutes
- Creates a server profiles CSV file

### schedule_creation.slurm
### schedule_creation_script (schedule_creation.slurm)

**Purpose**: Creates download schedules based on server profiles.

Expand All @@ -295,7 +294,7 @@ scripts are environment-specific and are currently configured for the Ohio Super
- Typical run time is 5 minutes
- Moves logs to a structured directory after completion

### server_downloading.slurm
### download_script (server_downloading.slurm)

**Purpose**: Executes the actual downloading process using MPI.

Expand All @@ -309,7 +308,7 @@ scripts are environment-specific and are currently configured for the Ohio Super
- Configures memory settings for optimal performance
- Typical run time is 3 hours

### server_verifying.slurm
### verify_script (server_verifying.slurm)

**Purpose**: Verifies the completion status of downloads.

Expand Down Expand Up @@ -356,7 +355,7 @@ pipeline. These scripts are designed to work with the tools framework which proc
>
> This format is essential for job dependency tracking.

### tools_filter.slurm
### tools_filter_script (tools_filter.slurm)

**Purpose**: Performs the first step in the tool pipeline, filtering images based on specific criteria.

Expand All @@ -372,7 +371,7 @@ pipeline. These scripts are designed to work with the tools framework which proc
For a size-based filter tool, this script would identify all images smaller than a threshold size and write their UUIDs,
server names, and partition IDs to CSV files.

### tools_scheduler.slurm
### tools_scheduling_script (tools_scheduler.slurm)

**Purpose**: Creates execution schedules for the tool workers based on filtered data.

Expand All @@ -389,7 +388,7 @@ server names, and partition IDs to CSV files.
For a size-based filter tool, the scheduler might group images by server name and partition ID (which corresponds to a
single parquet file) and assign these groups to different MPI ranks (e.g., worker 1 processes partitions 1,2,3,4).

### tools_worker.slurm
### tools_worker_script (tools_worker.slurm)

**Purpose**: Executes the actual tool processing using MPI parallelism.

Expand All @@ -407,7 +406,7 @@ single parquet file) and assign these groups to different MPI ranks (e.g., worke
For an image resizing tool, each worker would load the images assigned to it from the schedule, resize them to the
specified dimensions, and save the results to the output location.

### tools_verifier.slurm
### tools_verification_script (tools_verifier.slurm)

**Purpose**: Verifies the completion of the tool processing and updates status flags.

Expand Down
10 changes: 10 additions & 0 deletions src/distributed_downloader/core/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
)
from distributed_downloader.core.initializers.gbif_initializer import GBIFInitializer
from distributed_downloader.core.initializers.lila_initializer import LilaInitializer
from distributed_downloader.core.initializers.tol_fathomNet_initializer import (
TolFathomNetInitializer,
)
from distributed_downloader.core.initializers.tol_general_initializer import (
TolGeneralInitializer,
)
from distributed_downloader.tools import Config
from distributed_downloader.tools.utils import (
truncate_paths,
Expand All @@ -18,6 +24,8 @@
"fathom_net": FathomNetInitializer,
"lila": LilaInitializer,
"eol": EoLInitializer,
"tol200m_general": TolGeneralInitializer,
"tol200m_fathomNet": TolFathomNetInitializer,
}

__all__ = [
Expand All @@ -27,6 +35,8 @@
"FathomNetInitializer",
"LilaInitializer",
"EoLInitializer",
"TolGeneralInitializer",
"TolFathomNetInitializer",
"initializer",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def save_results(self, resul_df: DataFrame) -> None:
None
"""
(
resul_df.repartition("server_name", "partition_id")
.write.partitionBy("server_name", "partition_id")
resul_df.write.partitionBy("server_name", "partition_id")
.mode("overwrite")
.format("parquet")
.save(self.output_path)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os

from distributed_downloader.core.initializers.base_initializer import BaseInitializer


class TolFathomNetInitializer(BaseInitializer):
"""
Initializer for processing FathomNet multimedia data for the Tree of Life project.

This class filters, deduplicates, and processes multimedia data from FathomNet,
generates UUIDs, joins with bounding box information, and writes the results to parquet files.
"""

def run(self):
"""
Executes the initialization process:
- Reads multimedia and bounding box data.
- Filters multimedia data by included sources.
- Deduplicates entries by source URL.
- Generates UUIDs for deduplicated entries.
- Joins multimedia data with bounding box information.
- Extracts server names and partitions the dataframe.
- Saves the processed data and UUID reference to parquet files.
"""
multimedia_df = self.spark.read.parquet(self.input_path)
included_sources = self.config["included_sources"]
bbox_information_df = self.spark.read.parquet(self.config["bounding_box_information_path"])

multimedia_df = multimedia_df.filter(
multimedia_df["data_source"].isin(included_sources)
)
multimedia_df_dedup = multimedia_df.dropDuplicates(["source_url"])
multimedia_df_dedup = self.generate_uuid(multimedia_df_dedup)
uuid_ref_df = (
multimedia_df_dedup.select("uuid", "source_url")
.withColumnRenamed("uuid", "image_uuid")
.join(
multimedia_df.select("uuid", "source_id", "source_url"),
on="source_url",
how="inner",
)
.drop("source_url")
.join(bbox_information_df, on="uuid", how="inner")
)

master_df = self.extract_server_name(
multimedia_df_dedup.withColumnRenamed("source_url", "identifier")
)
master_df_filtered = self.partition_dataframe(master_df)

self.logger.info("Writing to parquet")
self.save_results(master_df_filtered)
uuid_ref_df.write.parquet(
os.path.join(self.config.get_folder("tools_folder"), "uuid_ref")
)
self.logger.info("Finished batching")
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from distributed_downloader.core.initializers.base_initializer import BaseInitializer


class TolGeneralInitializer(BaseInitializer):
"""
Initializer for TOL general data processing.
Reads multimedia data, filters by included sources and excluded servers,
partitions the dataframe, and saves the results.
"""

def run(self):
"""
Executes the initialization process:
- Reads multimedia parquet data.
- Reads excluded servers from CSV.
- Extracts server names from multimedia data.
- Filters multimedia data by included sources and excluded servers.
- Partitions the filtered dataframe.
- Saves the results to parquet.
"""
multimedia_df = self.spark.read.parquet(self.input_path)
included_sources = self.config["included_sources"]
if self.config.get("excluded_servers_path") in ["", None]:
excluded_servers = self.spark.createDataFrame([], "server_name: string")
else:
excluded_servers = self.spark.read.csv(
self.config["excluded_servers_path"], header=True
)

multimedia_df = self.extract_server_name(
multimedia_df.withColumnRenamed("source_url", "identifier")
)
multimedia_df = multimedia_df.filter(
multimedia_df["data_source"].isin(included_sources)
).join(excluded_servers, on="server_name", how="left_anti")

master_df_filtered = self.partition_dataframe(multimedia_df)

self.logger.info("Writing to parquet")
self.save_results(master_df_filtered)
self.logger.info("Finished batching")