
# Enhancing Data Handling Efficiency and Memory Management

In our endeavor to enhance data handling efficiency and optimize memory usage, we have devised a methodical approach. Given the considerable size of our dataset—roughly 300,000 columns and 535 rows—processing poses a formidable challenge. Hence, we adopt a meticulous strategy: the raw file is imported with efficiency in mind, then meticulously segmented into multiple files. Each file contains 10,000 columns, systematically labeled as data_0, data_1, ..., data_29. Furthermore, we conduct a curation process, removing superfluous columns to streamline data processing.

### Optimizing CPU Resources with Spark Session Initialization
First, we'll import the necessary libraries. Next, we'll initiate a Spark session and configure it to harness the maximum CPU resources available on the PC.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, ChiSqSelector
from pyspark.ml.feature import *
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from functools import reduce
from pyspark.sql import DataFrame
import gc, psutil

# Create a Spark session
spark = (SparkSession.builder
        .appName("DementiaData")
        .config("spark.driver.memory", "7g")
        .config("spark.driver.cores", "3")
        .getOrCreate()
    )

### Data Preprocessing Script for Dementia Dataset

We'll gather all the necessary files for processing into a single list.

In [None]:
# File location
data_read_locations = ["hdfs://localhost:9000/user/hdfs/dimentia_model/JanBDRcount.raw", # Location of data where data is raw
                       "Distributed/", # Location of data where batch size is 100
                       "combined.csv" # Location of combined data after feature selection
                      ]

In preprocessing, we reading data in text format. This script efficiently manages data reading, column name adjustments, and data structuring for analysis. Key steps include setting up the file location, reading the file, adjusting column names by replacing colons with underscores, and using Spark's `read.text` method to parse the file and split values into columns with `withColumn`.

In [3]:
print("Let's start with file reading process.")

# We are reading the first column using python
with open(data_read_locations[0], "r") as file:
    column_names = file.readline().strip().split(" ")

# Changing the column names to avoid the issues
column_names = [col_name.replace(':', '_') for col_name in column_names]

# Read the text file as a single column
read_raw_file = spark.read.text(data_read_locations[0])

# Split each line into individual columns based on space delimiter
read_raw_file = read_raw_file.withColumn("columns", split(read_raw_file["value"], " ").cast("array<int>"))

print("Completed with reading the file.")

Let's start with file reading process.
Completed with reading the file.


We replace the column names with the actual column names exported earlier using Python.

In [None]:
print("Let's create the dataframe and change the column names.")
dementia_data = (read_raw_file.selectExpr([f"columns[{i}] as {col_name}" 
                                               for i, col_name in enumerate(column_names)
                                               if col_name not in "FID IID PAT MAT".split(" ")]
                                         )
                )
print("Completed with dataframe creation.")

### Streamlined Batch Processing of Dementia Dataset Columns
Upon acquiring the column names from the dementia dataset, we segment them into batches, each comprising 100 columns. These batches are pivotal in constructing separate dataframes, with dimensions of 100 columns and 535 rows, across a total of 2977 files. This approach not only enhances data organization but also expedites processing by enabling parallel execution across multiple files. Additionally, the provision of progress updates for each batch ensures transparency and facilitates real-time monitoring, thereby optimizing the chunk creation process and overall data management efficiency.

In [None]:
print("Let's create the batch size and create diffrent files with columns of defined batch size.")

column_names = dementia_data.columns
# Create a batch size of 100 columns with each file
batch_size = 100

# Copying column names in chunks of batch size
column_names_batches = [column_names[i:i+batch_size] for i in range(0, len(column_names), batch_size)]

# Creating dataframe for each 100 columns and exporting as csv
for index,batch in enumerate(column_names_batches):
    dementia_data.select(*batch).coalesce(1).write.csv(f"{data_read_locations[1]}/Data__{index}",header=True,mode="overwrite")
    print(f"Completed with {index} chunk")

print("Completed with chunks creation.")