# Master DSLS / Programming 3 / Assignment 6
# Final Assignment

## Introduction
https://bioinf.nl/~martijn/master/programming3/assignment6.html

This is the final for programming 3. In this assignment, I will develop scikit-learn machine learning models to predict the function of the proteins in the specific dataset. This model will use small InterPro_annotations_accession to predict large InterPro_annotations_accession.
The definition of small InterPro_annotations_accession and large InterPro_annotations_accession is defined as below:

If InterPro_annotations_accession's feature length(Stop_location-Start_location) / Sequence_length > 0.9, it is large InterPro_annotations_accession.

Otherwise, it is a small InterPro_annotations_accession.

We can briefly rewrite as:

            |(Stop - Start)|/Sequence >  0.9 --> Large

            |(Stop - Start)|/Sequence <= 0.9 --> small

I will also check the "bias" and "noise" that does not make sense from the dataset.

ie. lines(-) from the TSV file which don't contain InterPRO numbers

ie. proteins which don't have a large feature (according to the criteria above)

## 1. Goal

The goal of this assignment is to predict large InterPro_annotations_accession by small InterPro_annotations_accession.

I will use the dataset from /data/dataprocessing/interproscan/all_bacilli.tsv file on assemblix2012 and assemblix2019. However, this file contains ~4,200,000 protein annotations, so I will put a subset of all_bacilli.tsv on GitHub and on local for code testing.

In [1]:
# Spark web UI:http://localhost:4040/jobs/
# Output format : https://interproscan-docs.readthedocs.io/en/latest/OutputFormats.html

# pyspark
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark_dist_explore import hist

# pyspark ML
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes, GBTClassifier, MultilayerPerceptronClassifier, LinearSVC
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 

# ETL & visualization
import numpy as np
import warnings
import time
import pickle
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns


# call spark session
spsession = SparkContext('local[16]')


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/14 15:45:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

def process_protein_data(path):
    # Initialize SparkSession
    spark = SparkSession.builder.appName("ProteinDataProcessing").getOrCreate()

    # Read the data from the given path
    df = spark.read.csv(path, sep='\t', header=None)
    
    # Drop the first column "_c0"
    df = df.drop("_c0")

    # Rename columns
    df = df.toDF(
        "Protein", "Sequence_MD5_digest", "Sequence_length", "Analysis",
        "Signature_accession", "Signature_description", "Start", "Stop",
        "Score", "Status", "Date", "Interpro_accession",
        "Interpro_description", "GO", "Pathway"
    )

    # Remove rows where Interpro_accession == "-"
    df = df.filter(df["Interpro_accession"] != "-")

    # Calculate feature_Length
    df = df.withColumn("feature_Length", (df["Stop"] - df["Start"]))

    # Calculate the ratio
    df = df.withColumn("ratio", (df["feature_Length"] / df["Sequence_length"]))

    # Assign Size (1 for large, 0 for small)
    df = df.withColumn("Size", when(df["ratio"] > 0.9, 1).otherwise(0))

    # Select columns to drop
    to_drop = ["Sequence_MD5_digest", "Analysis", "Signature_accession", "Signature_description",
               "Score", "Status", "Date", "Interpro_description", "GO", "Pathway"]

    # Drop selected columns
    df = df.drop(*to_drop)
    
    # drop duplicates
    df = df.dropDuplicates()

    # Sort by ratio from big to small
    df = df.orderBy(df.ratio.desc())

    return df

path = "/homes/dlsteur/Git_repos/Programming3/Assignment6/all_bacilli.tsv"
processed_df = process_protein_data(path)

# Show the processed DataFrame
processed_df.show()




