In [45]:
import sys
import os
import pandas as pd
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as f
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row
from awsglue.context import GlueContext
from awsglue.job import Job

# Data lake ingestion for the most common bioinformatic Formats

When storing bioinformatics data in a data lake, it's common to use Parquet or Delta Lake for formats that allow tabular or hierarchical structuring, taking advantage of the compression and efficient data access these formats offer. For unstructured formats, you can consider keeping them in their original form in the data lake or transforming them as needed for specific analyses.

Dividing bioinformatics processing by phases according to the types of file formats is an effective way to organize and understand the workflow. Below is a general outline of how the different processing phases can be categorized based on the file formats involved:

| **Phase**                              | **Input Formats**                   | **Typical Processes**                                                                                                                                       |
|----------------------------------------|-------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Preprocessing and Data Cleaning**    | FASTQ, FASTA, BAM, CRAM             | - Quality Filtering: Removing low-quality reads (FASTQ).<br>- Adapter Trimming: Removing adapter sequences (FASTQ).<br>- Sequence Alignment (FASTQ to BAM/CRAM).<br>- Format Conversion: e.g., FASTQ to BAM/CRAM. |
| **Alignment and Mapping**              | FASTA, FASTQ, BAM, CRAM, GFF/GTF    | - Sequence Alignment to a Reference Genome: Generating BAM/CRAM from FASTQ.<br>- Read Mapping to Specific Regions: Using GFF/GTF for identifying genes or regions (BAM/CRAM to GTF).               |
| **Analysis and Quantification**        | BAM, GTF, BED, VCF                  | - Gene Expression Quantification: Calculating gene expression from aligned reads (BAM to gene counts).<br>- Variant Detection: Identifying SNPs, INDELs, etc. (BAM to VCF).<br>- RNA-RNA/DNA Interaction Analysis: Using alignment and annotation files (e.g., BAM to interaction files).        |
| **Post-Processing and Annotation**     | VCF, GFF/GTF, BED, TSV/CSV          | - Variant Annotation: Adding functional information to identified variants (VCF to annotated files).<br>- Variant Filtering and Prioritization (VCF/TSV/CSV).<br>- Multi-Omics Data Integration (GFF/GTF, VCF, TSV/CSV).        |
| **Visualization and Reporting**        | VCF, TSV/CSV, BED, PNG/SVG, PDF     | - Genomic Data Visualization: Using BED files for visualizing genomic regions.<br>- Graph and Report Generation: Creating figures and tables (TSV/CSV to PNG/SVG, PDF).<br>- Custom Output Files (VCF, TSV/CSV).        |
| **Data Storage and Management**        | BAM, CRAM, VCF, FASTA, TSV/CSV      | - Long-Term Storage: Archiving processed data in efficient formats (CRAM).<br>- Data Management: Organizing and accessing large volumes of data.         |


This outline can vary depending on the specific bioinformatics analysis being performed, but generally, file formats largely dictate what tools and steps are needed at each phase of processing.

Other way we can categorized bioinformatics file formats is referring as structured and unstructured based on how the data is organized within them. This classification is useful for determining which formats are most suitable for storage in a data lake using formats like Parquet or Delta Lake, which are designed to efficiently handle large volumes of structured and semi-structured data.

| **Category**          | **Format**        | **Description**                                                                                         | **Suitability for Data Lake**                               | **Typical Use**                                               |
|-----------------------|-------------------|---------------------------------------------------------------------------------------------------------|-------------------------------------------------------------|---------------------------------------------------------------|
| **Structured Formats** | CSV/TSV           | Tabular data with values separated by commas or tabs.                                                    | Suitable for Parquet, Delta Lake                             | Storing gene expression data, quantification results, annotations. |
|                       | VCF               | Structured format for storing genomic variants.                                                          | Suitable for Parquet, Delta Lake                             | Storing SNPs, INDELs, and other genetic variants.               |
|                       | HDF5              | Hierarchical file format for storing large amounts of complex data.                                      | Suitable for Parquet, Delta Lake (after conversion or direct storage if supported) | Storing single-cell RNA-seq data, gene expression matrices.     |
|                       | BAM/CRAM          | Binary structured files that store sequence alignments.                                                  | Suitable for Parquet, Delta Lake (after conversion if needed) | Storing sequence alignment data to reference genomes.           |
|                       | GTF/GFF           | Tabular formats for genomic annotations.                                                                 | Suitable for Parquet, Delta Lake                             | Annotation of genomic features like genes and exons.            |
| **Unstructured Formats** | FASTQ           | Text format that stores nucleotide sequences and their qualities.                                        | Suitable for direct storage; conversion needed for analysis  | Storing high-quality sequencing data.                           |
|                       | FASTA             | Text format that stores nucleotide or protein sequences.                                                 | Suitable for direct storage; conversion may be necessary     | Storing DNA, RNA, or protein sequences.                         |
|                       | SAM               | Text file that stores sequence alignments.                                                               | Suitable for direct storage; conversion may be necessary     | DNA/RNA sequence alignment.                                     |
|                       | Plain Text Files  | Unstructured text files.                                                                                 | Suitable for direct storage, but not ideal for structured data lakes | Logs, simple reports.                                           |


