In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.ml.feature import VectorAssembler
from functools import reduce
from tqdm import tqdm

sc= SparkContext()
sqlContext = SQLContext(sc)

In [2]:
df = sqlContext.read.format('csv').options(inferSchema='true', header='true').load('file:///home/pgroup1/l7/PBMC_16k_RNA.csv')

In [5]:
df.groupby().max('KLHL17').first().asDict()['max(KLHL17)'] - df.groupby().min('KLHL17').first().asDict()['min(KLHL17)']

10.11507928

In [6]:
df.groupby().max('HES4').first().asDict()['max(HES4)'] - df.groupby().min('HES4').first().asDict()['min(HES4)']

10.812213

## PCA Analysis

In [3]:
df = df.drop('index')
old_cols = df.columns
new_cols = df.columns

for i in range(len(new_cols)):
    new_cols[i] = new_cols[i].replace('.','')

df = reduce(lambda data, idx: data.withColumnRenamed(old_cols[idx], new_cols[idx]), tqdm(range(len(old_cols))), df)

100%|██████████| 1882/1882 [08:46<00:00,  3.57it/s]


In [4]:
df.write.option('header','true').option('inferSchema', 'true').csv("file:///home/pgroup1/l7/PBMC_col_revised.csv")

In [5]:
assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
features = assembler.transform(df).select("features")

pca = PCA(k=2, inputCol='features')
pca.setOutputCol("pca_features")
model = pca.fit(features)
model.explainedVariance

DenseVector([0.0294, 0.0131])

In [6]:
result = model.transform(features).select("pca_features")
result.show(1)

+--------------------+
|        pca_features|
+--------------------+
|[7.46652731979111...|
+--------------------+
only showing top 1 row



In [7]:
y = sqlContext.read.format('csv').options(inferSchema='true', header='true').load('file:///home/pgroup1/l7/PBMC_16k_RNA_label.csv')
y.printSchema()

root
 |-- index: string (nullable = true)
 |-- CITEsort: string (nullable = true)



## SGD

In [11]:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
result = result.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
result.show(1)

+--------------------+---------+
|        pca_features|row_index|
+--------------------+---------+
|[7.46652731979111...|        1|
+--------------------+---------+
only showing top 1 row



In [14]:
y = y.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
y.show(5)

+------------------+--------+---------+
|             index|CITEsort|row_index|
+------------------+--------+---------+
|AAGGTTCTCAGTTTGG-1|     ACT|        1|
|CGGACGTAGAAACGCC-1|  C-mono|        2|
|GGCTCGATCCTAAGTG-1|  CD4+ T|        3|
|TACACGACAATAGCGG-1|  CD4+ T|        4|
|TCAATCTCATTCGACA-1|  CD8+ T|        5|
+------------------+--------+---------+
only showing top 5 rows



In [15]:
result = result.join(y, on=["row_index"]).drop("row_index")
result.show(5)

+--------------------+------------------+--------+
|        pca_features|             index|CITEsort|
+--------------------+------------------+--------+
|[7.46652731979111...|AAGGTTCTCAGTTTGG-1|     ACT|
|[-4.4924025163180...|CGGACGTAGAAACGCC-1|  C-mono|
|[2.61741671706775...|GGCTCGATCCTAAGTG-1|  CD4+ T|
|[5.55800451104079...|TACACGACAATAGCGG-1|  CD4+ T|
|[4.98150290491624...|TCAATCTCATTCGACA-1|  CD8+ T|
|[4.57613006366093...|CTTACCGGTCGTTGTA-1|  CD4+ T|
|[-7.8628762317470...|CCCAGTTTCCAGGGCT-1| NC-mono|
|[6.98638584301929...|AGAGCGAAGAATAGGG-1|  CD8+ T|
|[8.91687773343435...|ACGGCCAAGGATTCGG-1|     ACT|
|[-7.6157478596061...|ATCGAGTGTAATCGTC-1|     ACT|
|[5.29679290463833...|GCGCAACGTGAGCGAT-1|     ACT|
|[2.91381397038213...|CCCAGTTTCCGCGGTA-1|  CD8+ T|
|[3.97114115574227...|GAGGTGAAGCTACCTA-1|     ACT|
|[1.03882406794541...|CGTCTACAGCGGATCA-1|  CD4+ T|
|[0.14270695568292...|CCAATCCGTGCCTGGT-1|  CD4+ T|
|[2.97562961813030...|TGGCCAGGTGTGCCTG-1|  B cell|
|[14.3869576747256...|AAGGTTCAG

In [17]:
result = result.drop('index')

In [2]:
result = sqlContext.read.options(inferSchema='true', header='true').load('dataset')
result.show(5)

+--------------------+--------+
|        pca_features|CITEsort|
+--------------------+--------+
|[7.46652731979111...|     ACT|
|[-4.4924025163180...|  C-mono|
|[2.61741671706775...|  CD4+ T|
|[5.55800451104079...|  CD4+ T|
|[4.98150290491624...|  CD8+ T|
+--------------------+--------+
only showing top 5 rows



In [3]:
from pyspark.mllib.util import MLUtils
result = MLUtils.convertVectorColumnsFromML(result, "pca_features")
result.write.option('header','true').option('inferSchema', 'true').save("file:///home/pgroup1/l7/dataset")

### Now we check which cell type has the greatest number (our a-cell)

In [8]:
y['CITEsort'].value_counts()

CD4+ T     5262
ACT        2952
C-mono     2313
CD8+ T     2035
mNK        1057
NC-mono     537
B cell      414
mDC         303
DNT         178
CD4+ DC     166
iNK         113
CD8+ DC      81
Name: CITEsort, dtype: int64

In [5]:
result.createOrReplaceTempView('data')
a_cell = sqlContext.sql("select pca_features from data where CITEsort == 'CD4+ T'")
a_cell.show(5)

+--------------------+
|        pca_features|
+--------------------+
|[2.61741671706775...|
|[5.55800451104079...|
|[4.57613006366093...|
|[1.03882406794541...|
|[0.14270695568292...|
+--------------------+
only showing top 5 rows



In [6]:
non_a_cell = sqlContext.sql("select pca_features from data where CITEsort != 'CD4+ T'")
non_a_cell.show(5)

+--------------------+
|        pca_features|
+--------------------+
|[7.46652731979111...|
|[-4.4924025163180...|
|[4.98150290491624...|
|[-7.8628762317470...|
|[6.98638584301929...|
+--------------------+
only showing top 5 rows



### Label the vector

In [7]:
from pyspark.mllib.regression import LabeledPoint
a_cell_rdd = a_cell.rdd.map(lambda x: LabeledPoint(1,x[0]))
non_a_cell_rdd = non_a_cell.rdd.map(lambda x: LabeledPoint(0,x[0]))

### Split the dataset

In [8]:
data = a_cell_rdd.union(non_a_cell_rdd)
(train, test) = data.randomSplit([0.7,0.3])

## Logistic Regression

In [9]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
logistic_model = LogisticRegressionWithLBFGS.train(train,numClasses=2, regParam=0,
                                                               intercept=True,
                                                               validateData=False)

### Predict on the test dataset & Calculate the error rate

In [38]:
labels_and_predictions = test.map(lambda x: (x.label, logistic_model.predict(x.features)))
error_rate = labels_and_predictions.filter(lambda x: x[0] != x[1]).count() / float(test.count())
error_rate

0.32744535217487303