# Master DSLS / Programming 3 / Assignment 6
# Final Assignment

## Introduction
https://bioinf.nl/~martijn/master/programming3/assignment6.html

This is the final for programming 3. In this assignment, I will develop scikit-learn machine learning models to predict the function of the proteins in the specific dataset. This model will use small InterPro_annotations_accession to predict large InterPro_annotations_accession.
The definition of small InterPro_annotations_accession and large InterPro_annotations_accession is defined as below:

If InterPro_annotations_accession's feature length(Stop_location-Start_location) / Sequence_length > 0.9, it is large InterPro_annotations_accession.

Otherwise, it is a small InterPro_annotations_accession.

We can briefly rewrite as:

            |(Stop - Start)|/Sequence >  0.9 --> Large

            |(Stop - Start)|/Sequence <= 0.9 --> small

I will also check the "bias" and "noise" that does not make sense from the dataset.

ie. lines(-) from the TSV file which don't contain InterPRO numbers

ie. proteins which don't have a large feature (according to the criteria above)

## 1. Goal

The goal of this assignment is to predict large InterPro_annotations_accession by small InterPro_annotations_accession.

I will use the dataset from /data/dataprocessing/interproscan/all_bacilli.tsv file on assemblix2012 and assemblix2019. However, this file contains ~4,200,000 protein annotations, so I will put a subset of all_bacilli.tsv on GitHub and on local for code testing.

In [1]:
# Spark web UI:http://localhost:4040/jobs/
# Output format : https://interproscan-docs.readthedocs.io/en/latest/OutputFormats.html
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import numpy as np
import warnings
import time
warnings.filterwarnings('ignore')
import pickle
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier, NaiveBayes, RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 

