Based on: 
    https://github.com/asdspal/dimRed/blob/master/tsne.ipynb

In [1]:
sc

In [2]:
# necessary imports
from sklearn import datasets
from pyspark.sql import SQLContext as SQC
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.mllib.linalg.distributed import IndexedRowMatrix , IndexedRow

from pyspark.sql.types import *
from pyspark.sql.functions import udf, struct, collect_list, lit,col
import math as m
import numpy as np

In [3]:
sqlContext = SQC(sc)

In [4]:
# read the dataset
iris = datasets.load_iris()
iris.data.shape

(150, 4)

In [5]:
iris_rdd = sc.parallelize(iris.data).zipWithIndex().map(lambda x:(x[1], Vectors.dense(x[0])))
iris_df  = sqlContext.createDataFrame(iris_rdd, ["id", "features"])
n,p = iris.data.shape
n_dim = 2


In [7]:
iris_df.show()

+---+-----------------+
| id|         features|
+---+-----------------+
|  0|[5.1,3.5,1.4,0.2]|
|  1|[4.9,3.0,1.4,0.2]|
|  2|[4.7,3.2,1.3,0.2]|
|  3|[4.6,3.1,1.5,0.2]|
|  4|[5.0,3.6,1.4,0.2]|
|  5|[5.4,3.9,1.7,0.4]|
|  6|[4.6,3.4,1.4,0.3]|
|  7|[5.0,3.4,1.5,0.2]|
|  8|[4.4,2.9,1.4,0.2]|
|  9|[4.9,3.1,1.5,0.1]|
| 10|[5.4,3.7,1.5,0.2]|
| 11|[4.8,3.4,1.6,0.2]|
| 12|[4.8,3.0,1.4,0.1]|
| 13|[4.3,3.0,1.1,0.1]|
| 14|[5.8,4.0,1.2,0.2]|
| 15|[5.7,4.4,1.5,0.4]|
| 16|[5.4,3.9,1.3,0.4]|
| 17|[5.1,3.5,1.4,0.3]|
| 18|[5.7,3.8,1.7,0.3]|
| 19|[5.1,3.8,1.5,0.3]|
+---+-----------------+
only showing top 20 rows



In [8]:
iris_df.count()

150

In [9]:
def weight(x, avg):
    
    if int(x)==1:
        return(0.0)
    elif x>2.5*avg:
        return(x)
    else:
        return(0.0)

    
def sort_list(x):
    
    y = sorted(x, key = lambda x: x[0])
    y = [s[1] for s in y]
    return(y)


In [10]:
# calculate the euclidean distance
udf_dist = udf(lambda x, y:  float(x.squared_distance(y)), DoubleType())

udf_weight = udf(weight, DoubleType())

# sort the distances according to id
udf_sort = udf(sort_list, ArrayType(DoubleType()))


In [11]:
# create the function to calculate the eucidean distances vector for each point
def dist_df(X_df):
    
    X_2 = X_df.crossJoin(X_df ).toDF('x_id', 'x_feature', 'y_id', 'y_feature')
    X_dist = X_2.withColumn("sim", udf_dist(X_2.x_feature, X_2.y_feature))
    avg = X_dist.groupBy().mean("sim").collect()[0][0]

    X_dist = X_dist.drop("x_feature","y_feature")

    nameCol = struct([name for name in ["y_id", "sim"]]).alias("map")
    X_dist = X_dist.select("x_id", nameCol)
    X_dist  = X_dist.groupby("x_id").agg(collect_list("map"))
    Dist = X_dist.withColumn("sims", udf_sort("collect_list(map)")).drop("collect_list(map)")
    
    return(Dist.select(col("x_id").alias("id"),col("sims").alias("Dist")))


In [12]:
DF_dist = dist_df(iris_df)

# entropy and p calculation is same as defined above


In [13]:
def entropy(dist, beta):
    
    
    p = np.exp(-dist*beta)
    
    sum_p = p.sum()
    p = p/sum_p
    
    res = (p*np.log(p)).sum()
    
    return(-res, p)

def get_p(dists, perplexity,i):
    dists = np.array(dists)
    beta_min = float("-inf")
    beta_max = float("inf")
    log_u = np.log(30)
    dists = np.delete(dists,[i])
    func = lambda x: entropy(dists, x)
    
    beta = 1.0
    
    f1,p = func(beta)
    
    iter = 0
    diff = f1-log_u
    while(True):
        if np.abs(diff) < 1e-5:
            
            p = np.insert(p,i,0)
            
            return(p.tolist())
        
        if diff > 0.0:
        
            beta_min = beta
            if beta_max == np.inf or beta_max == -np.inf:
                beta *= 2.0
            else:
                beta = (beta + beta_max)/2.0
        else:
            beta_max = beta
            if beta_min == np.inf or beta_min == -np.inf:
                beta /= 2.0
    

            else:
                beta = (beta + beta_min) / 2.0
        
        iter += 1
        f1,p = func(beta)
        diff = f1-log_u
        if iter > 50:
            p = np.insert(p,i,0)
            
            return(p.tolist())