## Genotype files formats and genomic structure data ingestion process on a Data Lake Using Glow

Bioinformatics often involves the manipulation and analysis of large-scale genomic data, which is typically stored in specialized file formats such as Variant Call Format (VCF) and General Feature Format Version 3 (GFF3). These formats are crucial for representing genetic variants and annotations, respectively, and are commonly used in various genomic studies.

VCF files store information about genetic variants, including details such as the chromosome, position, reference, and alternative alleles, as well as associated metadata. GFF3 files, on the other hand, are used to describe gene and other genomic feature annotations, providing a structured way to represent the location and attributes of various genomic elements.

To efficiently manage and analyze such vast datasets, especially in distributed computing environments, Apache Spark has emerged as a powerful tool. The Glow library extends Spark’s capabilities, allowing bioinformaticians to process VCF and GFF3 files at scale with Spark DataFrames. This integration facilitates parallel processing, making it easier to handle large genomic datasets.

Moreover, to ensure data is stored in a scalable, reliable, and queryable manner, integrating with Delta Lake is highly advantageous. Delta Lake provides ACID transactions, scalable metadata handling, and efficient file format support. By converting VCF and GFF3 files into Delta format, researchers can leverage these features, enabling efficient data storage, versioning, and querying.

This approach allows bioinformatics workflows to be more robust, with better data management and faster processing times, especially when dealing with large datasets. Below, we explore how to manipulate these file formats using Glow and how to store them in Delta Lake for optimal performance and scalability.     

### Glow library
#### Description
??????????

In [2]:
# Create a SparkSession whit configuration required
spark = SparkSession.builder \
    .appName("Glow with PySpark") \
    .config("spark.jars.packages", "io.projectglow:glow-spark3_2.12:1.2.1,io.delta:delta-core_2.12:2.1.0") \
    .config("spark.hadoop.io.compression.codecs", "io.projectglow.sql.util.BGZFCodec") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

### VCF Format

#### Description
The VCF (Variant Call Format) is used to store genomic variants such as SNPs, indels, and other structural variants.

Now we see the structure of the most basic non binary formartmats we can start exploring some good architecture practices to store and process it.

Depending of the kind of project we are working, we will need to store and process different genomic formats. Lets supose we have a client which need to store every kind of bioinformatic formats, raw reads (BCL) and processed files (FASTA, FASTQ, GFF3, SAM, VCF).

For raw read formats, these can be stored in a data lake using S3, where they can be indexed and partitioned for easier access using patters susch as tenant/year/month/day/file/ which will then be used to process them and run the secondary analyses that will generate the other formats that will allow us to extract information of interest to produce biological insights. 

BCL to (FASTA or FASTAQ)

In [3]:
df = spark.read.format('vcf').load("../data/example1000g_chr22_MAF05.vcf")

                                                                                

In [4]:
df.select("contigName", "start").head(), Row(contigName='17', start=504217)

24/08/14 15:07:19 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


(Row(contigName='22', start=16051248), Row(contigName='17', start=504217))

In [5]:
df.write.format("delta").mode("overwrite").save("../data/delta/variants_delta")

                                                                                

In [31]:
spark.read.format('delta').load("../data/delta/variants_delta").count()

107658

### GFF3 Format

