# GOALS
- DESCRIBE:
    - What is the input
    - What is the output

- Overview—: to the readme
    - Include, what is a bucket! --> done!(ish)

- Rules: 
    - How to construct new rules
    - How to run them
    - How to understand the output

- In progress is a valid flag bc this work is being done

- Inventory autogenerates ¿once a week? --> Yes! but the index is generated manually

- Explain the example case for downloading files

- README describing what it does
    - Tutorial and or examples

## Description
`cpgdata` is a CLI toolset for navigating and exploring the Cell Painting Gallery.


## Context

The CellPainting Gallery (CPG) is hosted in a cloud object storage, AWS's S3.

Within this system, all files within the CPG live in a single 'bucket'. Because it is an object storage, this bucket does not contain a folder structure, but rather all objects live together. In order to identify them, each object is assigned a unique 'key' that consists of a string of characters, akin to the directory path in a regular file structure with folders.

For example, `s3://cellpainting-gallery/cpg0016-jump/source_4/workspace/analysis/2021_06_21_Batch7/BR00125168/analysis/BR00125168-G17-5/Cells.csv` is the key to a specific .csv file that lives within the CPG S3 bucket.

The [**aws inventory**]( https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html) is a list of all the objects contained in a bucket and their associated metadata (i.e. object size, date of upload, last modified date, etc). This inventory is updated automatically on a weekly basis for the CPG.

However, the format in which this inventory is structured is not friendly for exploring it. 

That is why, we use `cpgdata` to parse the inventory, retireving and organizing useful information about all objects in a dataframe called the **Index**. In order to do this, `cpgdata` uses `cpgparser`, a Python library written in Rust.

The `cpgdata` package also provides tools to **navigate and filter the Index e** which can then be used to selectively download certain files from the CPG or explore its contents.

