In [1]:
### Import necessary Graphframes and SparkSQL packages

In [1]:
from graphframes import *
from pyspark.sql.functions import *
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StackOverflow").enableHiveSupport().getOrCreate()
sqlContext = SparkSession(sc)

### Load the questions and answers files and create temporary tables

In [6]:
import pandas as pd
import functools

from pyspark.sql import functions as F
from graphframes import GraphFrame
from pyspark.sql import types as T
from pyspark.sql import Window

In [2]:
# get the questions and answers dataset

questions = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true').load("file:///home/cloudera/Documents/Big_Data_Management_2/Assignment/Cleaned_Questions.csv")
answers = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true').load("file:///home/cloudera/Documents/Big_Data_Management_2/Assignment/answers_consolidated.csv")

questions = questions.select("id", "tags", "creationdate").dropna()
answers.registerTempTable("answers")

In [3]:
def tags_to_array(x):
    
    """Function to convert list of tags from string to array."""
    
    x_split = x[1:-1].split(", ")
    x_clean = list(map(lambda x: x[1: -1], x_split))
    
    return x_clean

# create udf to get tags as list
udf_tags_to_array = F.udf(tags_to_array, T.ArrayType(T.StringType()))

In [4]:
# get separate row for all tags in list
questions_tags_array = questions.withColumn("tags", F.explode(udf_tags_to_array("tags")))
questions_tags_array.select(F.col("tags").alias("tag"), 
                            F.col("id").alias("question_id"), 
                            F.col("creationdate").alias("ques_create_date")
                           ).registerTempTable("ques_tags")

### For each unique queston and tag combination, build one row and add the tag_id created above

In [8]:
def unionAll(*dfs):
        return functools.reduce(lambda df1, df2: df1.union(df2), dfs)

In [12]:
# append all relationships to get the edges dataframe

edge1 = sqlContext.sql("""select question_id as src, tag as dst, 
                            null as is_accepted, null as score, null as reputation 
                            from ques_tags """) # question-tag

edge2 = sqlContext.sql("""select answer_id as src, question_id as dst,
                            is_accepted, score, null as reputation
                            from answers""") # answer-question

edge3 = sqlContext.sql("""select user_id as src, answer_id as dst,
                            null as is_accepted, null as score, reputation
                            from answers""") # user-answer

edges_df = unionAll(edge1,edge2,edge3)

print(edges_df.persist().count())

331703


In [11]:
# append all vertices to get the vertices dataframe

vertices1 = sqlContext.sql("select distinct(question_id) as id, 'question' as type from ques_tags ")
vertices2 = sqlContext.sql("select distinct(tag) as id, 'tag' as type from ques_tags ")
vertices3 = sqlContext.sql("select distinct(answer_id) as id, 'answer' as type from answers")
vertices4 = sqlContext.sql("select user_id as id, 'user' as type from answers")
vertices_df = unionAll(vertices1,vertices2,vertices3,vertices4)

print(vertices_df.persist().count())

232714


In [13]:
# create graphframe using vertices and edges
main_graph = GraphFrame(vertices_df, edges_df)
print(main_graph)

GraphFrame(v:[id: string, type: string], e:[src: string, dst: string ... 3 more fields])


In [14]:
# transform graph using the provided motif
graph_motif = main_graph.find("""(user)-[user_to_answer]->(answer);
                                 (answer)-[answer_to_question]->(question);
                                 (question)-[question_to_tag]->(tag)"""
                             )

In [15]:
print(graph_motif.persist().count())

graph_motif.show()

2292729
+---------------+--------------------+------------------+--------------------+--------------------+--------------------+-------------------+
|           user|      user_to_answer|            answer|  answer_to_question|            question|     question_to_tag|                tag|
+---------------+--------------------+------------------+--------------------+--------------------+--------------------+-------------------+
|[2623104, user]|[2623104, 4497404...|[44974041, answer]|[44974041, 346460...|[34646021, question]| [34646021, rest,,,]|        [rest, tag]|
|[2623104, user]|[2623104, 4497404...|[44974041, answer]|[44974041, 346460...|[34646021, question]|[34646021, spring...|  [spring-mvc, tag]|
|[2623104, user]|[2623104, 4497404...|[44974041, answer]|[44974041, 346460...|[34646021, question]|[34646021, rest-a...|[rest-assured, tag]|
|[2623104, user]|[2623104, 4497404...|[44974041, answer]|[44974041, 346460...|[34646021, question]|[34646021, mockmv...|     [mockmvc, tag]|
|[262

In [16]:
# get the user-tag combination with given metrics
user_tag_map = graph_motif.filter(
    "answer_to_question.is_accepted = 'true'"
).groupby(
    F.col("user.id").alias("user"), F.col("tag.id").alias("tag")
).agg(
    F.avg("answer_to_question.score").alias("avg_score_per_answer"),
    F.avg("user_to_answer.reputation").alias("user_reputation")
).withColumn(
    "composite_score", 
    (F.col("avg_score_per_answer") + F.col("user_reputation"))/2
)

In [17]:
w = Window.partitionBy("tag").orderBy(F.desc("composite_score"))

In [21]:
# get the top 5 users for each tag
expert_tag_map = user_tag_map.withColumn(
    "rank", F.rank().over(w)
).filter(
    F.col("rank") <= 5
).select(
    "user", "tag", "avg_score_per_answer", "user_reputation"
)

In [22]:
expert_tag_map.persist().count()

13695

In [23]:
expert_tag_map.toPandas().to_csv("experts_tag_map.csv", index = False)