In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

In [5]:
!pip install pysam
import pysam



In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
from pyspark.sql.functions import log2
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id

from pyspark.ml.feature import StringIndexer

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.stat import Correlation
from pyspark.ml.clustering import LDA

from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression, LogisticRegressionSummary
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator


In [7]:
# loading the Alignment files in BAM format

bamfile_45 = pysam.AlignmentFile("/content/drive/MyDrive/CLOUD/SRR26924245_sorted.bam",mode='rb',require_index = True)

In [12]:
# Printing the header of the BAM file

print(bamfile_45.header)

@HD	VN:1.0	SO:coordinate
@SQ	SN:1	LN:248956422
@SQ	SN:10	LN:133797422
@SQ	SN:11	LN:135086622
@SQ	SN:12	LN:133275309
@SQ	SN:13	LN:114364328
@SQ	SN:14	LN:107043718
@SQ	SN:15	LN:101991189
@SQ	SN:16	LN:90338345
@SQ	SN:17	LN:83257441
@SQ	SN:18	LN:80373285
@SQ	SN:19	LN:58617616
@SQ	SN:2	LN:242193529
@SQ	SN:20	LN:64444167
@SQ	SN:21	LN:46709983
@SQ	SN:22	LN:50818468
@SQ	SN:3	LN:198295559
@SQ	SN:4	LN:190214555
@SQ	SN:5	LN:181538259
@SQ	SN:6	LN:170805979
@SQ	SN:7	LN:159345973
@SQ	SN:8	LN:145138636
@SQ	SN:9	LN:138394717
@SQ	SN:MT	LN:16569
@SQ	SN:X	LN:156040895
@SQ	SN:Y	LN:57227415
@SQ	SN:KI270728.1	LN:1872759
@SQ	SN:KI270727.1	LN:448248
@SQ	SN:KI270442.1	LN:392061
@SQ	SN:KI270729.1	LN:280839
@SQ	SN:GL000225.1	LN:211173
@SQ	SN:KI270743.1	LN:210658
@SQ	SN:GL000008.2	LN:209709
@SQ	SN:GL000009.2	LN:201709
@SQ	SN:KI270747.1	LN:198735
@SQ	SN:KI270722.1	LN:194050
@SQ	SN:GL000194.1	LN:191469
@SQ	SN:KI270742.1	LN:186739
@SQ	SN:GL000205.2	LN:185591
@SQ	SN:GL000195.1	LN:182896
@SQ	SN:KI270736.1	LN:181920
@S

In [81]:
# Fetching the values in the BAM file at chr1 in between 1211340 and 1314153 start and end points

chr1 = bamfile_45.fetch('1',1211340,1314153)

for x in chr1:
  print(str(x))
  break

SRR26924245.13771301	99	#0	1212843	60	150M	#0	1212880	187	CCCCCCCGTGGGCCCTGACCCGGGAGCTCGGTCTTGAGGATGCCAGAGGAGGCACCGGTGGGGCCAGGTCCCTGCGGCCCACGGCCCATCTGCGTCACAGACAGCCGCTATGCACACCCCCCACCGCCGGCCGCTGCCACCGAGCTCACC	array('B', [37, 37, 37, 11, 37, 37, 37, 37, 37, 37, 37, 37, 11, 37, 37, 37, 37, 11, 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, 25, 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, 25, 37, 37, 25, 37, 25, 37, 37, 25, 37, 37, 37, 25, 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, 11, 37, 25, 37, 37, 37, 25, 25, 37, 37, 37, 25, 37, 37, 37, 11, 37, 25, 37, 37, 37, 37, 37, 11, 37, 37, 37, 37, 37, 25, 37, 11, 37, 11, 37, 37, 37, 37, 37, 37, 37, 37, 37, 11, 37, 37, 37, 37, 11, 37, 25, 37, 37, 37, 37, 25, 11, 25, 11, 37, 11, 37, 25, 37, 25, 37, 25, 37, 37, 11, 37, 37, 37, 11, 37, 37, 37, 11, 37, 37, 11, 11, 37, 37, 37])	[('AS', -9), ('ZS', -10), ('XN', 0), ('XM', 3), ('XO', 0), ('XG', 0), ('NM', 3), ('MD', '3A117A12A15'), ('YS', -3), ('YT', 'CP'), ('NH', 1)]


In [8]:
# reading the second BAM file

