
# Enhancing Data Handling Efficiency and Memory Management

In our endeavor to enhance data handling efficiency and optimize memory usage, we have devised a methodical approach. Given the considerable size of our dataset—roughly 300,000 columns and 535 rows—processing poses a formidable challenge. Hence, we adopt a meticulous strategy: the raw file is imported with efficiency in mind, then meticulously segmented into multiple files. Each file contains 10,000 columns, systematically labeled as data_0, data_1, ..., data_29. Furthermore, we conduct a curation process, removing superfluous columns to streamline data processing.

### Optimizing CPU Resources with Spark Session Initialization
First, we'll import the necessary libraries. Next, we'll initiate a Spark session and configure it to harness the maximum CPU resources available on the PC.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, ChiSqSelector
from pyspark.ml.feature import *
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from functools import reduce
from pyspark.sql import DataFrame
import gc, psutil

# Create a Spark session
spark = (SparkSession.builder
        .appName("DementiaData")
        .config("spark.driver.memory", "7g")
        .config("spark.driver.cores", "3")
        .getOrCreate()
    )

### Data Preprocessing Script for Dementia Dataset

We'll gather all the necessary files for processing into a single list.

In [2]:
# File location
data_read_locations = ["JanBDRcount.raw", # Location of data where data is raw
                       "Distributed/", # Location of data where batch size is 100
                       "combined.csv" # Location of combined data after feature selection
                      ]

In preprocessing, we reading data in text format. This script efficiently manages data reading, column name adjustments, and data structuring for analysis. Key steps include setting up the file location, reading the file, adjusting column names by replacing colons with underscores, and using Spark's `read.text` method to parse the file and split values into columns with `withColumn`.

In [3]:
print("Let's start with file reading process.")

# We are reading the first column using python
with open(data_read_locations[0], "r") as file:
    column_names = file.readline().strip().split(" ")

# Changing the column names to avoid the issues
column_names = [col_name.replace(':', '_') for col_name in column_names]

# Read the text file as a single column
read_raw_file = spark.read.text(data_read_locations[0])

# Split each line into individual columns based on space delimiter
read_raw_file = read_raw_file.withColumn("columns", split(read_raw_file["value"], " ").cast("array<int>"))

print("Completed with reading the file.")

Let's start with file reading process.
Completed with reading the file.


We replace the column names with the actual column names exported earlier using Python.

In [None]:
print("Let's create the dataframe and change the column names.")
dementia_data = (read_raw_file.selectExpr([f"columns[{i}] as {col_name}" 
                                               for i, col_name in enumerate(column_names)
                                               if col_name not in "FID IID PAT MAT".split(" ")]
                                         )
                )
print("Completed with dataframe creation.")

### Streamlined Batch Processing of Dementia Dataset Columns
Upon acquiring the column names from the dementia dataset, we segment them into batches, each comprising 100 columns. These batches are pivotal in constructing separate dataframes, with dimensions of 100 columns and 535 rows, across a total of 2977 files. This approach not only enhances data organization but also expedites processing by enabling parallel execution across multiple files. Additionally, the provision of progress updates for each batch ensures transparency and facilitates real-time monitoring, thereby optimizing the chunk creation process and overall data management efficiency.

In [None]:
print("Let's create the batch size and create diffrent files with columns of defined batch size.")

column_names = dementia_data.columns
# Create a batch size of 100 columns with each file
batch_size = 100

# Copying column names in chunks of batch size
column_names_batches = [column_names[i:i+batch_size] for i in range(0, len(column_names), batch_size)]

# Creating dataframe for each 100 columns and exporting as csv
for index,batch in enumerate(column_names_batches):
    dementia_data.select(*batch).coalesce(1).write.csv(f"{data_read_locations[1]}/Data__{index}",header=True,mode="overwrite")
    print(f"Completed with {index} chunk")

print("Completed with chunks creation.")

### Feature Selection and Encoding Pipeline
In this process, we first create placeholders for target data and selected features. Then, we iterate through each smaller chunk file, reading and selecting the required features. For each file, we drop columns with a significant number of null values and handle missing values appropriately. We cast columns to integer type and vectorize the column values. Next, we perform feature selection using ChiSqSelector and encode the selected features using OneHotEncoder. Finally, we append the encoded data into the list of selected features data.

In [None]:
# Placeholder for selected features data
selected_features_data = []
target_data=""

def count_nulls_in_partition(iterator):
  null_counts = [0] * len(next(iterator))  # Initialize null count for each column
  for row in iterator:
      for i, value in enumerate(row):
          if value is None:
              null_counts[i] += 1
  yield null_counts