If you prefer, you can [manually browse the Index contents online using Quilt](https://open.quiltdata.com/b/cellpainting-gallery/tree/).

Moreover, `cpgdata` tools can also be used to create rules to **validate the structure and completeness** of new data before uploading it to the CPG bucket, to ensure that it complies with the CPG requirements.


The **Inventory** for the CPG lives in <s3://cellpainting-gallery-inventory/cellpainting-gallery/whole_bucket/>
```
s3://cellpainting-gallery-inventory/
    └── cellpainting-gallery/
        └──index/
            └── [all the index chunks in .parquet format]
        └── whole_bucket/
            ├──2024-03-31T01-00Z/
            ├── 2024-04-07T01-00Z/
            ├── data/
            └──hive/
```

The **Index** file lives in <s3://cellpainting-gallery/index.html>

If you want to get an idea of the expected file structure of the CPG you do so [HERE](https://broadinstitute.github.io/cellpainting-gallery/data_structure.html).


### In its current version, `cpgdata` runs on Python 3.10 so the first step will be to create an environment to run it

In [None]:
!conda create --name cpgdata python==3.10
!conda activate cpgdata

### Next, you need to install the <cpgparser> and <cpgdata> packages (only once)

In [None]:
!pip install cpgparser
!pip install cpgdata

### Import the necessary libraries and packages

In [1]:
from pathlib import Path

# We use polars to read and explore the index 
import polars as pl
from cpgdata.utils import parallel, download_s3_files

#These were included in the example but not necessary for the code so far
# from typing import List
# from pathlib import Path
# from pprint import pprint
# import os
# from tqdm import tqdm

### The generation of the **Index** file is a very time-consuming process, but you don't need to do it yourself! 
### You can easily download a pre-generated **Index** file using the following command:

In [2]:
# Select a local diretory in which to download the Index

index_dir = Path("/Users/emigliet/Documents/CPG-index")
# index_dir = Path("Your/Local/Destination/Directory")

# Note that the Index file is fairly large (over 20gb) so it's divided into several .parquet files.
# !cpg sync index {index_dir}

# Load the index using polars (pl)
index_files = [file for file in index_dir.glob("*.parquet")]
index = pl.scan_parquet(index_files)


## THIS DESCRIPTION IS INCOMPLETE AND CATEGORIES ARE LIKELY TO HAVE CHANGED AFTER REESTRUCTURING OF THE TOOLS
## Columns included in the Index file:

- `key` : object key identifier, useful for downloading files
- `root_dir`: ## if col("worskpace')=="workspace, from 'workspace' to leaf node, 
- `images_root_dir`: path after "{dataset_id}/{source_id}/{batch_id}/images/" to the object. Is 'null' is object is not within that path.
- `images_batch_root_dir`:  path after "{dataset_id}/{source_id}/{batch_id}/" to the object. Is 'null' if object is no within that path.
- `images_illum_root_dir`: path after "{dataset_id}/{source_id}/{batch_id}/images/illum/" to the object. If "is_dir"==True, "images_illum_root_dir"=="plate_id". If "is_dir"==False, "images_illum_root_dir"=="plate_id/illumFile".
- `images_images_root_dir`: path after "{dataset_id}/{source_id}/{batch_id}/images/{plate_id}/images/" to the object. Is 'null' if object is no within that path. 
- `images_images_aligned_root_dir`: 
- `images_images_corrected_root_dir`: 
- `images_images_corrected_cropped_root_dir`: 
- `workspace_root_dir`: path after "{dataset_id}/{source_id}/workspace/" to the object. Is 'null' is object is not within that path.
- `analysis_root_dir`: 
- `backend_root_dir`: 
- `load_data_csv_root_dir`: path from "load_data_csv" to leaf node (?)
- `metadata_root_dir`: 
- `profiles_root_dir`: 
- `assaydev_root_dir`: 
- `embeddings_root_dir`: 
- `pipelines_root_dir`: 
- `qc_root_dir`: 
- `segmentation_root_dir`: 
- `software_root_dir`: 
- `workspace_dl_root_dir`: 
- `collated_root_dir`: 
- `consensus_root_dir`: 
- `dl_embeddings_root_dir`: 
- `dl_profiles_root_dir`: 
- `sep`: 
- `images`: is "images" if "images" is part of the key. Is "null" otherwise.
- `workspace`: is "workspace" if "workspace" is part of the key. Is "null" otherwise.
- `workspace_dl`: 
- `dataset_id`: name of the dataset (e.g. "cpg0016-jump", "cpg0021-periscope", etc.)
- `source_id`: code for the source of the images (the institution who produced them)
- `batch_id`: batch number
- `plate_id`:  unique plate identification code
- `well_id`: well position
- `site_id`: site number (sites are each of the fields of view imaged in a well)
- `well_site_id`: 
- `plate_well_site_id`: 
- `ml_model_id`: 
- `leaf_node`: if the object is a file ("is_dir"==False), the name of the file. Otherwise, 'null'.
- `filename`: leaf node filename, without extension
- `extension`: leaf node extension
- `software_hash`: 
- `software`: 
- `hash`: 
- `allowed_names`: 
- `bucket`: 
- `obj_key`: 
- `size`: 
- `last_modified_date`: 
- `e_tag`: 
- `storage_class`: 
- `is_multipart_uploaded`: 
- `replication_status`: 
- `encryption_status`: 
- `object_lock_retain_until_date`: 
- `object_lock_mode`: 
- `object_lock_legal_hold_status`: 
- `intelligent_tiering_access_tier`: 
- `bucket_key_status`: 
- `checksum_algorithm`: 
- `object_access_control_list`: 
- `object_owner`: 
- `is_parsing_error`: 
- `errors`: 
- `is_dir`: 
- `key_parts`: 
- `workspace_dir`: if within "workspace", which workspace dir is object related to ('profiles', 'load_data_csv', 'software', 'metadata', 'backend', 'quality_control', 'assaydev', 'analysis', 'pipelines'). Is 'None' if the object does not contain "workspace" in it's key or is in  
 
You can see all the Index column names and the type of data stored in each using `df.schema`.

**Refer to CPG schema and use those same keys! check matching and come up with useful key maybe**

In [None]:
# dictionary of index col names and their respective data types
index.schema

# Example: use the index to download just a specific subset of files from the CPG

We will use **Polars** tools in the following examples to explore and filter the index.
You can find more info on the context and expressions in:
https://docs.pola.rs/user-guide/concepts/contexts/
https://docs.pola.rs/user-guide/concepts/expressions/

In [3]:
# Setting the maximum length of displayed strings to 500 helps to visualize the complete keys
pl.Config(fmt_str_lengths=500)
pl.Config.set_fmt_table_cell_list_len(500)


polars.config.Config

In [None]:
#Pull the keys (file location within the bucket) of all 'Cells.csv' files from Source 4 in the JUMP dataset (cpg0016-jump)

df = (
    index
    #Use filtering to get to the failing rows and not the other way around
    .filter(pl.col("dataset_id").eq("cpg0016-jump"))  
    .filter(pl.col("source_id").eq("source_4"))
    .filter(pl.col("leaf_node").str.contains("Cells.csv"))
     
     # Always add a `select` at the end of the chain and ONLY select for keys
    .select(pl.col("key","load_data_csv_root_dir"))   
    
    # Materialize this polars LazyFrame into a DataFrame.
    .collect(streaming=True)  # the streaming option prevents out of memory errors when loading big dataframes
)

# print first 10 results
print(df.to_dicts()[0:10])

# List the keys of the files to download
download_keys = list(df.to_dict()["key"])

# Choose a destination directory for the files
dest_dir = "Path/To/Save/Your/files"

# Run a parallel command to dowload all files specified in the list of keys
parallel(download_keys, download_s3_files, ["cellpainting-gallery", Path(dest_dir)], jobs=20)


# Further Tests and issues

In [None]:
## some useful structures for filtering and selecting

    # .filter(pl.col("images_images_root_dir").is_in(["2020_11_04_CPJUMP1", "2020_11_19_TimepointDay4", "2020_12_08_CPJUMP1_Bleaching"])) 
    # .filter(pl.col("leaf_node").str.contains("^.*(.tiff)"))
    # .filter(pl.col("well_id").eq("E7")) 

    # .select(pl.col("well_id").unique())
    # .select(pl.col("*").exclude(["size", "is_multipart_uploaded"]))


In [None]:
### There appear to be some issues when parsing well_id, particularly in the embedding.parquet files from sources 1, 2 and 7 of the JUMP dataset.

df = (
    index
    .unique(subset="well_id")
    .filter(pl.col("is_parsing_error").eq(False)) 
    .select("well_id", "key", "dataset_id", "source_id", "leaf_node")
    .unique(subset=["dataset_id","leaf_node","source_id"])
    .collect(streaming=True)
    )
df

# Challenge: write this validations:
 - Is there a folder with illum files for every plate within raw images?
 - Is there a load_data.csv for every plate, is there a load_data csv?

### Idea: compare total number of unique plates in raw images with number of unique load.csv files and illum folders

Works fine fo just source 4 of JUMP but fails when applied bucket-wide

In [None]:
# get total number of distinct plates within the raw images folder
df1 = (
    index
    .filter(pl.col("dataset_id").eq("cpg0016-jump"))
    .filter(pl.col("source_id").eq("source_4"))
    
    .filter(pl.col("is_dir").eq(True))
    .filter(pl.col("images").eq("images"))
    .filter(pl.col("images_images_root_dir").is_not_null())
    .filter(pl.col("dataset_id").eq("jump").not_()) #this dataset_id has parsing errors
    .select("key", "dataset_id", "source_id", "batch_id", "plate_id")
    .unique(subset=["dataset_id", "source_id", "batch_id", "plate_id"]) # gives me 1150 unique plates
    # .unique(subset=["plate_id"]) # gives me 1150 unique plates
    .collect(streaming=True)
    )


# get total number of load_data.csv files
df2 = (
    index
    .filter(pl.col("dataset_id").eq("cpg0016-jump"))
    .filter(pl.col("source_id").eq("source_4"))
    
    .filter(pl.col("workspace").eq("workspace"))
    .filter(pl.col("leaf_node").eq("load_data.csv"))
    .filter(pl.col("dataset_id").eq("jump").not_()) #this dataset_id has parsing errors
    .select("key", "dataset_id", "source_id", "batch_id", "plate_id")
    # .unique(subset=["dataset_id", "source_id", "batch_id", "plate_id"]) #gives me 3478 unique combinations
    .unique(subset=["plate_id"]) # gives me 3652 unique plates if I don't filter the "jump" dataset_id because of parsing shenanigans
    .collect(streaming=True)
    )

# get total number of illum/ folders
df3 = (
    index
    .filter(pl.col("dataset_id").eq("cpg0016-jump"))
    .filter(pl.col("source_id").eq("source_4"))
    
    .filter(pl.col("is_dir").eq(True))
    .filter(pl.col("images").eq("images"))
    .filter(pl.col("images_illum_root_dir").is_not_null())
    .filter(pl.col("dataset_id").eq("jump").not_()) #this dataset_id has parsing errors
    .select("key", "dataset_id", "source_id", "batch_id", "plate_id")
    .unique(subset=["dataset_id", "source_id", "batch_id", "plate_id"]) # gives me 2386 unique plates
    # .unique(subset=["plate_id"]) # gives me 2386 unique plates
    .collect(streaming=True)
    )

print(f"df1: {df1.shape}")
print(f"df2: {df2.shape}")
print(f"df3: {df3.shape}")

In [4]:
# Try to check within each dataset and source to find where there are missmatches between plates and load_data.csv or illum files
import pandas as pd

result = pd.DataFrame(columns=['dataset_id', 'source_id', '#plates','#load_data_csvs','#illum_folders','plates-and-load_data_csvs-match', 'plates-and-illum_folders-match'])

df0 =(
    index
    .unique(subset=["dataset_id","source_id"])
    .filter(pl.col("dataset_id").is_not_null())
    .select(pl.col("dataset_id","source_id"))
    .collect(streaming=True)
)

for dataset,source in zip(df0["dataset_id"],df0["source_id"]):
    # get total number of distinct plates within the raw images folder
    df1 = (
        index
        .filter(pl.col("dataset_id").eq(dataset))
        .filter(pl.col("source_id").eq(source))
        
        .filter(pl.col("is_dir").eq(True))
        .filter(pl.col("images").eq("images"))
        .filter(pl.col("images_images_root_dir").is_not_null())
        .select("key", "dataset_id", "source_id", "batch_id", "plate_id")
        .unique(subset=["plate_id"])
        .collect(streaming=True)
        )


    # get total number of load_data.csv files
    df2 = (
        index
        .filter(pl.col("dataset_id").eq(dataset))
        .filter(pl.col("source_id").eq(source))
        
        .filter(pl.col("workspace").eq("workspace"))
        .filter(pl.col("leaf_node").eq("load_data.csv"))
        .select("key", "dataset_id", "source_id", "batch_id", "plate_id")
        .unique(subset=["plate_id"]) 
        .collect(streaming=True)
        )

    # get total number of illum/ folders
    df3 = (
        index
        .filter(pl.col("dataset_id").eq(dataset))
        .filter(pl.col("source_id").eq(source))
        
        .filter(pl.col("is_dir").eq(True))
        .filter(pl.col("images").eq("images"))
        .filter(pl.col("images_illum_root_dir").is_not_null())
        .select("key", "dataset_id", "source_id", "batch_id", "plate_id")
        .unique(subset=["plate_id"]) 
        .collect(streaming=True)
        )

    # Evaluate if number of plates matches with number of load_data_csv and illum folders
    matchLoadData= df1.shape[0]==df2.shape[0]
    matchIllum = df1.shape[0]==df3.shape[0]

    # Collect results in the results df
    new_row_data = {'dataset_id': dataset, 
                    'source_id': source, 
                    '#plates': len(df1),
                    '#load_data_csvs': len(df2),
                    '#illum_folders': len(df3),
                    'plates-and-load_data_csvs-match': matchLoadData, 
                    'plates-and-illum_folders-match': matchIllum
                    }
    result = pd.concat([result, pd.DataFrame([new_row_data])], ignore_index=True)

result

Unnamed: 0,dataset_id,source_id,#plates,#load_data_csvs,#illum_folders,plates-and-load_data_csvs-match,plates-and-illum_folders-match
0,cpg0003-rosetta,,0,0,0,True,True
1,cpg0028-kelley-resistance,broad,0,0,0,True,True
2,dev-cpg0016-jump,deflaux-workflow-tests,0,0,0,True,True
3,cpg0016-jump,source_1,55,55,56,True,False
4,dev-cpg0016-jump,deflaux-workflow-test-BR00117012,0,0,0,True,True
5,cpg0016-jump,source_7,0,128,128,False,False
6,cpg0018-singh-seedseq,broad,0,0,0,True,True
7,cpg0016-jump-fixed,source_4,0,0,0,True,True
8,cpg0002-jump-scope,source_4,1,44,5,False,False
9,cpg0016-jump,source_11,182,182,182,True,True


In [84]:
# There appear to be some issues when parsing well_id, particularly in the embedding.parquet files from sources 1, 2 and 7 of the JUMP dataset. 
# The well_id listed in the index corresponds to the previous "segment" of the key.

df = (
    index
    .unique(subset="well_id")
    .filter(pl.col("is_parsing_error").eq(False)) 
    .select("well_id", "key", "dataset_id", "source_id", "leaf_node")
    .unique(subset=["dataset_id","leaf_node","source_id"])
    .collect(streaming=True)
    )
df

well_id,key,dataset_id,source_id,leaf_node
str,str,str,str,str
"""UL001673""","""cpg0016-jump/source_1/workspace_dl/embeddings/efficientnet_v2_imagenet21k_s_feature_vector_2_0260bc96/Batch2_20221006/UL001673/UL001673/A02/embedding.parquet""","""cpg0016-jump""","""source_1""","""embedding.parquet"""
"""CP3-SC1-07""","""cpg0016-jump/source_7/workspace_dl/embeddings/efficientnet_v2_imagenet21k_s_feature_vector_2_0260bc96/20210727_Run3/CP3-SC1-07/CP3-SC1-07/A01/embedding.parquet""","""cpg0016-jump""","""source_7""","""embedding.parquet"""
"""M07""","""cpg0019-moshkov-deepprofiler/broad/workspace_dl/embeddings/105281_zenodo7114558/BBBC022/20585/M07/1/embedding.npz""","""cpg0019-moshkov-deepprofiler""","""broad""","""embedding.npz"""
,"""cpg0016-jump/source_8/""","""cpg0016-jump""","""source_8""",
"""1086292259""","""cpg0016-jump/source_2/workspace_dl/embeddings/efficientnet_v2_imagenet21k_s_feature_vector_2_0260bc96/20210816_Batch_9/1086292259/1086292259/A01/embedding.parquet""","""cpg0016-jump""","""source_2""","""embedding.parquet"""


In [110]:
#24 plates of the source 15 in JUMP have a load_data.csv without having a load_data_csv_root_dir
# also, the dataset_id is 'jump' and not 'cpg0016-jump'
df = (index
      .unique(subset=["dataset_id","plate_id"])
      .filter(pl.col("leaf_node").eq("load_data.csv"))
      .select(pl.col(["dataset_id","source_id","plate_id","load_data_csv_root_dir"]))
      .filter(pl.col("load_data_csv_root_dir").is_null())  #https://docs.pola.rs/py-polars/html/reference/dataframe/api/polars.DataFrame.drop_nulls.html
      .collect(streaming=True)
      )
df

dataset_id,source_id,plate_id,load_data_csv_root_dir
str,str,str,str


In [111]:
# There are 460 plate_ids for source 15 in JUMP, are there really 460 plates? 
# also, plate_id varies in structure!
df = (index
      .filter(pl.col("dataset_id").eq("jump"))
      .filter(pl.col("source_id").eq("source_15"))
      .unique(subset=["plate_id"])
      .select(pl.col(["plate_id"]))
      .collect(streaming=True)
      )
df

plate_id
str
"""PEC00001815__2022-02-23T18_03_17-Measurement3"""
"""PEP00004102__2021-12-02T12_05_47-Measurement1"""
"""PEP00004137__2021-12-01T16_50_57-Measurement1"""
"""PEP00004139__2021-12-03T05_02_12-Measurement1"""
"""PEC00001790"""
"""PEP00004092"""
"""PEP00004102"""
"""PEC00001785__2021-12-07T01_54_26-Measurement1"""
"""PEP00004041__2021-12-16T04_45_56-Measurement1"""
"""PEC00001854__2021-12-17T13_35_15-Measurement1"""


In [112]:
# For source 15 in JUMP, are there really 460 plates? 
# There are only 183 unique ones matching the regex for the plate name structure
df = (index
      .filter(pl.col("dataset_id").eq("jump"))
      .filter(pl.col("source_id").eq("source_15"))
      # .unique(subset=["plate_id"])
      .filter(pl.col("plate_id").str.contains("^PE(P|C)[0-9]{8}$"))
      .select(pl.col(["plate_id"]).unique().sort())
      .collect(streaming=True)
      )
df

plate_id
str
"""PEC00001782"""
"""PEC00001783"""
"""PEC00001784"""
"""PEC00001785"""
"""PEC00001786"""
"""PEC00001787"""
"""PEC00001788"""
"""PEC00001789"""
"""PEC00001790"""
"""PEC00001791"""