In [2]:
# Create a df by PySpark
start = time.time()
schema = StructType([
    StructField("Protein_accession", StringType(), True),
    StructField("Sequence_MD5_digest", StringType(), True),
    StructField("Sequence_length", IntegerType(), True),
    StructField("Analysis", StringType(), True),
    StructField("Signature_accession", StringType(), True),
    StructField("Signature_description", StringType(), True),
    StructField("Start_location", IntegerType(), True),
    StructField("Stop_location", IntegerType(), True),
    StructField("Score", FloatType(), True),
    StructField("Status", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("InterPro_annotations_accession", StringType(), True),
    StructField("InterPro_annotations_description", StringType(), True),
    StructField("GO_annotations", StringType(), True),
    StructField("Pathways_annotations", StringType(), True)])
path = "/data/dataprocessing/interproscan/all_bacilli.tsv"
spark = SparkSession.builder.master("local[16]")\
                            .config('spark.driver.memory', '128g')\
                            .config('spark.executor.memory', '128g')\
                            .config("spark.sql.debug.maxToStringFields","100")\
                            .appName("InterPro").getOrCreate()
df = spark.read.option("sep","\t").option("header","False").csv(path,schema=schema)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/03 10:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/03 10:10:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df.printSchema()

root
 |-- Protein_accession: string (nullable = true)
 |-- Sequence_MD5_digest: string (nullable = true)
 |-- Sequence_length: integer (nullable = true)
 |-- Analysis: string (nullable = true)
 |-- Signature_accession: string (nullable = true)
 |-- Signature_description: string (nullable = true)
 |-- Start_location: integer (nullable = true)
 |-- Stop_location: integer (nullable = true)
 |-- Score: float (nullable = true)
 |-- Status: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- InterPro_annotations_accession: string (nullable = true)
 |-- InterPro_annotations_description: string (nullable = true)
 |-- GO_annotations: string (nullable = true)
 |-- Pathways_annotations: string (nullable = true)



In [4]:
# remove InterPro_annotations_accession == "-"
# get the length of protein
# get the ratio to distinguish them to large and small InterPro_annotations_accession
# 1 for large, 0 for small InterPro_annotations_accession
df = df.filter(df.InterPro_annotations_accession != "-")\
        .withColumn("Ratio", (abs(df["Stop_location"] - df["Start_location"])/df["Sequence_length"]))\
        .withColumn("Size", when((abs(df["Stop_location"] - df["Start_location"])/df["Sequence_length"])>0.9,1).otherwise(0))

# get the intersection to make sure there is a match of large and small InterPro_annotations_accession(at least one large and one small InterPro_annotations_accession)
intersection = df.filter(df.Size == 0).select("Protein_accession").intersect(df.filter(df.Size == 1).select("Protein_accession"))
intersection_df = intersection.join(df,["Protein_accession"])

# get the number of small InterPro_annotations_accession in each Protein_accession
small_df = intersection_df.filter(df.Size == 0).groupBy(["Protein_accession"]).pivot("InterPro_annotations_accession").count()

# There are several InterPro_annotations_accession with the same Protein_accession. I only choose the largest one.
large_df = intersection_df.filter(df.Size == 1).groupby(["Protein_accession"]).agg(max("Ratio").alias("Ratio"))
large_df = large_df.join(intersection_df,["Protein_accession","Ratio"],"inner").dropDuplicates(["Protein_accession"])

# Drop the useless columns
columns = ("Sequence_MD5_digest","Analysis","Signature_accession","Signature_description",
        "Score","Status","Date","InterPro_annotations_description","GO_annotations",
        "Pathways_annotations","Ratio","Size","Stop_location","Start_location","Sequence_length")
large_df = large_df.drop(*columns)

# Create the df for ML, we do not need Protein_accession anymore.
ML_df = large_df.join(small_df,["Protein_accession"],"outer").fillna(0).drop("Protein_accession")

# catalogize y variable
Label = StringIndexer(inputCol="InterPro_annotations_accession", outputCol="InterPro_index")

# catalogize X variable
input_columns = ML_df.columns[1:]
assembler = VectorAssembler(inputCols=input_columns,outputCol="InterPro_features")

pipeline = Pipeline(stages=[Label,assembler])
ML_final = pipeline.fit(ML_df).transform(ML_df)

# Setup X, y and split it
(trainData, testData) = ML_final.randomSplit([0.7, 0.3],seed=42)

                                                                                

In [5]:
# RandomForestClassifier
# create a model
rf = RandomForestClassifier(labelCol="InterPro_index",
                            featuresCol="InterPro_features",
                            predictionCol="prediction", 
                            seed=42,maxDepth=20,
                            maxMemoryInMB = 256,
                            numTrees=500)
rf_Model = rf.fit(trainData)
predict = rf_Model.transform(testData)

# evaluate the result
evaluator = MulticlassClassificationEvaluator(labelCol='InterPro_index',
                                            predictionCol = 'prediction',
                                            metricName='accuracy')

accuracy = evaluator.evaluate(predict)
print(f"Accuracy is {accuracy}")

22/10/03 10:11:41 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/10/03 10:12:35 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/10/03 10:13:29 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/10/03 10:14:51 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/10/03 10:15:18 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/10/03 10:15:49 WARN DAGScheduler: Broadcasting large task binary with size 8.0 MiB
22/10/03 10:15:50 WARN DAGScheduler: Broadcasting large task binary with size 1998.3 KiB
22/10/03 10:15:51 WARN DAGScheduler: Broadcasting large task binary with size 11.9 MiB
22/10/03 10:15:53 WARN DAGScheduler: Broadcasting large task binary with size 1973.7 KiB
22/10/03 10:15:54 WARN DAGScheduler: Broadcasting large task binary with size 15.9 MiB
22/10/03 10:15:55 WARN DAGScheduler: B

Accuracy is 0.3044988743110007


                                                                                

In [6]:
# Hyperparameter tuning RandomForestClassifier
# create a model
rf = RandomForestClassifier(labelCol="InterPro_index",
                            featuresCol="InterPro_features",
                            predictionCol="prediction", 
                            seed=42,
                            maxMemoryInMB = 256)

# Tuning
paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [5,10,20])
            .addGrid(rf.numTrees, [20,100])
            .build())

# evaluate the result
rf_evaluator = MulticlassClassificationEvaluator(labelCol='InterPro_index',
                                            predictionCol = 'prediction',
                                            metricName='accuracy')

# KFold
cv = CrossValidator(estimator=rf,
                    evaluator=rf_evaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=5,
                    parallelism=10,
                    seed=42)

# Run Cross-validation
rf_cvModel = cv.fit(trainData)

# Make predictions on testData. cvModel uses the bestModel.
rf_cvPredictions = rf_cvModel.transform(testData)

# Evaluate bestModel found from Cross Validation
rf_evaluator.evaluate(rf_cvPredictions)

22/10/03 10:52:39 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:39 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:40 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:40 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:40 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:40 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:47 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:47 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:47 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:47 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:48 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
22/10/03 10:52:48 WARN DAGScheduler: Broadcasting larg

0.30174287710581477

In [7]:
# Decision trees
dtc = DecisionTreeClassifier(labelCol="InterPro_index",
                            featuresCol="InterPro_features",
                            predictionCol="prediction")
dtc = dtc.fit(trainData)
dtc_pred = dtc.transform(testData)
dtc_evaluator=MulticlassClassificationEvaluator(labelCol='InterPro_index',
                                                predictionCol = 'prediction',
                                                metricName='accuracy')
dtc_acc = dtc_evaluator.evaluate(dtc_pred)
print("Prediction Accuracy: ", dtc_acc)

