In [3]:
import os
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
import pandas as pd    
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/"
os.environ["SPARK_HOME"] = "/workspace/tripx/MCS/big_data/spark-3.1.1-bin-hadoop3.2"
findspark.init()
import time 
import logging
import seaborn as sns
import matplotlib.pyplot as plt
import pyspark.sql.functions as f
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, count, explode, sum as sum_


# Stop spark if it existed
SparkContext.setSystemProperty('spark.executor.memory', '200g')
# SparkContext.setSystemProperty('spark.driver.maxResultsSize', '0')
# SparkContext.setSystemProperty('spark.driver.memory', '100g')

try:
    sc.stop()
except:
    print('sc have not yet created!')


In [4]:
sc = SparkContext(master = "local", appName = "Multimodal Single Cell")
# sc = SparkContext.getOrCreate()
# Init session
print("Init session")
my_spark = SparkSession.builder.getOrCreate()
print(my_spark.catalog.listTables())



Init session
[]


In [5]:
train_cite_inputs_path = "/workspace/tripx/MCS/big_data/data/sub_train_input.csv"
train_cite_inputs_data = my_spark.read.csv(train_cite_inputs_path,  header=True, maxColumns=30000)

train_cite_target_path = "/workspace/tripx/MCS/big_data/data/sub_train_output.csv"
train_cite_target_data = my_spark.read.csv(train_cite_target_path,  header=True)

print(train_cite_inputs_data.count())
print(train_cite_inputs_data.count())

new_cols=(column.replace('.', '_') for column in train_cite_inputs_data.columns)
new_train_data = train_cite_inputs_data.toDF(*new_cols)

23/07/27 10:44:51 WARN DAGScheduler: Broadcasting large task binary with size 1257.5 KiB


11


23/07/27 10:44:55 WARN DAGScheduler: Broadcasting large task binary with size 1257.5 KiB


11


In [6]:
train_cite_inputs_data.columns