# Iterate through each file index
for file_index in range(2977):
  # Read data from CSV file
  print("Started processing file:", file_index)
  file_path = f"{data_read_locations[1]}/Data__{file_index}.csv"
  data_frame = spark.read.csv(file_path, header=True, sep=",", nullValue="NA")

  # Store the label into target_data
  if file_index == 0:
    target_data = data_frame.select("PHENOTYPE")

  # Convert DataFrame to RDD for parallel processing
  rdd = data_frame.rdd.map(lambda row: tuple(row))

  # Calculate null counts in parallel
  null_counts = rdd.mapPartitions(count_nulls_in_partition).reduce(lambda x, y: [a + b for a, b in zip(x, y)])

  # Create a dictionary to map column names to their null counts
  column_null_counts = dict(zip(data_frame.columns, null_counts))
  
  # Filter the dictionary to include only entries with null counts greater than 10
  filtered_null_counts = {column: count for column, count in column_null_counts.items() if count > 10}
  print("Dropping these columns",filtered_null_counts.keys())

  # Drop columns with null counts greater than 10
  data_frame = data_frame.drop(*filtered_null_counts.keys())

  # Fill missing values with a placeholder
  data_frame = data_frame.fillna("3")

  # Cast columns to integer type
  for column_name in data_frame.schema.names:
      data_frame = data_frame.withColumn(column_name, col(column_name).cast('int'))

  # Vectorize column values
  assembler = VectorAssembler(inputCols=[col_name for col_name in data_frame.columns if col_name != 'PHENOTYPE'], outputCol="features")

  # Transform data with VectorAssembler
  assembled_data = assembler.transform(data_frame)

  # Select features using ChiSqSelector
  selector = ChiSqSelector(numTopFeatures=2, featuresCol="features", outputCol="selectedFeatures", labelCol="PHENOTYPE")

  # Fit ChiSqSelector to data
  selector_model = selector.fit(assembled_data)

  # Transform data with ChiSqSelector
  selected_data = selector_model.transform(assembled_data)

  # Store selected column names
  selected_features = [col_name for col_number, col_name in zip(selector_model.selectedFeatures, selected_data.columns)]
  
  # Append encoded data to selected features data list
  selected_features_data.append()
  print("Completed File", file_index)

### Combining DataFrames and Adding Unique Identifier Column
The code segment begins by adding a unique identifier column to each DataFrame using the `monotonically_increasing_id()` function, ensuring a distinct identifier for each row. The unique identifier is then appended to the list of selected features data. Similarly, an identifier column is added to the target data DataFrame. Subsequently, all DataFrames are merged horizontally based on the unique identifier column, resulting in the combined DataFrame. Finally, the identifier column is dropped from the combined DataFrame to finalize the data preparation process. This approach ensures data integrity and facilitates further analysis or modeling tasks.

In [None]:
# Function to join two DataFrames on a common key
def join_dataframes(df1: DataFrame, df2: DataFrame):
    return df1.join(df2, "id", "inner")

# Add unique identifier column for target data using monotonically_increasing_id function
target_data_with_id = target_data.withColumn("id", monotonically_increasing_id())

# Add unique identifier column to each DataFrame in selected_features_data
selected_features_data_with_id = [df.withColumn("id", monotonically_increasing_id()) for df in selected_features_data]

# Combine all DataFrames into a single list
all_dataframes = [target_data_with_id] + selected_features_data_with_id

# Perform join operations using reduce function
combined_data = reduce(join_dataframes, all_dataframes)

# Drop the identifier column from the final DataFrame
combined_data = combined_data.drop("id")

# Export selected features into a single csv file
combined_data.coalesce(1).write.csv(data_read_locations[2], header=True, mode="overwrite")

print("Combining of selected features completed")

### Customization with Vectorization

The process begins by reading the first row of a CSV file to infer the schema, ensuring data types are accurately captured. Subsequently, a custom schema is generated based on the inferred data types of the first row. Utilizing this custom schema, the CSV file is read again, ensuring consistency in data types. Following this, the column values are vectorized using `VectorAssembler`, excluding the 'PHENOTYPE' column, and transforming the data accordingly. The resultant DataFrame is then refined to retain only the 'PHENOTYPE' column along with the newly created 'features' column, facilitating subsequent analysis or modeling tasks.

In [4]:
# Read the first row of the CSV file to infer the schema
first_row = spark.read.csv(data_read_locations[2], header=True, inferSchema=True).limit(1)

# Generate the schema based on the data types of the first row
custom_schema = StructType([StructField(col, IntegerType(), True) for col in first_row.columns])

# Read the CSV file with the custom schema
data_frame_combined = spark.read.csv(data_read_locations[2], schema=custom_schema, header=True)

# Vectorize column values
assembler = VectorAssembler(inputCols=[col_name for col_name in data_frame_combined.columns if col_name != 'PHENOTYPE'], outputCol="features")

