# Spark Initialization

In [1]:
# Import findspark to read SPARK_HOME and HADOOP_HOME
import findspark
findspark.init()

In [2]:
# Import SparkSession to initiate session
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession.builder.appName("Reddit Submissions").getOrCreate()

# Print Spark object ID
print(spark)

<pyspark.sql.session.SparkSession object at 0x00000200AAE87630>


# Loading Data using Spark

In [3]:
# Dataset could be downloaded at https://snap.stanford.edu/data/web-Reddit.html
submissions = spark.read.csv("D:/kuliah/bigdata/submissions.csv", header=True, inferSchema=True)

In [4]:
# Shows top 20 of dataset loaded
submissions.show()

+---------+----------+--------------------+--------------------+-----------+---------+-----------------+-------------------+-------------------+----------+-----+------------------+-------------------+
|#image_id|  unixtime|             rawtime|               title|total_votes|reddit_id|number_of_upvotes|          subreddit|number_of_downvotes| localtime|score|number_of_comments|           username|
+---------+----------+--------------------+--------------------+-----------+---------+-----------------+-------------------+-------------------+----------+-----+------------------+-------------------+
|        0|1333172439|2012-04-01 02:50:...|And here's a down...|      63470|    rmqjs|            32657|              funny|              30813|1333197639| 1844|               622|Animates_Everything|
|        0|1333178161|2012-04-01 04:17:...|         Expectation|         35|    rmun4|               29|           GifSound|                  6|1333203361|   23|                 3|      Gangsta_Ra

In [5]:
# Count number of datas in datafile loaded
submissions.count()

132308

In [6]:
# Shows the schema of dataset
submissions.printSchema()