In [14]:
udf_pi = udf(get_p, ArrayType(DoubleType()))

DF = DF_dist.withColumn("P", udf_pi("Dist",lit(30),"id")).drop("Dist")

# change pij to symmetric probability distribution

P_bm = IndexedRowMatrix(DF.select("id","P").rdd.map(lambda x: IndexedRow(x.id, x.P))).toBlockMatrix(10,10)
P = P_bm.add(P_bm.transpose())

udf_sum = udf(lambda x: float(np.sum(x)), DoubleType())


In [15]:
DF = (P.toIndexedRowMatrix().rows.map(lambda x:(x.index, x.vector) ).toDF(["id","P"])
                           .withColumn("Psum",udf_sum("P")))

# normalize the P matrix
Psum  = DF.groupBy().sum("Psum").collect()[0]["sum(Psum)"]

udf_divide = udf(lambda x,Psum:(np.array(x)/Psum).tolist(), ArrayType(DoubleType()))
DF = DF.withColumn("P", udf_divide("P",lit(Psum))).drop("Psum")

# ransom sample for the intial Y
np.random.seed(100)


In [16]:
Y = np.random.rand(n,n_dim).tolist()
Y_rdd = sc.parallelize(Y).zipWithIndex()

Y_df = spark.createDataFrame(Y_rdd,["Y","_id"])

DF = DF.join(Y_df,Y_df["_id"]==DF["id"]).drop("_id")



In [17]:
def sort_foo(x):
    
    x = sorted(x, key = lambda y:y[0])
    dists = list(map(lambda y:y[1][0], x))
    q = list(map(lambda y:1/(1 + y**2), dists))
    qsum  = float(np.sum(q))
    subs = list(map(lambda y:y[1][1], x))
    return([qsum, q, subs])

# function to calculate the q_ij from Yi and Yj


In [18]:
def Q_sub(df_Y):
    
    df2 = (df_Y.select(col("id").alias("id1"),col("features").alias("features1")).
            crossJoin(df_Y.select(col("id").alias("id2"),col("features").alias("features2"))))

    udf_dist = udf(lambda x, y:  float(np.sum((np.array(x)-np.array(y))**2)), DoubleType())
    udf_sub  = udf(lambda x,y:(np.array(x)-np.array(y)).tolist(), ArrayType(DoubleType()))

    df_dist = (df2.withColumn("sim", udf_dist(df2.features1, df2.features2))
            .withColumn("sub",udf_sub("features1","features2")))

    df3 = df_dist.select("id1","id2",struct("sim","sub").alias("val"))
    df4 = df3.select("id1",struct("id2","val").alias("set"))

    st = StructType([StructField("qsum", DoubleType()),
                     StructField("q", ArrayType(DoubleType())),
                 StructField("sub",ArrayType(ArrayType(DoubleType())))])

    udf_sort = udf(sort_foo,st)

    df5 = df4.groupBy("id1").agg(collect_list("set").alias("set"))
    df6 = (df5.withColumn("set",udf_sort("set")).select(col("id1").alias("_id"),
                                                col("set.qsum").alias("Qsum"),
                                                col("set.q").alias("Q"),
                                                col("set.sub").alias("Zsub")))
        
    return(df6)


In [19]:
Y = DF.select("id", col("Y").alias("features"))
udf_foo = udf(lambda x:np.zeros(x).tolist(), ArrayType(DoubleType()))

DF1 = Q_sub(Y)
DF = DF.join(DF1, DF["id"]==DF1["_id"]).drop("_id")
Qsum = DF.groupBy().sum("Qsum").collect()[0]["sum(Qsum)"]
DF = DF.withColumn("Y_t_1",udf_foo(lit(n_dim)))
DF = DF.select("id","P","Q","Zsub","Y", "Y_t_1",col("Y_t_1").alias("Y_t_2"))

# cost function


In [20]:
DF.show()

