In [457]:
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.mllib import linalg
from pyspark.sql.functions import *
from pyspark.sql.window import *
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

In [2]:
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()
sqlContext = SQLContext(sc)

In [None]:
df = ss.read.csv("./digit-recognizer/train.csv",header=True,inferSchema=True)

In [536]:
def cosine_similarity(arr):
    return float(arr[0].dot(arr[1])/
                 ((arr[0].dot(arr[0])**0.5) * (arr[1].dot(arr[1])**0.5)))

def laplacian_vector(row_id,arr,size,k):
    lap_vec = np.zeros(size,dtype=int)
    lap_vec[np.array(arr)] = 1
    lap_vec[row_id] = -k
    return list([int(item) for item in lap_vec])

sqlContext.registerFunction('COS_SIM',cosine_similarity,returnType=t.DoubleType())
sqlContext.registerFunction("LAP_VECTOR",laplacian_vector,returnType=t.ArrayType(t.IntegerType()))



class Spectral_Clustering():
    def __init__(self,k=2,k_nearest=7,num_eigenvectors = 10,featureCol='features',predictionCol='predictions'):
        self.k = k
        self.k_nearest = k_nearest
        self.num_eigenvectors = num_eigenvectors
        self.featureCol = featureCol
        self.predictionCol = predictionCol
    def cluster(self, df):
        sqlContext = SQLContext(SparkContext.getOrCreate())
        n = df.count()
        # index rows
        df_index = df.select((row_number().over(Window.partitionBy().orderBy(self.featureCol)) - 1).alias('id'),"*")
        df_features = df_index.select('id',self.featureCol)
        sqlContext.registerDataFrameAsTable(df_features,'feature_table')
        # k nearest neighbors
        knn = sqlContext.sql(f'''
                               SELECT
                                   *
                               FROM
                                (SELECT
                                    left_id,
                                    right_id,
                                    RANK() OVER (PARTITION BY left_id ORDER BY norm) AS rank,
                                    norm
                                FROM 
                                 (SELECT 
                                    lq.id as left_id,
                                    rq.id as right_id,
                                    lq.{self.featureCol} as left_features,
                                    rq.{self.featureCol} as right_features,
                                    COS_SIM(ARRAY(lq.{self.featureCol},rq.{self.featureCol})) as norm
                                 FROM 
                                    feature_table as LQ
                                    JOIN
                                    feature_table as RQ
                                    on LQ.id != RQ.id)
                               )
                               WHERE
                                   rank <= {self.k_nearest}
                             ''')
        sqlContext.registerDataFrameAsTable(knn,'knn')
        # compute laplacian
        laplacian = sqlContext.sql(f"""
                                    SELECT left_id, LAP_VECTOR(left_id,nn,{n},{self.k_nearest}) as lap_vector
                                    FROM
                                     (SELECT 
                                        left_id,
                                        collect_list(right_id) as nn
                                      FROM 
                                          knn
                                      GROUP BY
                                      1)
                                    ORDER BY
                                        1
                                    """)
        laplacian_matrix = RowMatrix(laplacian.select('lap_vector').rdd.map(lambda x:list(x[0])))
        eigenvectors = laplacian_matrix.computePrincipalComponents(k=self.num_eigenvectors)
        
        eigenvectors = [(idx,Vectors.dense([float(item) for item in row])) 
                        for idx, row in enumerate(eigenvectors.toArray().tolist())]
        
        eigen_df = ss.createDataFrame(eigenvectors,['id',self.featureCol])
        model = KMeans(featuresCol=self.featureCol,predictionCol=self.predictionCol,k=self.k).fit(eigen_df)
        predictions = model.transform(eigen_df).join(df_index,on='id')
        return predictions
    
    


In [None]:
spec_clust = Spectral_Clustering()
assembler = VectorAssembler(outputCol='features',inputCols=df.columns[1:])
df_features = assembler.transform(df.limit(1000))
spec_clust.cluster(df_features).select('id','predictions','label').show(100)