def data_preprocessing(df):
    """
    It will help you to finish preprocessing data.
    df: spark df
    return small_df,large_df
    """
    # remove InterPro_annotations_accession == "-"
    # get the length of protein
    # get the ratio to distinguish them to large and small InterPro_annotations_accession
    # 1 for large, 0 for small InterPro_annotations_accession

    # get the intersection to make sure there is a match of large and small InterPro_annotations_accession(at least one large and one small InterPro_annotations_accession)
    intersection = df.filter(df.Size == 0).select("protein").intersect(df.filter(df.Size == 1).select("protein"))
    intersection_df = intersection.join(df,["protein"])

    # get the number of small InterPro_annotations_accession in each protein
    small_df = intersection_df.filter(df.Size == 0).groupBy(["protein"]).pivot("InterPro_accession").count()

    # There are several InterPro_annotations_accession with the same protein. I only choose the largest one.
    large_df = intersection_df.filter(df.Size == 1).groupby(["protein"]).agg(max("Ratio").alias("Ratio"))
    large_df = large_df.join(intersection_df,["protein","Ratio"],"inner").dropDuplicates(["protein"])

    # Drop the useless columns
    columns = ("Sequence_MD5_digest","Analysis","Signature_accession","Signature_description",
        "Score","Status","Date","InterPro_annotations_description","GO_annotations",
        "Pathways_annotations","Ratio","Size","Stop_location","Start_location","Sequence_length")
    large_df = large_df.drop(*columns)
    return small_df, large_df

def ML_df_create(small_df,large_df):
    """
    It will help you to create a correct ML dataframe.
    small_df: spark df, preprocessing to fit the criteria ratio<=0.9
    large_df: spark df, preprocessing to fit the criteria ratio>0.9
    return ML_df
    """
    # Create the df for ML, we do not need protein anymore.
    ML_df = large_df.join(small_df,["protein"],"outer").fillna(0).drop("protein")

    # catalogize y variable
    Label = StringIndexer(inputCol="InterPro_annotations_accession", outputCol="InterPro_index")

    # catalogize X variable
    input_columns = ML_df.columns[1:]
    assembler = VectorAssembler(inputCols=input_columns,outputCol="InterPro_features")

    pipeline = Pipeline(stages=[Label,assembler])
    ML_final = pipeline.fit(ML_df).transform(ML_df)
    return ML_final

def split_data(ML_final,percentage=0.7):
    """
    it can help you split the data to trainning data and test data.
    ML_final: df
    percentage:int, you can set another value.
    return: trainData, df; testData,df
    """
    (trainData, testData) = ML_final.randomSplit([percentage, 1-percentage],seed=42)
    return trainData, testData


small_df, large_df = data_preprocessing(processed_df)
ML_final = ML_df_create(small_df,large_df)
train_data, test_Data = split_data(ML_final,percentage=0.7)
train_data.show()


+--------------------+---------------+-----+----+------------------+--------------+------------------+----+
|             Protein|Sequence_length|Start|Stop|Interpro_accession|feature_Length|             ratio|Size|
+--------------------+---------------+-----+----+------------------+--------------+------------------+----+
|gi|29896607|gb|AA...|            429|    1| 429|         IPR006264|         428.0|0.9976689976689976|   1|
|gi|29895450|gb|AA...|            419|    1| 419|         IPR016496|         418.0|0.9976133651551312|   1|
|gi|29894872|gb|AA...|            412|    1| 412|         IPR017568|         411.0|0.9975728155339806|   1|
|gi|29894872|gb|AA...|            412|    1| 412|         IPR016039|         411.0|0.9975728155339806|   1|
|gi|29897784|gb|AA...|            372|    1| 372|         IPR010162|         371.0|0.9973118279569892|   1|
|gi|29896081|gb|AA...|            350|    1| 350|         IPR024169|         349.0|0.9971428571428571|   1|
|gi|29895876|gb|AA...|      

Py4JJavaError: An error occurred while calling o122.fit.
: org.apache.spark.SparkException: Input column InterPro_annotations_accession does not exist.
	at org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema(StringIndexer.scala:123)
	at org.apache.spark.ml.feature.StringIndexerBase.validateAndTransformSchema$(StringIndexer.scala:115)
	at org.apache.spark.ml.feature.StringIndexer.validateAndTransformSchema(StringIndexer.scala:145)
	at org.apache.spark.ml.feature.StringIndexer.transformSchema(StringIndexer.scala:252)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:237)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
# get the intersection to make sure there is a match of large and small InterPro_annotations_accession(at least one large and one small InterPro_annotations_accession)
intersection = processed_df.filter(processed_df.Size == 0).select("Protein").intersect(processed_df.filter(processed_df.Size == 1).select("Protein"))
intersection_df = intersection.join(processed_df,["Protein"])
# intersection_df.toPandas()

