#### Import Library

In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("l7").master('local[2]').getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Import Data

In [2]:
spark_df = spark.read.options(header='true', inferSchema='true').csv("file:///root/code/datasets/PBMC_16k_RNA.csv")

                                                                                

In [3]:
label_df = spark.read.options(header='true', inferSchema='true').csv("file:///root/code/datasets/PBMC_16k_RNA_label.csv")

#### Preprocess

**Column Name**

In [4]:
transformed_columns = []
for col in spark_df.columns:
    transformed_columns.append(col.strip().replace('.', ''))

spark_df = spark_df.toDF(*transformed_columns)

**Vector Assemble**

In [5]:
# Merge the features into one vector
from pyspark.ml.feature import VectorAssembler

feature_cols = spark_df.drop('index').columns
feature_assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
feature_df = feature_assembler.transform(spark_df).select('index', 'features')
feature_df.show(5)

2022-07-22 06:59:14,902 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 4:>                                                          (0 + 1) / 1]

+------------------+--------------------+
|             index|            features|
+------------------+--------------------+
|AAGGTTCTCAGTTTGG-1|[-0.06204129,-0.2...|
|CGGACGTAGAAACGCC-1|[-0.09675046,-0.4...|
|GGCTCGATCCTAAGTG-1|[-0.043204147,-0....|
|TACACGACAATAGCGG-1|[-0.056053527,-0....|
|TCAATCTCATTCGACA-1|[-0.042632394,-0....|
+------------------+--------------------+
only showing top 5 rows



                                                                                

**Standardized**

In [6]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='features', outputCol='standardized_features')
standardized_df = scaler.fit(feature_df).transform(feature_df).select('index', 'standardized_features')
standardized_df.show(5)


                                                                                

+------------------+---------------------+
|             index|standardized_features|
+------------------+---------------------+
|AAGGTTCTCAGTTTGG-1| [-0.1244221378800...|
|CGGACGTAGAAACGCC-1| [-0.1940304444681...|
|GGCTCGATCCTAAGTG-1| [-0.0866447544050...|
|TACACGACAATAGCGG-1| [-0.1124138402837...|
|TCAATCTCATTCGACA-1| [-0.0854981191465...|
+------------------+---------------------+
only showing top 5 rows



**PCA Upon Standardized Vector with More Principal Components**

In [7]:
from pyspark.ml.feature import PCA

pca = PCA(k=3, inputCol='standardized_features', outputCol='pca_features')
pca_model = pca.fit(standardized_df)
pca_df = pca_model.transform(standardized_df)

2022-07-22 06:59:44,943 WARN netlib.InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
2022-07-22 06:59:44,950 WARN netlib.InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
2022-07-22 07:00:42,229 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
2022-07-22 07:00:42,230 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


**Explained Variance**

In [8]:
pca_model.explainedVariance.cumsum()

array([0.02441924, 0.03537516, 0.04234139])

In [9]:
labeled_pca_df = pca_df.join(label_df, on='index')

In [10]:
labeled_pca_df.show(5)

+------------------+---------------------+--------------------+--------+
|             index|standardized_features|        pca_features|CITEsort|
+------------------+---------------------+--------------------+--------+
|AAGGTTCTCAGTTTGG-1| [-0.1244221378800...|[7.55190724793356...|     ACT|
|CGGACGTAGAAACGCC-1| [-0.1940304444681...|[-4.6778081823218...|  C-mono|
|GGCTCGATCCTAAGTG-1| [-0.0866447544050...|[2.59406523090638...|  CD4+ T|
|TACACGACAATAGCGG-1| [-0.1124138402837...|[5.65480016910858...|  CD4+ T|
|TCAATCTCATTCGACA-1| [-0.0854981191465...|[4.88987145625218...|  CD8+ T|
+------------------+---------------------+--------------------+--------+
only showing top 5 rows



#### Non-A cells

In [11]:
labeled_pca_df.groupBy('CITEsort').count().orderBy("count").show()



+--------+-----+
|CITEsort|count|
+--------+-----+
| CD8+ DC|   81|
|     iNK|  113|
| CD4+ DC|  166|
|     DNT|  178|
|     mDC|  303|
|  B cell|  414|
| NC-mono|  537|
|     mNK| 1057|
|  CD8+ T| 2035|
|  C-mono| 2313|
|     ACT| 2952|
|  CD4+ T| 5262|
+--------+-----+



                                                                                

**Divide Dataframe**

In [12]:
labeled_pca_df.columns

['index', 'standardized_features', 'pca_features', 'CITEsort']

In [13]:
labeled_pca_df.createOrReplaceTempView("data")

t_cell_df = spark.sql("SELECT pca_features FROM data WHERE CITEsort == 'CD4+ T'")
non_t_cell_df = spark.sql("SELECT pca_features FROM data WHERE CITEsort != 'CD4+ T'")
t_cell_df.show(5)

+--------------------+
|        pca_features|
+--------------------+
|[2.59406523090638...|
|[5.65480016910858...|
|[4.49718949234563...|
|[1.03337582005633...|
|[0.13968854037425...|
+--------------------+
only showing top 5 rows



**Label Data: change the label**

In [14]:
from pyspark.mllib.regression import LabeledPoint

t_cell_rdd = t_cell_df.rdd.map(lambda x: LabeledPoint(0, [x[0]]))
non_t_cell_rdd = non_t_cell_df.rdd.map(lambda x: LabeledPoint(1, [x[0]]))

t_cell_rdd.take(5)

                                                                                

[LabeledPoint(0.0, [2.5940652309063856,-2.891411819704267,-1.736792233172006]),
 LabeledPoint(0.0, [5.654800169108588,-2.954814475956952,-2.129561857845239]),
 LabeledPoint(0.0, [4.497189492345637,-2.1081466516542253,-2.1047844793474755]),
 LabeledPoint(0.0, [1.0333758200563368,-2.7457468764231288,-1.0717953732629908]),
 LabeledPoint(0.0, [0.13968854037425663,-2.57990288682906,-1.1745761580036478])]

**Reunion and Split Data with Random Seed**

In [17]:
data = t_cell_rdd.union(non_t_cell_rdd)
(training_data, test_data) = data.randomSplit([0.7, 0.3], seed=22)

#### Gradient Descent

In [18]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

training_data.cache()
model = LogisticRegressionWithLBFGS.train(training_data, iterations=100)
labels_and_predictions = test_data.map(lambda x: (x.label, model.predict(x.features)))
error_rate = labels_and_predictions.filter(lambda res: res[0] != res[1]).count() / float(test_data.count())

print("Error Rate: " + str(error_rate))

2022-07-22 07:04:13,041 WARN util.Instrumentation: [412cd8cf] Initial coefficients will be ignored! Its dimensions (1, 3) did not match the expected size (1, 3)

Error Rate: 0.23958110707416114


                                                                                