# Imports & Env

In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [3]:
import collections
collections.Callable = collections.abc.Callable

In [4]:
from pyreadline import Readline
readline = Readline()
import rlcompleter
readline.parse_and_bind("tab: complete")

In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

In [6]:
from pyspark.sql import functions

In [7]:
import itertools

In [8]:
import os

In [9]:
os.chdir("..")

In [10]:
os.getcwd()

'E:\\coding\\repos\\BigDataProject'

In [11]:
spark = SparkSession.builder.master("local").appName('SamplingReviews')\
.config('spark.executor.cores', '8').config('spark.executor.memory','16g').getOrCreate()

# Load Data

In [12]:
FINAL_DATASET = "data/steam_reviews_final.csv"

In [13]:
non_null_schema = StructType() \
      .add("#",IntegerType(),True) \
      .add("app_id",IntegerType(),True) \
      .add("app_name",StringType(),True) \
      .add("review_id",IntegerType(),True) \
      .add("language",StringType(),True) \
      .add("review",StringType(),True) \
      .add("timestamp_created",IntegerType(),True) \
      .add("timestamp_updated",IntegerType(),True) \
      .add("recommended",BooleanType(),True) \
      .add("votes_helpful",IntegerType(),True) \
      .add("votes_funny",IntegerType(),True) \
      .add("weighted_vote_score",FloatType(),True) \
      .add("comment_count",IntegerType(),True) \
      .add("steam_purchase",BooleanType(),True) \
      .add("received_for_free",BooleanType(),True) \
      .add("written_during_early_access",BooleanType(),True) \
      .add("author_steamid",IntegerType(),True) \
      .add("author_num_games_owned",IntegerType(),True) \
      .add("author_num_reviews",IntegerType(),True) \
      .add("author_playtime_forever",FloatType(),True) \
      .add("author_playtime_last_two_weeks",FloatType(),True) \
      .add("author_playtime_at_review",FloatType(),True) \
      .add("author_last_played",IntegerType(),True)

In [14]:
INDEX = "#"
APP_ID = "app_id"
APP_NAME = "app_name"
REVIEW_ID = "review_id"
LANGUAGE = "language"
REVIEW = "review"
TIMESTAMP_CREATED = "timestamp_created"
TIMESTAMP_UPDATED = "timestamp_updated"
RECOMMENDED = "recommended"
VOTES_HELPFUL = "votes_helpful"
VOTES_FUNNY = "votes_funny"
WEIGHTED_VOTE_SCORE = "weighted_vote_score"
COMMENT_COUNT = "comment_count"
STEAM_PURCHASE = "steam_purchase"
RECEIVED_FOR_FREE = "received_for_free"
WRITTEN_DURING_EARLY_ACCESS = "written_during_early_access"
AUTHOR_STEAMID = "author_steamid"
AUTHOR_NUM_GAMES_OWNED = "author_num_games_owned"
AUTHOR_NUM_REVIEWS = "author_num_reviews"
AUTHOR_PLAYTIME_FOREVER = "author_playtime_forever"
AUTHOR_PLAYTIME_LAST_TWO_WEEKS = "author_playtime_last_two_weeks"
AUTHOR_PLAYTIME_AT_REVIEW = "author_playtime_at_review"
AUTHOR_LAST_PLAYED = "author_last_played"

In [15]:
df_non_null_final = spark.read.format("csv") \
      .option("header", True) \
      .option("encoding", "utf-8") \
      .schema(non_null_schema) \
      .load(FINAL_DATASET)

In [16]:
df_non_null_final.count()

16618478

# Sampling

In [16]:
df_english_dataset = df_non_null_final.where(df_non_null_final[LANGUAGE] == "english")

In [21]:
df_english_dataset.count()

7240550

In [22]:
df_english_dataset.groupBy(RECOMMENDED).count().show()

+-----------+-------+
|recommended|  count|
+-----------+-------+
|       true|6572346|
|      false| 668204|
+-----------+-------+



In [17]:
df_non_english_dataset = df_non_null_final.where(df_non_null_final[LANGUAGE] != "english")

In [24]:
df_non_english_dataset.count()

9377928

In [25]:
df_non_english_dataset.groupBy(RECOMMENDED).count().show()

+-----------+-------+
|recommended|  count|
+-----------+-------+
|       true|8109017|
|      false|1268911|
+-----------+-------+



In [18]:
SAMPLE_SIZES = [(100_000, "data/final_dataset_100k"),
                (50_000, "data/final_dataset_50k"), 
                (10_000, "data/final_dataset_10k")]

In [19]:
LANG_DFS = [(df_english_dataset, "en"), (df_non_english_dataset, "non-en")]

In [20]:
BALANCE_LABEL = [(RECOMMENDED, 2), (APP_ID, 300)]

In [43]:
def sample_big_df(df, sample_size, outdir, lang, label):
    fractions = df.groupBy(label).count().withColumn("required_n", sample_size/col("count"))\
                .drop("count").rdd.collectAsMap()
    sanitized_fractions = {x:min(1.00, fractions[x]) for x in fractions}
    sample_df = df.sampleBy(label, sanitized_fractions)
    sample_df.repartition(1).write \
            .options(header='True', delimiter=',') \
            .csv(f"{outdir}_{lang}_{label.lower()}_{sample_size}per")

In [44]:
# 100k, 50k, 10k

In [45]:
# small sample for english dataset balanced around RECOMMENDED

In [46]:
# small sample for english dataset balanced around APP_ID

In [47]:
# small sample for non-english dataset balanced around RECOMMENDED

In [48]:
# small sample for non-english dataset balanced around APP_ID

In [49]:
for sample, lang, balance_label in itertools.product(SAMPLE_SIZES, LANG_DFS, BALANCE_LABEL):
    print(sample[0], lang[1], balance_label[0], int(sample[0]/balance_label[1]))
    sample_big_df(lang[0], int(sample[0]/balance_label[1]), sample[1], lang[1], balance_label[0])

100000 en recommended 50000
100000 en app_id 333
100000 non-en recommended 50000
100000 non-en app_id 333
50000 en recommended 25000
50000 en app_id 166
50000 non-en recommended 25000
50000 non-en app_id 166
10000 en recommended 5000
10000 en app_id 33
10000 non-en recommended 5000
10000 non-en app_id 33


In [30]:
fractions = df_english_dataset.groupBy(APP_ID).count().withColumn("required_n", 50000/col("count"))\
                .drop("count").rdd.collectAsMap()

In [29]:
len(fractions)

315