# Clustering StackOverflow Q&A

EPFL Big Data Analysis Week 2 Assignment
https://www.coursera.org/learn/scala-spark-big-data/home/info

"The overall goal of this assignment is to implement a distributed k-means algorithm which clusters posts on the popular question-answer platform StackOverflow according to their score. Moreover, this clustering should be executed in parallel for different programming languages, and the results should be compared.

The motivation is as follows: StackOverflow is an important source of documentation. However, different user-provided answers may have very different ratings (based on user votes) based on their perceived value. Therefore, we would like to look at the distribution of questions and their answers. For example, how many highly-rated answers do StackOverflow users post, and how high are their scores? Are there big differences between higher-rated answers and lower-rated ones?"

Data file download link: http://alaska.epfl.ch/~dockermoocs/bigdata/stackoverflow.csv

**WORK IN PROGRESS**

In [1]:
import time

# Credits to Fahim Sakri 
# Source (https://medium.com/pythonhive/python-decorator-to-measure-the-execution-time-of-methods-fa04cb6bb36d)
# An annotation for timing a python function
def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        if 'log_time' in kw:
            name = kw.get('log_name', method.__name__.upper())
            kw['log_time'][name] = int((te - ts) * 1000)
        else:
            print ("%r  %2.2f ms" % (method.__name__, (te - ts) * 1000))
        return result
    return timed

from post import Post

In [2]:
## Setup
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark import SparkConf, SparkContext
import pandas as pd
import numpy as np

spark = SparkSession \
    .builder \
    .appName("EPFL Wk2 Assignment") \
    .getOrCreate()
        
spark.conf.set("spark.executor.instances", 1)
spark.conf.set("spark.executor.cores", 1)
spark.conf.set("spark.cores.max", 1)
spark.sparkContext.addPyFile('post.py')

# Create RDD
data = spark.read.csv('/data/epfl-big-data-analysis/stackoverflow.csv', header=False, inferSchema=True)
data = data.withColumnRenamed("_c0", "post_type_id") # type 1 = question, type 2 = answer
data = data.withColumnRenamed("_c1", "id")
data = data.withColumnRenamed("_c2", "acceptedAnswerId")
data = data.withColumnRenamed("_c3", "parentId")
data = data.withColumnRenamed("_c4", "score")
data = data.withColumnRenamed("_c5", "tag")

#data = pd.read_csv('/data/epfl-big-data-analysis/stackoverflow.csv', 
#                   names=["post_type_id", "id", "acceptedAnswerId", "parentId", "score", "tag"],
#                   dtype={'post_type_id': np.int64, 'score': np.float16})
data_size = data.count()

In [3]:
data.select('tag').distinct().collect()

[Row(tag='C#'),
 Row(tag='JavaScript'),
 Row(tag='Perl'),
 Row(tag=None),
 Row(tag='C++'),
 Row(tag='Groovy'),
 Row(tag='Objective-C'),
 Row(tag='CSS'),
 Row(tag='MATLAB'),
 Row(tag='Haskell'),
 Row(tag='Scala'),
 Row(tag='Clojure'),
 Row(tag='PHP'),
 Row(tag='Ruby'),
 Row(tag='Python'),
 Row(tag='Java')]

In [5]:
posts = spark.sparkContext.parallelize(data.head(100)).filter(lambda p: p.tag is not None or p.post_type_id == 2)
data_size = 100                                                 
#langs = ["JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
#"Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy"]
print(posts.take(1))
#posts.filter(lambda p: p.post_type_id == 1)
langs = posts.map(lambda p: p.tag).distinct().collect()
print(langs)

[Row(post_type_id=1, id=27233496, acceptedAnswerId=None, parentId=None, score=0, tag='C#')]
[None, 'Objective-C', 'Java', 'Python', 'JavaScript', 'CSS', 'C++', 'PHP', 'C#', 'Ruby']


### Step 1 Preparation - Grouped posts by question
First we use the map function to create kv pairs for each type of posts namely questions and answers.  
Then a join operation is used for merging the two datasets.  A dataset `RDD[(QID, Iterable(Question, Answer))]` should be useful, the key is the ID of the question post and the values is a collection of tuple (Question, Answer).