22/10/03 16:14:52 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/10/03 16:15:09 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/10/03 16:15:36 WARN DAGScheduler: Broadcasting large task binary with size 4.6 MiB
22/10/03 16:16:03 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/10/03 16:16:40 WARN DAGScheduler: Broadcasting large task binary with size 4.8 MiB
22/10/03 16:16:46 WARN DAGScheduler: Broadcasting large task binary with size 4.8 MiB
22/10/03 16:16:54 WARN DAGScheduler: Broadcasting large task binary with size 4.9 MiB
22/10/03 16:17:01 WARN DAGScheduler: Broadcasting large task binary with size 4.9 MiB
22/10/03 16:17:09 WARN DAGScheduler: Broadcasting large task binary with size 5.0 MiB
22/10/03 16:17:16 WARN DAGScheduler: Broadcasting large task binary with size 5.1 MiB
22/10/03 16:17:22 WARN DAGScheduler: Broadcasting large task binary with size 5.2 MiB
22/10/03 16:17:29 WARN DAGScheduler: Broadcasting larg

Prediction Accuracy:  0.09174365344305567


                                                                                

In [8]:
# Hyperparameter tuning DecisionTree
# Tuning
dtc = DecisionTreeClassifier(labelCol="InterPro_index",
                            featuresCol="InterPro_features",
                            predictionCol="prediction")  

paramGrid = (ParamGridBuilder()
            .addGrid(dtc.maxDepth, [2,4,6,8,10,12])
            .build())

# evaluate the result
dtc_evaluator = MulticlassClassificationEvaluator(labelCol='InterPro_index',
                                            predictionCol = 'prediction',
                                            metricName='accuracy')

# KFold
cv = CrossValidator(estimator=dtc,
                    evaluator=dtc_evaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=5,
                    parallelism=10,
                    seed=42)

# Run Cross-validation
dtc_cvModel = cv.fit(trainData)

# Make predictions on testData. cvModel uses the bestModel.
dtc_cvPredictions = dtc_cvModel.transform(testData)

# Evaluate bestModel found from Cross Validation
dtc_evaluator.evaluate(dtc_cvPredictions)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/homes/kalin/.local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/homes/kalin/.local/lib/python3.9/site-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/commons/conda/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [9]:
# Naive Bayes
nb = NaiveBayes(modelType="multinomial",labelCol="InterPro_index",
                    featuresCol="InterPro_features",
                    predictionCol="prediction",)    
nb = nb.fit(trainData)
nb_pred = nb.transform(testData)
nb_evaluator=MulticlassClassificationEvaluator(labelCol='InterPro_index',
                                                predictionCol = 'prediction',
                                                metricName='accuracy')
nb_acc = nb_evaluator.evaluate(nb_pred)
print("Prediction Accuracy: ", nb_acc)

22/10/03 16:28:05 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
22/10/03 16:28:39 WARN DAGScheduler: Broadcasting large task binary with size 9.3 MiB
22/10/03 16:29:16 WARN DAGScheduler: Broadcasting large task binary with size 120.3 MiB

Prediction Accuracy:  0.7822956292213338


                                                                                

In [10]:
# Hyperparameter tuning Naive Bayes model
# Tuning
nb = NaiveBayes(modelType="multinomial",labelCol="InterPro_index",
                    featuresCol="InterPro_features",
                    predictionCol="prediction",)    

paramGrid = (ParamGridBuilder()
            .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0])
            .build())

# evaluate the result
nb_evaluator = MulticlassClassificationEvaluator(labelCol='InterPro_index',
                                            predictionCol = 'prediction',
                                            metricName='accuracy')

# KFold
cv = CrossValidator(estimator=nb,
                    evaluator=nb_evaluator,
                    estimatorParamMaps=paramGrid,
                    numFolds=5,
                    parallelism=10,
                    seed=42)

# Run Cross-validation
nb_cvModel = cv.fit(trainData)

# Make predictions on testData. cvModel uses the bestModel.
nb_cvPredictions = nb_cvModel.transform(testData)

# Evaluate bestModel found from Cross Validation
nb_evaluator.evaluate(nb_cvPredictions)

In [None]:
# save file
trainFile = '/students/2021-2022/master/Kai_DSLS/trainData.pkl'
trainData.toPandas().set_index('InterPro_annotations_accession').to_pickle(trainFile)
testFile = '/students/2021-2022/master/Kai_DSLS/testData.pkl'
testData.toPandas().set_index('InterPro_annotations_accession').to_pickle(testFile)