+---+--------------------+--------------------+--------------------+--------------------+----------+----------+
| id|                   P|                   Q|                Zsub|                   Y|     Y_t_1|     Y_t_2|
+---+--------------------+--------------------+--------------------+--------------------+----------+----------+
| 26|[2.59989310919824...|[0.98170021906155...|[[-0.183897097854...|[0.35950784393690...|[0.0, 0.0]|[0.0, 0.0]|
| 29|[7.99333745696438...|[0.91737969858859...|[[-0.498542659330...|[0.04486228246077...|[0.0, 0.0]|[0.0, 0.0]|
| 65|[8.95399439694531...|[0.95959172566885...|[[-0.452972153594...|[0.09043278819643...|[0.0, 0.0]|[0.0, 0.0]|
| 19|[2.62360433830685...|[0.95790576619089...|[[0.3471410029375...|[0.89054594472850...|[0.0, 0.0]|[0.0, 0.0]|
| 54|[3.04540692486844...|[0.96152263103819...|[[0.4330951852248...|[0.97650012701585...|[0.0, 0.0]|[0.0, 0.0]|
|  0|[0.0, 8.879165408...|[1.0, 0.899125296...|[[0.0, 0.0], [0.1...|[0.54340494179096...|[0.0, 0.0]|[0.0

In [21]:
DF.count()

150

In [22]:
def kld_error(Q, P, Qsum):
    
    Q,P = np.array(Q)/Qsum, np.array(P)
    P[P==0.0] = 1.0
    cost = float(np.sum(P*np.log(P/Q)))
    return(cost)


In [23]:
udf_kld = udf(kld_error, DoubleType())

DF = DF.withColumn("err",udf_kld("Q","P",lit(Qsum)))

DF.cache()

error_old = DF.groupBy().sum("err").collect()[0]["sum(err)"]
DF = DF.drop("err")
#print("error-old = ",error_old)


In [24]:
#function for one descent step 
def descent_step(q, P, Y_t, Y_t_1, Y_t_2,Z_sub, Q_sum,alpha, eta):
    
    q,P,Y_t,Y_t_1,Y_t_2,Z_sub = list(map(lambda x:np.array(x), [q, P, Y_t, Y_t_1, Y_t_2, Z_sub] ) )
    Q_ = q/Q_sum 
    temp = ((P - Q_)*q).reshape(P.shape[0],1)
    dY = (temp*Z_sub).sum(axis=0)
    Y = Y_t + eta*dY +alpha*(Y_t_1-Y_t_2)
    
    return(Y.tolist())


In [25]:
udf_step = udf(descent_step, ArrayType(DoubleType()))

alpha =0.2
eta = 0.9


In [26]:
max_iter =3
for i in range(max_iter):
    DF = (DF.withColumn("new_Y", udf_step("Q","P","Y","Y_t_1",
                                          "Y_t_2","Zsub",lit(Qsum),lit(alpha),lit(eta))))
    
    DF = (DF.select("id","P",col("new_Y").alias("Y"),
                   col("Y").alias("Y_t_1"),
                   col("Y_t_1").alias("Y_t_2")))
    
    DF1 = DF.select("id",col("Y").alias("features"))
    DF1 = Q_sub(DF1)
    
    DF = DF.join(DF1, DF["id"]==DF1["_id"])
    Qsum = DF.groupBy().sum("Qsum").collect()[0]["sum(Qsum)"]
    
    DF = DF.withColumn("err",udf_kld("Q","P",lit(Qsum)))
    DF.cache()
    error_new = DF.groupBy().sum("err").collect()[0]["sum(err)"]
    DF = DF.drop("err")
    print("i = ",i,"  error = ",error_new)
    if abs(error_old - error_new)<1e-3:
        break
    else:
        error_old = error_new
        



('i = ', 0, '  error = ', 1484.098527057293)
('i = ', 1, '  error = ', 1470.4185788572436)
('i = ', 2, '  error = ', 1470.3678736576699)


In [27]:
DF.cache()
DF.show()


+---+--------------------+--------------------+--------------------+--------------------+---+------------------+--------------------+--------------------+
| id|                   P|                   Y|               Y_t_1|               Y_t_2|_id|              Qsum|                   Q|                Zsub|
+---+--------------------+--------------------+--------------------+--------------------+---+------------------+--------------------+--------------------+
| 26|[2.59989310919824...|[0.43143888645196...|[0.43144539495180...|[0.35952633470949...| 26|  135.291314200893|[0.96196290032354...|[[-0.223013799249...|
| 29|[7.99333745696438...|[0.05257894846682...|[0.05309212683520...|[0.04449155292179...| 29|115.30078956329251|[0.83957913865518...|[[-0.601873737234...|
| 65|[8.95399439694531...|[0.10781165261616...|[0.10812506883334...|[0.09023532849379...| 65|113.18277722598773|[0.91801959276244...|[[-0.546641033085...|
| 19|[2.62360433830685...|[1.07029990425969...|[1.06968185227436...|[0

In [28]:
DF.count()

150

In [29]:
pwd

u'/home/ec2-user/Projects/ScalableML/PySparkDemos'