# Substitute assignment for programming 3

Test code before I combine everything in a .py file


## Imports

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, countDistinct, count, avg, min, max, sum, when
import re

## PySpark GBFF parser

In [18]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("GBFF Parser") \
    .master("local[16]") \
    .config("spark.driver.memory", "32g") \
    .config("spark.executor.memory", "32g") \
    .getOrCreate()

def parse_location(location):
    """Parse location string into start, end, strand"""
    # Skip features starting with < or >
    if location.startswith(('<', '>')):
        return None, None, None
    
    strand = "+"
    if "complement" in location:
        strand = "-"
        location = location.replace("complement(", "").replace(")", "")
    
    numbers = re.findall(r'\d+', location)
    
    try:
        return int(numbers[0]), int(numbers[1]), strand
    except:
        return None, None, strand
    
def extract_cds_counts(record):
    """Extracts CDS counts (with and without protein) from the COMMENT block."""
    comment_block = " ".join(record)
    
    cds_with_protein_match = re.search(r'CDSs \(with protein\)\s+::\s+([\d,]+)', comment_block)
    cds_without_protein_match = re.search(r'CDSs \(without protein\)\s+::\s+([\d,]+)', comment_block)

    cds_with_protein = int(cds_with_protein_match.group(1).replace(',', '')) if cds_with_protein_match else None
    cds_without_protein = int(cds_without_protein_match.group(1).replace(',', '')) if cds_without_protein_match else None
    
    return cds_with_protein, cds_without_protein

def parse_gbff(file_path):
    """Parse a GBFF file into structured features"""
    with open(file_path, "r") as f:
        lines = [line.rstrip() for line in f]

    records = []
    current_record = []
    for line in lines:
        if line.startswith("LOCUS"):
            if current_record:
                records.append(current_record)
            current_record = [line]
        elif line == "//":
            current_record.append(line)
            records.append(current_record)
            current_record = []
        else:
            current_record.append(line)
        if len(records) >= 20:  # Process first 20 genomes for testing
            break

    all_features = []
    genomes_processed = 0
    
    for record in records:
        try:
            accession_line = next(line for line in record if line.startswith("ACCESSION"))
            primary_accession = accession_line.split()[1]
            
            organism = None
            in_source = False
            source_lines = []
            
            for line in record:
                if line.startswith("     source"):
                    in_source = True
                    source_lines.append(line)
                elif in_source:
                    if line.startswith(" " * 21):
                        source_lines.append(line)
                    else:
                        in_source = False
            
            if source_lines:
                source_text = " ".join([line[21:] for line in source_lines])
                organism_match = re.search(r'/organism="([^"]+)"', source_text)
                if organism_match:
                    organism = organism_match.group(1)

            cds_with_protein, cds_without_protein = extract_cds_counts(record)

        except (StopIteration, AttributeError):
            continue

        try:
            features_start = next(i for i, line in enumerate(record) if line.startswith("FEATURES"))
            valid_headers = {'ORIGIN', 'CONTIG', 'REFERENCE', 'COMMENT', '//'}
            features_end = next((i for i in range(features_start+1, len(record)) 
                               if any(record[i].startswith(h) for h in valid_headers)), len(record))
            features = record[features_start+1:features_end]
        except StopIteration:
            continue

        current_feature = []
        feature_blocks = []
        for line in features:
            if not line.startswith(" " * 21):
                if current_feature:
                    feature_blocks.append(current_feature)
                current_feature = [line]
            else:
                current_feature.append(line)
        if current_feature:
            feature_blocks.append(current_feature)

        for feature in feature_blocks:
            first_line = feature[0]
            feature_type = first_line[5:21].strip()
            
            if feature_type == "gene":
                continue
                
            location_str = first_line[21:].strip()
            start, end, strand = parse_location(location_str)
            
            if start is None or end is None:
                continue
                
            length = end - start + 1 if end >= start else 0
            
            qual_text = " ".join([line[21:] for line in feature[1:]])
            qualifiers = {
                'product': re.search(r'/product="([^"]+)"', qual_text),
                'protein_id': re.search(r'/protein_id="([^"]+)"', qual_text),
                'translation': re.search(r'/translation="([^"]+)"', qual_text)
            }
            
            qualifiers = {k: v.group(1).replace('\n', ' ') if v else None 
                         for k, v in qualifiers.items()}

            all_features.append({
                "genome_id": primary_accession,
                "organism": organism,
                "feature_type": feature_type,
                "start": start,
                "end": end,
                "length": length,
                "strand": strand,
                "product": qualifiers['product'],
                "protein_id": qualifiers['protein_id'],
                "translation": qualifiers['translation'],
                "cds_with_protein": cds_with_protein,
                "cds_without_protein": cds_without_protein
            })

        genomes_processed += 1
        if genomes_processed >= 20:
            break

    return all_features


