In [1]:
import dask.dataframe as dd
import dask.delayed
import yaml
import numpy as np
from scripts.data_processor_dask import DaskDataProcessor
from dask.distributed import Client


 ### <span style="background-color: lightyellow;"> Initial Data Preprocessing</span>

In [2]:

@dask.delayed
def get_config(file_name):
    with open(file_name, 'r', encoding="UTF-8") as stream:
        config = yaml.safe_load(stream)
    return config


def retrieve_data():
    config = get_config('../config.yaml').compute()
    lung3 = config['lung3']
    gene = config['gene']
    lung3_df = dd.read_csv(lung3) 
    gene_df = dd.read_csv(gene, sep= '\t', comment= "!").compute()
    gene_df.set_index('ID_REF', inplace=True)
    gene_df = gene_df.T
    gene_df.reset_index(drop=True, inplace=True)
    combined_df = lung3_df.merge(gene_df, left_index=True, right_index=True)
    return combined_df


In [3]:

def setup_dask_client(n_workers=4, threads_per_worker=2, memory_limit='8GB'):
    client = Client(n_workers=n_workers, 
                   threads_per_worker=threads_per_worker,
                   memory_limit=memory_limit)
    return client
client = setup_dask_client(n_workers=4, 
                          threads_per_worker=2, 
                          memory_limit='16GB')

def sub_classification(histology):
    if "Carcinoma" in histology:
        return 'Carcinoma'
    elif "Adenocarcinoma" in histology:
        return 'Adenocarcinoma'
    else:
        return 'Others'



Perhaps you already have a cluster running?
Hosting the HTTP server on port 45823 instead


In [4]:
dataset = retrieve_data()
y = dataset['characteristics.tag.histology'].map(sub_classification, meta=('output_column', 'str'))
dataset  = dataset.drop(columns='characteristics.tag.histology')
print("dataset loaded")

data_processor = DaskDataProcessor(dataset)
data_processor.remove_non_related_columns()
data_processor.impute_not_available_values('characteristics.tag.grade')
data_processor.drop_nan_columns(35)
data_processor.cramerV(y, 0)
covarrianced_columns = data_processor.covarianced_columns
removed_catagorical_features = set(data_processor.find_cols_on_type('object')) - set(covarrianced_columns)
data_processor.drop_columns(column_list = list(removed_catagorical_features))
data_processor.selecting_high_variance_gene_expression(95)
dataset = data_processor.dataframe
dataset['classes'] = y

Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(


dataset loaded


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


### <span style="background-color: lightyellow;">EDA Task</span>

- Calculate summary statistics (e.g., mean, median, standard deviation) for clinical variables and gene expression data.
- Identify the top 10 most variable genes across patients.
- Group patients by clinical variables and compute the average expression of selected genes.
- Visualize the distribution of the target variable (TumorSubtype).

In [5]:

def calculate_statistics(df):
    if not isinstance(df, dd.DataFrame):
        df = dd.from_pandas(df, npartitions=4)  
    numerical_cols = df.select_dtypes(include=[np.number])
    describe_df = numerical_cols.describe().compute().T
    median_values = numerical_cols.quantile(0.5).compute()
    describe_df['median'] = median_values

    stats_df = describe_df[['mean', 'median', 'std']]

    return stats_df  

In [6]:
stats_df = calculate_statistics(dataset)

In [None]:
# Identify the top 10 most variable genes across patients.
import re
variances = {}
gens = [col for col in dataset.columns if re.match(r'.*_at$',col)]
for col in gens:
    data = dataset[col].compute()
    log_normalized = np.log1p(data)
    variances[col] = log_normalized.var()
    
variances = list(sorted(variances.items(), key=lambda item: item[1], reverse=True))
top_var_genes = [gene for gene, _ in variances[:10]]
top_var_genes

In [None]:
# - Group patients by clinical variables and compute the average expression of selected genes
dataset.compute().groupby("characteristics.tag.gender")[top_var_genes].mean()

In [None]:
# - Visualize the distribution of the target variable (TumorSubtype)
dataset.compute()['classes'].value_counts().plot.bar()