bamfile_46 = pysam.AlignmentFile('/content/drive/MyDrive/CLOUD/SRR26924246_sorted.bam',mode='rb',require_index = True)

In [None]:
# Calculating the total number of reads in the BAM file

bamfile_45.count()

37182383

In [None]:
bamfile_46.count()

42557025

In [9]:
# Loading the Gene Transfer format file using spark

gtf_data = spark.read.csv('/content/drive/MyDrive/CLOUD/Homo_sapiens.GRCh38.106.gtf', sep = '\t',comment='#', header=None)

new_column_names = ['seqname', 'source', 'feature', 'start', 'end', 'score', 'strand', 'frame', 'attribute']

# Renaming columns
for i in range(len(new_column_names)):
    gtf_data = gtf_data.withColumnRenamed('_c' + str(i), new_column_names[i])

# Display the DataFrame with updated column names
gtf_data.show(10)


+-------+--------------+-----------+-------+-------+-----+------+-----+--------------------+
|seqname|        source|    feature|  start|    end|score|strand|frame|           attribute|
+-------+--------------+-----------+-------+-------+-----+------+-----+--------------------+
|      1|ensembl_havana|       gene|1211340|1214153|    .|     -|    .|gene_id "ENSG0000...|
|      1|ensembl_havana| transcript|1211340|1214153|    .|     -|    .|gene_id "ENSG0000...|
|      1|ensembl_havana|       exon|1213983|1214153|    .|     -|    .|gene_id "ENSG0000...|
|      1|ensembl_havana|        CDS|1213983|1214127|    .|     -|    0|gene_id "ENSG0000...|
|      1|ensembl_havana|start_codon|1214125|1214127|    .|     -|    0|gene_id "ENSG0000...|
|      1|ensembl_havana|       exon|1213663|1213785|    .|     -|    .|gene_id "ENSG0000...|
|      1|ensembl_havana|        CDS|1213663|1213785|    .|     -|    2|gene_id "ENSG0000...|
|      1|ensembl_havana|       exon|1212992|1213093|    .|     -|    .

In [10]:
# Selecting only gene features for annotation

from pyspark.sql.functions import col
genes = gtf_data.filter(col('feature') == 'gene').rdd.collect()

In [11]:
# Quantifying the whole BAM file to obtain the Gene and their corresponding counts ( how many reads are mapped to the gene )

def quantification(gtf,bam):

    # creating an empty dictionary to store the gene_id and their corresponding counts
    gene_read_counts = {}

    # for the entire length of the gtf file obtain each row and extract the gene ID from the attribute column
    for i in range(len(gtf)):
        gene = gtf[i]
        gene_id = gene['attribute'].split('gene_id "')[1].split('";')[0]

    # Initialize read count for the gene
        gene_read_counts[gene_id] = 0

    # Iterate through BAM file alignments and count reads overlapping with the gene
        for read in bam.fetch(str(gene['seqname']), int(gene['start']), int(gene['end'])):
            gene_read_counts[gene_id] += 1

    return gene_read_counts

In [12]:
gene_count_45 = quantification(genes,bamfile_45)

In [None]:
gene_count_45