In [6]:
questions = posts.filter(lambda p: p.post_type_id == 1).map(lambda p: (p.id, p))
answers = posts.filter(lambda p: p.post_type_id == 2).map(lambda p: (p.parentId, p))
grouped = questions.join(answers).groupByKey() # Use inner join to exclude posts with no answers
print(questions.take(1))
print(answers.take(1))
print(grouped.take(2))

[(27233496, Row(post_type_id=1, id=27233496, acceptedAnswerId=None, parentId=None, score=0, tag='C#'))]
[(5484340, Row(post_type_id=2, id=5494879, acceptedAnswerId=None, parentId=5484340, score=1, tag=None))]
[(21984912, <pyspark.resultiterable.ResultIterable object at 0x7fc5a5fd6898>), (19402497, <pyspark.resultiterable.ResultIterable object at 0x7fc5d041acc0>)]


### Step 2 Calculate maximum answer score for each question
Produce a set of key-value pairs - Key of the pair is the question and value should be the maximum answer score of the question.  The output is an `RDD[(Posting, Int)]`

In [7]:
def post_max_scores(iterable):
    max_score = -1
    for pair in iterable:
        question = pair[0]
        answer_score = pair[1].score
        if answer_score > max_score:
            max_score = answer_score
        return (question, max_score)

post_scores = grouped.values().map(post_max_scores) #(post_max_score)
print(post_scores.take(2))

[(Row(post_type_id=1, id=21984912, acceptedAnswerId=None, parentId=None, score=0, tag='Java'), 0), (Row(post_type_id=1, id=19402497, acceptedAnswerId=None, parentId=None, score=0, tag='PHP'), 1)]


### Step 3 Create vectors for clustering
Prepare the vectors as an input for clustering.  

<br/>
Index of the language (in the langs list) multiplied by the `langSpread` factor.

The highest answer score (computed above).

The `langSpread factor` is provided (set to 50000). Basically, it makes sure posts about different programming languages have at least distance 50000 using the distance measure provided by the euclideanDist function. You will learn later what this distance means and why it is set to this value. The output is `RDD[(Int, Int)]`

In [50]:
langSpread = 50000

vectors = post_scores.map(lambda s: (langs.index(s[0].tag)*langSpread, s[1]))
print(vectors.count())
print(vectors.take(1))

25
[(100000, 0)]


In [42]:
# K-means parameter: Number of clusters
kmeansKernels = 3

# K-means parameter: Convergence criteria
kmeansEta = 20

# K-means parameter: Maximum iterations
kmeansMaxIterations = 1 # 120

sample_ratio = 1/len(langs)#/len(langs)

print(sample_ratio)
fractions = dict([(langs.index(lang)*langSpread, 0.1) for lang in langs])
sample_vectors = vectors.sampleByKey(False, fractions).take(kmeansKernels)
print(fractions)
print(vectors.count())
print(sample_vectors)


0.1
{0: 0.1, 50000: 0.1, 100000: 0.1, 150000: 0.1, 200000: 0.1, 250000: 0.1, 300000: 0.1, 350000: 0.1, 400000: 0.1, 450000: 0.1}
25
[(400000, 1), (150000, 1), (100000, 0)]


In [55]:

## Helper methods for clustering
from scipy.spatial import distance

def euclideanDistanceSum(v1, v2):
    distance_sum = 0
    for i in np.arange(0, len(v1)):
        vector1 = v1[i]
        vector2 = v2[i]
        distance_sum = distance_sum + distance.euclidean(vector1, vector2)
    return distance_sum

def findClosest(p, means):
    closest_distance = np.iinfo(np.int32).max
    for i in np.arange(0, len(means)):
        m = means[i]
        dist = distance.euclidean(p, m)
        if dist < closest_distance:
            closest_i = i
            closest_distance = dist
    return closest_i

def averageVectors(v):
    sum1 = 0
    sum2 = 0
    for vector in v:
        sum1 = sum1 + vector[0]
        sum2 = sum2 + vector[1]
    return (sum1/len(v), sum2/len(v))