# Define schema
schema = StructType([
    StructField("genome_id", StringType(), False),
    StructField("organism", StringType(), True),
    StructField("feature_type", StringType(), False),
    StructField("start", IntegerType(), False),
    StructField("end", IntegerType(), False),
    StructField("length", IntegerType(), False),
    StructField("strand", StringType(), False),
    StructField("product", StringType(), True),
    StructField("protein_id", StringType(), True),
    StructField("translation", StringType(), True),
    StructField("cds_with_protein", IntegerType(), True),
    StructField("cds_without_protein", IntegerType(), True)
])


# Parse and create DataFrame
file_path = "/data/datasets/NCBI/refseq/ftp.ncbi.nlm.nih.gov/refseq/release/archaea/archaea.1.genomic.gbff"
parsed_data = parse_gbff(file_path)
df = spark.createDataFrame(parsed_data, schema=schema)

# Add coding_status column
df = df.withColumn(
    "coding_status",
    when(col("feature_type") == "CDS", "coding").otherwise("non-coding")
)

In [16]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("GBFF Parser") \
    .master("local[16]") \
    .config("spark.driver.memory", "32g") \
    .config("spark.executor.memory", "32g") \
    .getOrCreate()

def parse_location(location):
    """Parse location string into start, end, strand"""
    strand = "+"
    if "complement" in location:
        strand = "-"
        location = location.replace("complement(", "").replace(")", "")
    
    # Remove any special characters like < or >
    location = re.sub(r'[<>]', '', location)
    
    # Handle join and other complex locations by taking first segment
    if "join" in location:
        location = location.split("(")[1].split(")")[0].split(",")[0]
    
    try:
        start, end = map(int, re.findall(r'\d+', location)[:2])
        return start, end, strand
    except:
        return None, None, strand

def parse_gbff(file_path):
    """Parse a GBFF file into structured features"""
    with open(file_path, "r") as f:
        lines = [line.rstrip() for line in f]

    records = []
    current_record = []
    for line in lines:
        if line.startswith("LOCUS"):
            if current_record:
                records.append(current_record)
            current_record = [line]
        elif line == "//":
            current_record.append(line)
            records.append(current_record)
            current_record = []
        else:
            current_record.append(line)

    all_features = []
    for record in records:
        try:
            # Get primary accession and organism
            accession_line = next(line for line in record if line.startswith("ACCESSION"))
            primary_accession = accession_line.split()[1]
            
            # Get organism from source feature
            organism = None
            for line in record:
                if line.startswith("     source") and "/organism=" in line:
                    organism = re.search(r'/organism="([^"]+)"', line).group(1)
                    break
        except (StopIteration, AttributeError):
            continue

        try:
            features_start = next(i for i, line in enumerate(record) if line.startswith("FEATURES"))
            valid_headers = {'ORIGIN', 'CONTIG', 'REFERENCE', 'COMMENT', '//'}
            features_end = next((i for i in range(features_start+1, len(record)) 
                               if any(record[i].startswith(h) for h in valid_headers)), len(record))
            features = record[features_start+1:features_end]
        except StopIteration:
            continue

        current_feature = []
        feature_blocks = []
        for line in features:
            if not line.startswith(" " * 21):
                if current_feature:
                    feature_blocks.append(current_feature)
                current_feature = [line]
            else:
                current_feature.append(line)
        if current_feature:
            feature_blocks.append(current_feature)

        for feature in feature_blocks:
            first_line = feature[0]
            feature_type = first_line[5:21].strip()
            location_str = first_line[21:].strip()
            
            start, end, strand = parse_location(location_str)
            if start is None or end is None:
                continue  # Skip invalid locations
                
            length = end - start + 1 if end >= start else 0
            
            # Parse qualifiers
            qual_text = " ".join([line[21:] for line in feature[1:]])
            qualifiers = {
                'gene': re.search(r'/gene="([^"]+)"', qual_text),
                'product': re.search(r'/product="([^"]+)"', qual_text),
                'protein_id': re.search(r'/protein_id="([^"]+)"', qual_text),
                'translation': re.search(r'/translation="([^"]+)"', qual_text)
            }
            
            # Clean qualifiers
            qualifiers = {k: v.group(1).replace('\n', ' ') if v else None 
                         for k, v in qualifiers.items()}

            all_features.append({
                "genome_id": primary_accession,
                "organism": organism,
                "feature_type": feature_type,
                "start": start,
                "end": end,
                "length": length,
                "strand": strand,
                "gene": qualifiers['gene'],
                "product": qualifiers['product'],
                "protein_id": qualifiers['protein_id'],
                "translation": qualifiers['translation']
            })

    return all_features

# Define schema
schema = StructType([
    StructField("genome_id", StringType(), False),
    StructField("organism", StringType(), True),
    StructField("feature_type", StringType(), False),
    StructField("start", IntegerType(), False),
    StructField("end", IntegerType(), False),
    StructField("length", IntegerType(), False),
    StructField("strand", StringType(), False),
    StructField("gene", StringType(), True),
    StructField("product", StringType(), True),
    StructField("protein_id", StringType(), True),
    StructField("translation", StringType(), True)
])

