In [None]:
import boto3.session
import fsspec
import h5py
import pandas as pd

from anndata._io.specs import read_elem
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path

from lattice import (
    Connection, 
    get_object,
    get_report, 
    parse_ids, 
)

In [2]:
FRAGMENT_DIR = Path("matrix_files/atac_fragments")

In [3]:
if not FRAGMENT_DIR.is_dir():
    FRAGMENT_DIR.mkdir()

In [None]:
@dataclass
class URIPath:
    """
    Dataclass to hold various parsed information for each S3 URI
    Only needs full uri to create the various attributes
    """

    full_uri: str

    def __post_init__(self):
        self.bucket_name: str = self.full_uri.split("/")[2]
        self.file_path = self.full_uri.replace("s3://{}/".format(self.bucket_name), "")
        self.file_name = self.full_uri.split("/")[-1]
        self.parent_path = self.file_path.replace(self.file_name, "")
        self.full_parent_path = self.full_uri.replace(self.file_name, "")

In [None]:
@dataclass
class FragmentFileMeta:
    cell_label_location: str
    label: str
    accession: str
    uri: URIPath
    barcodes: pd.Series

    def __post_init__(self):
        self.download_file_name = self.accession + "_" + self.uri.file_name
        # probably better to add this logic somewhere else
        self.is_file_local = (FRAGMENT_DIR / self.download_file_name).is_file()

In [5]:
fs = fsspec.filesystem('s3')

In [82]:
for item in fs.ls(uri.full_parent_path):
    if fs.isdir(item):
        possible_fragment_file = item + "/CellRanger_outputs" + "/atac_fragments.tsv.gz"
        if fs.isfile(possible_fragment_file):
            print(item)

submissions-czi115cns/Galvao_2023/PhaseI/G120_D_FL
submissions-czi115cns/Galvao_2023/PhaseI/G120_D_TL
submissions-czi115cns/Galvao_2023/PhaseI/G120_F1_N
submissions-czi115cns/Galvao_2023/PhaseI/G129_D
submissions-czi115cns/Galvao_2023/PhaseI/G133_D_FL
submissions-czi115cns/Galvao_2023/PhaseI/G133_N_FL
submissions-czi115cns/Galvao_2023/PhaseI/G150_D
submissions-czi115cns/Galvao_2023/PhaseI/G159_D
submissions-czi115cns/Galvao_2023/PhaseI/G171_D
submissions-czi115cns/Galvao_2023/PhaseI/G187_D
submissions-czi115cns/Galvao_2023/PhaseI/G210_D


In [6]:
connection = Connection("prod")

In [17]:
parse_ids(['/processed-matrix-files/LATDF393MGJ/'])

('ProcessedMatrixFile', '&@id=/processed-matrix-files/LATDF393MGJ/')

In [8]:
PROCESSED_MATRIX_ACCESSION = 'LATDF393MGJ'
field_list = [
    'accession',
    's3_uri',
    'cell_label_location',
    'cell_label_mappings',
]

In [9]:
processed_matrix_report = get_report(
    'ProcessedMatrixFile',
    f'&@id=/processed-matrix-files/{PROCESSED_MATRIX_ACCESSION}/',
    field_list,
    connection
)[0]
processed_matrix_report

