In [None]:
import os
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col

# from pyspark_dist_explore import hist
from pyspark.sql.functions import udf
from pyspark.sql.functions import array
from pyspark.sql.types import FloatType

In [None]:
WORK_DIRECTORY = "data"
DATA_FILE_NAME = "cancer_methylation_v1.txt"
DATA_FILE_NAME_NEW = "leukemia_met3"

N_STRINGS_INFO = 3

In [None]:
# create spark session
spark = (
    SparkSession.builder.master("local")
    .appName("Colab")
    .config("spark.driver.memory", "32g")
    .config("spark.executor.cores", "1")
    .config("spark.cores.max", "1")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "100g")
    .config("spark.sparkContext.setLogLevel", "ERROR")
    .getOrCreate()
)

In [None]:
# create spark dataframe
df = spark.read.csv(
    os.path.join(WORK_DIRECTORY, DATA_FILE_NAME),
    header=True,
    inferSchema=True,
    sep="\t",
)

In [None]:
class GetInfo:
    """
    The class gets information from the entire dataset.

    Methods:
    1. information_strings: get the first few strings with information
                            about cancer type and tissue.
    2. cancer_types: show the count of samples for each cancer type.
    3. cancer_tissue_types: show the count of samples for each cancer and tissue type.
    4. filter_type: filter rows with the nessesary type of cancer.
    5. column_names: return dataset columns for the necessary type of cancer.
    """

    def __init__(self, data):
        self.data = data

    def information_strings(self, n_strings=3):
        self.information = self.data.limit(n_strings)
        self.information = self.information.to_pandas_on_spark()
        self.information = self.information.set_index("sample_id")
        self.information = self.information.transpose()
        self.information = self.information.reset_index()
        self.information = self.information.rename(columns={"index": "sample"})

    def cancer_types(self, n_strings=10):
        print(
            self.information.groupby(by=["cancer"], as_index=False)
            .count()
            .sort_values(by=["cancer"])
            .head(n_strings)
        )

    def cancer_tissue_types(self, n_strings=10):
        print(
            self.information.groupby(by=["cancer", "tissue"], as_index=False)
            .count()
            .sort_values(by=["cancer", "tissue"])
            .head(n_strings)
        )

    def column_names(self):
        columns = self.information["sample"].unique().to_list()
        return columns

    def filter_type(self, cancer_filter):
        self.information = self.information[
            (self.information["cancer"] == cancer_filter)
        ]

In [None]:
# get a dataset with information about cancer and tissue types
info = GetInfo(df)
info.information_strings()

In [None]:
# table with counts of each cancer type
info.cancer_types()

In [None]:
# choose the necessary cancer type for analysis
info.filter_type("acute myeloid leukemia")
columns_list = info.column_names()

In [None]:
# table with counts of each tissue type in the filtered dataset
info.cancer_tissue_types()

In [None]:
class DataPreparing:

    """
    The class prepares the dataset for analysis.

    Methods:
    info_remove: remove the first rows with sample information.
    get_samples: get rows with necessary samples for cancer type.
    col_rename: rename columns without '-'
                (necessary for spark functions).
    na_string_replace: change the 'NA' string to None.
    na_strings_remove: remove strings with a count missing value
                more than max_na_count.
    data_to_float: change data types to float.
    quartile_range: remove strings ranging between 25% quantile and
                75% quantile less than delta.
    save_dataset: save dataset to CSV

    """

    def __init__(self, data, columns):
        self.data = data
        self.columns = columns

    def info_remove(self, n_strings=3):
        self.data = self.data.withColumn("index", monotonically_increasing_id())
        self.data = self.data.filter((self.data["index"] >= n_strings))

    def col_rename(self):
        self.columns = [col.replace("-", "") for col in self.columns]
        self.data = self.data.toDF(*self.columns)

    def get_samples(self):
        self.columns.insert(0, "sample_id")
        self.data = self.data.select(self.columns)

    def na_string_replace(self):
        self.data = self.data.na.replace("NA", None)

    def na_strings_remove(self, max_na_count=50):
        self.data = self.data.withColumn(
            "numNulls", sum(self.data[col].isNull().cast("int") for col in self.columns)
        )
        self.data = self.data.filter((self.data["numNulls"] <= max_na_count))

    def data_to_float(self):
        self.data = self.data.select(
            *(
                col(c).cast("float").alias(c) if c != "sample_id" else col(c).alias(c)
                for c in self.columns
            )
        )

    def quartile_range(self, delta=0.1):
        self.data = self.data.fillna(0, subset=None)

        combined = array(*(col(x) for x in self.columns[1:]))
        median_udf = udf(
            lambda xs: float(np.percentile(xs, 75) - np.percentile(xs, 25)), FloatType()
        )
        self.data = self.data.withColumn("mean", median_udf(combined))

        self.data = self.data.filter((self.data["mean"] > delta))

    def save_dataset(self, directory):
        self.data.repartition(1).write.option("header", True).csv(directory, sep=";")

In [None]:
data_prep = DataPreparing(df, columns_list)

In [None]:
# get subset with necessary cancer type
data_prep.info_remove()
data_prep.get_samples()
data_prep.col_rename()

In [None]:
# remove unconfident rows
data_prep.na_string_replace()
data_prep.na_strings_remove()
data_prep.data_to_float()
data_prep.quartile_range()

In [None]:
# save the dataset to CSV
data_prep.save_dataset(os.path.join(WORK_DIRECTORY, DATA_FILE_NAME_NEW))