In [1]:
K = 10

In [2]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName='spectral')

In [3]:
spark = pyspark.sql.SparkSession(sc)

In [4]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix, MatrixEntry, CoordinateMatrix

In [27]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

In [6]:
# spark.conf.set('spark.sql.pivotMaxValues', 335000)

## Get adjacency matrix

In [7]:
txt = sc.textFile('./data/com-amazon.ungraph.txt')
txt.take(5)

['# Undirected graph: ../../data/output/amazon.ungraph.txt',
 '# Amazon',
 '# Nodes: 334863 Edges: 925872',
 '# FromNodeId\tToNodeId',
 '1\t88160']

In [8]:
txt = txt.sample(False, 0.001, 1)

In [9]:
txt = txt.zipWithIndex().filter(lambda x: int(x[1]) >= 4).map(lambda x: x[0].split('\t'))

In [10]:
txt.take(10)

[['2054', '74554'],
 ['2359', '82235'],
 ['3177', '309156'],
 ['3747', '67793'],
 ['4439', '126886'],
 ['5082', '20434'],
 ['6085', '369981'],
 ['6099', '490998'],
 ['6155', '506382'],
 ['6261', '264701']]

In [11]:
N = txt.flatMap(lambda x: [int(xx) for xx in x]).max()
N

548091

In [12]:
upper_entries = txt.map(lambda x: MatrixEntry(int(x[0])-1, int(x[1])-1, 1.0))
lower_entries = txt.map(lambda x: MatrixEntry(int(x[1])-1, int(x[0])-1, 1.0))
type(upper_entries) # rdd
type(lower_entries) # rdd

pyspark.rdd.PipelinedRDD

In [13]:
W = CoordinateMatrix(upper_entries.union(lower_entries), numCols=N, numRows=N)
print(W.numCols())
print(W.numRows())
print(type(W))

548091
548091
<class 'pyspark.mllib.linalg.distributed.CoordinateMatrix'>


In [151]:
# def toSparseRow(N):
#     return lambda val: Vectors.sparse(N, [(int(ii), 1) for ii in val])

# txt.flatMap(lambda x: [int(xx) for xx in x]).max()
# N = txt.flatMap(lambda x: [int(xx) for xx in x]).max()
# rows = txt.map(lambda x: tuple(x)).groupByKey().mapValues(toSparse(N))
# W = IndexedRowMatrix(rows)
# W.numCols()
# W.numRows()

## Graph Laplacian

In [14]:
degrees = upper_entries.map(lambda entry: (entry.i, entry.value)).reduceByKey(lambda a, b: a + b)
entries = degrees.map(lambda x: MatrixEntry(x[0], x[0], x[1]))
D = CoordinateMatrix(entries, numCols=N, numRows=N)

- Ordinay: $$L = D - W$$
- Norlaized: $$L = I - D^{-1}W$$
- Symmetric: $$L = I - D^{-1/2}WD^{-1/2}$$

**Calculating the Laplacian could be expensive.**

In [15]:
L = D.toBlockMatrix().subtract(W.toBlockMatrix()).toCoordinateMatrix()
type(L)

pyspark.mllib.linalg.distributed.CoordinateMatrix

## First k eigenvalues and eigen vectors

In [16]:
svd = L.toRowMatrix().computeSVD(k=K, computeU=False)

In [17]:
type(svd.s)
type(svd.V)

pyspark.mllib.linalg.DenseMatrix

In [17]:
## The PCA method is not that scalable and has a fixed limit of columns (65535)
V = L.computePrincipalComponents(k=K)

IllegalArgumentException: 'Argument with more than 65535 cols: 548551'

## K-means on rows of transformed data

In [23]:
V = svd.V.toArray()
type(V)

numpy.ndarray

In [24]:
VV = spark.createDataFrame(V.tolist())
type(VV)

pyspark.sql.dataframe.DataFrame

In [28]:
VV.schema.names

['_1', '_2', '_3', '_4', '_5', '_6', '_7', '_8', '_9', '_10']

In [29]:
kmeans = KMeans().setK(K).setSeed(1)
vecAssembler = VectorAssembler(inputCols=VV.schema.names, outputCol='features')
VV = vecAssembler.transform(VV)

In [31]:
model = kmeans.fit(VV.select('features'))
clusters = model.transform(VV)

In [33]:
clusters.select('prediction').show()

+----------+
|prediction|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [35]:
clusters.describe('prediction').show()

+-------+--------------------+
|summary|          prediction|
+-------+--------------------+
|  count|              548091|
|   mean|0.006900313998952729|
| stddev|  0.2116750644838857|
|    min|                   0|
|    max|                   9|
+-------+--------------------+



In [36]:
sc.stop()