#### Description
GFF3 (General Feature Format Version 3) is a standard file format used to describe genes and other features of DNA, RNA, and protein sequences. GFF3 files are widely used in genomics for storing annotation data, which includes information about genomic features such as exons, introns, regulatory regions, and more. The format is highly structured and allows for hierarchical relationships between features, making it suitable for complex annotations.

#### Structure of GFF3
A typical GFF3 file consists of nine columns, which include:

- Seqid: The name of the sequence (e.g., chromosome name).
- Source: The source of the annotation (e.g., the name of the program that generated this feature).
- Type: The type of feature (e.g., gene, exon, CDS).
- Start: The start position of the feature.
- End: The end position of the feature.
- Score: A floating-point score associated with the feature.
- Strand: The strand of the feature (+ or -).
- Phase: The phase of the feature (relevant for CDS features).
- Attributes: A list of tag-value pairs providing additional information.

#### Uses of GFF3
GFF3 files are used for:

**Genomic Annotation:** Storing the locations and characteristics of genes and other features on genomes.
**Comparative Genomics:** Comparing gene annotations across different species.
**Bioinformatics Pipelines:** Serving as input for various bioinformatics tools that analyze genomic data.

#### Manipulating GFF3 with Glow and Delta Lake
Glow is an open-source toolkit for large-scale genomic data analysis on Apache Spark. It extends Spark's capabilities with genomic data types and functions, allowing seamless integration with standard formats like GFF3.

Delta Lake is an open-source storage layer that brings reliability to data lakes. It provides ACID transactions, scalable metadata handling, and unified streaming and batch data processing.

#### Reading GFF3 with Glow

To read a GFF3 file using Glow and Apache Spark:

In [17]:
gff3_df = spark.read.format("gff").load("../data/example.gff3")

In [18]:
gff3_df.show()

+-----+------+----+-----+----+-----+------+-----+-----+-----------+-------+
|seqId|source|type|start| end|score|strand|phase|   ID|       Name| Parent|
+-----+------+----+-----+----+-----+------+-----+-----+-----------+-------+
| seq1|  null|gene|  999|2000| null|     +| null|gene1|      Gene1|   null|
| seq1|  null|mRNA| 1049|1800| null|     +| null|mRNA1|Transcript1|[gene1]|
| seq1|  null|exon| 1049|1200| null|     +| null|exon1|      Exon1|[mRNA1]|
| seq1|  null|exon| 1249|1500| null|     +| null|exon2|      Exon2|[mRNA1]|
| seq1|  null|exon| 1549|1800| null|     +| null|exon3|      Exon3|[mRNA1]|
| seq1|  null| CDS| 1059|1190| null|     +|    0| cds1|       null|[mRNA1]|
| seq1|  null| CDS| 1259|1490| null|     +|    0| cds2|       null|[mRNA1]|
| seq1|  null| CDS| 1559|1790| null|     +|    0| cds3|       null|[mRNA1]|
+-----+------+----+-----+----+-----+------+-----+-----+-----------+-------+



In [22]:
print(f"Number of records: {gff3_df.count()}")
display(gff3_df)

Number of records: 8


DataFrame[seqId: string, source: string, type: string, start: bigint, end: bigint, score: double, strand: string, phase: int, ID: string, Name: string, Parent: array<string>]

In [25]:
etl_original_gff_df = gff3_df.select(['start', 'end', 'type', 'seqId', 'source', 'strand', 'phase', 'ID', 'Name', 'Parent'])

In [28]:
etl_original_gff_df.show()

+-----+----+----+-----+------+------+-----+-----+-----------+-------+
|start| end|type|seqId|source|strand|phase|   ID|       Name| Parent|
+-----+----+----+-----+------+------+-----+-----+-----------+-------+
|  999|2000|gene| seq1|  null|     +| null|gene1|      Gene1|   null|
| 1049|1800|mRNA| seq1|  null|     +| null|mRNA1|Transcript1|[gene1]|
| 1049|1200|exon| seq1|  null|     +| null|exon1|      Exon1|[mRNA1]|
| 1249|1500|exon| seq1|  null|     +| null|exon2|      Exon2|[mRNA1]|
| 1549|1800|exon| seq1|  null|     +| null|exon3|      Exon3|[mRNA1]|
| 1059|1190| CDS| seq1|  null|     +|    0| cds1|       null|[mRNA1]|
| 1259|1490| CDS| seq1|  null|     +|    0| cds2|       null|[mRNA1]|
| 1559|1790| CDS| seq1|  null|     +|    0| cds3|       null|[mRNA1]|
+-----+----+----+-----+------+------+-----+-----+-----------+-------+