In [None]:
# save model
nb_cvModel.bestModel.write().overwrite().save("/students/2021-2022/master/Kai_DSLS/NaiveBayesBestModel")
dtc_cvModel.bestModel.write().overwrite().save("/students/2021-2022/master/Kai_DSLS/DecisionTreeBestModel")
rf_cvModel.bestModel.write().overwrite().save("/students/2021-2022/master/Kai_DSLS/RandomForestBestModel")

In [None]:
# Original script
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import numpy as np
import warnings
import time
warnings.filterwarnings('ignore')
from sklearn.model_selection import train_test_split,StratifiedKFold,KFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn import metrics
import pickle
# Create a df by PySpark
start = time.time()
schema = StructType([
    StructField("Protein_accession", StringType(), True),
    StructField("Sequence_MD5_digest", StringType(), True),
    StructField("Sequence_length", IntegerType(), True),
    StructField("Analysis", StringType(), True),
    StructField("Signature_accession", StringType(), True),
    StructField("Signature_description", StringType(), True),
    StructField("Start_location", IntegerType(), True),
    StructField("Stop_location", IntegerType(), True),
    StructField("Score", FloatType(), True),
    StructField("Status", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("InterPro_annotations_accession", StringType(), True),
    StructField("InterPro_annotations_description", StringType(), True),
    StructField("GO_annotations", StringType(), True),
    StructField("Pathways_annotations", StringType(), True)])
path = "/data/dataprocessing/interproscan/all_bacilli.tsv"
# path = "all_bacilli.tsv"
spark = SparkSession.builder.master("local[16]")\
                            .config('spark.driver.memory', '128g')\
                            .config('spark.executor.memory', '128g')\
                            .config("spark.sql.debug.maxToStringFields","100")\
                            .appName("InterPro").getOrCreate()
df = spark.read.option("sep","\t").option("header","False").csv(path,schema=schema)
df = df.filter(df.InterPro_annotations_accession != "-")\
        .withColumn("Length", abs(df["Stop_location"] - df["Start_location"]))\
        .withColumn("Ratio", (abs(df["Stop_location"] - df["Start_location"])/df["Sequence_length"]))\
        .withColumn("Size", when((abs(df["Stop_location"] - df["Start_location"])/df["Sequence_length"])>0.9,1).otherwise(0))
# get the intersection to make sure there is a match of large and small InterPro_annotations_accession(at least one large and one small InterPro_annotations_accession)
intersection = df.filter(df.Size == 0).select("Protein_accession").intersect(df.filter(df.Size == 1).select("Protein_accession"))
intersection_df = intersection.join(df,["Protein_accession"])
# get the number of small InterPro_annotations_accession in each Protein_accession
small_df = intersection_df.filter(df.Size == 0).groupBy(["Protein_accession"]).pivot("InterPro_annotations_accession").count()
# There are several InterPro_annotations_accession with the same Protein_accession. I only choose the largest one.
large_df = intersection_df.filter(df.Size == 1).groupby(["Protein_accession"]).agg(max("Ratio").alias("Ratio"))
large_df = large_df.join(intersection_df,["Protein_accession","Ratio"],"inner").dropDuplicates(["Protein_accession"])
# Drop the useless columns
columns = ("Sequence_MD5_digest","Analysis","Signature_accession","Signature_description",
        "Score","Status","Date","InterPro_annotations_description","GO_annotations",
        "Pathways_annotations","Ratio","Size","Stop_location","Start_location","Sequence_length","Length")
large_df = large_df.drop(*columns)
# Create the df for ML
ML_df = large_df.join(small_df,["Protein_accession"]).fillna(0)
# Setup X,y and split it
y = ML_df.select("InterPro_annotations_accession")
X = ML_df.select(ML_df.columns[2:])
y = np.array(y.collect())
X = np.array(X.collect())

# ordinal encode target variable
label_encoder = LabelEncoder()
label_encoder.fit(y)
y = label_encoder.transform(y)
sk = StratifiedKFold(n_splits=2, shuffle=True,random_state=42)
for train_index, test_index in sk.split(X, y):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]


# Try standard#
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Modeling
rfc = RandomForestClassifier(random_state=42,max_depth=5,n_estimators=100)
rfc.fit(X_train, y_train)
# performing predictions on the test dataset
y_pred = rfc.predict(X_test)
# using metrics module for accuracy calculation
accuracy=metrics.accuracy_score(y_test, y_pred)
print("RandomForest{}".format(accuracy))

# Modeling
from sklearn.tree import DecisionTreeClassifier
dtc = DecisionTreeClassifier()
dtc.fit(X_train, y_train)
y_pred = dtc.predict(X_test)
accuracy=metrics.accuracy_score(y_test, y_pred)
print("DecisionTre{}".format(accuracy))
# print('Accuracy: {:.2f} %'.format(accuracy*100))
end = time.time()
print(end-start)