<a href="https://colab.research.google.com/github/andrkech/GENERATIVE-METHODS-IN-GENOMICS/blob/main/FASTQ_DataLoader.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Load the Dataset.

### Connect with Google Drive.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Import Libraries.

In [2]:
import os
import shutil
import zipfile
import tarfile
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans

### Unzip the dataset folder and import it into the Colab notebook.

In [3]:
def extract_or_create_zip(source_path, extract_path):
    # Check if the source path is a directory
    if os.path.isdir(source_path):
        # Create a ZIP file from the directory contents
        shutil.make_archive(extract_path, "zip", source_path)
        print(f"Created ZIP file from directory: {source_path}")

    elif os.path.isfile(source_path) and source_path.lower().endswith('.zip'):
        # Extract the existing ZIP file
        with zipfile.ZipFile(source_path, 'r') as zip_ref:
            zip_ref.extractall(extract_path)
            print(f"Extraction completed successfully: {source_path}")

    else:
        print(f"Error: Invalid source path '{source_path}'. Must be a directory or a ZIP file.")

In [4]:
# Define source and extraction paths
source_path = "/content/drive/MyDrive/BIOINFORMATICS/THESIS_KECHAGIAS/DATA/DATASET/FASTQ_FILES.zip"
extract_path = "/content/fastq_dataset"

# Extract or create ZIP file based on source path
extract_or_create_zip(source_path, extract_path)

Extraction completed successfully: /content/drive/MyDrive/BIOINFORMATICS/THESIS_KECHAGIAS/DATA/DATASET/FASTQ_FILES.zip


### Extract the inner compressed folders, if needed.

In [19]:
def extract_compressed_files(source_path, extract_dir):
    # Check if the source path exists
    if not os.path.exists(source_path):
        print(f"Error: Source path '{source_path}' not found.")
        return

    # Create the extraction directory if it doesn't exist
    os.makedirs(extract_dir, exist_ok=True)

    # Iterate through all files in the directory
    for file in os.listdir(source_path):
        file_path = os.path.join(source_path, file)

        # Extract compressed files
        if file.endswith('.zip'):
            extract_file(file_path, extract_dir)

        elif file.endswith('.tar'):
            extract_file(file_path, extract_dir)

def extract_file(source_path, extract_dir):
    try:
        # Get the file extension
        file_extension = os.path.splitext(source_path)[1].lower()

        # Extract based on file type
        if file_extension == '.zip':
            with zipfile.ZipFile(source_path, 'r') as zip_ref:
                zip_ref.extractall(extract_dir)

        elif file_extension == '.tar':
            with tarfile.open(source_path, 'r') as tar_ref:
                tar_ref.extractall(extract_dir)

        print(f"Extraction completed successfully: {source_path}")

    except Exception as e:
        print(f"Error extracting {source_path}: {e}")

Extraction completed successfully: /content/fastq_dataset/FASTQ_FILES.zip


In [18]:
# Extract the compressed file
extract_compressed_files(extract_path, extract_path)

Extraction completed successfully: /content/fastq_dataset/FASTQ_FILES.zip


## Build the DataLoader.

### Import Libraries.

In [None]:
from torch.utils.data import Dataset
!pip install Bio
from Bio import SeqIO

### Initialize the Dataset.

In [21]:
dataset_path = "/content/fastq_dataset"
directory_paths = []

for dir_name in os.listdir(dataset_path):
    dir_path = os.path.join(dataset_path, dir_name)

    if dir_path.endswith('fastq'):
        directory_paths.append(dir_path)

print(directory_paths)

['/content/fastq_dataset/ERR5885031_2.fastq', '/content/fastq_dataset/ERR5885026_2.fastq', '/content/fastq_dataset/ERR5960500_1.fastq', '/content/fastq_dataset/ERR5885026_1.fastq', '/content/fastq_dataset/ERR6053338_2.fastq', '/content/fastq_dataset/ERR6053339_2.fastq', '/content/fastq_dataset/ERR6053337_1.fastq', '/content/fastq_dataset/ERR6608944_1.fastq', '/content/fastq_dataset/ERR5885027_2.fastq', '/content/fastq_dataset/ERR5885033_2.fastq', '/content/fastq_dataset/ERR5960499_2.fastq', '/content/fastq_dataset/ERR6155194_2.fastq', '/content/fastq_dataset/ERR6155194_1.fastq', '/content/fastq_dataset/ERR5885024_1.fastq', '/content/fastq_dataset/ERR6608942_1.fastq', '/content/fastq_dataset/ERR5885030_1.fastq', '/content/fastq_dataset/ERR5885031_1.fastq', '/content/fastq_dataset/ERR5885028_1.fastq', '/content/fastq_dataset/ERR5960500_2.fastq', '/content/fastq_dataset/ERR5885025_1.fastq', '/content/fastq_dataset/ERR5885032_2.fastq', '/content/fastq_dataset/ERR5885025_2.fastq', '/content

### Create a Dataset Class.

In [8]:
class FASTQDataset(Dataset):
    def __init__(self, file_paths):
        self.file_paths = file_paths
        self.data = []
        self._load_data()

    def _load_data(self):
        for file_path in self.file_paths:
            for record in SeqIO.parse(file_path, "fastq"):
                sequence = str(record.seq)
                quality_scores = record.letter_annotations["phred_quality"]
                self.data.append({'sequence': sequence, 'quality_scores': quality_scores})

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

### Create a DataLoader class.

In [9]:
class FASTQ_DataLoader:
    def __init__(self, data_dir, batch_size, seq_length):
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.seq_length = seq_length

        self.fastq_files = [f for f in os.listdir(data_dir)]

    def load_batch(self):
        batch_files = np.random.choice(self.fastq_files, size=self.batch_size, replace=False)
        batch_sequences = []
        batch_quality_scores = []

        for file in batch_files:
            file_path = os.path.join(self.data_dir, file)
            with open(file_path, 'r') as f:
                seq, qs = self.extract_seq_and_qs(f)

            seq = self.pad_or_truncate(seq, self.seq_length)

            batch_sequences.append(seq)
            batch_quality_scores.append(qs)

        return np.array(batch_sequences), np.array(batch_quality_scores)

    def extract_seq_and_qs(self, fastq_file):
        # You need to implement this method based on your specific data format
        # It should read the FASTQ file and extract sequence (seq) and quality scores (qs)
        pass

    def pad_or_truncate(self, seq, target_length):
        # You need to implement this method based on your requirements
        pass

### Make a DataLoader object to load the sequence and quality scores batches.

In [None]:
DATA_DIR = "/content/fastq_dataset/"
BATCH_SIZE = 16
SEQ_LENGTH = 100  # Replace with your desired sequence length
dna_loader = FASTQ_DataLoader(DATA_DIR, BATCH_SIZE, SEQ_LENGTH)

batch_sequences, batch_quality_scores = dna_loader.load_batch()

print("Batches loaded successfully.")