#### cluster-specific setup, adjust this according to your own ###

In [None]:
 #start a new Spark session, necessary for all Spark-related operations

from pyspark.sql import SparkSession

PFAM_HMM_PATH = "/Pfam-A.hmm" # should also contain all the hmmpressed files (will be captured using glob)

# 'spark' and 'sc' are the canonical variable names for the SparkSession and SparkContext objects
spark = SparkSession.builder\
    .config("spark.files", ",".join([PFAM_HMM_PATH + add for add in ["", ".h3f", ".h3i", ".h3m", ".h3p"]]))\
    .getOrCreate()
sc = spark.sparkContext


In [None]:
# point here to where the workshop's data folder is located
from os import path

DATA_FOLDER = "gs://zw_axolotl/jgi_workshop_2024/data"

In [None]:
# point here to where we will store results from the activity (make sure it's empty and readable!)

RESULT_FOLDER = "./"

In [None]:
# helper variable to set minimum repartition of the data (to use up all executors)

MIN_PARTITIONS = 100

In [None]:
# to avoid needing to mount a Gdrive folder, we will copy an existing parquet for the taxonomy table from module 1
#spark.read.parquet(path.join(DATA_FOLDER, "df_taxonomy.pq")).write.mode("overwrite").parquet(path.join(RESULT_FOLDER, "df_taxonomy.pq"))

#### variables setup and imports

In [None]:
from pyspark.sql import functions as F # contain frequently-used functions
from pyspark.sql import types as T # for specifying schema datatypes, etc

# basic python libraries for data manipulation and visualization
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt

