# Now that you’ve found the answers to the questions above, design two of your own questions to answer. 

For the second question, We're going to be looking for mentions of users in comments, to build a network of connected users, which I will then graph

## Dependencies
Note: this requires graphrames to do the pagerank. To use, simply add the flag `--packages graphframes:graphframes:0.7.0-spark2.4-s_2.11` when running pyspark

In [1]:
import re
import pandas as pd
from graphframes import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, desc, explode, lower
from pyspark.sql.types import StructType, StructField, FloatType, LongType, StringType, BooleanType, ArrayType

sqlContext = SQLContext(sc)

df = sqlContext.read.json("hdfs://orion11:15001/sampled_reddit/*")
columns = [
    "distinguished",
    "downs",
    "created_utc",
    "controversiality",
    "edited",
    "gilded",
    "author_flair_css_class",
    "id",
    "author",
    "retrieved_on",
    "score_hidden",
    "subreddit_id",
    "score",
    "name",
    "author_flair_text",
    "link_id",
    "archived",
    "ups",
    "parent_id",
    "subreddit",
    "body"]

df = df.select("author", "body", "subreddit")


In [2]:
# A function to find a mention in a given comment
def find_mention(val):
    found = re.findall("/u/([a-z0-9_-]+)", val)
    
    if not found:
        return None
    
    return found

mention_udf = udf(find_mention, ArrayType(StringType()))

In [3]:
# Finds the mentions
df = df.withColumn("mention", mention_udf("body"))

# filters out comments where there's no mention
df = df.filter(col("mention").isNotNull()) 

# Apparently we need to lowercase everything. Makes life easier later
df = df.withColumn("author", lower(col("author")))
df.show()

+----------------+--------------------+-------------------+------------------+
|          author|                body|          subreddit|           mention|
+----------------+--------------------+-------------------+------------------+
|       sgspectra|I just stared at ...|fffffffuuuuuuuuuuuu|               [4]|
|         freakie|I saw this post s...|         reddit.com|         [7266319]|
|         redmoss|Hey, not bad for ...|                tf2|              [10]|
|          rankun|If you pmed me.. ...|               pics|               [0]|
|          cjhard|:O Too cool! I ha...|         reddit.com|         [1620808]|
|       kappa-kun|I use [this](http...|            Android|         [5056709]|
|           brash|Here's my [incine...|          Minecraft|            [7366]|
|    sexbob-omb92|http://i41.servim...|               pics|             [f41]|
|     nullfallacy|Go to your [**Pro...|         googleplus|               [0]|
|        hazzypls|Awesome. [This is...|          sta

In [4]:
# Now start to create the edges and verticies for a graphframe
v = df.select("author").withColumnRenamed("author", "id")

e = df.select("author", "mention").withColumn("single_mention", explode("mention"))
e = e.withColumnRenamed("author", "src").withColumnRenamed("single_mention", "dst").drop("mention")
# Note: should we remove self mentions?

# Finally create the graph
g = GraphFrame(v, e)

In [None]:
# Calculate pageRank. Note: This takes FOREVER!
results = g.pageRank(resetProbability=0.15, maxIter=10)
display(results.vertices)

# Save the pageRank, since it takes forever I don't want to have to run it multiple times
results.vertices.write.parquet("hdfs://orion11:15001/friendgraph/gf/vertices")
results.edges.write.parquet("hdfs://orion11:15001/friendgraph/gf/edges")

In [None]:
# Load the vertices and edges back.
read_v = sqlContext.read.parquet("hdfs://orion11:15001/friendgraph/gf/vertices")
read_e = sqlContext.read.parquet("hdfs://orion11:15001/friendgraph/gf/edges")

final_g = GraphFrame(read_v, read_e)

In [None]:
final_g.edges.show(25)