#### Annotate chromosome (contigName) to gff3 dataframe
Selecting regions and joining back to original dataframe

In [43]:
regions_df = gff3_df.where((f.col("type") == "exon") & (f.col("Name") == "Exon1")). \
                             select("seqId", "ID")
regions_df.show()

+-----+-----+
|seqId|   ID|
+-----+-----+
| seq1|exon1|
+-----+-----+



In [44]:
regions_df.count()

1

In [46]:
gff3_df.write.mode("overwrite").format("delta").save("../data/delta/gff3_delta")

In [47]:
spark.read.format('delta').load("../data/delta/gff3_delta").count()

8

24/08/15 17:52:31 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 293180 ms exceeds timeout 120000 ms
24/08/15 17:52:31 WARN SparkContext: Killing executors is not supported by current scheduler.


## Sequence and Aligment data or unstructure file formats 

Context for Storing Sequence Data (e.g., FASTA) and Managing Metadata

Sequence data, such as those in FASTA format, can be stored in two primary ways:

1. Raw Storage with Metadata Management
2. Preprocessed and Converted into Structured Formats (e.g., CSV)

Each approach has its own advantages depending on the specific needs of the analysis and the computational environment.

#### 1. Raw Storage with Metadata Management

In this scenario, the sequence data is stored in its original raw format, such as FASTA or FASTQ. This approach preserves the integrity of the original data, which is crucial for reproducibility and for applications that require the raw sequence information, such as de novo assembly or certain types of variant calling.

Metadata associated with the sequences, which can include information like sample identifiers, sequencing run details, and quality scores, is frequently stored in formats like GFF3 (General Feature Format version 3) or in independent tabular files (e.g., CSV or TSV). These metadata files are essential for tracking the provenance of the data and for later analysis steps and can be processed as it was defined in the structure data ingestion. Additionally, these metadata files can be managed using a data catalog, which helps in organizing, searching, and accessing the data across different systems and by establishing a relationship to the correspondend fasta file.

#### 2. Preprocessed and Converted into Structured Formats (e.g., CSV)

In this scenario, the sequence data is preprocessed, possibly using tools like Apache Spark, to extract relevant features or metrics (e.g., sequence length, GC content, etc.) and then stored in a structured format such as CSV. This approach is useful when performing large-scale data analysis, machine learning, or integration with other structured datasets. The preprocessing might involve filtering, transforming, or summarizing the sequence data, and the resulting structured data is easier to query, analyze, and integrate with other data sources.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length
from pyspark.sql.types import StructType, StructField, StringType

# Create a Spark session
spark = SparkSession.builder \
    .appName("FastaProcessing") \
    .getOrCreate()

# Define a schema for the FASTA sequences
schema = StructType([
    StructField("header", StringType(), True),
    StructField("sequence", StringType(), True)
])

# Function to parse the FASTA files
def parse_fasta(line):
    if line.startswith(">"):
        return (line, None)
    else:
        return (None, line)

# Load the FASTA files as RDD
fasta_rdd = spark.sparkContext.textFile("../data/fasta_sample.fasta")

# Process the RDD to asociate each sequence with each header
fasta_pairs_rdd = fasta_rdd.map(parse_fasta) \
                           .filter(lambda x: x[0] is not None or x[1] is not None) \
                           .map(lambda x: (x[0], x[1])) \
                           .groupByKey() \
                           .map(lambda x: (x[0], ''.join(list(x[1]))))

# Convert RDD to DataFrame using the defined schema
fasta_df = fasta_pairs_rdd.toDF(schema)
fasta_df.show(truncate=False)

# Get the lenght of each sequence
fasta_df = fasta_df.withColumn("sequence_length", length(col("sequence")))
fasta_df.show(truncate=False)

# Get basic statistics about the lenght of the sequences
length_stats = fasta_df.describe("sequence_length")
length_stats.show()

# Optional: Save results as a CSV or Delta file formats
fasta_df.select("header", "sequence_length").write.csv("..data/csv/fasta_sequence_lengths.csv")

spark.stop()