{'@id': '/processed-matrix-files/LATDF393MGJ/',
 '@type': ['ProcessedMatrixFile', 'AnalysisFile', 'DataFile', 'File', 'Item'],
 'accession': 'LATDF393MGJ',
 'cell_label_location': 'suffix',
 'cell_label_mappings': [{'raw_matrix': '/raw-matrix-files/LATDF665HUO/',
   'label': '_1'},
  {'raw_matrix': '/raw-matrix-files/LATDF565IPN/', 'label': '_2'},
  {'raw_matrix': '/raw-matrix-files/LATDF070QKM/', 'label': '_3'},
  {'raw_matrix': '/raw-matrix-files/LATDF384WBZ/', 'label': '_4'},
  {'raw_matrix': '/raw-matrix-files/LATDF796UKR/', 'label': '_5'},
  {'raw_matrix': '/raw-matrix-files/LATDF533UDH/', 'label': '_6'},
  {'raw_matrix': '/raw-matrix-files/LATDF678UZL/', 'label': '_7'},
  {'raw_matrix': '/raw-matrix-files/LATDF719TNR/', 'label': '_8'},
  {'raw_matrix': '/raw-matrix-files/LATDF939FYS/', 'label': '_9'},
  {'raw_matrix': '/raw-matrix-files/LATDF565CDU/', 'label': '_10'},
  {'raw_matrix': '/raw-matrix-files/LATDF435SJY/', 'label': '_11'}],
 's3_uri': 's3://submissions-czi115cns/Galva

In [11]:
h5ad_uri = URIPath(processed_matrix_report['s3_uri'])
with h5py.File(fs.open(h5ad_uri.full_uri)) as f:
    barcodes = read_elem(f['obs']).index.to_series()

barcodes

AAACCGCGTGCATTAG-1_1      AAACCGCGTGCATTAG-1_1
AAAGGAGCAATCCTAG-1_1      AAAGGAGCAATCCTAG-1_1
AAAGGCTCATCCCGCT-1_1      AAAGGCTCATCCCGCT-1_1
AAATGCCTCTTGTTCG-1_1      AAATGCCTCTTGTTCG-1_1
AACAAAGGTTGTTCAC-1_1      AACAAAGGTTGTTCAC-1_1
                                 ...          
TTTGTGTTCTCCATGC-1_11    TTTGTGTTCTCCATGC-1_11
TTTGTTGGTCTAACAG-1_11    TTTGTTGGTCTAACAG-1_11
TTTGTTGGTTGGCCGA-1_11    TTTGTTGGTTGGCCGA-1_11
TTTGTTGGTTTATCGC-1_11    TTTGTTGGTTTATCGC-1_11
TTTGTTGGTTTGACCT-1_11    TTTGTTGGTTTGACCT-1_11
Length: 61525, dtype: object

In [12]:
master_fragment_file_meta = []
for raw_matrix_meta in processed_matrix_report['cell_label_mappings']:
    obj_type, filter_url = parse_ids([raw_matrix_meta['raw_matrix']])
    raw_matrix_report = get_report(
        obj_type, 
        filter_url, 
        field_lst=[
            'accession', 
            's3_uri', 
            'fragment_file_s3_uri'
        ], 
        connection=connection
    )[0]
    fragment_uri = raw_matrix_report['s3_uri'].replace("filtered_feature_bc_matrix.h5", "atac_fragments.tsv.gz")
    accession = raw_matrix_report['accession']
    raw_matrix_meta['fragment_file_s3_uri'] = fragment_uri
    raw_matrix_meta['accession'] = raw_matrix_report['accession']
    assert fs.isfile(raw_matrix_meta['fragment_file_s3_uri']), f"raw matrix{accession} does not have fragment file s3"
    master_fragment_file_meta.append(
        FragmentFileMeta(
            cell_label_location=processed_matrix_report['cell_label_location'],
            label=raw_matrix_meta['label'],
            accession=accession,
            uri=URIPath(fragment_uri),
            barcodes=barcodes
        )
    )

for meta in master_fragment_file_meta:
    print(meta.accession)
    print(meta.label)
    print(meta.cell_label_location)
    print(meta.uri.file_name)
    print(meta.download_file_name)
    print(meta.is_file_local)
    print("=" * 40)
for matrix in processed_matrix_report['cell_label_mappings']:
    print(matrix)

LATDF665HUO
_1
suffix
atac_fragments.tsv.gz
LATDF665HUO_atac_fragments.tsv.gz
False
LATDF565IPN
_2
suffix
atac_fragments.tsv.gz
LATDF565IPN_atac_fragments.tsv.gz
False
LATDF070QKM
_3
suffix
atac_fragments.tsv.gz
LATDF070QKM_atac_fragments.tsv.gz
False
LATDF384WBZ
_4
suffix
atac_fragments.tsv.gz
LATDF384WBZ_atac_fragments.tsv.gz
False
LATDF796UKR
_5
suffix
atac_fragments.tsv.gz
LATDF796UKR_atac_fragments.tsv.gz
False
LATDF533UDH
_6
suffix
atac_fragments.tsv.gz
LATDF533UDH_atac_fragments.tsv.gz
False
LATDF678UZL
_7
suffix
atac_fragments.tsv.gz
LATDF678UZL_atac_fragments.tsv.gz
False
LATDF719TNR
_8
suffix
atac_fragments.tsv.gz
LATDF719TNR_atac_fragments.tsv.gz
False
LATDF939FYS
_9
suffix
atac_fragments.tsv.gz
LATDF939FYS_atac_fragments.tsv.gz
False
LATDF565CDU
_10
suffix
atac_fragments.tsv.gz
LATDF565CDU_atac_fragments.tsv.gz
False
LATDF435SJY
_11
suffix
atac_fragments.tsv.gz
LATDF435SJY_atac_fragments.tsv.gz
False
{'raw_matrix': '/raw-matrix-files/LATDF665HUO/', 'label': '_1', 'fragment_

In [101]:
for matrix in processed_matrix_report['cell_label_mappings']:
    assert fs.isfile(matrix['fragment_file_s3_uri']), f"raw matrix{matrix} does not have fragment file s3"
    print(matrix['label'])
    print(matrix['fragment_file_s3_uri'])

_1
s3://submissions-czi115cns/Galvao_2023/PhaseI/G120_F1_N/CellRanger_outputs/atac_fragments.tsv.gz
_2
s3://submissions-czi115cns/Galvao_2023/PhaseI/G120_D_TL/CellRanger_outputs/atac_fragments.tsv.gz
_3
s3://submissions-czi115cns/Galvao_2023/PhaseI/G120_D_FL/CellRanger_outputs/atac_fragments.tsv.gz
_4
s3://submissions-czi115cns/Galvao_2023/PhaseI/G133_D_FL/CellRanger_outputs/atac_fragments.tsv.gz
_5
s3://submissions-czi115cns/Galvao_2023/PhaseI/G133_N_FL/CellRanger_outputs/atac_fragments.tsv.gz
_6
s3://submissions-czi115cns/Galvao_2023/PhaseI/G129_D/CellRanger_outputs/atac_fragments.tsv.gz
_7
s3://submissions-czi115cns/Galvao_2023/PhaseI/G150_D/CellRanger_outputs/atac_fragments.tsv.gz
_8
s3://submissions-czi115cns/Galvao_2023/PhaseI/G159_D/CellRanger_outputs/atac_fragments.tsv.gz
_9
s3://submissions-czi115cns/Galvao_2023/PhaseI/G171_D/CellRanger_outputs/atac_fragments.tsv.gz
_10
s3://submissions-czi115cns/Galvao_2023/PhaseI/G187_D/CellRanger_outputs/atac_fragments.tsv.gz
_11
s3://submi

In [13]:
[parse_ids([raw_matrix_meta['raw_matrix']]) for raw_matrix_meta in processed_matrix_report['cell_label_mappings']]

[('RawMatrixFile', '&@id=/raw-matrix-files/LATDF665HUO/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF565IPN/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF070QKM/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF384WBZ/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF796UKR/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF533UDH/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF678UZL/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF719TNR/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF939FYS/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF565CDU/'),
 ('RawMatrixFile', '&@id=/raw-matrix-files/LATDF435SJY/')]

In [13]:
processed_matrix_file_json = get_object('LATDF393MGJ', connection)
processed_matrix_file_json

{'lab': {'name': 'diogo-troggian-veiga',
  'title': 'Diogo Troggian Veiga, UNICAMP',
  'status': 'current',
  'schema_version': '3',
  'institute_name': 'State University of Campinas',
  '@id': '/labs/diogo-troggian-veiga/',
  '@type': ['Lab', 'Item'],
  'uuid': '252a818f-5d43-49fb-bded-302bce6cab1a'},
 's3_uri': 's3://submissions-czi115cns/Galvao_2023/PhaseI/Anndata_RNA_CELLxGENE.h5ad',
 'status': 'in progress',
 'aliases': ['diogo-troggian-veiga:RNA_processed_matrix'],
 'dataset': '/datasets/LATDS089SAH/',
 'accession': 'LATDF393MGJ',
 'validated': False,
 'description': 'snRNA-seq data from eight focal cortical dysplasia donors',
 'file_format': 'hdf5',
 'X_normalized': True,
 'date_created': '2024-06-27T20:46:20.687543+00:00',
 'derived_from': ['/raw-matrix-files/LATDF070QKM/',
  '/raw-matrix-files/LATDF565IPN/',
  '/raw-matrix-files/LATDF665HUO/',
  '/raw-matrix-files/LATDF533UDH/',
  '/raw-matrix-files/LATDF384WBZ/',
  '/raw-matrix-files/LATDF796UKR/',
  '/raw-matrix-files/LATDF6

In [14]:
barcodes.str.split("_", expand=True)

Unnamed: 0,0,1
AAACCGCGTGCATTAG-1_1,AAACCGCGTGCATTAG-1,1
AAAGGAGCAATCCTAG-1_1,AAAGGAGCAATCCTAG-1,1
AAAGGCTCATCCCGCT-1_1,AAAGGCTCATCCCGCT-1,1
AAATGCCTCTTGTTCG-1_1,AAATGCCTCTTGTTCG-1,1
AACAAAGGTTGTTCAC-1_1,AACAAAGGTTGTTCAC-1,1
...,...,...
TTTGTGTTCTCCATGC-1_11,TTTGTGTTCTCCATGC-1,11
TTTGTTGGTCTAACAG-1_11,TTTGTTGGTCTAACAG-1,11
TTTGTTGGTTGGCCGA-1_11,TTTGTTGGTTGGCCGA-1,11
TTTGTTGGTTTATCGC-1_11,TTTGTTGGTTTATCGC-1,11


In [None]:
FILES_TO_DOWNLOAD = master_fragment_file_meta # all the files that you want to download

def download_object(s3_client, fragment_meta: FragmentFileMeta):
    download_path = Path(FRAGMENT_DIR) / fragment_meta.download_file_name
    print(f"Downloading {fragment_meta.download_file_name} to {download_path}")
    s3_client.download_file(
        fragment_meta.uri.bucket_name,
        fragment_meta.uri.file_path,
        str(download_path)
    )
    return "Success"

def download_parallel_multithreading():
    # Create a session and use it to make our client
    session = boto3.session.Session()
    s3_client = session.client("s3")

    # Dispatch work tasks with our s3_client
    with ThreadPoolExecutor(max_workers=8) as executor:
        future_to_key = {
            executor.submit(download_object, s3_client, key): key.download_file_name 
            for key in FILES_TO_DOWNLOAD
            if not key.is_file_local
        }

        if not future_to_key:
            print("All files local, no downloading needed")

        for future in futures.as_completed(future_to_key):
            key = future_to_key[future]
            exception = future.exception()

            if not exception:
                yield key, future.result()
            else:
                yield key, exception


for key, result in download_parallel_multithreading():
    print(f"{key} result: {result}")

All files local, no downloading needed
