In [0]:
# install cspray if you have not already
#%pip install cspray
#%restart_python

In [0]:
from cspray.data import SprayData
import cspray as cs
import pandas as pd

# (optionally) prevent mlflow logging of PCA and kmeans
import mlflow
mlflow.autolog(disable=True)

# where to save data in Unity Catalog
CATALOG = '' 
SCHEMA = ''

# RAM on each worker (only required for read stage -use it to help set read in chunk size optimally)
WORKER_RAM : int = 32
TOTAL_CORES = spark.sparkContext.defaultParallelism
NUM_WORKERS = spark.sparkContext._jsc.sc().getExecutorMemoryStatus().size() - 1
CORES_PER_WORKER = TOTAL_CORES // NUM_WORKERS
MAX_PARTITIONS = 200

In [0]:
datasets = [
 'b252b015-b488-4d5c-b16e-968c13e48a2c',
 '0ba636a1-4754-4786-a8be-7ab3cf760fd6',
 '350237e0-9f48-4cbd-9140-3b44495549f3',
 'fd072bc3-2dfb-46f8-b4e3-467cb3223182',
 '55003f67-c494-46f1-83fb-902745646379',
 'ae4f8ddd-cac9-4172-9681-2175da462f2e',
 '1b9d8702-5af8-4142-85ed-020eb06ec4f6',
 '80a2c5b6-02e7-4fc0-9f12-179f5247c1bc',
 'cd2f23c1-aef1-48ae-8eb4-0bcf124e567d', 
 '21d3e683-80a4-4d9b-bc89-ebb2df513dde', 
 '18e2a8c5-33f7-455e-a58a-b2ba6921db27', 
 '242c6e7f-9016-4048-af70-d631f5eea188',
 'ed5d841d-6346-47d4-ab2f-7119ad7e3a35',
 '576f193c-75d0-4a11-bd25-8676587e6dc2',
 '7f7faf6b-f11d-4f07-bc1c-188a4472748d',
 '7b55fe5c-d5c8-48e0-a1a3-54c5b9074f3f',
 '76150f40-1989-4977-9e23-696e72d59d9e',
 '43245158-5ae1-4e71-a9a6-67eef49c26bc',
 '9fcb0b73-c734-40a5-be9c-ace7eea401c9',
 '518d9049-2a76-44f8-8abc-1e2b59ab5ba1'
]
root_path = f"/Volumes/{CATALOG}/{SCHEMA}/raw_h5ad/"

path = [
    root_path + d + '.h5ad' for d in datasets
]

ensembl_reference_df = spark.createDataFrame(cs.utils.get_gene_table())

In [0]:
dbutils.widgets.text('BASE_prefix', 'test', 'BASE prefix')
BASE_prefix = dbutils.widgets.get('BASE_prefix')

In [0]:
import time

t0 = time.time()

sdata = SprayData.from_h5ads(
    spark,
    path=path, 
    force_partitioning = 2*spark.sparkContext.defaultParallelism,
    chunk_size=int(6_000_000*(WORKER_RAM/16)),
    from_raw=True,
    fallback_default=False,
    broadcast_genes=True,
    ensembl_reference_df=ensembl_reference_df,
)
sdata.to_tables_and_reset(spark, table_base=f'{CATALOG}.{SCHEMA}', join_char=f'.{BASE_prefix}_bronze_')

cs.pp.calculate_qc_metrics(sdata)
cs.pp.filter_cells(sdata)
cs.pp.filter_genes(sdata)
cs.pp.apply_samplewise_mt_statistic(sdata)
cs.pp.filter_cells_on_mt(sdata)
cs.pp.normalize(sdata)
cs.pp.log1p_counts(sdata)
sdata.to_tables_and_reset(spark, table_base=f'{CATALOG}.{SCHEMA}', join_char=f'.{BASE_prefix}_silver_pp_')

cs.pp.calculate_hvg(sdata, n_hvg=1000)
sdata.to_tables_and_reset(spark,table_base=f'{CATALOG}.{SCHEMA}', join_char=f'.{BASE_prefix}_silver_hvg_') 

cs.pp.pca(sdata)
sdata.to_tables_and_reset(spark,table_base=f'{CATALOG}.{SCHEMA}', join_char=f'.{BASE_prefix}_silver_pca_', subset=['obs'])

scores_pdf = cs.tl.kmeans(sdata, ks=[2,3,4,5])
sdf_rank = cs.tl.rank_marker_genes(sdata, fc_cutoff=0.15)
sdata.to_tables_and_reset(spark,table_base=f'{CATALOG}.{SCHEMA}', join_char=f'.{BASE_prefix}_silver_end_')

# not required as benchmarking v scanpy
# cs.tl.as_gold_mart_data(sdata)
# sdata.to_tables_and_reset(spark,table_base=f'{CATALOG}.{SCHEMA}', join_char='.gold_') 


# not required as benchmarking v scanpy
# cs.tl.as_gold_mart_data(sdata)
# sdata.to_tables_and_reset(spark,table_base=f'{CATALOG}.{SCHEMA}', join_char='.gold_') 

t1 = time.time()
print(f"Total time: {(t1-t0)} seconds")
print(f"Total time: {(t1-t0)/60.} minutes")
print(f"Total time: {(t1-t0)/60./60.} hours")

In [0]:
dbutils.jobs.taskValues.set(key="execution_time", value= t1 - t0)

ex_time = t1-t0
dbutils.notebook.exit("{:.3f}".format(ex_time))