# Reviews Dataset Subsampling and Insertion Into MongoDB Database

In [1]:
import findspark

# Locate the Spark installation (add pyspark to sys.path, see https://github.com/minrk/findspark#readme)
findspark.init()
print(f'Using Spark located in {findspark.find()}.')

from pyspark.sql import SparkSession

# Create or get the Spark session (singleton) and the underlying Spark context
spark = SparkSession.builder.getOrCreate()

Using Spark located in /usr/local/spark/.


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/13 00:48:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.types import BooleanType, DoubleType, LongType, StringType, StructField, StructType

# Define a reasonable schema for the reviews dataset
reviews_schema = StructType([
    StructField('steamid', LongType(), True),
    StructField('appid', LongType(), True),
    StructField('voted_up', BooleanType(), True),
    StructField('votes_up', LongType(), True),
    StructField('votes_funny', LongType(), True),
    StructField('weighted_vote_score', DoubleType(), True),
    StructField('playtime_forever', LongType(), True),
    StructField('playtime_at_review', LongType(), True),
    StructField('num_games_owned', LongType(), True),
    StructField('num_reviews', LongType(), True),
    StructField('review', StringType(), True),
    StructField('unix_timestamp_created', LongType(), True),
    StructField('unix_timestamp_updated', LongType(), True)
])

# Read the reviews dataset from HDFS
reviews_df = spark.read.csv(
    path='hdfs://localhost:54310/final_project/data/reviews',
    schema=reviews_schema,
    escape='"',
    header=True,
    ignoreTrailingWhiteSpace=True,
    mode='FAILFAST',
    multiLine=True,
    unescapedQuoteHandling='STOP_AT_CLOSING_QUOTE'
)

In [3]:
# Print a few rows of the original reviews dataset to verify their correctness
reviews_df.limit(5).toPandas()

Unnamed: 0,steamid,appid,voted_up,votes_up,votes_funny,weighted_vote_score,playtime_forever,playtime_at_review,num_games_owned,num_reviews,review,unix_timestamp_created,unix_timestamp_updated
0,76561199012934585,204100,True,0,0,0.0,1671,1660,37,10,A masterpiece that is extremely underrated. Th...,1619063926,1619063926
1,76561198123483551,204100,True,0,0,0.0,3143,3143,83,14,Amazing!!!,1619062713,1619062713
2,76561197993895820,204100,False,1,0,0.0,787,746,123,2,The latest update forces the install of some R...,1619058151,1619058151
3,76561197972464391,204100,True,0,0,0.0,1012,1006,191,20,Classic Payne\n,1619052527,1619052527
4,76561198242204348,204100,True,0,0,0.0,414,414,54,28,Not like 1 and 2 of the series but its alright.,1619047384,1619047384


In [4]:
# Check the types of the reviews Spark DataFrame
reviews_df.dtypes

[('steamid', 'bigint'),
 ('appid', 'bigint'),
 ('voted_up', 'boolean'),
 ('votes_up', 'bigint'),
 ('votes_funny', 'bigint'),
 ('weighted_vote_score', 'double'),
 ('playtime_forever', 'bigint'),
 ('playtime_at_review', 'bigint'),
 ('num_games_owned', 'bigint'),
 ('num_reviews', 'bigint'),
 ('review', 'string'),
 ('unix_timestamp_created', 'bigint'),
 ('unix_timestamp_updated', 'bigint')]

In [5]:
# Randomly subsample the original dataset due to the single-node Spark installation limitation
base_reviews_df = reviews_df.sample(withReplacement=False, fraction=0.2)

# Get the exact the number of rows extracted from the original dataset
original_count = reviews_df.count()
base_count = base_reviews_df.count()
percentage = base_count / original_count * 100
print(f'Taking {base_count} reviews out of the {original_count} of the original reviews dataset ({percentage:.3f}%).')



Taking 3087862 reviews out of the 15437471 of the original reviews dataset (20.002%).




In [6]:
from pyspark.sql.functions import monotonically_increasing_id

# Add unique ids to the extracted reviews (as the first column) as a common refence for further analyses
# (note that ids are monotonically increasing but not contiguous, in general, due to partitioning)
base_review_id_column_name = 'base_review_id'
base_reviews_df = base_reviews_df.withColumn(base_review_id_column_name, monotonically_increasing_id())
base_reviews_df = base_reviews_df.select([base_review_id_column_name, *reviews_schema.fieldNames()])