# Parse and create DataFrame
file_path = "/data/datasets/NCBI/refseq/ftp.ncbi.nlm.nih.gov/refseq/release/archaea/archaea.1.genomic.gbff"
parsed_data = parse_gbff(file_path)
df = spark.createDataFrame(parsed_data, schema=schema)

# Add coding_status column
df = df.withColumn(
    "coding_status",
    when(col("feature_type") == "CDS", "coding").otherwise("non-coding")
)

# Show results
df.show(5, truncate=False)

25/01/31 01:19:07 WARN TaskSetManager: Stage 21 contains a task of very large size (34349 KiB). The maximum recommended task size is 1000 KiB.
25/01/31 01:19:11 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 21 (TID 239): Attempting to kill Python Worker

+---------------+--------+------------+-----+-----+------+------+----+------------------------------------------+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|genome_id      |organism|feature_type|start|end  |length|strand|gene|product                                   |protein_id    |translation                                                                                                                                                                                                                                                                            |coding_status|
+---------------+--------+------------+-----+-----+------+------+----+------------------------------------------+--------------+--------------------------



In [18]:
df.count()

25/01/31 01:40:25 WARN TaskSetManager: Stage 24 contains a task of very large size (34349 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

2935883



## Questions

In [21]:
# Question 1 - Hoeveel "features" heeft een Archaea genoom gemiddeld?

# Group by genome_id and count features
features_per_genome = df.groupBy("genome_id").agg(count("*").alias("feature_count"))

# Calculate average features per genome
avg_features = features_per_genome.agg(avg("feature_count").alias("avg_features_per_genome"))

# Show result
avg_features.show()

25/01/31 01:46:55 WARN TaskSetManager: Stage 28 contains a task of very large size (34349 KiB). The maximum recommended task size is 1000 KiB.
[Stage 28:>                                                       (0 + 16) / 16]

+-----------------------+
|avg_features_per_genome|
+-----------------------+
|      80.76040491843864|
+-----------------------+



                                                                                

In [22]:
# Question 2 - Hoe is de verhouding tussen coding en non-coding features? (Deel coding door non-coding totalen).

# Count coding and non-coding features
coding_non_coding_counts = df.groupBy("coding_status").agg(count("*").alias("count"))

# Calculate ratio
coding_count = coding_non_coding_counts.filter(col("coding_status") == "coding").select("count").collect()[0][0]
non_coding_count = coding_non_coding_counts.filter(col("coding_status") == "non-coding").select("count").collect()[0][0]

coding_ratio = coding_count / non_coding_count

print(f"Coding to non-coding ratio: {coding_ratio:.2f}")

25/01/31 01:47:25 WARN TaskSetManager: Stage 34 contains a task of very large size (34349 KiB). The maximum recommended task size is 1000 KiB.
25/01/31 01:47:27 WARN TaskSetManager: Stage 37 contains a task of very large size (34349 KiB). The maximum recommended task size is 1000 KiB.

Coding to non-coding ratio: 0.94


                                                                                

In [27]:
# question 3 - Wat zijn de minimum en maximum aantal eiwitten van alle organismen in het file?

# Compute min and max cds_with_protein per organism
result = df.groupBy("organism").agg(
    min("cds_with_protein").alias("min_proteins"),
    max("cds_with_protein").alias("max_proteins")
)

result.show()

25/01/31 01:53:12 WARN TaskSetManager: Stage 54 contains a task of very large size (34349 KiB). The maximum recommended task size is 1000 KiB.

+------------+------------+
|min_proteins|max_proteins|
+------------+------------+
|     1018659|     1018659|
+------------+------------+



                                                                                

In [24]:
# Question 4 - Verwijder alle non-coding (RNA) features en schrijf dit weg als een apart DataFrame (Spark format).

coding_df = df.filter(col("coding_status") == "coding")

# Write to disk (e.g., Parquet)
coding_df.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("/homes/pcriesebos/Documents/programming3/Assignment6_alternative/dataframe_output_non_coding")

25/01/31 01:50:07 WARN TaskSetManager: Stage 40 contains a task of very large size (34349 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [25]:
# Question 5 - Wat is de gemiddelde lengte van een feature ?

# Calculate average feature length
avg_feature_length = df.agg(avg("length").alias("avg_feature_length"))

# Show result
avg_feature_length.show()

25/01/31 01:50:43 WARN TaskSetManager: Stage 41 contains a task of very large size (34349 KiB). The maximum recommended task size is 1000 KiB.
[Stage 41:>                                                       (0 + 16) / 16]

+------------------+
|avg_feature_length|
+------------------+
| 1313.002710939094|
+------------------+



                                                                                

## Store output
Write the dataframe to snappy compressed parquet files

In [None]:
# Write as compressed Parquet

df.write \
    .format("parquet") \
    .option("compression", "snappy") \
    .mode("overwrite") \
    .save("/homes/pcriesebos/Documents/programming3/Assignment6_alternative/dataframe_output")

25/01/31 01:44:25 WARN TaskSetManager: Stage 27 contains a task of very large size (34349 KiB). The maximum recommended task size is 1000 KiB.
                                                                                