# Transform data with VectorAssembler
combined_df = assembler.transform(data_frame_combined)
combined_df = combined_df.drop(*[col_name for col_name in data_frame_combined.columns if col_name != 'PHENOTYPE'])

##### Display combined_df
Let's display all the selected features from the combined DataFrame.

In [10]:
data_frame_combined.show()

+---------+----------+-----------+-----------+------------+----------+------------+-----------+-------------+------------+------------+------------+-----------+----------+-----------+-----------+-----------+------------+-----------+-----------+------------+-----------+-----------+------------+-----------+-----------+----------+-----------+-----------+------------+-----------+------------+-----------+----------+-----------+-------------+------------+------------+-----------+-------------+------------+-----------+-------------+------------+------------+-------------+------------+-----------+-----------+----------+-------------+-----------+----------+------------+------------+-----------+-----------+------------+-----------+----------+-----------+-----------+-------------+----------+-----------+----------+-----------+-----------+-------------+------------+------------+------------+-------------+-------------+-----------+-----------+-----------+------------+------------+-----------+-------

##### Display combined_df
Let's display all the selected features after vector assembler.

In [11]:
combined_df.show()

+---------+--------------------+
|PHENOTYPE|            features|
+---------+--------------------+
|        1|(2977,[3,4,5,9,11...|
|        2|(2977,[2,3,5,8,9,...|
|        2|(2977,[2,5,11,14,...|
|        1|(2977,[3,4,8,9,11...|
|        1|(2977,[2,5,8,15,1...|
|        1|(2977,[3,9,15,20,...|
|        2|(2977,[1,2,3,5,11...|
|        2|(2977,[0,2,3,4,5,...|
|        1|(2977,[5,6,9,11,1...|
|        2|(2977,[2,4,5,8,9,...|
|        1|(2977,[5,8,11,14,...|
|        2|(2977,[3,5,6,8,9,...|
|        2|(2977,[2,8,9,11,2...|
|        2|(2977,[2,4,9,11,1...|
|        2|(2977,[2,5,11,20,...|
|        1|(2977,[2,3,5,8,11...|
|        2|(2977,[2,6,9,14,1...|
|        1|(2977,[5,8,14,16,...|
|        2|(2977,[4,8,16,18,...|
|        2|(2977,[8,9,16,18,...|
+---------+--------------------+
only showing top 20 rows



### Model Training and Evaluation

Train and test data are split from the combined dataset with a ratio of 80:20. Logistic Regression, Random Forest, and Decision Tree models are defined with various feature columns for prediction. Each model is trained using a pipeline. Predictions are made on the test data using the trained models.

In [5]:
# Split the data into train and test sets
train_data, test_data = combined_df.randomSplit([0.8, 0.2], seed=42)

print("Train Rows:", train_data.count())
print("Test Rows:", test_data.count())

# Define models for Logistic Regression, Random Forest, and Decision Tree
log_reg = [LogisticRegression(featuresCol=col_name, labelCol="PHENOTYPE",
                              predictionCol="prediction_reg_" + col_name,
                              rawPredictionCol="rawPrediction_reg_" + col_name,
                              probabilityCol="probability_reg_" + col_name)
           for col_name in combined_df.columns if col_name != "PHENOTYPE"]

rf = [RandomForestClassifier(featuresCol=col_name, labelCol="PHENOTYPE",
                             predictionCol="prediction_rf_" + col_name,
                             rawPredictionCol="rawPrediction_rf_" + col_name,
                             probabilityCol="probability_rf_" + col_name)
      for col_name in combined_df.columns if col_name != "PHENOTYPE"]

dt = [DecisionTreeClassifier(featuresCol=col_name, labelCol="PHENOTYPE",
                             predictionCol="prediction_dt_" + col_name,
                             rawPredictionCol="rawPrediction_dt_" + col_name,
                             probabilityCol="probability_dt_" + col_name)
      for col_name in combined_df.columns if col_name != "PHENOTYPE"]

# Define pipelines
pipeline_log_reg = Pipeline(stages=log_reg)
pipeline_rf = Pipeline(stages=rf)
pipeline_dt = Pipeline(stages=dt)

# Fit the pipelines to training data
model_log_reg = pipeline_log_reg.fit(train_data)
print("Logistic Regression Train Done")
model_rf = pipeline_rf.fit(train_data)
print("Random Forest Train Done")
model_dt = pipeline_dt.fit(train_data)
print("Decision Tree Classifier Train Done")

# Make predictions on test data
predictions_reg = model_log_reg.transform(test_data)
print("Logistic Regression Test Done")
predictions_rf = model_rf.transform(test_data)
print("Random Forest Test Done")
predictions_dt = model_dt.transform(test_data)
print("Decision Tree Classifier Test Done")
print("Test Done")

Train Rows: 452
Test Rows: 82
Linear Regression Train Done
Random Forest Train Done
Decision Tree Classifier Train Done
Linear Regression Test Done
Random Forest Test Done
Decision Tree Classifier Test Done
Test Done


### Dropping Redundant Columns from Predictions Data
We first initialize an empty list named `columns_to_drop`, which will be used to store the names of columns to be dropped from the predictions dataframes. We then iterate through each column in the combined DataFrame. If the column name is not "PHENOTYPE" and exists in the predictions dataframe columns, we add it to the `columns_to_drop` list and print a message indicating that the column has been dropped. Finally, we use the `drop` function to remove the selected columns from the predictions dataframes `predictions_reg`, `predictions_rf`, and `predictions_dt`.

In [6]:
# List to store columns to be dropped
columns_to_drop = []

# Iterate through each column in the combined DataFrame
for col_name in combined_df.columns:
      if col_name != "PHENOTYPE" and col_name in predictions_reg.columns:
            columns_to_drop.append(col_name)
            print(col_name, "Dropped from predictions dataframes.")

# Drop the selected columns from the predictions dataframes
predictions_reg=predictions_reg.drop(*columns_to_drop)
predictions_rf=predictions_rf.drop(*columns_to_drop)
predictions_dt=predictions_dt.drop(*columns_to_drop)

features Dropped from predictions dataframes.


### Evaluating Model Accuracy
We're evaluating the accuracy of logistic regression, random forest, and decision tree models. We begin by initializing an `accuracy_dict` to store accuracies for each prediction column. Then, we loop through the prediction columns in `predictions_reg`, `predictions_rf`, and `predictions_dt`, excluding columns with names "PHENOTYPE" or containing "rawPrediction" or "probability". For each prediction column, we define evaluators for accuracy and evaluate the models, storing the results in `accuracy_dict`. Finally, we print out the accuracies for each prediction column. This process allows us to assess the performance of each model across different prediction columns.

In [7]:
# Initialize accuracy dictionary to store accuracies for each prediction column
accuracy_dict = {}

# Loop through each prediction column in predictions_reg, predictions_rf, predictions_dt
for (predictions_reg_col, predictions_rf_col, predictions_dt_col) in zip(predictions_reg.columns, predictions_rf.columns, predictions_dt.columns):
    # Exclude the case where predictionColumn is "PHENOTYPE" or contains "rawPrediction"
    if ("PHENOTYPE" != predictions_reg_col and "rawPrediction" not in predictions_reg_col and "probability" not in predictions_reg_col):
        # Define the evaluator for accuracy using the current prediction column
        evaluator_reg_acc = MulticlassClassificationEvaluator(labelCol="PHENOTYPE", metricName="accuracy", predictionCol=predictions_reg_col)
        evaluator_rf_acc = MulticlassClassificationEvaluator(labelCol="PHENOTYPE", metricName="accuracy", predictionCol=predictions_rf_col)
        evaluator_df_acc = MulticlassClassificationEvaluator(labelCol="PHENOTYPE", metricName="accuracy", predictionCol=predictions_dt_col)
        
        # Evaluate the model for accuracy using the current prediction column
        accuracy_reg = evaluator_reg_acc.evaluate(predictions_reg)
        accuracy_rf = evaluator_rf_acc.evaluate(predictions_rf)
        accuracy_dt = evaluator_df_acc.evaluate(predictions_dt)
        
        # Store the accuracy in the accuracy dictionary with the prediction column name as the key
        accuracy_dict[predictions_reg_col] = accuracy_reg
        accuracy_dict[predictions_rf_col] = accuracy_rf
        accuracy_dict[predictions_dt_col] = accuracy_dt

# Print accuracies for each prediction column
for predictionColumn, accuracy in accuracy_dict.items():
    print("Accuracy for {}: {}".format(predictionColumn, accuracy))

Accuracy for prediction_reg_features: 0.9878048780487805
Accuracy for prediction_rf_features: 0.7195121951219512
Accuracy for prediction_dt_features: 0.7439024390243902


In [8]:
def custom_max(iterable):
    if not iterable:
        raise ValueError("max() arg is an empty sequence")

    max_value = iterable[0]

    for item in iterable:
        if item > max_value:
            max_value = item

    return max_value

# Convert dictionary values to a list and find the maximum value
max_value = custom_max(list(accuracy_dict.values()))

# Find the key corresponding to the maximum value
max_key = [key for key, value in accuracy_dict.items() if value == max_value][0]
if "prediction_reg" in max_key:
    max_key = "Logistic Regression"
elif "prediction_rf" in max_key:
    max_key = "Random Forest Classifier"
elif "prediction_dt" in max_key:
    max_key = "Decision Tree Classifier"
    
print("Selected Model:", max_key)
print("Best Model:", max_value)

Selected Model: Logistic Regression
Best Model: 0.9878048780487805