In [7]:
# Print a few rows of the base reviews dataset to verify their correctness
base_reviews_df.limit(5).toPandas()

Unnamed: 0,base_review_id,steamid,appid,voted_up,votes_up,votes_funny,weighted_vote_score,playtime_forever,playtime_at_review,num_games_owned,num_reviews,review,unix_timestamp_created,unix_timestamp_updated
0,0,76561199012934585,204100,True,0,0,0.0,1671,1660,37,10,A masterpiece that is extremely underrated. Th...,1619063926,1619063926
1,1,76561198242204348,204100,True,0,0,0.0,414,414,54,28,Not like 1 and 2 of the series but its alright.,1619047384,1619047384
2,2,76561198078115373,204100,False,1,1,0.522059,119,119,91,8,Unskippable cut scenes are horrible. Gameplay ...,1619040366,1619040366
3,3,76561198255525846,204100,True,0,0,0.0,69,69,27,1,I enjoy the game. Played it to 100% on PS3 an...,1619035215,1619035215
4,4,76561199026331378,204100,True,0,0,0.0,608,608,40,1,"Feel the Payne ;)\nGreat Game, just like part ...",1619027681,1619027681


In [8]:
# Check the types of the base reviews Spark DataFrame
base_reviews_df.dtypes

[('base_review_id', 'bigint'),
 ('steamid', 'bigint'),
 ('appid', 'bigint'),
 ('voted_up', 'boolean'),
 ('votes_up', 'bigint'),
 ('votes_funny', 'bigint'),
 ('weighted_vote_score', 'double'),
 ('playtime_forever', 'bigint'),
 ('playtime_at_review', 'bigint'),
 ('num_games_owned', 'bigint'),
 ('num_reviews', 'bigint'),
 ('review', 'string'),
 ('unix_timestamp_created', 'bigint'),
 ('unix_timestamp_updated', 'bigint')]

In [9]:
# Write the extracted base reviews dataset to HDFS
base_reviews_df.write.csv(
    path='hdfs://localhost:54310/final_project/data/base_reviews',
    mode='overwrite',
    escape='"',
    header=True
)

                                                                                

In [10]:
# Randomly subsample the base dataset to create a small repository on which to perform the preliminary analyses
small_reviews_df = base_reviews_df.sample(withReplacement=False, fraction=0.1)

# Get the exact the number of rows extracted from the base dataset
small_count = small_reviews_df.count()
percentage = small_count / base_count * 100
print(f'Taking {small_count} reviews out of the {base_count} of the base reviews dataset ({percentage:.3f}%).')



Taking 308524 reviews out of the 3087862 of the base reviews dataset (9.992%).


                                                                                

In [11]:
# Print a few rows of the small reviews dataset to verify their correctness
small_reviews_df.limit(5).toPandas()

Unnamed: 0,base_review_id,steamid,appid,voted_up,votes_up,votes_funny,weighted_vote_score,playtime_forever,playtime_at_review,num_games_owned,num_reviews,review,unix_timestamp_created,unix_timestamp_updated
0,3,76561198255525846,204100,True,0,0,0.0,69,69,27,1,I enjoy the game. Played it to 100% on PS3 an...,1619035215,1619035215
1,18,76561198058159765,204100,True,0,0,0.0,356,336,61,3,good\n,1618048279,1618048279
2,22,76561198372464367,204100,True,0,0,0.0,1684,1581,135,8,one of the best 3rd person shooting game,1617507118,1617507118
3,25,76561198126769984,204100,True,0,0,0.0,477,97,41,23,"I beat this game 5 times on ps3, when i saw th...",1617410356,1617410356
4,27,76561198090877508,204100,True,0,0,0.0,421,421,87,6,me cague a tiro con brazucas y me empastille a...,1617402322,1617402322


In [12]:
import pymongo

# Connect to the local MongoDB instance and select the database used as repository for the small reviews dataset
mongo = pymongo.MongoClient()
mongo_db = mongo.final_project

In [13]:
# Delete the content of the small_reviews collection, if necessary
mongo_db.small_reviews.delete_many({})

# Get a dict representation of the DataFrame containing the small dataset
small_reviews_dict = small_reviews_df.toPandas().to_dict(orient='records')

# Insert the reviews in the small_reviews collection of the final_project MongoDB database
mongo_db.small_reviews.insert_many(small_reviews_dict);

                                                                                

In [14]:
# Close the connection to the local MongoDB instance
mongo.close()

# Stop the Spark context underlying the Spark session
spark.stop()