In [1]:
import os
import sys
import pandas
import numpy
import scipy
from scipy import linalg

import findspark
findspark.init("/usr/local/spark/spark-2.2.0-bin-hadoop2.7/")

import pyspark
from pyspark.sql.window import Window
import pyspark.sql.functions as func

from pyspark.rdd import reduce
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
import pyspark.mllib.linalg.distributed
from pyspark.mllib.linalg.distributed import RowMatrix, DenseMatrix
from pyspark.ml.linalg import SparseVector, VectorUDT, Vector, Vectors
from pyspark.mllib.stat import Statistics

In [2]:
conf = pyspark.SparkConf().setMaster("local[*]").set("spark.driver.memory", "4G").set("spark.executor.memory", "4G")
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)

In [3]:
spark.version

'2.2.0'

In [4]:
file_name = "/Users/simondi/PHD/data/data/target_infect_x/query_data/cells_sample_10_normalized_cut_100.tsv"
df = spark.read.csv(path=file_name, sep="\t", header='true')

In [5]:
old_cols = df.columns
new_cols = list(map(lambda x: x.replace(".", "_"), old_cols))

df = reduce(
  lambda data, idx: data.withColumnRenamed(old_cols[idx], new_cols[idx]),
  range(len(new_cols)), df)

for i, x in enumerate(new_cols):
    if x.startswith("cells") or x.startswith("perin") or x.startswith("nucl"):
        df = df.withColumn(x, df[x].cast("double"))

df = df.fillna(0)

In [6]:
feature_columns = [x for x in df.columns if (x.startswith("cells") or x.startswith("perin") or x.startswith("nucl")) and not x.startswith("cells_children_invasomes_count")]
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
data = assembler.transform(df)

In [7]:
data

DataFrame[study: string, pathogen: string, library: string, design: string, replicate: string, plate: string, well: string, gene: string, sirna: string, well_type: string, image_idx: string, object_idx: string, cells_areashape_area: double, cells_areashape_eccentricity: double, cells_areashape_extent: double, cells_areashape_formfactor: double, cells_areashape_majoraxislength: double, cells_areashape_minoraxislength: double, cells_areashape_perimeter: double, cells_children_bacteria_count: double, cells_children_invasomes_count: double, cells_location_center_x: double, cells_location_center_y: double, cells_neighbors_anglebetweenneighbors_2: double, cells_neighbors_firstclosestobjectnumber_2: double, cells_neighbors_firstclosestxvector_2: double, cells_neighbors_firstclosestyvector_2: double, cells_neighbors_numberofneighbors_2: double, cells_neighbors_secondclosestobjectnumber_2: double, cells_neighbors_secondclosestxvector_2: double, cells_neighbors_secondclosestyvector_2: double, ce

In [8]:
rdd = data.select(feature_columns).rdd.map(list)
X = RowMatrix(rdd)

In [9]:
X.numCols()

19

In [10]:
summary = Statistics.colStats(rdd)
means = summary.mean()
means

array([-0.06635729, -0.11428057,  0.11343067,  0.05040407, -0.10184821,
       -0.00780856, -0.07642234, -0.1275254 , -0.02282229, -0.06262571,
        0.12484165,  0.05149121,  0.0084862 ,  0.06069586,  0.14847159,
        0.03277408, -0.08037804,  0.00642936,  0.04285231])

In [11]:
def svd(X, comps):
    svd = X.computeSVD(X.numCols(), computeU=False)
    s = svd.s.toArray()
    V = svd.V.toArray().T
    var = numpy.dot(s[comps:], s[comps:])
    return s[:comps], V[:comps], var

In [12]:
def tilde(X, psi_sqrt, n_sqrt):
    norm = psi_sqrt * n_sqrt
    Xtilde = RowMatrix(X.rows.map(lambda x: x / norm))
    return Xtilde

In [13]:
X = RowMatrix(X.rows.map(lambda x: x - means))

In [14]:
iter = 0
DELTA = 1e-12
MAX_ITER = 100

N, P = X.numRows(), X.numCols()
ll = old_ll = -numpy.inf
psi = numpy.ones(P, dtype=numpy.float64)
var = summary.variance()
nsqrt = numpy.sqrt(N)

n_components = 2
logliks = []
llconst = P * numpy.log(2. * numpy.pi) + n_components

In [15]:
for i in range(5):
    sqrt_psi = numpy.sqrt(psi) + DELTA
    s, V, unexp_var = svd(tilde(X, sqrt_psi, nsqrt), 2)
    s = s ** 2
    W = numpy.sqrt(numpy.maximum(s - 1., 0.))[:, numpy.newaxis] * V
    W *= sqrt_psi
    ll = llconst + numpy.sum(numpy.log(s))
    ll += unexp_var + numpy.sum(numpy.log(psi))
    ll *= -N / 2.
    psi = numpy.maximum(var - numpy.sum(W ** 2, axis=0), DELTA)
    logliks.append(ll)
    if numpy.abs(ll - old_ll) < 0.001:
        break
    old_ll = ll

In [16]:
Ih = numpy.eye(len(W))
Wpsi = W / psi
cov_z = scipy.linalg.inv(Ih + numpy.dot(Wpsi, W.T))
tmp = numpy.dot(Wpsi.T, cov_z)

In [17]:
tmp_dense = DenseMatrix(numRows=tmp.shape[0], numCols=tmp.shape[1], values=tmp.flatten())

In [18]:
Xtran = X.multiply(tmp_dense)

In [19]:
Xtran.rows.take(5)

[DenseVector([0.1245, 0.8663]),
 DenseVector([0.1644, -0.0257]),
 DenseVector([0.8964, 0.2646]),
 DenseVector([0.5474, 0.7118]),
 DenseVector([-0.6877, 0.0961])]

In [16]:
file_name = "/Users/simondi/PHD/data/data/target_infect_x/query_data/cells_sample_10_normalized_cut_100_factors/"
df = spark.read.parquet(file_name)

In [21]:
X = spark.createDataFrame(Xtran.rows.map(lambda x: (x,)))
X = X.withColumnRenamed("_1", "features")


In [28]:
from pyspark.sql.functions import udf

In [31]:
as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT())

In [32]:
result = X.withColumn("features", as_ml("features"))

In [33]:
result

DataFrame[features: vector]