# select small proteins in df
small_proteins = intersection_df.filter(intersection_df.ratio < 0.9)

# select latrge proteins in df
large_proteins = intersection_df.filter(intersection_df.ratio >= 0.9)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import pandas as pd

# Assuming you have a PySpark DataFrame named large_proteins and small_proteins
# Select the feature_Length column and convert it to a NumPy array
large_feature_lengths = large_proteins.select('feature_Length').rdd.flatMap(lambda x: x).collect()
small_feature_lengths = small_proteins.select('feature_Length').rdd.flatMap(lambda x: x).collect()

# Create subplots with a shared y-axis in a 2x2 grid
fig, axs = plt.subplots(2, 2, figsize=(12, 10), sharey='row')

# Customize the style of the histograms
color_large = 'lightgreen'
color_small = 'orange'
bins = 20

# Plot for small_proteins
axs[0, 0].set_title('Histogram showing the feature length of small Interpro accession')
axs[0, 0].set_xlabel('Feature length (Start-Stop)')
axs[0, 0].set_ylabel('Frequency')
axs[0, 0].hist(small_feature_lengths, bins=bins, color=color_small, edgecolor='black', alpha=0.7)

# Plot for large_proteins
axs[0, 1].set_title('Histogram showing the feature length of large Interpro accession')
axs[0, 1].set_xlabel('Feature length (Start-Stop)')
axs[0, 1].set_ylabel('Frequency')
axs[0, 1].hist(large_feature_lengths, bins=bins, color=color_large, edgecolor='black', alpha=0.7)



# Add gridlines
for ax in axs[:, 0]:
    ax.grid(axis='y', linestyle='--', alpha=0.5)

# Convert PySpark DataFrames to Pandas DataFrames
large_df = large_proteins.toPandas()
small_df = small_proteins.toPandas()

# Set Seaborn theme and style
sns.set_theme(style="whitegrid")

# Plot for small_proteins with light green color
sns.boxplot(x=small_df["feature_Length"], ax=axs[1, 1], color='lightgreen')
axs[1, 0].set_title('Boxplot of small Interpro accessions')
axs[1, 0].set_ylabel('Feature Length')

# Plot for large_proteins with orange color
sns.boxplot(x=large_df["feature_Length"], ax=axs[1, 0], color='orange')
axs[1, 1].set_title('Boxplot of large Interpro accessions')
axs[1, 1].set_ylabel('Feature Length')

# Adjust spacing between subplots
plt.tight_layout()

plt.savefig('protein_analys_plots.png')

# Show the combined plot
plt.show()


## Machine learning
### testing four different machine learning models to predict large InterPro_annotations_accession by small InterPro_annotations_accession : NaiveBayes, randomforrst, MultilayerPerceptronClassifier, LinearSVC


In [None]:
# make a dataframe for ml
ml_df = processed_df.select("Protein","Interpro_accession","start","stop","feature_Length","ratio","Size")
# ml_df.show()

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# Assuming you have a DataFrame named ml_df with columns 'Interpro_accession', 'start', 'stop', 'feature_Length', 'ratio', and 'Size'

# Define the target column and feature columns
target = "Interpro_accession"
feature_columns = ["start", "stop", "feature_Length", "ratio", "Size"]

# Cast the relevant columns to DoubleType
for col_name in feature_columns:
    ml_df = ml_df.withColumn(col_name, ml_df[col_name].cast(DoubleType()))

# Assemble feature vectors using VectorAssembler
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
ml_df = vector_assembler.transform(ml_df)

# Select the assembled features and the target column
data = ml_df.select("features", target)

# Split the data into training and test sets (70% for training, 30% for testing)
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

# Show the first few rows of the training data and test data
train_data.show()
test_data.show()




In [None]:



