In [1]:
import sys
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
import xml.etree.ElementTree as ET
import re
from pyspark.sql.functions import stddev_pop, avg, broadcast

In [2]:
def preProcessUsers(data):
	try:
		root = ET.fromstring(data)
	        return [int(root.attrib['Id']), int(root.attrib['Reputation']), int(root.attrib['Views']), int(root.attrib['UpVotes']), int(root.attrib['DownVotes']), int(root.attrib['Age'])]
	except:
		print("Ignoring record")



def preProcessBadges(data):
	try:
		root = ET.fromstring(data)
	        return [int(root.attrib['UserId']), root.attrib['Name']]
	except:
		print("Ignoring record")

def preProcessPosts(data):
	try:
		root = ET.fromstring(data)
	        return [int(root.attrib['Id']), int(root.attrib['PostTypeId']), root.attrib['OwnerUserId']]
	except:
		print("Ignoring record")

def preProcessComments(data):
	try:
		root = ET.fromstring(data)
	        return [int(root.attrib['Id']), int(root.attrib['UserId'])]
	except:
		print("Ignoring record")

In [3]:
output = sc.textFile("stackOverflowUserOutput", 10)
users = sc.textFile("/data/stackoverflow/Users", 25)
posts = sc.textFile("/data/stackoverflow/Posts", 25)
comments = sc.textFile("/data/stackoverflow/Comments", 25)

In [4]:
def preprocessOutput(data):
	x = data.split(",")
	return (int(x[0].replace("((","")), float(x[1].replace(")","")), int(x[2].replace(")","")))

In [5]:
outputDF = output.map(preprocessOutput).filter(lambda x: x is not None).toDF(['Cluster', 'Score', 'UserId'])

In [6]:
usersDF = users.map(preProcessUsers).filter(lambda x: x is not None).toDF(['UserId', 'Reputation', 'Views', 'UpVotes', 'DownVotes', 'Age'])

postsData = posts.map(preProcessPosts).filter(lambda x: x is not None).toDF(['Id', 'PostTypeId', 'UserId'])
filteredPostData = postsData.filter("PostTypeId = 2")
postsDF = filteredPostData.groupBy("UserId").count().withColumnRenamed("count", "post_count")

commentsData = comments.map(preProcessComments).filter(lambda x: x is not None).toDF(['Id', 'UserId'])
commentsDF = commentsData.groupBy("UserId").count().withColumnRenamed("count", "comment_count")

In [7]:
badges = sc.textFile("/data/stackoverflow/Badges", 25)
badgesData = badges.map(preProcessBadges).filter(lambda x: x is not None)
badgesDF = badgesData.groupByKey().map(lambda (key, val): (key, list(val))).toDF(['UserId', 'Badges'])

In [8]:
finalData = outputDF.join(usersDF, ["UserId"], "left_outer").join(postsDF, ["UserId"], "left_outer").join(commentsDF, ["UserId"], "left_outer").join(badgesDF, ["UserId"], "left_outer").fillna(0).cache()

In [9]:
finalData.show()

+------+-------+------------------+----------+-----+-------+---------+---+----------+-------------+--------------------+
|UserId|Cluster|             Score|Reputation|Views|UpVotes|DownVotes|Age|post_count|comment_count|              Badges|
+------+-------+------------------+----------+-----+-------+---------+---+----------+-------------+--------------------+
|   231|      3|10.635831749543929|      3781|  175|    167|        3| 32|        39|           22|[Nice Question, P...|
|   431|      3| 10.26390043907491|      2733|  319|    233|        1| 34|        82|           53|[Famous Question,...|
|   631|      3| 20.74530278358818|      7050|  551|    690|        3| 36|        81|           86|[Quorum, Good Ans...|
|  1031|      3| 9.272470473670413|      1771|  212|    100|        4| 43|         8|            9|[Popular Question...|
|  1231|      3|17.605089046029455|      3738|  537|    427|       20| 38|        48|          137|[Popular Question...|
|  1631|      3|13.6420117668529

In [10]:
finalData.write.format('com.databricks.spark.csv').save('withBadgeCSV')

In [3]:
def preprocessOutputMlib(data):
	x = data.split(",")
	return (int(x[0].replace("(","")), int(x[1].replace(")","")))

In [4]:
outputMlib = sc.textFile("ClustersMlib", 10)
outputMlibDF = outputMlib.map(preprocessOutputMlib).filter(lambda x: x is not None).toDF(['Cluster', 'UserId'])
outputMlibDF.write.format('com.databricks.spark.csv').save('mlibCSV')