In [None]:
import pandas as pd
import numpy as np
from google.colab import drive
import gc
import math
import random

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def read_fasta(file_path):
    sequences = {}
    current_sequence = ''
    with open(file_path, 'r') as file:
        for line in file:
            line = line.strip()
            if line.startswith('>'):
                if current_sequence:
                    sequences[header] = current_sequence
                    current_sequence = ''
                header = line[1:]
            else:
                current_sequence += line
        if current_sequence:
            sequences[header] = current_sequence
    return sequences

In [None]:
def clear_variable(var_list):
    for var in var_list:
        globals().pop(var, None)  # Remove from global scope
    gc.collect() # Run garbage collection

In [None]:
def find_all_data_size(file_list):
  all_data_size = 0
  for file in file_list:
    all_data_size += int(file.split('-')[-1].split('.')[0]) # Adding every dataset's size together
  return all_data_size

In [None]:
def nuc_to_df_nuc(data_nuc, num_cols):
  sep_sequences = []

  for seq in data_nuc:
      tmp_char = []
      for char in seq:
          tmp_char.append(char)
      sep_sequences.append(tmp_char)

  tmp_df = pd.DataFrame(sep_sequences)
  clear_variable(['sep_sequences']) # Clean up memory

  fixed_df = pd.DataFrame(np.nan, index=range(len(data_nuc)), columns=range(num_cols))
  fixed_df.iloc[:len(tmp_df), :tmp_df.shape[1]] = tmp_df
  clear_variable(['tmp_df']) # Clean up memory
  return fixed_df

In [None]:
def one_hot_encode(samples_df):
    tmp_data = []  # Initialize a list to hold data for DataFrame creation
    for i, row in samples_df.iterrows():  # Iterate over the rows of samples_df
        row_data = {}  # Initialize a dictionary to hold data for the current row
        for j, char in enumerate(row):  # Iterate over the characters in each row
            if char == 'A':
                row_data[f'Nuc_A{j}'], row_data[f'Nuc_T{j}'], row_data[f'Nuc_C{j}'], row_data[f'Nuc_G{j}'] = 1, 0, 0, 0
            elif char == 'T':
                row_data[f'Nuc_A{j}'], row_data[f'Nuc_T{j}'], row_data[f'Nuc_C{j}'], row_data[f'Nuc_G{j}'] = 0, 1, 0, 0
            elif char == 'C':
                row_data[f'Nuc_A{j}'], row_data[f'Nuc_T{j}'], row_data[f'Nuc_C{j}'], row_data[f'Nuc_G{j}'] = 0, 0, 1, 0
            elif char == 'G':
                row_data[f'Nuc_A{j}'], row_data[f'Nuc_T{j}'], row_data[f'Nuc_C{j}'], row_data[f'Nuc_G{j}'] = 0, 0, 0, 1
            else:
                row_data[f'Nuc_A{j}'], row_data[f'Nuc_T{j}'], row_data[f'Nuc_C{j}'], row_data[f'Nuc_G{j}'] = 0, 0, 0, 0
        tmp_data.append(row_data)  # Append the data for the current row to the list

    return pd.DataFrame(tmp_data)  # Create the DataFrame from the list of dictionaries and return it

In [None]:
file_list = [
    'Cov-Alpha-US-13207.fasta',
    'Cov-BA.2.12.1-usa-11331.fasta',
    'Cov-Delta-US-10117.fasta',
    'Cov-BQ.1.1-usa-9999.fasta',
    'Cov-BA.1.1-usa-6694.fasta',
    'Cov-Gama-US-4995.fasta',
    'Cov-BA.5.4-3631.fasta',
    'Cov-BA.4.6-2607.fasta'
]

In [None]:
# Find the size of all datasets
subset_size = 1000
num_cols = 30900
num_classes = 8

# Calculate the proportion of each class in the subset
proportion = int(subset_size / num_classes)

all_data_size = find_all_data_size(file_list)
n_subsets = math.ceil(all_data_size / subset_size)


for i in range(len(file_list)):
  # Read each dataset
  file_path = '/content/drive/MyDrive/ML_DL_Datasets/DNA_Datasets/{path}'.format(path=file_list[i])
  data_nuc = [] # Intermediate list to store nucleotide sequences
  data = read_fasta(file_path) # Read the current dataset

  # Process the data to extract nucleotide sequences
  for key in data.keys():
      data_nuc.append(data[key])
  clear_variable(['data']) # Clean up memory
  random.seed(101)
  random.shuffle(data_nuc) # shuffling data

  # Other operations done on each batches of data
  file_counter = 1
  for start in range(0, 2500, proportion):
      batch_data_nuc = data_nuc[start : start + proportion]  # Create a smaller data_nuc for the batch

      # convert nucleotide list to the dataframe
      df_nuc = nuc_to_df_nuc(batch_data_nuc, num_cols)
      clear_variable(['batch_data_nuc']) # Clean up memory

      # Fill the empty cells
      df_nuc.fillna(value = 0, inplace=True)  # Fill the empty cells

      # One-hot encode
      one_hot_encoded_df = one_hot_encode(df_nuc)  # Convert the nucleotide dataframe to numerical foramt with one-hot encoding
      clear_variable(['df_nuc']) # Clean up memory

      # Add Class
      one_hot_encoded_df['Class'] = np.ones(len(one_hot_encoded_df))  # Assign a default class
      one_hot_encoded_df['Class'] = one_hot_encoded_df['Class'].apply(lambda x: x * i) # Adjust 'Class' for identification

      # Change the datatype
      final_df = one_hot_encoded_df.astype('uint8')
      clear_variable(['one_hot_encoded_df']) # Clean up memory

      # Save the data
      final_df.to_hdf('/content/drive/MyDrive/Datasets/temporary/class{i}_subset{file_counter}.h5'.format(i = i, file_counter = file_counter), key='data')
      clear_variable(['final_df']) # Clean up memory
      file_counter += 1

  clear_variable(['data_nuc']) # Clean up memory

# Read each subset, concatenate and shuffle them
for i in range(n_subsets):
  subsets_tmp = []
  for j in range(len(file_list)):
    tmp_subset = pd.read_hdf('/content/drive/MyDrive/Datasets/temporary/class{j}_subset{file_counter}.h5'.format(j = j, file_counter = i+1))
    subsets_tmp.append(tmp_subset)
    clear_variable(['tmp_subset']) # Clean up memory

  subset = pd.concat(subsets_tmp)
  clear_variable(['subsets_tmp']) # Clean up memory

  # Shuffle the subset
  shuffled_subset = subset.sample(frac=1, random_state=101).reset_index(drop=True)
  clear_variable(['subset']) # Clean up memory

  shuffled_subset.to_hdf('/content/drive/MyDrive/Datasets/Covid_Shuffled_Balanced_Train_Test/Shuffled_Subset{file_counter}.h5'.format(file_counter = i+1), key='data')
  clear_variable(['shuffled_subset']) # Clean up memory