### The dataset
The full dataset comprises 13,223 "Species representative" genomes of the MGnify Marine V2.0 catalogue (https://www.ebi.ac.uk/metagenomics/genome-catalogues/marine-v2-0).

Each genome folder contain:
- Fasta files of the contig sequences: *.fna
- GFF3 files of gene-calling annotations: *.gff
- GFF3 files of BGC-calling annotations (using their in-house pipeline): *_sanntis.gff

The data folder "genomes_all" is organized by Phylum->Genus->Genome, for example:

- genomes_all
 - Actinomycetota
   - Streptomycetes
     - MYM000001
       - MYM000001.fna
       - MYM000001.gff
       - MYM000001_sanntis.gff
   - Microbacteria
   - ...
 - Pseudomonatota
   - ...
 - ...





In [None]:
# Load the metadata (and persist it) for the entire dataset, as we will be using it later on

metadata_sdf = (spark.read
    .option("delimiter", "\t")
    .option("header", True)
    .option("inferSchema", True)
    .option("samplingRatio", 0.0001)
    .csv(path.join(DATA_FOLDER, "mgnify-marine-v2-0.tsv"))
).filter(F.col("Genome") == F.col("Species_rep")) # takes only the species representative genomes from the full 5X,XXX genomes

metadata_sdf.createOrReplaceTempView("metadata")
metadata_sdf.persist()
metadata_sdf.count()

In [None]:
taxonomy_pq_path = path.join(RESULT_FOLDER, "df_taxonomy.pq") # from module_1

# we will load the taxonomy as SQL, after filtering it only for the 13,223 species rep genomes (via Join)
spark.read.parquet(taxonomy_pq_path).createOrReplaceTempView("taxonomy")
taxonomy_sdf = spark.sql("SELECT taxonomy.* FROM taxonomy JOIN metadata ON taxonomy.Genome=metadata.Genome")
taxonomy_sdf.createOrReplaceTempView("taxonomy")

taxonomy_sdf.persist()
taxonomy_sdf.count()

In [None]:
# we will create a new table, to lookup the number of genomes + species included in a single genus
genus_stat_sdf = spark.sql((
    "SELECT Phylum, Genus, count(Genome) as num_genomes, count(distinct Species) as num_species"
    " FROM taxonomy"
    " GROUP BY Phylum, Genus"
))

genus_stat_sdf.createOrReplaceTempView("genus_stat")
genus_stat_sdf.persist().count()

# here is what the table looks like
genus_stat_sdf.orderBy("num_genomes", ascending=False).limit(10).toPandas()

#### For this activity, let's look up a total of <10 genomes from two different genera, all with unique species

In this case, we will arbitarily choose <i>Streptomyces</i> and <i>Kitasatospora</i>, two related genera that were known for their biosynthetic potentials.

In [None]:
# First, we will look up the correct naming of our genera of choice, along with their genomes/species count
# we can use the wild card character '%' to search using the SQL query

spark.sql((
    "SELECT * FROM genus_stat"
    " WHERE Genus LIKE '%Strepto%' OR Genus LIKE '%Kita%'"
)).toPandas()

In [None]:
# then let's encode and specify the genera to be selected, complete with its Phylum
included_genera = [
    ("p__Actinomycetota", "g__Streptomyces"),
    ("p__Actinomycetota", "g__Kitasatospora")
]

In [None]:
# this is the function that will later on be used to generate
# the list of folder paths of each genus
# all unknown genera ('g__') is combined into a folder called 'unknown_genus'

def generate_folder_paths(list_of_genera, file_extension=None):
    return [path.join(DATA_FOLDER, "genomes_all", phylum.split("__")[1], genus.split("__")[1] if genus != "g__" else "unknown_genus") for phylum, genus in list_of_genera]

# for example, here is the folder we selected
generate_folder_paths(included_genera)

### Activity 2-1: Parsing FASTA files from scratch with Spark

First, let's make a FASTA parser function that takes: a **file path string** and a **nucleotide FASTA text (can be multi-sequences)** and output list of sequences and its IDs+headers, along with its original file path, as follow:

**Input:** ('example_fasta.fna',
'
&gt;seq_1 desc_1
<br/>ATGCATGCATGC<br/>ATGCATGCATGC

&gt;seq_2 desc_2
<br/>AAAAAAAAAAAA
<br/>TTTTTTTTTTTT
'

**Output:**

[<br/>
['example_fasta.fna', 'seq_1', 'desc_1', 'ATGCATGCATGCATGCATGCATGC'],<br/>
['example_fasta.fna', 'seq_2', 'desc_2', 'AAAAAAAAAAAATTTTTTTTTTTT']
<br/>]

This function will be called later in combination with Spark's parallel file parsing function 'wholeTextFiles()' to produce a Spark DataFrame.

In [None]:
# this code is derived from chatGPT

import re

def parse_fasta(file_path, fasta_str):
    sequences = []
    fasta_entries = fasta_str.strip().split('>')

    for entry in fasta_entries:
        if entry:
            lines = entry.split('\n')
            header = lines[0]
            sequence = ''.join(lines[1:])

            match = re.match(r'(\S+)\s*(.*)', header)
            if match:
                seq_id = match.group(1)
                description = match.group(2)
                sequences.append([file_path, seq_id, description, sequence])

    return sequences

# test
file_path = "example.fasta"
fasta_str = """>seq1 Description of seq1
ATGCGTA
>seq2 Another description
GCTAGCTAGCTA
>seq3 Yet another description
TGCATGCA"""

%time parse_fasta(file_path, fasta_str)

#### Parsing using Spark's wholeTextFiles()

with Spark, we can use the wholeTextFiles() to scan and load text files into RDDs of string,
then pass it down to our parser function. This works well for smaller files that can fit each
into the worker's memory. See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.wholeTextFiles.html

The function takes file paths as a single string, separated by <b>','</b>. You can use glob-style wildcard <b>'*'</b> to capture multiple files with the same pattern.

If you are parsing fewer, but big files, you would use https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.textFile.html instead.



In [None]:
# let's generate the input fasta paths (using the function we wrote above, combining it with the included genera)
input_fasta_paths = [path.join(folder_path, "*/*.fna") for folder_path in generate_folder_paths(included_genera)]
input_fasta_paths

In [None]:
# flatMap() is used to flatten the list of lists into single list (i.e., row in the dataframe)
from_spark_df = (sc.wholeTextFiles(",".join(input_fasta_paths)).flatMap(
    lambda tup: parse_fasta(tup[0], tup[1]) # wholeTextFiles output: RDD of (file_path, file_content_string)
).toDF(["file_path", "seq_id", "desc", "sequence"])
    .withColumn("length", F.length(F.col("sequence"))) # add a new column: length
)

from_spark_df.persist()
%time from_spark_df.count()

from_spark_df.limit(5).toPandas()

In [None]:
# let's check the statistics
from_spark_df.select(
    F.countDistinct(F.col("file_path")).alias("num_files"),
    F.count(F.col("file_path")).alias("num_contigs"),
    F.min(F.col("length")).alias("min_length"),
    F.median(F.col("length")).cast("int").alias("med_length"),
    F.max(F.col("length")).alias("max_length")
).toPandas()

### Activity 2-2: Parsing FASTA & GFF files using Axolotl.IO

...

In [None]:
from axolotl.io.fasta import FastaIO

# just like previously
input_fasta_paths = [path.join(folder_path, "*/*.fna") for folder_path in generate_folder_paths(included_genera)]

# this time, we use FastaIO class to load, which wraps around the original wholeTextFiles() function
contigs = FastaIO.loadSmallFiles(
    ",".join(input_fasta_paths),
    seq_type = "nucl",
    minPartitions = MIN_PARTITIONS
)
%time print(contigs.df.persist().count())
contigs.df.createOrReplaceTempView("contigs")

contigs.df.limit(3).toPandas()

In [None]:
from axolotl.io.gff3 import gff3IO

# now we change the .fna into .gff
input_gff_paths = [path.join(folder_path, "*/*.gff") for folder_path in generate_folder_paths(included_genera)]

features = gff3IO.loadSmallFiles(
    ",".join(input_gff_paths),
    fasta_path_func = lambda x: x.rsplit(".", 1)[0].rsplit("_sanntis", 1)[0] + ".fna",
    minPartitions = MIN_PARTITIONS
)
%time print(features.df.persist().count())
features.df.createOrReplaceTempView("features")

features.df.limit(3).toPandas()

In [None]:
# let's get some statistics for the parsed GFFs

spark.sql((
    "SELECT type, count(type) as total_rows from features"
    " GROUP BY type"
    " ORDER BY total_rows DESC"
)).toPandas().set_index("type")

### Exercise: Scaling Up Data without Changing the Code

Let's select these new genera below, totaling to around 38 genomes (~4x larger), for parsing. **Using Axolotl's fastaIO and gff3IO, can you parse those genomes into new DataFrames?**

Phylum - Genus

- Actinomycetota - Microbacterium (13 genomes)
- Bacteroidota - Aquimarina (10 genomes)
- Bacillota - Bacillus (9 genomes)
- Pseudomonadota - Kordiimonas (6 genomes)


**_Assign new variable and table names for the new DataFrames_ --> use contigs_2 for the FASTA files and features_2 for the GFF files, as they will be used in the follow-up blocks**. Hint: use the same code as previously (check boxes before this), but reassign the variables and the input array.

Put your code down here (remove the comment tags and fill in the blank '...' with the correct code):

In [None]:
#included_genera_2 = [
# ...
#]

In [None]:
#input_fasta_paths_2 = ...

#contigs_2 = ...

#%time print(contigs_2.df.persist().count())
#contigs.df.createOrReplaceTempView("contigs_2")

#contigs.df.limit(3).toPandas()

In [None]:
#input_gff_paths_2 = ...

#features_2 = ...

#%time print(features_2.df.persist().count())
#features.df.createOrReplaceTempView("features_2")

#features.df.limit(3).toPandas()

In [None]:
# @title spoiler code: uncomment the code below this box and run it if you are stuck

def run_spoiler_code():
  # this part redefine the input paths, the only thing needs to be changed from the original code
  included_genera_2 = [
      ("p__Actinomycetota", "g__Microbacterium"),
      ("p__Bacteroidota", "g__Aquimarina"),
      ("p__Bacillota", "g__Bacillus"),
      ("p__Pseudomonadota", "g__Kordiimonas")
  ]
  # this part for the FASTAs
  input_fasta_paths_2 = [path.join(folder_path, "*/*.fna") for folder_path in generate_folder_paths(included_genera_2)]
  contigs_2 = FastaIO.loadSmallFiles(
      ",".join(input_fasta_paths_2),
      seq_type = "nucl",
      minPartitions = MIN_PARTITIONS
  )
  # this part for the GFFs
  input_gff_paths_2 = [path.join(folder_path, "*/*.gff") for folder_path in generate_folder_paths(included_genera_2)]
  features_2 = gff3IO.loadSmallFiles(
      ",".join(input_gff_paths_2),
      fasta_path_func = lambda x: x.rsplit(".", 1)[0].rsplit("_sanntis", 1)[0] + ".fna",
      minPartitions = MIN_PARTITIONS
  )
  return contigs_2, features_2


In [None]:
# uncomment and run this if you are stuck
contigs_2, features_2 = run_spoiler_code()
%time print(contigs_2.df.persist().count())
%time print(features_2.df.persist().count())

### Combining two DataFrames using .union()

Spark allows combining DataFrame (must be of the same structure) using DataFrame.union() function (see https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.union.html). Axolotl support the same operations in its DataFrames, **however, note that this will reset the indexes in the combined AxlDF**.

Below, we will **combine both the contig's and the feature's dataframes, then _persist them_**, so we can work on the combined version from this point forward.

In [None]:
from axolotl.data.sequence import NuclSeqDF

contigs_combined = NuclSeqDF(contigs.df.union(contigs_2.df), override_idx=True)
contigs_combined.df.persist()
contigs_combined.df.count()
contigs_combined.df.createOrReplaceTempView("contigs_combined")

In [None]:
from axolotl.data.annotation import RawFeatDF

features_combined = RawFeatDF(features.df.union(features_2.df), override_idx=True)
features_combined.df.persist()
features_combined.df.count()
features_combined.df.createOrReplaceTempView("features_combined")

#### Let's generate some visualization for the combined data

<i>Map contig lengths per genus</i>

In [None]:
# map out contig lengths per genus
ctg_length_summary = spark.sql((
    "SELECT printf('%s:%s ', Phylum, Genus) as genus, collect_list(log(length)) as ctg_lengths" # collect contig lengths per genus as a list
    " FROM contigs_combined JOIN taxonomy ON split(seq_id, '_')[0]=taxonomy.Genome" # join with taxonomy table using genome IDs
    " GROUP BY Phylum, Genus"
    " ORDER BY Phylum, Genus ASC"
)).toPandas()
ctg_length_summary

In [None]:
plt.figure(figsize=(8, 4));
plt.title("Contig lengths (log10)");
plt.xticks(rotation=-30, ha="left");
plt.boxplot(ctg_length_summary["ctg_lengths"].values, labels=ctg_length_summary["genus"]);

In [None]:
# map out BGC numbers per genome per genus
bgc_num_summary = spark.sql((
    "SELECT genus, collect_list(num_bgcs) as num_bgcs"
    " FROM (SELECT printf('%s:%s ', Phylum, Genus) as genus, count(type) as num_bgcs"
          " FROM features_combined JOIN taxonomy ON split(seq_id, '_')[0]=taxonomy.Genome"
          " WHERE type like 'CLUSTER'"
          " GROUP BY taxonomy.Genome, Phylum, Genus)"
    " GROUP BY genus"
    " ORDER BY genus ASC"
)).toPandas()

bgc_num_summary

In [None]:
plt.figure(figsize=(8, 4));
#plt.title("Contig lengths (log10)");
plt.xticks(rotation=-30, ha="left");
plt.boxplot(bgc_num_summary["num_bgcs"].values, labels=bgc_num_summary["genus"]);

### Activity 2-3: Extracting BGC and CDS tables from Raw Features table

A common data preprocessing activity is transforming (through filtering, joining, function calling) one or more data tables into another. In this case, we take an example of transforming the RawFeatDF (features table) parsed directly from the GFF files into a separate cdsDF (which contain only CDS features data) and bgcDF (which contain only BGC features data).

<i>As a reference, the original RawFeatDF table looks like this:</i> (most of the important information is stored in the 'qualifiers' column)

In [None]:
features_combined.df.limit(3).toPandas()

#### RawFeatDF -> cdsDF

<b>cdsDF</b>, as the name suggests, is an Axolotl dataframe class specifically made for storing Coding Sequences (CDS) information. In a typical genome annotation scenario (using GFF or GBK files), CDS information is stored as features, with type == 'CDS', and include information such as locus tag, gene id, etc.

In [None]:
from axolotl.data.annotation import cdsDF

# to 'extract' cdsDF from a RawFeatDF table, we use the fromRawFeatDF() function
cds_combined = cdsDF.fromRawFeatDF(features_combined)
%time print(cds_combined.df.persist().count()) # persist this for further analysis

# this is what cdsDF table will looks like:
# the 'locus_tag', 'gene_name', etc. were derived from the original 'qualifiers' column of the RawFeatDF
cds_combined.df.limit(3).toPandas()

#### utilizing Axolotl's pre-made functions

Notice how in the table above, the 'aa_sequence' column seems to be empty. While some annotation pipeline store AA sequence in the feature qualifiers, some don't. In this case, we will need to re-translate based on the location of the CDS and the nucleotide sequences (i.e., codons) taken from the contigs.

In Axolotl, the cdsDF class support this function via the <b>cdsDF.translateAAs()</b> function. See below:

In [None]:
# see how many CDS rows are untranslated
cds_combined.df.select(F.isnotnull(F.col("aa_sequence")).alias("have_translation")).groupBy("have_translation").count().toPandas()

In [None]:
# let's use Axolotl to translate the AAs. Since the GFF also doesn't provide transl_table value, we will use the most common table = 11
# this will take around 1.5 minutes

cds_combined_translated = cds_combined.translateAAs(contigs_combined, default_table=11) # assign into a new variable
%time cds_combined_translated.df.persist().count() # to kick start the process, we will persist and collect the new DF

cds_combined_translated.df.filter(F.isnotnull(F.col("aa_sequence"))).limit(3).toPandas() # let's see how the new table looks like

In [None]:
# see how many CDS rows are now translated
cds_combined_translated.df.select(F.isnotnull(F.col("aa_sequence")).alias("have_translation")).groupBy("have_translation").count().toPandas()

In [None]:
plt.figure(figsize=(3, 6));
plt.title("CDS lengths");
plt.boxplot(cds_combined_translated.df.select(F.length(F.col("aa_sequence")).alias("length")).toPandas()["length"]);

### utilizing Axolotl to perform parallel HMM scanning (using the pyHMMer library)

While Axolotl doesn't yet have an extensive list of pre-made functions and pipelines to cover the breadth of analyses people may do in genomics, we implement some most common ones, partly in order to demonstrate how flexible the library is for incorporating external tools, libraries and softwares.

<i>due to time constraints, we will only run the scanning on a limited subset of the cdsDF table</i>

In [None]:
from axolotl.app.bgc_analysis.functions import scan_cdsDF

cds_example_limited = cdsDF(cds_combined_translated.df.limit(500).repartition(500), keep_idx=True)

hmm_result = scan_cdsDF(cds_example_limited, "Pfam-A.hmm")

%time hmm_result.persist().count()

This is what the resulting hits table will looks like. The information provided will be enough to generate a multi-alignment table for all the sequences against the profile HMM (based on locations + gaps and the original CDS sequence).

In [None]:
hmm_result.select("cds_id", "hmm_name").groupBy("hmm_name").count().orderBy("count", ascending=False).toPandas()

#### an extra example, showing the bgcDF class

In [None]:
from axolotl.data.annotation import bgcDF

bgc_combined = bgcDF.fromRawFeatDF(
    features_combined,
    source_type="custom",
    kw_type="CLUSTER",
    kw_classes="nearest_MiBIG_class"
)
%time print(bgc_combined.df.persist().count())

bgc_combined.df.limit(3).toPandas()