sample_means = sample_vectors
print(vectors.take(1))    
#kmeans(sample_vectors, post_scores)
#def kmeans(sample_means, vectors):
itr = 1
means = sample_means
#while itr <= kmeansMaxIterations:
newMeans = vectors.map(lambda v: (findClosest(v, means), v)).reduceByKey(lambda v1, v2: averageVectors(list(v1, v2)))
print(newMeans.take(1))
#diff = euclideanDistanceSum(means, newMeans)
#print(diff)
        
        

[(100000, 0)]


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 98.0 failed 1 times, most recent failure: Lost task 1.0 in stage 98.0 (TID 369, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 362, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1857, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
  File "<ipython-input-55-17b286434dc7>", line 38, in <lambda>
TypeError: list() takes at most 1 argument (2 given)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2438, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 362, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1857, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
  File "<ipython-input-55-17b286434dc7>", line 38, in <lambda>
TypeError: list() takes at most 1 argument (2 given)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


 `val newMeans = means.clone # Copy the means
    # For each vector find the closest centroids, once a cluster is formed, calculate average
    val centroids = vectors.map {
      v => (findClosest(v, means), v)
    }.groupByKey().mapValues {
      vs => averageVectors(vs)
    }.collect()
    
    # Replace with new mean
    for ((i, centroid) <- centroids) {
      newMeans.update(i, centroid) // replace the i-th entry with new centroid
    }

    val distance = euclideanDistance(means, newMeans)

    if (debug) {
      println(s"""Iteration: $iter
                 |  * current distance: $distance
                 |  * desired distance: $kmeansEta
                 |  * means:""".stripMargin)
      for (idx <- 0 until kmeansKernels)
        println(f"   ${means(idx).toString}%20s ==> ${newMeans(idx).toString}%20s  " +
          f"  distance: ${euclideanDistance(means(idx), newMeans(idx))}%8.0f")
    }

    if (converged(distance)) {
      newMeans
    } else if (iter < kmeansMaxIterations) {
      kmeans(newMeans, vectors, iter + 1, debug)
    } else {
      println("Reached max iterations!")
      newMeans
    }`

Based on these initial means, and the provided variables converged method, implement the K-means algorithm by iteratively:

pairing each vector with the index of the closest mean (its cluster);

computing the new means by averaging the values of each cluster.

To implement these iterative steps, use the provided functions findClosest, averageVectors, and euclideanDistance.

Note 1:

In our tests, convergence is reached after 44 iterations (for langSpread=50000) and in 104 iterations (for langSpread=1), and for the first iterations the distance kept growing. Although it may look like something is wrong, this is the expected behavior. Having many remote points forces the kernels to shift quite a bit and with each shift the effects ripple to other kernels, which also move around, and so on. Be patient, in 44 iterations the distance will drop from over 100000 to 13, satisfying the convergence condition.

If you want to get the results faster, feel free to downsample the data (each iteration is faster, but it still takes around 40 steps to converge):

val scored  = scoredPostings(grouped).sample(true, 0.1, 0)
However, keep in mind that we will test your assignment on the full data set. So that means you can downsample for experimentation, but make sure your algorithm works on the full data set when you submit for grading.

Note 2:

The variable langSpread corresponds to how far away are languages from the clustering algorithm's point of view. For a value of 50000, the languages are too far away to be clustered together at all, resulting in a clustering that only takes scores into account for each language (similarly to partitioning the data across languages and then clustering based on the score). A more interesting (but less scientific) clustering occurs when langSpread is set to 1 (we can't set it to 0, as it loses language information completely), where we cluster according to the score. See which language dominates the top questions now?

In [None]:
 https://github.com/seahrh/stackoverflow-spark
        
 val lines   = sc.textFile("src/main/resources/stackoverflow/stackoverflow.csv")  

  val raw     = rawPostings(lines)  

  val grouped = groupedPostings(raw)  

  val scored  = scoredPostings(grouped)  

  val vectors = vectorPostings(scored)
    
    lines: the lines from the csv file as strings

raw: the raw Posting entries for each line

grouped: questions and answers grouped together

scored: questions and scores

vectors: pairs of (language, score) for each question