### Project 1: predict the function of the proteins on InterPROscan dataset using RandomForestClassifier

In [1]:
import pyspark
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark import SparkFiles
from pyspark.sql import functions as F
from pyspark.sql.functions import desc, avg, collect_list, flatten, split, countDistinct, explode, asc
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, isnan, when, count
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [2]:
conf = pyspark.SparkConf().setAll([('spark.executor.memory', '128g'),
                                   ('spark.master', 'local[16]'),
                                   ('spark.driver.memory', '128g')])
sc = pyspark.SparkContext(conf=conf)
sc.getConf().getAll()

22/07/08 20:38:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/08 20:38:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/07/08 20:38:24 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


[('spark.driver.memory', '180g'),
 ('spark.driver.port', '42877'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.executor.memory', '180g'),
 ('spark.app.id', 'local-1657305504486'),
 ('spark.app.startTime', '1657305503310'),
 ('spark.master', 'local[16]'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', 'assemblix2019.bin.bioinf.nl'),
 ('spark.ui.showConsoleProgress', 'true')]

In [3]:
file = '/data/dataprocessing/interproscan/all_bacilli.tsv'
df = SQLContext(sc).read.csv(file, sep=r'\t', header=False, inferSchema= True)
#rename the column name
new_names = ['Protein_accession', 'MD5', 'Seq_len', 'Analysis',
             'Signature_accession', 'Signature_description',
             'Start', 'Stop', 'Score', 'Status', 'Date', 'InterPro_annotations',
             'InterPro_discription', 'GO_annotations', 'Pathways']
df = df.toDF(*new_names)

                                                                                

In [4]:
df.show(1)

+--------------------+--------------------+-------+--------+-------------------+---------------------+-----+----+-------+------+----------+--------------------+--------------------+--------------+--------+
|   Protein_accession|                 MD5|Seq_len|Analysis|Signature_accession|Signature_description|Start|Stop|  Score|Status|      Date|InterPro_annotations|InterPro_discription|GO_annotations|Pathways|
+--------------------+--------------------+-------+--------+-------------------+---------------------+-----+----+-------+------+----------+--------------------+--------------------+--------------+--------+
|gi|29898682|gb|AA...|92d1264e347e14924...|    547| TIGRFAM|          TIGR03882| cyclo_dehyd_2: ba...|    2| 131|1.6E-21|     T|25-04-2022|           IPR022291|Bacteriocin biosy...|             -|       -|
+--------------------+--------------------+-------+--------+-------------------+---------------------+-----+----+-------+------+----------+--------------------+----------------

###### first drop every columns that we don't need it in next steps

In [5]:
df = df.drop("MD5", "Analysis", "Signature_accession", "Signature_description", "Score", "Status", "Date", "InterPro_discription", "GO_annotations", "Pathways")
df.show(3)

+--------------------+-------+-----+----+--------------------+
|   Protein_accession|Seq_len|Start|Stop|InterPro_annotations|
+--------------------+-------+-----+----+--------------------+
|gi|29898682|gb|AA...|    547|    2| 131|           IPR022291|
|gi|29898682|gb|AA...|    547|  161| 547|           IPR027624|
|gi|29898682|gb|AA...|    547|  159| 547|           IPR003776|
+--------------------+-------+-----+----+--------------------+
only showing top 3 rows



##### filter out the rows consisting of no annotation.
##### number of Distinct "InterPro_annotations" in total

In [6]:
df = df.filter(df.InterPro_annotations!="-")
df.select(countDistinct("InterPro_annotations")).collect()[0][0]

                                                                                

9703

###### check whether there is any null records in dataframe

In [7]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+-----------------+-------+-----+----+--------------------+
|Protein_accession|Seq_len|Start|Stop|InterPro_annotations|
+-----------------+-------+-----+----+--------------------+
|                0|      0|    0|   0|                   0|
+-----------------+-------+-----+----+--------------------+



                                                                                

###### how many recordes in total

In [8]:
df.count()

                                                                                

1921817

###### number of distict protein in total

In [9]:
df.select(countDistinct("Protein_accession")).collect()[0][0]

                                                                                

332775

###### adding "size" column that is the percentage of annotation size to protein size

In [10]:
df = df.withColumn('size', (((df['Stop'] - df['Start']) / df.Seq_len) * 100))
df.show(2)

+--------------------+-------+-----+----+--------------------+------------------+
|   Protein_accession|Seq_len|Start|Stop|InterPro_annotations|              size|
+--------------------+-------+-----+----+--------------------+------------------+
|gi|29898682|gb|AA...|    547|    2| 131|           IPR022291|23.583180987202926|
|gi|29898682|gb|AA...|    547|  161| 547|           IPR027624| 70.56672760511883|
+--------------------+-------+-----+----+--------------------+------------------+
only showing top 2 rows



###### create a dataframe for large annotations

In [11]:
large_df = df.where (df.size >= 90)
large_df.select(countDistinct("InterPro_annotations")).collect()[0][0]

                                                                                

4980

###### according to below output, there are more than on large Interpro_annotation for one protein;

In [12]:
large_df.select("*").sort("Protein_accession").show(20)

                                                                                

+--------------------+-------+-----+----+--------------------+-----------------+
|   Protein_accession|Seq_len|Start|Stop|InterPro_annotations|             size|
+--------------------+-------+-----+----+--------------------+-----------------+
|gi|10172613|dbj|B...|    449|    1| 448|           IPR001957|99.55456570155901|
|gi|10172613|dbj|B...|    449|    5| 448|           IPR001957|98.66369710467706|
|gi|10172613|dbj|B...|    449|    8| 447|           IPR001957| 97.7728285077951|
|gi|10172614|dbj|B...|    380|   17| 375|           IPR001001|94.21052631578948|
|gi|10172614|dbj|B...|    380|    1| 378|           IPR001001|99.21052631578947|
|gi|10172614|dbj|B...|    380|    1| 379|           IPR001001|99.47368421052632|
|gi|10172614|dbj|B...|    380|    1| 380|           IPR001001|99.73684210526315|
|gi|10172614|dbj|B...|    380|    1| 379|           IPR001001|99.47368421052632|
|gi|10172615|dbj|B...|     73|    1|  68|           IPR036986|91.78082191780823|
|gi|10172616|dbj|B...|    37

###### I pick only the largest one for classification

In [13]:
windowpro = Window.partitionBy("Protein_accession").orderBy(col("size").desc())
largdff = large_df.withColumn("row",row_number().over(windowpro)).filter(col("row") == 1).drop("row")
largdff.sort("Protein_accession").show(10)



+--------------------+-------+-----+----+--------------------+-----------------+
|   Protein_accession|Seq_len|Start|Stop|InterPro_annotations|             size|
+--------------------+-------+-----+----+--------------------+-----------------+
|gi|10172613|dbj|B...|    449|    1| 448|           IPR001957|99.55456570155901|
|gi|10172614|dbj|B...|    380|    1| 380|           IPR001001|99.73684210526315|
|gi|10172615|dbj|B...|     73|    1|  68|           IPR036986|91.78082191780823|
|gi|10172616|dbj|B...|    371|    1| 371|           IPR001238|99.73045822102425|
|gi|10172618|dbj|B...|    637|    5| 637|           IPR011557|99.21507064364206|
|gi|10172619|dbj|B...|    833|    6| 806|           IPR005743|96.03841536614645|
|gi|10172625|dbj|B...|    250|   15| 250|           IPR008841|             94.0|
|gi|10172631|dbj|B...|    317|    9| 312|           IPR026988|95.58359621451105|
|gi|10172632|dbj|B...|    485|    1| 485|           IPR005990|99.79381443298969|
|gi|10172634|dbj|B...|    29



###### to make sure that there are only one large annotation for every protein:

In [14]:
largdff.groupBy("Protein_accession").count().filter("count > 1").show(2)

                                                                                

+-----------------+-----+
|Protein_accession|count|
+-----------------+-----+
+-----------------+-----+



###### make a list of distinct Protein that have large feature

In [15]:
ap = largdff.select("Protein_accession").collect()
all_protein = [i[0] for i in ap]
len(all_protein)

                                                                                

208741

In [16]:
len(list(set(all_protein)))

208741

###### I drop the columns that I don't need them any more; I use largdff for marging later.

In [17]:
largdff = largdff.drop("Seq_len", "Start", "Stop", "size")
largdff.show(2)



+--------------------+--------------------+
|   Protein_accession|InterPro_annotations|
+--------------------+--------------------+
|gi|10172751|dbj|B...|           IPR036394|
|gi|10172776|dbj|B...|           IPR027417|
+--------------------+--------------------+
only showing top 2 rows





In [18]:
small_df = df.filter(df.Protein_accession.isin(all_protein)).where(df.size < 90)

In [19]:
sf = small_df.select("InterPro_annotations").distinct().collect()

22/07/08 20:46:29 WARN DAGScheduler: Broadcasting large task binary with size 26.1 MiB
[Stage 35:>                                                       (0 + 16) / 84]

CodeHeap 'non-profiled nmethods': size=118312Kb used=116751Kb max_used=117898Kb free=1560Kb
 bounds [0x00007f1a54494000, 0x00007f1a5b81e000, 0x00007f1a5b81e000]
CodeHeap 'profiled nmethods': size=118308Kb used=105058Kb max_used=105058Kb free=13249Kb
 bounds [0x00007f1a4d10b000, 0x00007f1a54494000, 0x00007f1a54494000]
CodeHeap 'non-nmethods': size=9140Kb used=1610Kb max_used=5094Kb free=7529Kb
 bounds [0x00007f1a4c81e000, 0x00007f1a4cd5e000, 0x00007f1a4d10b000]
 total_blobs=38120 nmethods=37208 adapters=817
 compilation: disabled (not enough contiguous free space left)
              stopped_count=1, restarted_count=0
 full_count=0




KeyboardInterrupt: 

#####  Pivot() It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data.

In [21]:
pivotDF = small_df.groupBy("Protein_accession").pivot("InterPro_annotations").count()

22/07/08 23:03:01 WARN DAGScheduler: Broadcasting large task binary with size 26.1 MiB
22/07/08 23:20:48 WARN DAGScheduler: Broadcasting large task binary with size 25.1 MiB
Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)
22/07/09 01:56:52 WARN DAGScheduler: Broadcasting large task binary with size 25.1 MiB
22/07/09 02:08:03 WARN DAGScheduler: Broadcasting large task binary with size 25.1 MiB
22/07/09 02:18:56 WARN DAGScheduler: Broadcasting large task binary with size 25.1 MiB
                                                                                

In [22]:
pivotDF = pivotDF.fillna(value=0)

##### finding numeric columns

In [24]:
all_columns = pivotDF.columns
numericCols = [i for i in all_columns if ((i != "Protein_accession") & (i != "InterPro_annotations"))]

##### chnageing the type of small features columns to integer

In [25]:
pivotDF = pivotDF.select(pivotDF.Protein_accession, 
                          (*(col(c).cast(IntegerType()).alias(c) for c in numericCols)),
                          pivotDF.InterPro_annotations)

###### left join of large dataframe(dataframe including protein and labels) and pivotDF(dataframe with small features in columns)

In [23]:
pivotDF = pivotDF.join(largdff, on=['Protein_accession'], how='left')

###### before doing random forest, all numeric columns are merged into a vector column using VectorAssembler

In [26]:
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
ML_df = assembler.transform(pivotDF)

###### label column is string, StringIndexer maps it to a label indices.

In [27]:
label_stringIdx = StringIndexer(inputCol = 'InterPro_annotations', outputCol = 'labelIndex')
ML_df = label_stringIdx.fit(ML_df).transform(ML_df)

22/07/09 02:46:32 WARN DAGScheduler: Broadcasting large task binary with size 13.9 MiB
22/07/09 02:46:35 WARN DAGScheduler: Broadcasting large task binary with size 26.1 MiB
22/07/09 09:23:48 WARN DAGScheduler: Broadcasting large task binary with size 25.1 MiB
22/07/09 10:59:32 WARN DAGScheduler: Broadcasting large task binary with size 43.2 MiB
22/07/09 11:00:34 WARN DAGScheduler: Broadcasting large task binary with size 43.2 MiB
                                                                                

###### I kept just "features", "labelIndex" columns

In [28]:
ML_df = ML_df.select("features", "labelIndex")
ML_df.show(2)

22/07/09 11:09:33 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/07/09 11:10:24 WARN DAGScheduler: Broadcasting large task binary with size 13.9 MiB
22/07/09 11:10:29 WARN DAGScheduler: Broadcasting large task binary with size 26.1 MiB
22/07/09 15:34:48 WARN DAGScheduler: Broadcasting large task binary with size 26.1 MiB
22/07/09 15:59:58 WARN DAGScheduler: Broadcasting large task binary with size 46.4 MiB
[Stage 49:>                                                         (0 + 1) / 1]

+--------------------+----------+
|            features|labelIndex|
+--------------------+----------+
| (5202,[3082],[1.0])|     745.0|
|(5202,[875,902,27...|       0.0|
+--------------------+----------+
only showing top 2 rows



                                                                                

##### split data to train and test

In [32]:
(train, test) = ML_df.randomSplit(weights=[0.7, 0.3])

# 2. ML inplementation
###### applying random forest

In [33]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'labelIndex')

In [34]:
rfModel = rf.fit(train)

22/07/09 17:27:49 WARN DAGScheduler: Broadcasting large task binary with size 14.9 MiB
22/07/09 17:27:54 WARN DAGScheduler: Broadcasting large task binary with size 13.9 MiB
22/07/09 19:53:49 WARN DAGScheduler: Broadcasting large task binary with size 15.0 MiB
22/07/09 22:10:43 WARN DAGScheduler: Broadcasting large task binary with size 24.1 MiB
22/07/09 22:11:46 WARN DAGScheduler: Broadcasting large task binary with size 24.1 MiB
22/07/09 22:40:42 WARN DAGScheduler: Broadcasting large task binary with size 24.2 MiB
22/07/09 23:10:00 WARN DAGScheduler: Broadcasting large task binary with size 24.2 MiB
22/07/09 23:51:23 WARN DAGScheduler: Broadcasting large task binary with size 25.7 MiB
22/07/10 00:09:53 WARN DAGScheduler: Broadcasting large task binary with size 27.6 MiB
22/07/10 00:28:11 WARN DAGScheduler: Broadcasting large task binary with size 29.4 MiB
22/07/10 00:49:56 WARN DAGScheduler: Broadcasting large task binary with size 31.3 MiB
22/07/10 00:56:56 WARN DAGScheduler: Broadc

# 3. Evaluation

In [35]:
predictions = rfModel.transform(test)

In [36]:
predictions.select("labelIndex", "prediction").show(10)

22/07/10 02:07:31 WARN DAGScheduler: Broadcasting large task binary with size 26.1 MiB
22/07/10 02:07:37 WARN DAGScheduler: Broadcasting large task binary with size 13.9 MiB
22/07/10 04:11:03 WARN DAGScheduler: Broadcasting large task binary with size 26.1 MiB
22/07/10 06:19:36 WARN DAGScheduler: Broadcasting large task binary with size 52.1 MiB
[Stage 91:>                                                         (0 + 1) / 1]

+----------+----------+
|labelIndex|prediction|
+----------+----------+
|    1029.0|       0.0|
|       7.0|       0.0|
|       6.0|       6.0|
|      73.0|      63.0|
|    1415.0|       0.0|
|    1756.0|       0.0|
|      10.0|       3.0|
|      10.0|       3.0|
|       3.0|       3.0|
|      53.0|      53.0|
+----------+----------+
only showing top 10 rows



                                                                                

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)

22/07/10 06:45:49 WARN DAGScheduler: Broadcasting large task binary with size 14.9 MiB
22/07/10 06:45:53 WARN DAGScheduler: Broadcasting large task binary with size 13.9 MiB
22/07/10 08:31:32 WARN DAGScheduler: Broadcasting large task binary with size 15.0 MiB

unfortunately, laptop turned off and I couldn't see the result; only next cell result is copied from another notebook(all steps are the same)

In [30]:
print("Accuracy = %s" % (accuracy))

Accuracy = 0.14805097137616458