['_c0',
 'cell_id',
 'ENSG00000121410_A1BG',
 'ENSG00000268895_A1BG-AS1',
 'ENSG00000175899_A2M',
 'ENSG00000245105_A2M-AS1',
 'ENSG00000166535_A2ML1',
 'ENSG00000128274_A4GALT',
 'ENSG00000094914_AAAS',
 'ENSG00000081760_AACS',
 'ENSG00000109576_AADAT',
 'ENSG00000103591_AAGAB',
 'ENSG00000115977_AAK1',
 'ENSG00000087884_AAMDC',
 'ENSG00000127837_AAMP',
 'ENSG00000129673_AANAT',
 'ENSG00000131043_AAR2',
 'ENSG00000205002_AARD',
 'ENSG00000090861_AARS',
 'ENSG00000124608_AARS2',
 'ENSG00000266967_AARSD1',
 'ENSG00000157426_AASDH',
 'ENSG00000149313_AASDHPPT',
 'ENSG00000008311_AASS',
 'ENSG00000215458_AATBC',
 'ENSG00000275700_AATF',
 'ENSG00000181409_AATK',
 'ENSG00000281376_ABALON',
 'ENSG00000183044_ABAT',
 'ENSG00000165029_ABCA1',
 'ENSG00000154263_ABCA10',
 'ENSG00000251595_ABCA11P',
 'ENSG00000179869_ABCA13',
 'ENSG00000238098_ABCA17P',
 'ENSG00000107331_ABCA2',
 'ENSG00000167972_ABCA3',
 'ENSG00000154265_ABCA5',
 'ENSG00000154262_ABCA6',
 'ENSG00000064687_ABCA7',
 'ENSG000000855

In [7]:
new_train_data = train_cite_inputs_data.select("cell_id", "ENSG00000121410_A1BG", "ENSG00000175899_A2M")

In [8]:
columns = ['ENSG00000121410_A1BG', 'ENSG00000175899_A2M']
for col_name in columns:
    print(col_name)
    new_train_data = new_train_data.withColumn(col_name,  col(col_name).cast('float'))

ENSG00000121410_A1BG
ENSG00000175899_A2M


In [9]:
new_train_data.printSchema()

root
 |-- cell_id: string (nullable = true)
 |-- ENSG00000121410_A1BG: float (nullable = true)
 |-- ENSG00000175899_A2M: float (nullable = true)



In [10]:
new_train_data.show()

+------------+--------------------+-------------------+
|     cell_id|ENSG00000121410_A1BG|ENSG00000175899_A2M|
+------------+--------------------+-------------------+
|45006fe3e4c8|                 0.0|                0.0|
|d02759a80ba2|                 0.0|                0.0|
|c016c6b0efa5|                 0.0|                0.0|
|ba7f733a4f75|                 0.0|                0.0|
|fbcf2443ffb2|                 0.0|                0.0|
|d80d84ca8e89|                 0.0|                0.0|
|1ac2049b4c98|                 0.0|                0.0|
|33fb0c29e2e4|                 0.0|                0.0|
|b329261bd0ee|                 0.0|                0.0|
|703762287e88|                 0.0|                0.0|
|b646f9b319d5|                 0.0|                0.0|
+------------+--------------------+-------------------+



23/07/27 10:45:00 WARN DAGScheduler: Broadcasting large task binary with size 1256.2 KiB


In [11]:
new_train_label_data = train_cite_target_data.select("cell_id", "CD86", "CD274", 'CD270')

In [None]:
new_train_data.show()

In [12]:
for col_name in ["CD86", "CD274", 'CD270']:
    print(col_name)
    new_train_label_data = new_train_label_data.withColumn(col_name,  col(col_name).cast('float'))

CD86
CD274
CD270


In [13]:
new_train_label_data

DataFrame[cell_id: string, CD86: float, CD274: float, CD270: float]

In [14]:
# train_data = new_train_data.merge(new_train_label_data, left_on='cell_id', right_on='cell_id' )

train_data = new_train_data.join(new_train_label_data, new_train_data.cell_id ==  new_train_label_data.cell_id,"left")

In [15]:
train_data.show(5)

+------------+--------------------+-------------------+------------+------------+-----------+-----------+
|     cell_id|ENSG00000121410_A1BG|ENSG00000175899_A2M|     cell_id|        CD86|      CD274|      CD270|
+------------+--------------------+-------------------+------------+------------+-----------+-----------+
|45006fe3e4c8|                 0.0|                0.0|45006fe3e4c8|   1.1678035|    0.62253| 0.10695851|
|d02759a80ba2|                 0.0|                0.0|d02759a80ba2|  0.81897014| 0.50600946|   1.078682|
|c016c6b0efa5|                 0.0|                0.0|c016c6b0efa5|  -0.3567033|-0.42226133|-0.82449275|
|ba7f733a4f75|                 0.0|                0.0|ba7f733a4f75|  -1.2015074| 0.14911485|  2.0224676|
|fbcf2443ffb2|                 0.0|                0.0|fbcf2443ffb2|-0.100404024|  0.6974609| 0.62583566|
+------------+--------------------+-------------------+------------+------------+-----------+-----------+
only showing top 5 rows



23/07/27 10:45:19 WARN DAGScheduler: Broadcasting large task binary with size 1260.1 KiB


In [16]:
train_data = train_data.select('ENSG00000121410_A1BG', 'ENSG00000175899_A2M', "CD86", "CD274", 'CD270')

In [17]:
train_data

DataFrame[ENSG00000121410_A1BG: float, ENSG00000175899_A2M: float, CD86: float, CD274: float, CD270: float]

In [18]:
from pyspark.ml.feature import VectorAssembler
vec_assembler_features = VectorAssembler(inputCols = ["ENSG00000121410_A1BG", "ENSG00000175899_A2M"],
                                outputCol = "features")

In [19]:
from pyspark.ml.feature import VectorAssembler
vec_assembler_label = VectorAssembler(inputCols = ["CD86", "CD274", 'CD270'],
                                outputCol = "label")

In [20]:
input_vector = vec_assembler_features.transform(train_data)


In [21]:
input_vector.show()

+--------------------+-------------------+------------+-----------+-----------+---------+
|ENSG00000121410_A1BG|ENSG00000175899_A2M|        CD86|      CD274|      CD270| features|
+--------------------+-------------------+------------+-----------+-----------+---------+
|                 0.0|                0.0|   1.1678035|    0.62253| 0.10695851|(2,[],[])|
|                 0.0|                0.0|  0.81897014| 0.50600946|   1.078682|(2,[],[])|
|                 0.0|                0.0|  -0.3567033|-0.42226133|-0.82449275|(2,[],[])|
|                 0.0|                0.0|  -1.2015074| 0.14911485|  2.0224676|(2,[],[])|
|                 0.0|                0.0|-0.100404024|  0.6974609| 0.62583566|(2,[],[])|
|                 0.0|                0.0|   0.8239842|  1.6257721|  1.8227521|(2,[],[])|
|                 0.0|                0.0|  -0.2512331| 0.43728906| 0.44692641|(2,[],[])|
|                 0.0|                0.0| -0.71294916| 0.76743567|  0.3196118|(2,[],[])|
|         

23/07/27 10:46:14 WARN DAGScheduler: Broadcasting large task binary with size 1271.8 KiB


In [22]:
output_vector = vec_assembler_label.transform(input_vector)

In [24]:
output_vector.show()

+--------------------+-------------------+------------+-----------+-----------+---------+--------------------+
|ENSG00000121410_A1BG|ENSG00000175899_A2M|        CD86|      CD274|      CD270| features|               label|
+--------------------+-------------------+------------+-----------+-----------+---------+--------------------+
|                 0.0|                0.0|   1.1678035|    0.62253| 0.10695851|(2,[],[])|[1.16780352592468...|
|                 0.0|                0.0|  0.81897014| 0.50600946|   1.078682|(2,[],[])|[0.81897014379501...|
|                 0.0|                0.0|  -0.3567033|-0.42226133|-0.82449275|(2,[],[])|[-0.3567033112049...|
|                 0.0|                0.0|  -1.2015074| 0.14911485|  2.0224676|(2,[],[])|[-1.2015074491500...|
|                 0.0|                0.0|-0.100404024|  0.6974609| 0.62583566|(2,[],[])|[-0.1004040241241...|
|                 0.0|                0.0|   0.8239842|  1.6257721|  1.8227521|(2,[],[])|[0.82398420572280...|
|

23/07/27 10:46:24 WARN DAGScheduler: Broadcasting large task binary with size 1281.4 KiB


In [25]:
train_vector = output_vector.select("features", "label")

In [26]:
train_vector.show()

+---------+--------------------+
| features|               label|
+---------+--------------------+
|(2,[],[])|[1.16780352592468...|
|(2,[],[])|[0.81897014379501...|
|(2,[],[])|[-0.3567033112049...|
|(2,[],[])|[-1.2015074491500...|
|(2,[],[])|[-0.1004040241241...|
|(2,[],[])|[0.82398420572280...|
|(2,[],[])|[-0.2512331008911...|
|(2,[],[])|[-0.7129491567611...|
|(2,[],[])|[-0.7890341281890...|
|(2,[],[])|[-0.4738209247589...|
|(2,[],[])|[0.77497178316116...|
+---------+--------------------+



23/07/27 10:46:30 WARN DAGScheduler: Broadcasting large task binary with size 1279.8 KiB


In [27]:
splits = train_vector.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

In [8]:
# from pyspark.ml import Pipeline

# # Make a pipeline
# single_cell_pipe  = Pipeline(stages = [vec_assembler])

In [9]:
# pipe_data = single_cell_pipe.fit(new_train_data).transform(new_train_data)

In [20]:
new_train_data = vec_assembler.transform(new_train_data)


In [21]:
new_train_data.show()

+--------------------+-------------------+---------+
|ENSG00000121410_A1BG|ENSG00000175899_A2M| features|
+--------------------+-------------------+---------+
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
|                 0.0|                0.0|(2,[],[])|
+--------------------+-------------------+---------+



23/07/24 18:15:42 WARN DAGScheduler: Broadcasting large task binary with size 1269.1 KiB


In [None]:
train_cite_target_data.printSchema()

In [13]:
new_train_label_data.show()

+------------+-----------+-----------+
|        CD86|      CD274|      CD270|
+------------+-----------+-----------+
|   1.1678035|    0.62253| 0.10695851|
|  0.81897014| 0.50600946|   1.078682|
|  -0.3567033|-0.42226133|-0.82449275|
|  -1.2015074| 0.14911485|  2.0224676|
|-0.100404024|  0.6974609| 0.62583566|
|   0.8239842|  1.6257721|  1.8227521|
|  -0.2512331| 0.43728906| 0.44692641|
| -0.71294916| 0.76743567|  0.3196118|
|  -0.7890341|-0.96942055| -0.7789143|
| -0.47382092|  0.1555812|-0.37076998|
|   0.7749718|   0.125361| -0.6935526|
+------------+-----------+-----------+



In [14]:
from pyspark.ml.regression import LinearRegression
