## K-means in PySpark

The next machine learning method I'd like to introduce is about clustering, K-means. It is an unsupervised learning method where we would like to group the observations into *K* groups (or subsets). We call it "unsupervised" since we don't have associated response measurements together with the observations to help check and evaluate the model we built (of course we can use other measures to evaluate the clustering models).

K-means may be the simplest approach for clustering while it’s also an elegant and efficient method. To produce the clusters, K-means method only requires the number of clusters *K* as its input.

The idea of K-means clustering is that a good clustering is with the smallest within-cluster variation (a measurement of how different the observations within a cluster are from each other) in a possible range. To achieve this purpose, K-means algorithm is designed in a "greedy" algorithm fashion

**K-means Algorithm**

	1. For each observation, assign a random number which is generated from 1 to *K* to it.

	2. For each of the *K* clusters, compute the cluster center. The *k*th cluster’s center is the vector of the means of the vectors of all the observations belonging to the kth cluster.

	3. Re-assign each observation to the cluster whose cluster center is closest to this observation.

	4. Check if the new assignments are the same as the last iteration. If not, go to step 2; if yes, END.


An example of iteration with K-means algorithm is presented below

![alt text](https://raw.githubusercontent.com/XD-DENG/Spark-ML-Intro/master/figures/k-means.gif)


Now it's time to implement K-means with PySpark. I generate a dateset myself, it contains 30 observations, and I purposedly "made" them group 3 sets.

troubleshooting:
- https://stackoverflow.com/questions/23280629/multiple-sparkcontexts-error-in-tutorial
- check this tutorial he based it off of: http://blog.insightdatalabs.com/jupyter-on-apache-spark-step-by-step/

# Dependencies

In [11]:
# from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel

# http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.NaiveBayesModel
from pyspark.mllib.classification import NaiveBayesModel

# http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RankingMetrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics, RankingMetrics
import numpy as np
import pandas as pd
from random import randrange
from math import sqrt

In [12]:
!ls -l

total 518108
drwxrwxr-x 20 ec2-user ec2-user      4096 Jun 28 17:43 anaconda3
-rw-rw-r--  1 ec2-user ec2-user 523283080 May 30 19:24 Anaconda3-4.4.0-Linux-x86_64.sh
-rw-------  1 ec2-user ec2-user       678 Jun 28 17:53 aws_sf17ds6.pem
-rw-rw-r--  1 ec2-user ec2-user       674 Jun 28 18:31 derby.log
-rw-rw-r--  1 ec2-user ec2-user       460 Jun 28 18:30 jupyter-setup.sh
drwxrwxr-x  5 ec2-user ec2-user      4096 Jun 28 18:31 metastore_db
drwxrwxr-x 13 ec2-user ec2-user      4096 Jun 28 17:39 spark
-rw-r--r--  1 ec2-user ec2-user     12986 Jun 28 18:35 spark-play.ipynb
-rw-r--r--  1 ec2-user ec2-user   7211502 Jun 28 18:00 transactions.csv


In [2]:
sc

<pyspark.context.SparkContext at 0x7fde8c5ac4e0>

In [5]:
# sc = SparkContext("local", "Simple App")

In [6]:
# sc.stop(sc)
# sc.getOrCreate("local", "Simple App")

# KMeans Fake Data

In [7]:
# Generate the observations -----------------------------------------------------
n_in_each_group = 10   # how many observations in each group
n_of_feature = 5 # how many features we have for each observation

In [8]:
observation_group_1=[]
for i in range(n_in_each_group*n_of_feature):
	observation_group_1.append(randrange(5, 8))

observation_group_2=[]
for i in range(n_in_each_group*n_of_feature):
	observation_group_2.append(randrange(55, 58))

observation_group_3=[]
for i in range(n_in_each_group*n_of_feature):
	observation_group_3.append(randrange(105, 108))

In [9]:
data = np.array([observation_group_1, observation_group_2, observation_group_3]).reshape(n_in_each_group*3, 5)
data = sc.parallelize(data)

 # Run the K-Means algorithm -----------------------------------------------------


In [13]:
# Build the K-Means model
# the initializationMode can also be "k-means||" or set by users.
clusters = KMeans.train(data, 3, maxIterations=10, initializationMode="random")

Py4JJavaError: An error occurred while calling o35.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage 0.0 (TID 26, 172.31.43.251, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 125, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
	at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:568)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.takeSample(RDD.scala:557)
	at org.apache.spark.mllib.clustering.KMeans.initRandom(KMeans.scala:334)
	at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:254)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:209)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainKMeansModel(PythonMLLibAPI.scala:367)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 125, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
# Collect the clustering result
result=data.map(lambda point: clusters.predict(point)).collect()
print(result)


In [None]:
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# GraphX in PySpark -----------------------------------------------------
- https://stackoverflow.com/questions/23302270/how-do-i-run-graphx-with-python-pyspark
- GraphFrames is the way to go
  - Work flow: http://aosc.umd.edu/~ide/data/teaching/amsc663/14fall/amsc663_14proposalpresentation_stefan_poikonen.pdf
  - http://www.datareply.co.uk/blog/2016/9/20/running-graph-analytics-with-spark-graphframes-a-simple-example
  - http://graphframes.github.io/quick-start.html
  - http://graphframes.github.io/user-guide.html
  - http://go.databricks.com/hubfs/notebooks/3-GraphFrames-User-Guide-python.html
  
## Work Flow
- Data & features
- First set up spark:https://github.com/PiercingDan/spark-Jupyter-AWS
- Set-up pyspark
- set up anaconda
- set-up graphframes

In [None]:
# Create a Vertex DataFrame with unique ID column "id"
v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])
# Create a GraphFrame
from graphframes import *
g = GraphFrame(v, e)

# Query: Get in-degree of each vertex.
g.inDegrees.show()

# Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()

# Run PageRank algorithm, and show results.
results = g.pageRank(resetProbability=0.01, maxIter=20)
results.vertices.select("id", "pagerank").show()

# Initial Classification - tip from guo

In [None]:
data = [LabeledPoint(0.0, [0.0, 0.0]),
        LabeledPoint(0.0, [0.0, 1.0]),
        LabeledPoint(1.0, [1.0, 0.0])]

model = NaiveBayes.train(sc.parallelize(data))
model.predict(array([0.0, 1.0]))
model.predict(array([1.0, 0.0]))

model.predict(sc.parallelize([[1.0, 0.0]])).collect()

sparse_data = [LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
               LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
               LabeledPoint(1.0, SparseVector(2, {0: 1.0}))]

model = NaiveBayes.train(sc.parallelize(sparse_data))
model.predict(SparseVector(2, {1: 1.0}))

model.predict(SparseVector(2, {0: 1.0}))

In [None]:
import os, tempfile
path = tempfile.mkdtemp()
model.save(sc, path)
sameModel = NaiveBayesModel.load(sc, path)
sameModel.predict(SparseVector(2, {0: 1.0})) == model.predict(SparseVector(2, {0: 1.0}))
# True

In [None]:
from shutil import rmtree
try:
    rmtree(path)
    except OSError:
        pass


  - if clustering on csvs, cut the frame down and use sklearn...
  
- Other options:
  - https://github.com/SsureyMoon/Bitcoin-network-cluster 
  - https://github.com/JeremyRubin/BTCSpark
  - https://github.com/mikispag/bitiodine

# for another time: Spark-YARN & performance tuning
- https://docs.continuum.io/anaconda-scale/howto/spark-yarn
- https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/