root
 |-- #image_id: integer (nullable = true)
 |-- unixtime: integer (nullable = true)
 |-- rawtime: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- reddit_id: string (nullable = true)
 |-- number_of_upvotes: integer (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- number_of_downvotes: integer (nullable = true)
 |-- localtime: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- number_of_comments: integer (nullable = true)
 |-- username: string (nullable = true)



In [7]:
# Register the DataFrame as a SQL temporary view
submissions.createOrReplaceTempView("submissions")

In [8]:
# Use username, reddit_id, and score, remove data that contain empty username
sub = submissions.select("username", "reddit_id", "score").filter("username is not null")

# Show top 20 of the dataframe created
sub.show()

+-------------------+---------+-----+
|           username|reddit_id|score|
+-------------------+---------+-----+
|Animates_Everything|    rmqjs| 1844|
|      Gangsta_Raper|    rmun4|   23|
|      Gangsta_Raper|    rna86|   23|
|      Gangsta_Raper|    ro7e4|    2|
|      Gangsta_Raper|    rooof|   49|
|  Hellothereawesome|    rxwjg|   -6|
|        HadManySons|    uxf5q|   -2|
|       TraumaticASH|    v8vl7|   -1|
|    MidgetDance1337|    v970d|  149|
|             Pazzaz|    vah9p|  730|
|           koolkows|    w27bp|   -5|
|            Moncole|    w9k12|    4|
|        shortguy014|    wwjak|   -5|
|         Killer2000|    x4ogv|   -2|
| todaysuckstomorrow|    xenyw|    0|
|       LazyBurnbaby|    y7wwi|   27|
|     azcomputerguru|    zyk4s|   22|
|        IronOxide42|   100bfe|    4|
|      Gangsta_Raper|   10ctqr|  181|
|         CrowKaneII|   10llkg|   -1|
+-------------------+---------+-----+
only showing top 20 rows



In [9]:
# Get min score in dataframe
sub_min = sub.agg({"score": "min"}).collect()[0]
min_score = sub_min["min(score)"]

# Shows min score in dataframe
print(min_score)

-122


In [10]:
# Get max score in dataframe
sub_max = sub.agg({"score": "max"}).collect()[0]
max_score = sub_max["max(score)"]

# Shows max score in dataframe
print(max_score)

20570


In [11]:
# Normalize score into scale between 0 to 10
score_normalized = sub.select("reddit_id", ((sub.score - min_score)/(max_score - min_score)*10).alias("score_normalized"))

# Shows 20 normalized score
score_normalized.show()

+---------+--------------------+
|reddit_id|    score_normalized|
+---------+--------------------+
|    rmqjs|  0.9501256524260583|
|    rmun4| 0.07007539145563503|
|    rna86| 0.07007539145563503|
|    ro7e4|0.059926541658612026|
|    rooof| 0.08264063406147304|
|    rxwjg| 0.05606031316450802|
|    uxf5q| 0.05799342741156002|
|    v8vl7| 0.05847670597332302|
|    v970d| 0.13096849023777304|
|    vah9p| 0.41175333462207614|
|    w27bp| 0.05654359172627102|
|    w9k12| 0.06089309878213802|
|    wwjak| 0.05654359172627102|
|    x4ogv| 0.05799342741156002|
|    xenyw|0.058959984535086024|
|    y7wwi| 0.07200850570268703|
|    zyk4s| 0.06959211289387203|
|   100bfe| 0.06089309878213802|
|   10ctqr| 0.14643340421418907|
|   10llkg| 0.05847670597332302|
+---------+--------------------+
only showing top 20 rows



In [12]:
# Username and reddit_id are in string format, so we assign id in integer for later computation
# import functions library
from pyspark.sql import functions

# Assign id for each distinct username
username = sub.select("username").distinct()
username_id_schema = username.withColumn("username_id", functions.lit(1)).schema
rdd_username_id = username.rdd.zipWithIndex().map(lambda row_rowId: (list(row_rowId[0]) + [row_rowId[1] + 1]))
username_id = spark.createDataFrame(rdd_username_id, username_id_schema)

# Shows 20 username an username's id
username_id.show()

+-------------+-----------+
|     username|username_id|
+-------------+-----------+
|   thehugeone|          1|
|       darmog|          2|
|    TirSimpot|          3|
|  DrSoccerMan|          4|
|   btrucker94|          5|
|     hasai185|          6|
|  cassus_fett|          7|
|    Avril14th|          8|
|   ProfBurial|          9|
|     scs22191|         10|
|    PimpMogul|         11|
|  rockstar107|         12|
|   Shootemsup|         13|
| mariohead123|         14|
|   aaMikeyDaa|         15|
|     mylescox|         16|
|       SAAWKS|         17|
| pwntuspilate|         18|
|SpunkingCorgi|         19|
|      battboy|         20|
+-------------+-----------+
only showing top 20 rows



In [13]:
# Assign id for each distinct reddit_id
reddit_id = sub.select("reddit_id").distinct()
reddit_id_id_schema = reddit_id.withColumn("reddit_id_id", functions.lit(1)).schema
rdd_reddit_id_id = reddit_id.rdd.zipWithIndex().map(lambda row_rowId: (list(row_rowId[0]) + [row_rowId[1] + 1]))
reddit_id_id = spark.createDataFrame(rdd_reddit_id_id, reddit_id_id_schema)

# Shows 20 reddit_id an reddit_id's id
reddit_id_id.show()


+---------+------------+
|reddit_id|reddit_id_id|
+---------+------------+
|    rptk1|           1|
|    wpq35|           2|
|    wf4sp|           3|
|    msktu|           4|
|    uj83r|           5|
|    zxyzc|           6|
|    wp1m6|           7|
|    sooc3|           8|
|    is7fw|           9|
|   10ezo9|          10|
|   10w6x4|          11|
|   139mgd|          12|
|    vm13f|          13|
|    lnbn7|          14|
|    zzirv|          15|
|    ts1i6|          16|
|    n8i70|          17|
|    y6ool|          18|
|    m7td1|          19|
|    p184j|          20|
+---------+------------+
only showing top 20 rows



In [16]:
#Join id tables with data table
sub = sub.join(username_id, "username")
sub = sub.join(reddit_id_id, "reddit_id")
sub = sub.join(score_normalized, "reddit_id")

# Shows 20 data of joined table
sub.show()

+---------+------------------+-----+-----------+------------+--------------------+
|reddit_id|          username|score|username_id|reddit_id_id|    score_normalized|
+---------+------------------+-----+-----------+------------+--------------------+
|   1000xi| theicarusambition|    1|      54614|         395|0.059443263096849025|
|   102jwn|   ITookBrandybuck|   15|      54541|         283| 0.06620916296153104|
|   102ugj|        SmoresPies|    3|      50910|         454| 0.06040982022037503|
|   1051z4|        rozyhammer| 1999|       6769|          27|  1.0250338294993235|
|   1053hx|          Levinsky|  428|      50250|          22|  0.2658032089696501|
|   106gqm|     Gangsta_Raper|    0|      11151|         188|0.058959984535086024|
|   106hkd|       voidworship|  188|       5853|         120| 0.14981635414653005|
|   106mxc|Card_Captor_Sakura|    4|      49212|         300| 0.06089309878213802|
|   106r51|     Gangsta_Raper|    0|      11151|          38|0.058959984535086024|
|   

In [18]:
# Select processed columns from joined table
s = sub.select('username_id', 'reddit_id_id', 'score_normalized')

# Shows 20 datas of selected columns
s.show()

+-----------+------------+--------------------+
|username_id|reddit_id_id|    score_normalized|
+-----------+------------+--------------------+
|      54614|         395|0.059443263096849025|
|      54541|         283| 0.06620916296153104|
|      50910|         454| 0.06040982022037503|
|       6769|          27|  1.0250338294993235|
|      50250|          22|  0.2658032089696501|
|      11151|         188|0.058959984535086024|
|       5853|         120| 0.14981635414653005|
|      49212|         300| 0.06089309878213802|
|      11151|          38|0.058959984535086024|
|      62543|         206|0.058959984535086024|
|      57360|         107|  1.0424318577227916|
|      11151|         233| 0.10390489077904504|
|      37885|         250|  0.2754687802049101|
|      25401|          45| 0.05702687028803402|
|      35964|         455| 0.14401701140537407|
|      58956|         356| 0.05847670597332302|
|      48618|         324| 0.06089309878213802|
|      29269|         128| 0.06185965590

# Processing Data

In [19]:
# Import required library for processing data
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Split data into train and test data randomly
(train, test) = s.randomSplit([0.8, 0.2])

In [20]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="username_id", itemCol="reddit_id_id", ratingCol="score_normalized",
          coldStartStrategy="drop")