def data_preprocessing(df):
    """
    It will help you to finish preprocessing data.
    df: spark df
    return small_df,large_df
    """
    # remove InterPro_annotations_accession == "-"
    # get the length of protein
    # get the ratio to distinguish them to large and small InterPro_annotations_accession
    # 1 for large, 0 for small InterPro_annotations_accession

    # get the intersection to make sure there is a match of large and small InterPro_annotations_accession(at least one large and one small InterPro_annotations_accession)
    intersection = df.filter(df.Size == 0).select("protein").intersect(df.filter(df.Size == 1).select("protein"))
    intersection_df = intersection.join(df,["protein"])

    # get the number of small InterPro_annotations_accession in each protein
    small_df = intersection_df.filter(df.Size == 0).groupBy(["protein"]).pivot("InterPro_accession").count()

    # There are several InterPro_annotations_accession with the same protein. I only choose the largest one.
    large_df = intersection_df.filter(df.Size == 1).groupby(["protein"]).agg(max("Ratio").alias("Ratio"))
    large_df = large_df.join(intersection_df,["protein","Ratio"],"inner").dropDuplicates(["protein"])

    # Drop the useless columns
    columns = ("Sequence_MD5_digest","Analysis","Signature_accession","Signature_description",
        "Score","Status","Date","InterPro_annotations_description","GO_annotations",
        "Pathways_annotations","Ratio","Size","Stop_location","Start_location","Sequence_length")
    large_df = large_df.drop(*columns)
    return small_df, large_df

def ML_df_create(small_df,large_df):
    """
    It will help you to create a correct ML dataframe.
    small_df: spark df, preprocessing to fit the criteria ratio<=0.9
    large_df: spark df, preprocessing to fit the criteria ratio>0.9
    return ML_df
    """
    # Create the df for ML, we do not need protein anymore.
    ML_df = large_df.join(small_df,["protein"],"outer").fillna(0).drop("protein")

    # catalogize y variable
    Label = StringIndexer(inputCol="InterPro_annotations_accession", outputCol="InterPro_index")

    # catalogize X variable
    input_columns = ML_df.columns[1:]
    assembler = VectorAssembler(inputCols=input_columns,outputCol="InterPro_features")

    pipeline = Pipeline(stages=[Label,assembler])
    ML_final = pipeline.fit(ML_df).transform(ML_df)
    return ML_final

def split_data(ML_final,percentage=0.7):
    """
    it can help you split the data to trainning data and test data.
    ML_final: df
    percentage:int, you can set another value.
    return: trainData, df; testData,df
    """
    (trainData, testData) = ML_final.randomSplit([percentage, 1-percentage],seed=42)
    return trainData, testData


small_df, large_df = data_preprocessing(processed_df)
ML_final = ML_df_create(small_df,large_df)
train_data, test_Data = split_data(ML_final,percentage=0.7)
train_data.show()

Naive bayes

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create a NaiveBayes model
nb = NaiveBayes(featuresCol="features", smoothing=1.0, modelType="multinomial")

# Define a parameter grid for grid search
param_grid = ParamGridBuilder() \
    .addGrid(nb.smoothing, [0.1, 0.5, 1.0]) \
    .build()

# Define a cross-validator with 5-fold cross-validation
crossval = CrossValidator(estimator=nb,
                          estimatorParamMaps=param_grid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy"),
                          numFolds=5,
                          seed=42)

# Run cross-validation and choose the best set of parameters
cv_model = crossval.fit(train_data)

# Make predictions on the test data using the best model
best_model = cv_model.bestModel
predictions = best_model.transform(test_data)

# Evaluate the accuracy of the best model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))


In [None]:
# # save file
# trainFile = '/students/2021-2022/master/DaanSteur_DSLS/trainData.pkl'
# trainData.toPandas().set_index('InterPro_annotations_accession').to_pickle(trainFile)
# testFile = '/students/2021-2022/master/DaanSteur_DSLS/testData.pkl'
# testData.toPandas().set_index('InterPro_annotations_accession').to_pickle(testFile)

In [None]:
# # save model
# nb_cvModel.bestModel.write().overwrite().save("/students/2021-2022/master/DaanSteur_DSLS/NaiveBayesBestModel")
# dtc_cvModel.bestModel.write().overwrite().save("/students/2021-2022/master/DaanSteur_DSLS/DecisionTreeBestModel")
# rf_cvModel.bestModel.write().overwrite().save("/students/2021-2022/master/DaanSteur_DSLS/RandomForestBestModel")