{'ENSG00000186827': 3,
 'ENSG00000186891': 0,
 'ENSG00000160072': 1475,
 'ENSG00000260179': 2,
 'ENSG00000234396': 43,
 'ENSG00000225972': 237,
 'ENSG00000224315': 10,
 'ENSG00000198744': 41,
 'ENSG00000279928': 4,
 'ENSG00000228037': 0,
 'ENSG00000142611': 2,
 'ENSG00000225630': 20139,
 'ENSG00000131584': 1695,
 'ENSG00000227589': 10,
 'ENSG00000237402': 4,
 'ENSG00000284616': 0,
 'ENSG00000169972': 615,
 'ENSG00000157911': 1246,
 'ENSG00000269896': 83,
 'ENSG00000237973': 8475,
 'ENSG00000224051': 519,
 'ENSG00000228463': 128,
 'ENSG00000260972': 0,
 'ENSG00000157933': 1818,
 'ENSG00000162591': 2552,
 'ENSG00000224340': 121,
 'ENSG00000270035': 0,
 'ENSG00000169962': 16,
 'ENSG00000226545': 10,
 'ENSG00000233653': 2,
 'ENSG00000268663': 5,
 'ENSG00000226374': 1,
 'ENSG00000116285': 578,
 'ENSG00000229280': 0,
 'ENSG00000142655': 1288,
 'ENSG00000232596': 0,
 'ENSG00000235054': 0,
 'ENSG00000232663': 1,
 'ENSG00000284646': 4,
 'ENSG00000240409': 65,
 'ENSG00000229344': 52,
 'ENSG00000

In [13]:
gene_count_46 = quantification(genes,bamfile_46)

In [None]:
gene_count_46

{'ENSG00000186827': 2,
 'ENSG00000186891': 4,
 'ENSG00000160072': 1626,
 'ENSG00000260179': 2,
 'ENSG00000234396': 42,
 'ENSG00000225972': 305,
 'ENSG00000224315': 12,
 'ENSG00000198744': 57,
 'ENSG00000279928': 17,
 'ENSG00000228037': 0,
 'ENSG00000142611': 2,
 'ENSG00000225630': 28973,
 'ENSG00000131584': 1485,
 'ENSG00000227589': 4,
 'ENSG00000237402': 6,
 'ENSG00000284616': 0,
 'ENSG00000169972': 493,
 'ENSG00000157911': 1402,
 'ENSG00000269896': 95,
 'ENSG00000237973': 11763,
 'ENSG00000224051': 626,
 'ENSG00000228463': 122,
 'ENSG00000260972': 0,
 'ENSG00000157933': 2191,
 'ENSG00000162591': 2036,
 'ENSG00000224340': 150,
 'ENSG00000270035': 5,
 'ENSG00000169962': 32,
 'ENSG00000226545': 12,
 'ENSG00000233653': 8,
 'ENSG00000268663': 5,
 'ENSG00000226374': 0,
 'ENSG00000116285': 730,
 'ENSG00000229280': 0,
 'ENSG00000142655': 1405,
 'ENSG00000232596': 0,
 'ENSG00000235054': 0,
 'ENSG00000232663': 6,
 'ENSG00000284646': 0,
 'ENSG00000240409': 158,
 'ENSG00000229344': 60,
 'ENSG000

In [14]:
# Converting the dictionary obtained from the quantification function

SRR26924245 = spark.createDataFrame(gene_count_45.items(), ['Gene_ID','SRR26924245'])
SRR26924246 = spark.createDataFrame(gene_count_46.items(), ['Gene_ID','SRR26924246'])

In [15]:
# merging both the samples

counts_45_46 = SRR26924245.join(SRR26924246, 'Gene_ID', 'outer')

In [22]:
counts_45_46.show()

+---------------+-----------+-----------+
|        Gene_ID|SRR26924245|SRR26924246|
+---------------+-----------+-----------+
|ENSG00000000003|       1584|       1884|
|ENSG00000000005|          0|          0|
|ENSG00000000419|       1750|       2091|
|ENSG00000000457|        933|       1371|
|ENSG00000000460|       1679|       2252|
|ENSG00000000938|          2|          5|
|ENSG00000000971|       2751|       5580|
|ENSG00000001036|       4013|       4923|
|ENSG00000001084|       2425|       3142|
|ENSG00000001167|       1307|       1736|
|ENSG00000001460|        338|        318|
|ENSG00000001461|       1575|       1692|
|ENSG00000001497|       4220|       4869|
|ENSG00000001561|          0|         13|
|ENSG00000001617|       1753|       2211|
|ENSG00000001626|        442|        473|
|ENSG00000001629|       3995|       5068|
|ENSG00000001630|       4257|       5130|
|ENSG00000001631|       1925|       2272|
|ENSG00000002016|       1226|       1450|
+---------------+-----------+-----

In [16]:
counts_43_44 = spark.read.csv('new_merged.csv', header = True)
counts_47_48 = spark.read.csv('counts_47_48.csv', header = True)

In [17]:
counts_1 = counts_43_44.join(counts_45_46, 'Gene_ID', 'outer')
counts = counts_1.join(counts_47_48, 'Gene_ID', 'outer')

In [18]:
counts = counts.withColumnRenamed('Gene_ID', 'gene_id')

In [21]:
counts.show()

+---------------+-----------+-----------+-----------+-----------+-----------+-----------+
|        gene_id|SRR26924243|SRR26924244|SRR26924245|SRR26924246|SRR26924247|SRR26924248|
+---------------+-----------+-----------+-----------+-----------+-----------+-----------+
|ENSG00000000003|       1975|       2535|       1584|       1884|       1697|       2199|
|ENSG00000000005|          2|          0|          0|          0|          4|          0|
|ENSG00000000457|       1317|       1603|        933|       1371|       1276|       1442|
|ENSG00000000460|       2207|       2775|       1679|       2252|       2009|       2384|
|ENSG00000000971|       3314|       4355|       2751|       5580|       5291|       6055|
|ENSG00000001036|       5497|       6672|       4013|       4923|       4546|       5450|
|ENSG00000001167|       1564|       2030|       1307|       1736|       1542|       1600|
|ENSG00000001460|        349|        548|        338|        318|        288|        353|
|ENSG00000

In [30]:
counts.printSchema()

root
 |-- gene_id: string (nullable = true)
 |-- SRR26924243: string (nullable = true)
 |-- SRR26924244: string (nullable = true)
 |-- SRR26924245: long (nullable = true)
 |-- SRR26924246: long (nullable = true)
 |-- SRR26924247: string (nullable = true)
 |-- SRR26924248: string (nullable = true)



In [22]:
# Reading the meatadata file obtained from NCBI

meta_data = spark.read.csv('metadata.csv', header = True)

In [23]:
metadata = meta_data.drop(meta_data.columns[0])

In [24]:
metadata.show()

+------------+--------------------+
|sra_accesion|           condition|
+------------+--------------------+
| SRR26924243|EGFL7 overexpression|
| SRR26924244|EGFL7 overexpression|
| SRR26924245|EGFL7 overexpression|
| SRR26924246|      Normal Control|
| SRR26924247|      Normal Control|
| SRR26924248|      Normal Control|
+------------+--------------------+



In [34]:
metadata.printSchema()

root
 |-- sra_accesion: string (nullable = true)
 |-- condition: string (nullable = true)



In [25]:
cols = counts.columns
cols

['gene_id',
 'SRR26924243',
 'SRR26924244',
 'SRR26924245',
 'SRR26924246',
 'SRR26924247',
 'SRR26924248']

In [26]:
# Writing a function to convert sting values to integers

def int_row(x):
    row = {}
    for i in cols:
        if i == 'gene_id':
            row[i] = x[i]
        else:
            row[i] = int(x[i])
    return row

# converting the counts dataframe to an rdd and map to convert the strings to intergers
count_mapped = counts.rdd.map(int_row)

In [27]:
count_mod = spark.createDataFrame(count_mapped)

In [28]:
count_mod.show()

+-----------+-----------+-----------+-----------+-----------+-----------+---------------+
|SRR26924243|SRR26924244|SRR26924245|SRR26924246|SRR26924247|SRR26924248|        gene_id|
+-----------+-----------+-----------+-----------+-----------+-----------+---------------+
|       1975|       2535|       1584|       1884|       1697|       2199|ENSG00000000003|
|          2|          0|          0|          0|          4|          0|ENSG00000000005|
|       1317|       1603|        933|       1371|       1276|       1442|ENSG00000000457|
|       2207|       2775|       1679|       2252|       2009|       2384|ENSG00000000460|
|       3314|       4355|       2751|       5580|       5291|       6055|ENSG00000000971|
|       5497|       6672|       4013|       4923|       4546|       5450|ENSG00000001036|
|       1564|       2030|       1307|       1736|       1542|       1600|ENSG00000001167|
|        349|        548|        338|        318|        288|        353|ENSG00000001460|
|       21

In [39]:
count_mod.printSchema()

root
 |-- SRR26924243: long (nullable = true)
 |-- SRR26924244: long (nullable = true)
 |-- SRR26924245: long (nullable = true)
 |-- SRR26924246: long (nullable = true)
 |-- SRR26924247: long (nullable = true)
 |-- SRR26924248: long (nullable = true)
 |-- gene_id: string (nullable = true)



In [29]:
# converting the counts to log2 transformed counts using spark.sql log2 function

count_log = count_mod.select(['gene_id',log2('SRR26924243').alias('SRR26924243'),log2('SRR26924244').alias('SRR26924244'),log2('SRR26924245').alias('SRR26924245'),log2('SRR26924246').alias('SRR26924246'),log2('SRR26924247').alias('SRR26924247'),log2('SRR26924248').alias('SRR26924248')])

In [30]:
count_log.show()

+---------------+------------------+------------------+------------------+------------------+------------------+------------------+
|        gene_id|       SRR26924243|       SRR26924244|       SRR26924245|       SRR26924246|       SRR26924247|       SRR26924248|
+---------------+------------------+------------------+------------------+------------------+------------------+------------------+
|ENSG00000000003| 10.94763693795183|11.307770031890703| 10.62935662007961|10.879583249612784|10.728770849542665|11.102631888854969|
|ENSG00000000005|               1.0|              null|              null|              null|               2.0|              null|
|ENSG00000000457|10.363039630256514|10.646558710154547|  9.86573327085176|10.421012855779226| 10.31741261376487|10.493855449240824|
|ENSG00000000460|11.107870914279616| 11.43827205612483| 10.71338651493703| 11.13699111208023|10.972261848733291|11.219168520462162|
|ENSG00000000971|11.694357887221452|12.088457003486226|11.425740424316212|12

In [31]:
count_log = count_log.fillna(0)

In [32]:

count_log.show()

+---------------+------------------+------------------+------------------+------------------+------------------+------------------+
|        gene_id|       SRR26924243|       SRR26924244|       SRR26924245|       SRR26924246|       SRR26924247|       SRR26924248|
+---------------+------------------+------------------+------------------+------------------+------------------+------------------+
|ENSG00000000003| 10.94763693795183|11.307770031890703| 10.62935662007961|10.879583249612784|10.728770849542665|11.102631888854969|
|ENSG00000000005|               1.0|               0.0|               0.0|               0.0|               2.0|               0.0|
|ENSG00000000457|10.363039630256514|10.646558710154547|  9.86573327085176|10.421012855779226| 10.31741261376487|10.493855449240824|
|ENSG00000000460|11.107870914279616| 11.43827205612483| 10.71338651493703| 11.13699111208023|10.972261848733291|11.219168520462162|
|ENSG00000000971|11.694357887221452|12.088457003486226|11.425740424316212|12

In [33]:
# adding a column which contains counts of how many columns have count value greater than 10

col_count = count_log.withColumn('index', sum((col(column) > 11).cast("int") for column in cols[1:]))

# Filter rows where the sum of counts > 10 occurs in more than two columns
filtered_df = col_count.filter(col('index') > 4)


In [34]:
#  dropping the created index column
filtered_df = filtered_df.drop('index')

In [35]:
filtered_df.count()

5524

In [36]:
# creating a dictionary with each sample id as the key and their gene counts as values

data = {

    cols[1] : filtered_df.select(cols[1]).rdd.flatMap(lambda x: x).collect(),
    cols[2] : filtered_df.select(cols[2]).rdd.flatMap(lambda x: x).collect(),
    cols[3] : filtered_df.select(cols[3]).rdd.flatMap(lambda x: x).collect(),
    cols[4] : filtered_df.select(cols[4]).rdd.flatMap(lambda x: x).collect(),
    cols[5] : filtered_df.select(cols[5]).rdd.flatMap(lambda x: x).collect(),
    cols[6] : filtered_df.select(cols[6]).rdd.flatMap(lambda x: x).collect()
}



In [37]:
# the data dictionary is converted into a spark data frame such that the original data frame is transposed

df = spark.createDataFrame(spark.createDataFrame(sc.parallelize(data.items())).rdd.map(lambda x : x[1]))

In [38]:
header = filtered_df.select(cols[0]).rdd.flatMap(lambda x: x).collect()

In [39]:
df = df.toDF(*header)

In [40]:
df.select(df.columns[1:5]).show()

+------------------+------------------+------------------+------------------+
|   ENSG00000001036|   ENSG00000001497|   ENSG00000001629|   ENSG00000003096|
+------------------+------------------+------------------+------------------+
|12.424428764037762|12.500592945893835|12.365775635326269|11.915879378835774|
|12.703903573444663|12.805340828416385|12.711666973564347|12.271754805646474|
|11.970465440779996|12.043027283594549|11.963979787815338|11.470150435274359|
| 12.26532202423487|12.249409785269128| 12.30720080914081|11.982637133669424|
|12.150381968817848|12.179598130849836|12.151016538892248|11.920352855415082|
|12.412040514551652|12.284534954008427|  12.2659080092311|12.017156386383114|
+------------------+------------------+------------------+------------------+



In [41]:
data_col = [(column_name,) for column_name in data.keys()]

# Create a DataFrame from the list of tuples
data_col = spark.createDataFrame(data_col)
data_col = data_col.withColumnRenamed('_1','sra_accesion')

In [42]:
# adding monotonically increasing id to the df and srr accesion data

df = df.withColumn("Index", monotonically_increasing_id())

data_col = data_col.withColumn("Index", monotonically_increasing_id())

data_trans = data_col.join(df, 'Index','outer')

In [43]:
data_col.show()

+------------+----------+
|sra_accesion|     Index|
+------------+----------+
| SRR26924243|         0|
| SRR26924244|         1|
| SRR26924245|         2|
| SRR26924246|8589934592|
| SRR26924247|8589934593|
| SRR26924248|8589934594|
+------------+----------+



In [44]:
df.select('Index').show()

+----------+
|     Index|
+----------+
|         0|
|         1|
|         2|
|8589934592|
|8589934593|
|8589934594|
+----------+



In [45]:
data_trans.select(data_trans.columns[1:10]).show()

+------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|sra_accesion|   ENSG00000000971|   ENSG00000001036|   ENSG00000001497|   ENSG00000001629|   ENSG00000003096|   ENSG00000003436|   ENSG00000003509|   ENSG00000004142|
+------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
| SRR26924243|11.694357887221452|12.424428764037762|12.500592945893835|12.365775635326269|11.915879378835774|12.325024310191417|11.495355392235982|  12.4411286218047|
| SRR26924244|12.088457003486226|12.703903573444663|12.805340828416385|12.711666973564347|12.271754805646474|12.636851156626534|11.789126046425231|  12.7901448269498|
| SRR26924245|11.425740424316212|11.970465440779996|12.043027283594549|11.963979787815338|11.470150435274359|12.077149515589733|11.227615943414223|12.049167872372172

In [None]:
gene_df = count_log.select(cols[0])

gene_df_index = gene_df.withColumn('index', monotonically_increasing_id())

gene_df_index.show()

+---------------+-----+
|        gene_id|index|
+---------------+-----+
|ENSG00000186827|    0|
|ENSG00000186891|    1|
|ENSG00000160072|    2|
|ENSG00000260179|    3|
|ENSG00000234396|    4|
|ENSG00000225972|    5|
|ENSG00000224315|    6|
|ENSG00000198744|    7|
|ENSG00000279928|    8|
|ENSG00000228037|    9|
|ENSG00000142611|   10|
|ENSG00000225630|   11|
|ENSG00000131584|   12|
|ENSG00000227589|   13|
|ENSG00000237402|   14|
|ENSG00000284616|   15|
|ENSG00000169972|   16|
|ENSG00000157911|   17|
|ENSG00000269896|   18|
|ENSG00000237973|   19|
+---------------+-----+
only showing top 20 rows



In [46]:
# using string Indexer to convert the condition into numeric indexes

indexer = StringIndexer(inputCol="condition", outputCol="conditionIndex")

model_indexer = indexer.fit(metadata)
transformed_metadata = model_indexer.transform(metadata)



In [47]:
transformed_metadata.show()

+------------+--------------------+--------------+
|sra_accesion|           condition|conditionIndex|
+------------+--------------------+--------------+
| SRR26924243|EGFL7 overexpression|           0.0|
| SRR26924244|EGFL7 overexpression|           0.0|
| SRR26924245|EGFL7 overexpression|           0.0|
| SRR26924246|      Normal Control|           1.0|
| SRR26924247|      Normal Control|           1.0|
| SRR26924248|      Normal Control|           1.0|
+------------+--------------------+--------------+



In [48]:
# joining the metadata and the counts data for ml analysis

ml_data = data_trans.join(transformed_metadata,'sra_accesion','outer')

In [63]:
ml_data.printSchema()

root
 |-- sra_accesion: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- ENSG00000000971: double (nullable = true)
 |-- ENSG00000001036: double (nullable = true)
 |-- ENSG00000001497: double (nullable = true)
 |-- ENSG00000001629: double (nullable = true)
 |-- ENSG00000003096: double (nullable = true)
 |-- ENSG00000003436: double (nullable = true)
 |-- ENSG00000003509: double (nullable = true)
 |-- ENSG00000004142: double (nullable = true)
 |-- ENSG00000004478: double (nullable = true)
 |-- ENSG00000004487: double (nullable = true)
 |-- ENSG00000004700: double (nullable = true)
 |-- ENSG00000004864: double (nullable = true)
 |-- ENSG00000004961: double (nullable = true)
 |-- ENSG00000004975: double (nullable = true)
 |-- ENSG00000005007: double (nullable = true)
 |-- ENSG00000005022: double (nullable = true)
 |-- ENSG00000005100: double (nullable = true)
 |-- ENSG00000005156: double (nullable = true)
 |-- ENSG00000005175: double (nullable = true)
 |-- ENSG00000005339: d

In [49]:
# creating a feature vector for ml analysis using vector assembler

assembler = VectorAssembler(inputCols=ml_data.columns[2:-2], outputCol="features")
assembled_df = assembler.transform(ml_data)

In [50]:
selector = ChiSqSelector(numTopFeatures=20, featuresCol="features", outputCol="selected_features", labelCol='conditionIndex')

# Fit the model and transform the data
model = selector.fit(assembled_df)
selected_df = model.transform(assembled_df)

# Show the selected features
print("Selected Features:")
print(selected_df.select("selected_features").show())

Selected Features:
+--------------------+
|   selected_features|
+--------------------+
|[11.6943578872214...|
|[12.0884570034862...|
|[11.4257404243162...|
|[12.4460494067165...|
|[12.3693247024073...|
|[12.5639112445816...|
+--------------------+

None


In [51]:
model.selectedFeatures

[0,
 720,
 938,
 1915,
 1998,
 2363,
 2434,
 2444,
 2487,
 2877,
 2899,
 3005,
 3192,
 3478,
 3542,
 3635,
 3740,
 3907,
 5014,
 5371]

In [52]:
ml_col = ml_data.columns[2:-2]

feature_selected = ['sra_accesion','conditionIndex']
for i in model.selectedFeatures:
  feature_selected.append(ml_col[i])

feature_selected

['sra_accesion',
 'conditionIndex',
 'ENSG00000000971',
 'ENSG00000113658',
 'ENSG00000126767',
 'ENSG00000178464',
 'ENSG00000184863',
 'ENSG00000237399',
 'ENSG00000254995',
 'ENSG00000255717',
 'ENSG00000259080',
 'ENSG00000065613',
 'ENSG00000068724',
 'ENSG00000085365',
 'ENSG00000103479',
 'ENSG00000115275',
 'ENSG00000117419',
 'ENSG00000123080',
 'ENSG00000129103',
 'ENSG00000136485',
 'ENSG00000205765',
 'ENSG00000267436']

In [53]:
selected_data = ml_data.select(feature_selected)

selected_data.show()

+------------+--------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|sra_accesion|conditionIndex|   ENSG00000000971|   ENSG00000113658|   ENSG00000126767|   ENSG00000178464|   ENSG00000184863|   ENSG00000237399|   ENSG00000254995|   ENSG00000255717|   ENSG00000259080|   ENSG00000065613|   ENSG00000068724|   ENSG00000085365|   ENSG00000103479|   ENSG00000115275|   ENSG00000117419|   ENSG00000123080|   ENSG00000129103|   ENSG00000136485|   ENSG00000205765|   ENSG00000267436|
+------------+--------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------

In [54]:
selected_df.select("selected_features").rdd.take(6)

[Row(selected_features=DenseVector([11.6944, 12.0272, 11.6348, 11.856, 11.8704, 11.6142, 12.6221, 12.3581, 11.7805, 12.2255, 11.5459, 11.3287, 12.1539, 11.5784, 11.4686, 11.7879, 13.7244, 12.4742, 12.1257, 12.4244])),
 Row(selected_features=DenseVector([12.0885, 12.365, 12.0441, 12.31, 12.2297, 11.8525, 12.9748, 12.643, 12.0053, 12.6904, 11.8301, 11.7928, 12.5699, 11.8494, 11.7756, 12.1855, 14.1379, 12.7609, 12.384, 12.807])),
 Row(selected_features=DenseVector([11.4257, 11.7082, 11.3745, 11.7224, 11.5727, 11.2854, 12.2662, 12.0014, 11.4868, 11.9513, 10.9922, 11.1624, 11.8506, 11.1861, 11.1013, 11.3772, 13.3805, 12.1085, 11.6078, 12.1222])),
 Row(selected_features=DenseVector([12.446, 11.9833, 11.3663, 11.628, 11.8746, 11.0974, 12.6161, 12.2381, 11.6193, 12.334, 11.1718, 11.2515, 11.9625, 11.0217, 11.1434, 11.7616, 13.5771, 12.3565, 12.0553, 12.3883])),
 Row(selected_features=DenseVector([12.3693, 11.8045, 11.2808, 11.6022, 11.6662, 10.9549, 12.5355, 12.1056, 11.5488, 12.1668, 11.1718,

In [55]:
assembler = VectorAssembler(inputCols=selected_data.columns[2:], outputCol="selected_features")
selected_data_vector = assembler.transform(selected_data)

In [79]:

# Performing correlation analysis
corr = Correlation.corr(selected_data_vector, "selected_features", "spearman").head()
corr

Row(spearman(selected_features)=DenseMatrix(20, 20, [1.0, 0.116, -0.6667, -0.5798, 0.5218, -0.5798, 0.116, 0.116, ..., 0.7206, 0.5147, 0.8235, 0.8676, 0.8676, 0.8676, 0.8676, 1.0], False))

In [57]:
# Linear Descriminant analysis with k value 2

lda = LDA(k=2, maxIter=10)
model = lda.fit(selected_df)


In [58]:
topics = model.describeTopics(3)
topics.show()

+-----+------------------+--------------------+
|topic|       termIndices|         termWeights|
+-----+------------------+--------------------+
|    0|[2164, 1497, 1725]|[3.09000627542655...|
|    1| [2162, 5037, 234]|[2.86766389355105...|
+-----+------------------+--------------------+



In [59]:
# performing logistic regression
lr = LinearRegression(featuresCol="selected_features", labelCol="conditionIndex")
lr_model = lr.fit(selected_data_vector)

In [60]:
pred = lr_model.transform(selected_data_vector.select(['selected_features','conditionIndex']))
pred.show()

+--------------------+--------------+--------------------+
|   selected_features|conditionIndex|          prediction|
+--------------------+--------------+--------------------+
|[11.6943578872214...|           0.0|4.768967221124853...|
|[12.0884570034862...|           0.0|-8.74984129595191E-9|
|[11.4257404243162...|           0.0|9.767178177355618E-9|
|[12.4460494067165...|           1.0|  1.0000000275295573|
|[12.3693247024073...|           1.0|  1.0000001460789796|
|[12.5639112445816...|           1.0|   0.999999777684442|
+--------------------+--------------+--------------------+



In [61]:

# Evaluate using RMSE
evaluator = RegressionEvaluator(labelCol='conditionIndex', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(pred)

print("Root Mean Squared Error (RMSE):", rmse)


Root Mean Squared Error (RMSE): 1.1103108756880926e-07


In [65]:

# Performing Logistic Regression

log = LogisticRegression(featuresCol="selected_features", labelCol="conditionIndex")
log_model = log.fit(selected_data_vector)

In [69]:
predictions = log_model.transform(selected_data_vector)

# Show the predictions
pred_log = predictions.select(['conditionIndex','rawPrediction','probability','prediction'])

pred_log.show()



+--------------+--------------------+--------------------+----------+
|conditionIndex|       rawPrediction|         probability|prediction|
+--------------+--------------------+--------------------+----------+
|           0.0|[19.4533777516444...|[0.99999999643954...|       0.0|
|           0.0|[19.8717196090836...|[0.99999999765673...|       0.0|
|           0.0|[19.8411203104877...|[0.99999999758392...|       0.0|
|           1.0|[-19.325493582214...|[4.04617605314047...|       1.0|
|           1.0|[-18.395610108536...|[1.02538753612853...|       1.0|
|           1.0|[-21.045740332407...|[7.24354405372075...|       1.0|
+--------------+--------------------+--------------------+----------+



In [72]:
# Evaluating the model performance using rmse, accuracy

evaluator = RegressionEvaluator(labelCol='conditionIndex', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(pred_log)

print("Root Mean Squared Error (RMSE):", rmse)


Root Mean Squared Error (RMSE): 0.0


In [71]:

evaluator = MulticlassClassificationEvaluator(labelCol="conditionIndex", metricName="accuracy")
accuracy = evaluator.evaluate(pred_log)

print("Accuracy:", accuracy)



Accuracy: 1.0


In [78]:
summary = log_model.summary

# Access and print specific summary statistics
print("Area Under ROC:", summary.areaUnderROC)
print("Precision by threshold:")
summary.precisionByThreshold.show()

Area Under ROC: 1.0
Precision by threshold:
+--------------------+---------+
|           threshold|precision|
+--------------------+---------+
|  0.9999999992756456|      1.0|
|  0.9999999959538239|      1.0|
|  0.9999999897461246|      1.0|
|3.560454153728187...|     0.75|
|2.416078004330302E-9|      0.6|
|2.343267357929335E-9|      0.5|
+--------------------+---------+