model = als.fit(train)

In [22]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="score_normalized",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.01078428836361627


In [23]:
# Get top 5 reddit_id (thread) recommendations for each username
user_recs = model.recommendForAllUsers(5)

# Shows 20 reddit_id (thread) recommendations for each username
user_recs.show()

+-----------+--------------------+
|username_id|     recommendations|
+-----------+--------------------+
|        148|[[96952, 0.456870...|
|        463|[[96952, 0.414847...|
|        471|[[82820, 0.667997...|
|        496|[[59755, 0.454210...|
|        833|[[16407, 1.086788...|
|       1088|[[34964, 3.821365...|
|       1238|[[5800, 1.1140206...|
|       1342|[[93421, 0.373076...|
|       1580|[[7981, 0.6689435...|
|       1591|[[96952, 0.903193...|
|       1645|[[86405, 0.355782...|
|       1829|[[59755, 0.556376...|
|       1959|[[43167, 2.730705...|
|       2122|[[76635, 0.456862...|
|       2142|[[34964, 1.810586...|
|       2659|[[86405, 0.533266...|
|       2866|[[61828, 1.150037...|
|       3175|[[96952, 1.465660...|
|       3749|[[43167, 0.582927...|
|       3794|[[40947, 1.567295...|
+-----------+--------------------+
only showing top 20 rows



In [24]:
# Get top 5 username recommendations for each reddit_id (thread)
thread_recs = model.recommendForAllItems(5)

# Shows 20 username recommendations for each reddit_id (thread)
thread_recs.show()

+------------+--------------------+
|reddit_id_id|     recommendations|
+------------+--------------------+
|         148|[[8096, 0.0863949...|
|         471|[[55385, 0.213137...|
|         833|[[55635, 0.185374...|
|        1088|[[63336, 1.065810...|
|        1238|[[48804, 0.159746...|
|        1342|[[15633, 0.204311...|
|        1580|[[12795, 0.198389...|
|        1591|[[55580, 0.515716...|
|        1645|[[47318, 0.074004...|
|        1829|[[32037, 0.470365...|
|        1959|[[15597, 0.194494...|
|        2142|[[38379, 0.195518...|
|        3175|[[40329, 1.009310...|
|        3794|[[2109, 0.1939718...|
|        3918|[[47813, 0.887412...|
|        3997|[[51137, 0.208381...|
|        4519|[[12225, 0.975640...|
|        4818|[[47119, 0.110173...|
|        5156|[[21294, 0.403806...|
|        5300|[[7082, 0.1946166...|
+------------+--------------------+
only showing top 20 rows

