In [None]:
#Install pyspark

%env PYTHONHASHSEED 3
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

env: PYTHONHASHSEED=3
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
#Setup the pyspark session
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *

spark = SparkSession.builder.master("local[*]").appName('SparkExample').config(
    "spark.executor.memory", "1g").config("spark.ui.port", "4050"
        ).getOrCreate()
sc = spark.sparkContext

In [None]:
import time

In [None]:
!pip install ijson

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ijson
  Downloading ijson-3.2.0.post0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (113 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m113.3/113.3 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ijson
Successfully installed ijson-3.2.0.post0


In [None]:
import json
import gzip

#use the wget function to retrieve the data as the download link gives a
#gz file, not a json
!wget -O steam.gz https://datarepo.eng.ucsd.edu/mcauley_group/data/steam/australian_user_reviews.json.gz

#load the data into an rdd
steam_rdd = sc.textFile('steam.gz')

#Extract whether or not a game is recommended and the review content
steam_rdd = steam_rdd.flatMap(lambda x: [(review['recommend'], review['review']) for review in eval(x)['reviews']])

#Test to see it has worked
print(steam_rdd.take(5))

#Split this RDD into two RDD's depending on whether or not a game is
#recommended
steam_rdd_true = steam_rdd.filter(lambda x: x[0] == True)
steam_rdd_false = steam_rdd.filter(lambda x: x[0] == False)

#Check that this has worked:
print(steam_rdd_false.take(5))

#See the length of each RDD:
print(steam_rdd_true.count())
print(steam_rdd_false.count())

#Since we no longer need whether or not a game is recommended, we can get rid
#of this value
steam_rdd_true = steam_rdd_true.map(lambda x: x[1])
steam_rdd_false = steam_rdd_false.map(lambda x: x[1])

#Check this has also worked:
print(steam_rdd_true.take(10))

#Now we can move on to applying the tfidf algorithm

--2023-06-02 08:03:52--  https://datarepo.eng.ucsd.edu/mcauley_group/data/steam/australian_user_reviews.json.gz
Resolving datarepo.eng.ucsd.edu (datarepo.eng.ucsd.edu)... 132.239.8.30
Connecting to datarepo.eng.ucsd.edu (datarepo.eng.ucsd.edu)|132.239.8.30|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6940139 (6.6M) [application/x-gzip]
Saving to: ‘steam.gz’


2023-06-02 08:03:53 (15.1 MB/s) - ‘steam.gz’ saved [6940139/6940139]

[(True, 'Simple yet with great replayability. In my opinion does "zombie" hordes and team work better than left 4 dead plus has a global leveling system. Alot of down to earth "zombie" splattering fun for the whole family. Amazed this sort of FPS is so rare.'), (True, "It's unique and worth a playthrough."), (True, 'Great atmosphere. The gunplay can be a bit chunky at times but at the end of the day this game is definitely worth it and I hope they do a sequel...so buy the game so I get a sequel!'), (True, 'I know what you think when 

In [None]:
import re
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords
import time
STOP_WORDS = set(stopwords.words('english'))

#Use these functions to remove non letters:
def remove_nonletters(word):
  return re.sub(r'[^a-zA-Z]', '', word)

def split_remove_nonletters(line):
  result = []
  for word in line.split(" "):
    removed_token = remove_nonletters(word.lower())
    if removed_token != '':
      result.append((removed_token, 1))
  return result

#Wrap the process of counting words into a wordcount function:
def wc(review):
  result = {}
  for words in review.split(" "):
    removed_token = remove_nonletters(words.lower())
    if removed_token != '' and removed_token not in STOP_WORDS:
      if removed_token not in result:
        result[removed_token] = 0
      result[removed_token] += 1
  return result

time_start = time.time()

counts_true = steam_rdd_true.map(lambda x: wc(x))
counts_false = steam_rdd_false.map(lambda x: wc(x))

#Check this has worked okay
print(counts_true.take(1))


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


[{'simple': 1, 'yet': 1, 'great': 1, 'replayability': 1, 'opinion': 1, 'zombie': 2, 'hordes': 1, 'team': 1, 'work': 1, 'better': 1, 'left': 1, 'dead': 1, 'plus': 1, 'global': 1, 'leveling': 1, 'system': 1, 'alot': 1, 'earth': 1, 'splattering': 1, 'fun': 1, 'whole': 1, 'family': 1, 'amazed': 1, 'sort': 1, 'fps': 1, 'rare': 1}]


In [None]:
#term frequency function to calculate the frequency of a given word in respect
#to the document it belongs to
#Will also need to turn the RDD of dictionaries into an RDD of tuples so we
#can work with it

def tf(doc_counts):
  rdd_tuples = doc_counts.map(lambda x: list(x.items()))
  return rdd_tuples.map(lambda x: [(k, (v / max([y[1] for y in x]))) for k, v in x])
true_tf = tf(counts_true)
false_tf = tf(counts_false)
print(true_tf.take(2))
print(false_tf.take(2))

[[('simple', 0.5), ('yet', 0.5), ('great', 0.5), ('replayability', 0.5), ('opinion', 0.5), ('zombie', 1.0), ('hordes', 0.5), ('team', 0.5), ('work', 0.5), ('better', 0.5), ('left', 0.5), ('dead', 0.5), ('plus', 0.5), ('global', 0.5), ('leveling', 0.5), ('system', 0.5), ('alot', 0.5), ('earth', 0.5), ('splattering', 0.5), ('fun', 0.5), ('whole', 0.5), ('family', 0.5), ('amazed', 0.5), ('sort', 0.5), ('fps', 0.5), ('rare', 0.5)], [('unique', 1.0), ('worth', 1.0), ('playthrough', 1.0)]]
[[('game', 1.0), ('doesnt', 1.0), ('work', 1.0)], [('charged', 1.0), ('dollars', 1.0), ('got', 1.0), ('boring', 1.0), ('hours', 1.0)]]


In [None]:
#calculate inverse document frequency

import math
def idf(count_rdds):
  rdd_tuples = count_rdds.map(lambda x: list(x.items()))
  n = rdd_tuples.count()
  combined = rdd_tuples.flatMap(lambda x: x)
  combined_tuple = combined.map(lambda x: (x[0], 1))
  combined_reduced = combined_tuple.reduceByKey(lambda x, y: x+y)
  final = combined_reduced.map(lambda x: (x[0], math.log(n/x[1], 2)))
  return final

true_idf = idf(counts_true)
print(true_idf.take(30))
false_idf = idf(counts_false)
print(false_idf.take(20))

[('simple', 6.175461916028362), ('yet', 5.9564801228545665), ('great', 2.956266722212081), ('replayability', 8.32173564940603), ('opinion', 6.937820667622965), ('zombie', 6.450468963528231), ('hordes', 10.094325153302956), ('team', 5.871932731966508), ('work', 5.840083865927169), ('better', 4.582572499535576), ('left', 6.921064439297387), ('dead', 6.787503950805802), ('plus', 7.484530799601865), ('global', 8.784469890716169), ('leveling', 9.1557256979671), ('system', 5.944578033798274), ('alot', 6.101858825988365), ('earth', 9.124698802346476), ('splattering', 14.094325153302956), ('fun', 2.7669596413370168), ('whole', 6.532082729081884), ('family', 8.499378564009179), ('amazed', 10.35735955913675), ('sort', 8.028235962845184), ('fps', 5.276275630449116), ('rare', 8.591824812773774), ('unique', 6.37550690584701), ('worth', 4.570763197245944), ('playthrough', 8.602472056973282), ('atmosphere', 7.772397058415594)]
[('game', 0.9183123277820102), ('doesnt', 4.354387967146438), ('work', 4.4

In [None]:
def tfidfi(tfi, idf):
  tfi = tfi.flatMap(lambda x: x)
  joined = tfi.join(idf)
  multiply = joined.map(lambda x: (x[0], x[1][0]*x[1][1])).reduceByKey(lambda x, y: x + y)
  #unioned = sc.union(to_union)
  #reduced = unioned.reduceByKey(lambda x, y: x*y)
 # reduced = reduced.repartition(1)
  return multiply

#now we have the most important words in positive and negative reviews:
true_tfidfi = tfidfi(true_tf, true_idf).takeOrdered(100, key = lambda x: (-x[1], x[0]))
print(true_tfidfi)
false_tfidfi = tfidfi(false_tf, false_idf).takeOrdered(100, key = lambda x: (-x[1], x[0]))
print(false_tfidfi)

time_end = time.time()
print(f"elapsed time is {time_end-time_start}")

print("")

#Now lets actually compute the answer to my research question by removing
#important words that are common between positive and negative reviews,
set_true = set(item[0] for item in true_tfidfi)
set_false = set(item[0] for item in false_tfidfi)

common_words = set_true.intersection(set_false)

set_true = set(item for item in true_tfidfi if item[0] not in common_words)
set_false = set(item for item in false_tfidfi if item[0] not in common_words)

list_true = list(set_true)
list_false = list(set_false)

print(list_true)
print(list_false)

[('game', 23166.553527125972), ('good', 15153.297396722517), ('fun', 15061.014893592095), ('great', 13711.760770153725), ('play', 12502.319905297769), ('best', 12151.80382828794), ('like', 11634.699935638404), ('get', 10899.939983324433), ('would', 9696.135841314997), ('one', 9594.76162135919), ('really', 9263.171518724024), ('games', 9084.526818706998), ('awesome', 8907.105793903622), ('love', 8408.11164383609), ('ever', 8406.665598538206), ('amazing', 8195.228602727859), ('played', 7816.794808270157), ('time', 7091.218350533951), ('buy', 6651.049610927495), ('much', 6639.616563529974), ('dont', 6573.571394624949), ('playing', 6135.547921580345), ('recommend', 6027.00721777465), ('still', 5930.160735942393), ('friends', 5838.166395579024), ('worth', 5613.104701924476), ('better', 5587.0721076446935), ('people', 5571.311828134626), ('story', 5506.74398786709), ('hours', 5426.78556271918), ('even', 5258.730170297355), ('free', 5159.35837050103), ('jogo', 5148.413121250958), ('well', 510

In [None]:
#Now do market basket analysis on the dataset, finding frequent pairs can also
#be helpful in finding common/important words, and is interesting for those
#who may want to look into steam reviews

#make sure this function excludes nonletters and stop words
def split_remove_spaces(line):
  result = []
  for word in line.split(" "):
    removed_token = remove_nonletters(word.lower())
    if removed_token != '' and removed_token not in STOP_WORDS:
      result.append(removed_token)
  return result

#helper function to find all pairs of words in each review
def helper(string):
  split = split_remove_spaces(string)
  pairs = []
  for i in range(len(split)):
    for j in range(i+1, len(split)):
      pairs.append(tuple(sorted((split[i], split[j]))))
  return pairs

#step 1 of the apriori function which returns counts of individual words
def a_priori_step1(text_file_rdd):
  mapped = text_file_rdd.map(lambda x: split_remove_spaces(x)).flatMap(lambda x: x)
  mapped = mapped.map(lambda x: (x, 1))
  reduced = mapped.reduceByKey(lambda x, y: x + y)
  return reduced

steam_apriori_true = a_priori_step1(steam_rdd_true).takeOrdered(20, lambda kv: -kv[1])
print(steam_apriori_true)

steam_apriori_false = a_priori_step1(steam_rdd_false).takeOrdered(20, lambda kv: -kv[1])
print(steam_apriori_false)

[('game', 50174), ('good', 9262), ('fun', 9244), ('like', 8884), ('play', 8558), ('great', 8319), ('get', 7847), ('one', 6373), ('really', 5795), ('games', 5684), ('best', 5588), ('would', 5582), ('time', 4388), ('played', 3895), ('dont', 3882), ('much', 3792), ('amazing', 3648), ('love', 3647), ('awesome', 3418), ('playing', 3357)]
[('game', 8517), ('like', 1661), ('get', 1627), ('play', 1502), ('bad', 1410), ('dont', 1359), ('even', 1166), ('good', 1094), ('would', 950), ('one', 934), ('time', 889), ('really', 884), ('buy', 802), ('fun', 769), ('games', 759), ('cant', 739), ('want', 687), ('much', 667), ('money', 650), ('new', 627)]


In [None]:
#perform the second step of the apriori algorithm

def a_priori(text_file_rdd, support=100):
  step1 = a_priori_step1(text_file_rdd)
  filtered = step1.filter(lambda x: x[1] >= support)
  to_broadcast = filtered.collectAsMap()
  broadcasted = sc.broadcast(to_broadcast)

  mapped = text_file_rdd.map(lambda x: helper(x)).map(lambda x: [pair for pair in x if all(word in broadcasted.value for word in pair)])
  mapped = mapped.flatMap(lambda x: x)
  mapped = mapped.map(lambda x: (x, 1))
  reduced = mapped.reduceByKey(lambda x, y: x + y)
  return reduced

time_start = time.time()

steam_apriori_true_final = a_priori(steam_rdd_true).takeOrdered(20, lambda kv: -kv[1])
print(steam_apriori_true_final)

steam_apriori_false_final = a_priori(steam_rdd_false).takeOrdered(20, lambda kv: -kv[1])
print(steam_apriori_false_final)

time_end = time.time()
print(f"elapsed time is {time_end-time_start}")

print("")

#Compute the answer to the research question, find the actual difference

set_true = set(item[0] for item in steam_apriori_true_final)
set_false = set(item[0] for item in steam_apriori_false_final)

common_words = set_true.intersection(set_false)

set_true = set(item for item in steam_apriori_true_final if item[0] not in common_words)
set_false = set(item for item in steam_apriori_false_final if item[0] not in common_words)

list_true = list(set_true)
list_false = list(set_false)

print(list_true)
print(list_false)

[(('hat', 'hat'), 1999043), (('nyan', 'nyan'), 1279200), (('hats', 'hats'), 181154), (('hot', 'super'), 152108), (('shit', 'shit'), 139129), (('pew', 'pew'), 114572), (('hit', 'shit'), 91344), (('super', 'super'), 75987), (('hot', 'hot'), 75870), (('pew', 'pewpew'), 75682), (('game', 'game'), 65636), (('john', 'madden'), 46666), (('warriordragon', 'warriordragon'), 35245), (('exile', 'path'), 34244), (('buy', 'itbuy'), 31327), (('shower', 'took'), 29929), (('dadi', 'took'), 29756), (('dadi', 'shower'), 29756), (('rep', 'rep'), 29647), (('game', 'like'), 27699)]
[(('bad', 'bad'), 332683), (('dlc', 'new'), 28030), (('new', 'update'), 28001), (('adds', 'dlc'), 27923), (('dlc', 'update'), 27917), (('adds', 'new'), 27901), (('dlc', 'overpriced'), 27897), (('new', 'overpriced'), 27894), (('overpriced', 'update'), 27893), (('adds', 'update'), 27889), (('adds', 'overpriced'), 27889), (('game', 'game'), 16224), (('dlc', 'dlc'), 14179), (('new', 'new'), 14043), (('update', 'update'), 13910), (('

In [None]:
#We have seen how the code performs on the full datasets now, lets test it at
#a quarter, half and three quarters of the datasets:

steam_rdd_true_quarter = steam_rdd_true.take(13118)
steam_rdd_true_quarter = sc.parallelize(steam_rdd_true_quarter)
steam_rdd_true_half = steam_rdd_true.take(26236)
steam_rdd_true_half = sc.parallelize(steam_rdd_true_half)
steam_rdd_true_threequarters = steam_rdd_true.take(39354)
steam_rdd_true_threequarters = sc.parallelize(steam_rdd_true_threequarters)

steam_rdd_false_quarter = steam_rdd_false.take(1708)
steam_rdd_false_quarter = sc.parallelize(steam_rdd_false_quarter)
steam_rdd_false_half = steam_rdd_false.take(3416)
steam_rdd_false_half = sc.parallelize(steam_rdd_false_half)
steam_rdd_false_threequarters = steam_rdd_false.take(5124)
steam_rdd_false_threequarters = sc.parallelize(steam_rdd_false_threequarters)

#time_start = time.time()

#time_end = time.time()
#print(f"elapsed time is {time_end-time_start}")

print("Quarter:")

time_start = time.time()

counts_true_quarter = steam_rdd_true_quarter.map(lambda x: wc(x))
counts_false_quarter = steam_rdd_false_quarter.map(lambda x: wc(x))

true_tf_quarter = tf(counts_true_quarter)
false_tf_quarter = tf(counts_false_quarter)

true_idf_quarter = idf(counts_true_quarter)
false_idf_quarter = idf(counts_false_quarter)

true_tfidfi_quarter = tfidfi(true_tf_quarter, true_idf_quarter).takeOrdered(100, key = lambda x: (-x[1], x[0]))
false_tfidfi_quarter = tfidfi(false_tf_quarter, false_idf_quarter).takeOrdered(100, key = lambda x: (-x[1], x[0]))

time_end = time.time()
print(f"elapsed time is (tfidf) {time_end-time_start}")

time_start = time.time()

steam_apriori_true_final_quarter = a_priori(steam_rdd_true_quarter).takeOrdered(20, lambda kv: -kv[1])
steam_apriori_false_final_quarter = a_priori(steam_rdd_false_quarter).takeOrdered(20, lambda kv: -kv[1])

time_end = time.time()
print(f"elapsed time is (market basket) {time_end-time_start}")

Quarter:
elapsed time is (tfidf) 14.157914638519287
elapsed time is (market basket) 36.832727909088135


In [None]:
print("Half:")

time_start = time.time()

counts_true_half = steam_rdd_true_half.map(lambda x: wc(x))
counts_false_half = steam_rdd_false_half.map(lambda x: wc(x))

true_tf_half = tf(counts_true_half)
false_tf_half = tf(counts_false_half)

true_idf_half = idf(counts_true_half)
false_idf_half = idf(counts_false_half)

true_tfidfi_half = tfidfi(true_tf_half, true_idf_half).takeOrdered(100, key = lambda x: (-x[1], x[0]))
false_tfidfi_half = tfidfi(false_tf_half, false_idf_half).takeOrdered(100, key = lambda x: (-x[1], x[0]))

time_end = time.time()
print(f"elapsed time is (tfidf) {time_end-time_start}")

time_start = time.time()

steam_apriori_true_final_half = a_priori(steam_rdd_true_half).takeOrdered(20, lambda kv: -kv[1])
steam_apriori_false_final_half = a_priori(steam_rdd_false_half).takeOrdered(20, lambda kv: -kv[1])

time_end = time.time()
print(f"elapsed time is (market basket) {time_end-time_start}")

Half:
elapsed time is (tfidf) 19.201499938964844
elapsed time is (market basket) 71.8371307849884


In [None]:
print("Three quarters:")

time_start = time.time()

counts_true_threequarters = steam_rdd_true_threequarters.map(lambda x: wc(x))
counts_false_threequarters = steam_rdd_false_threequarters.map(lambda x: wc(x))

true_tf_threequarters = tf(counts_true_threequarters)
false_tf_threequarters = tf(counts_false_threequarters)

true_idf_threequarters = idf(counts_true_threequarters)
false_idf_threequarters = idf(counts_false_threequarters)

true_tfidfi_threequarters = tfidfi(true_tf_threequarters, true_idf_threequarters).takeOrdered(100, key = lambda x: (-x[1], x[0]))
false_tfidfi_threequarters = tfidfi(false_tf_threequarters, false_idf_threequarters).takeOrdered(100, key = lambda x: (-x[1], x[0]))

time_end = time.time()
print(f"elapsed time is (tfidf) {time_end-time_start}")

time_start = time.time()

steam_apriori_true_final_threequarters = a_priori(steam_rdd_true_threequarters).takeOrdered(20, lambda kv: -kv[1])
steam_apriori_false_final_threequarters = a_priori(steam_rdd_false_threequarters).takeOrdered(20, lambda kv: -kv[1])

time_end = time.time()
print(f"elapsed time is (market basket) {time_end-time_start}")

Three quarters:
elapsed time is (tfidf) 26.43730592727661
elapsed time is (market basket) 122.11415076255798


In [None]:
%%writefile pyspark_project.py
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
import json
import gzip
import re
import sys
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords
import math
import time

if len(sys.argv) < 2:
  raise Exception("Input URI required")

sc = pyspark.SparkContext()

steam_rdd = sc.textFile(sys.argv[1])

steam_rdd = steam_rdd.flatMap(lambda x: [(review['recommend'], review['review']) for review in eval(x)['reviews']])

steam_rdd_true = steam_rdd.filter(lambda x: x[0] == True)
steam_rdd_false = steam_rdd.filter(lambda x: x[0] == False)

steam_rdd_true = steam_rdd_true.map(lambda x: x[1])
steam_rdd_false = steam_rdd_false.map(lambda x: x[1])

STOP_WORDS = set(stopwords.words('english'))

def remove_nonletters(word):
  return re.sub(r'[^a-zA-Z]', '', word)

def split_remove_nonletters(line):
  result = []
  for word in line.split(" "):
    removed_token = remove_nonletters(word.lower())
    if removed_token != '':
      result.append((removed_token, 1))
  return result

def wc(review):
  result = {}
  for words in review.split(" "):
    removed_token = remove_nonletters(words.lower())
    if removed_token != '' and removed_token not in STOP_WORDS:
      if removed_token not in result:
        result[removed_token] = 0
      result[removed_token] += 1
  return result

time_start = time.time()

counts_true = steam_rdd_true.map(lambda x: wc(x))
counts_false = steam_rdd_false.map(lambda x: wc(x))

def tf(doc_counts):
  rdd_tuples = doc_counts.map(lambda x: list(x.items()))
  return rdd_tuples.map(lambda x: [(k, (v / max([y[1] for y in x]))) for k, v in x])
true_tf = tf(counts_true)
false_tf = tf(counts_false)

def idf(count_rdds):
  rdd_tuples = count_rdds.map(lambda x: list(x.items()))
  n = rdd_tuples.count()
  combined = rdd_tuples.flatMap(lambda x: x)
  combined_tuple = combined.map(lambda x: (x[0], 1))
  combined_reduced = combined_tuple.reduceByKey(lambda x, y: x+y)
  final = combined_reduced.map(lambda x: (x[0], math.log(n/x[1], 2)))
  return final

true_idf = idf(counts_true)
false_idf = idf(counts_false)

def tfidfi(tfi, idf):
  tfi = tfi.flatMap(lambda x: x)
  joined = tfi.join(idf)
  multiply = joined.map(lambda x: (x[0], x[1][0]*x[1][1])).reduceByKey(lambda x, y: x + y)
  #unioned = sc.union(to_union)
  #reduced = unioned.reduceByKey(lambda x, y: x*y)
 # reduced = reduced.repartition(1)
  return multiply

true_tfidfi = tfidfi(true_tf, true_idf).takeOrdered(100, key = lambda x: (-x[1], x[0]))
false_tfidfi = tfidfi(false_tf, false_idf).takeOrdered(100, key = lambda x: (-x[1], x[0]))

time_end = time.time()
print(f"elapsed time is (tfidf) {time_end-time_start}")

def split_remove_spaces(line):
  result = []
  for word in line.split(" "):
    removed_token = remove_nonletters(word.lower())
    if removed_token != '' and removed_token not in STOP_WORDS:
      result.append(removed_token)
  return result

def helper(string):
  split = split_remove_spaces(string)
  pairs = []
  for i in range(len(split)):
    for j in range(i+1, len(split)):
      pairs.append(tuple(sorted((split[i], split[j]))))
  return pairs

def a_priori_step1(text_file_rdd):
  mapped = text_file_rdd.map(lambda x: split_remove_spaces(x)).flatMap(lambda x: x)
  mapped = mapped.map(lambda x: (x, 1))
  reduced = mapped.reduceByKey(lambda x, y: x + y)
  return reduced

steam_apriori_true = a_priori_step1(steam_rdd_true).takeOrdered(20, lambda kv: -kv[1])
steam_apriori_false = a_priori_step1(steam_rdd_false).takeOrdered(20, lambda kv: -kv[1])

def a_priori(text_file_rdd, support=100):
  step1 = a_priori_step1(text_file_rdd)
  filtered = step1.filter(lambda x: x[1] >= support)
  to_broadcast = filtered.collectAsMap()
  broadcasted = sc.broadcast(to_broadcast)

  mapped = text_file_rdd.map(lambda x: helper(x)).map(lambda x: [pair for pair in x if all(word in broadcasted.value for word in pair)])
  mapped = mapped.flatMap(lambda x: x)
  mapped = mapped.map(lambda x: (x, 1))
  reduced = mapped.reduceByKey(lambda x, y: x + y)
  return reduced

time_start = time.time()

steam_apriori_true_final = a_priori(steam_rdd_true).takeOrdered(20, lambda kv: -kv[1])
print(steam_apriori_true_final)

steam_apriori_false_final = a_priori(steam_rdd_false).takeOrdered(20, lambda kv: -kv[1])
print(steam_apriori_false_final)

time_end = time.time()
print(f"elapsed time is (market basket) {time_end-time_start}")

Writing pyspark_project.py


In [None]:
USERNAME="dcollett"
%env REGION=australia-southeast1
%env ZONE=australia-southeast1-a
%env PROJECT=data301-2023-$USERNAME
%env CLUSTER=data301-2023-$USERNAME-project-cluster
%env BUCKET=data301-2023-$USERNAME-project-bucket

env: REGION=australia-southeast1
env: ZONE=australia-southeast1-a
env: PROJECT=data301-2023-dcollett
env: CLUSTER=data301-2023-dcollett-project-cluster
env: BUCKET=data301-2023-dcollett-project-bucket


In [None]:
!python3 -m pip install google-cloud-dataproc[libcst]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting google-cloud-dataproc[libcst]
  Downloading google_cloud_dataproc-5.4.1-py2.py3-none-any.whl (307 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.5/307.5 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
Collecting grpc-google-iam-v1<1.0.0dev,>=0.12.4 (from google-cloud-dataproc[libcst])
  Downloading grpc_google_iam_v1-0.12.6-py2.py3-none-any.whl (26 kB)
Installing collected packages: grpc-google-iam-v1, google-cloud-dataproc
Successfully installed google-cloud-dataproc-5.4.1 grpc-google-iam-v1-0.12.6


In [None]:
!gcloud auth login

Go to the following link in your browser:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=32555940559.apps.googleusercontent.com&redirect_uri=https%3A%2F%2Fsdk.cloud.google.com%2Fauthcode.html&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fappengine.admin+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=nnQ0DJL6GKWpEx4XmFA6MPYRKn7DzU&prompt=consent&access_type=offline&code_challenge=2h3GEJYMzWBZ0FZWZSwyvPGRcqIiy7a8s7dWlwEg1Zk&code_challenge_method=S256

Enter authorization code: 4/0AbUR2VN-vQBJC0NQeNL-bW3UBYNIRLq0vSKkbh-RHDLjm7jciWZ3ywSx8rcVnBkezFCzwg

You are now logged in as [khoretto@gmail.com].
Your current project is [None].  You can change this setting by running:
  $ gcloud config set project PROJECT_ID


In [None]:
!gcloud config set project $PROJECT

Updated property [core/project].


In [None]:
!gcloud services enable dataproc.googleapis.com cloudresourcemanager.googleapis.com

Operation "operations/acat.p2-891767847038-5eb6f2fc-e560-4430-addf-7498c1e7a7e7" finished successfully.


In [None]:
!gsutil mb -c regional -l $REGION -p $PROJECT gs://$BUCKET

Creating gs://data301-2023-dcollett-project-bucket/...
ServiceException: 409 A Cloud Storage bucket named 'data301-2023-dcollett-project-bucket' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [None]:
!gcloud storage cp ./steam.gz gs://$BUCKET

Copying file://./steam.gz to gs://data301-2023-dcollett-project-bucket/steam.gz


In [None]:
!gcloud storage cp ./nltk.zip gs://$BUCKET

Copying file://./nltk.zip to gs://data301-2023-dcollett-project-bucket/nltk.zip


In [None]:
!gcloud dataproc clusters create $CLUSTER --region=$REGION --bucket=$BUCKET --zone=$ZONE \
--master-machine-type=n1-standard-2 --worker-machine-type=n1-standard-1 \
--image-version=1.5 --max-age=30m --num-masters=1 --num-workers=2

Waiting on operation [projects/data301-2023-dcollett/regions/australia-southeast1/operations/80f6c501-e0e2-310b-9e1f-e8ba74720365].

Created [https://dataproc.googleapis.com/v1/projects/data301-2023-dcollett/regions/australia-southeast1/clusters/data301-2023-dcollett-project-cluster] Cluster placed in zone [australia-southeast1-a].


In [None]:
!pip install nltk

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[31mERROR: Operation cancelled by user[0m[31m
[0mTraceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pip/_vendor/pkg_resources/__init__.py", line 3108, in _dep_map
    return self.__dep_map
  File "/usr/local/lib/python3.10/dist-packages/pip/_vendor/pkg_resources/__init__.py", line 2901, in __getattr__
    raise AttributeError(attr)
AttributeError: _DistInfoDistribution__dep_map

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pip/_internal/cli/base_command.py", line 169, in exc_logging_wrapper
    status = run_func(*args)
  File "/usr/local/lib/python3.10/dist-packages/pip/_internal/cli/req_command.py", line 242, in wrapper
    return func(self, options, args)
  File "/usr/local/lib/python3.10/dist-packages/pip/_internal/commands/install.py", line 441

In [None]:
!gcloud dataproc jobs submit pyspark --cluster=$CLUSTER --region=$REGION pyspark_project.py -- gs://$BUCKET/steam.gz

Job [4e884a235b934390bb8388bb97a55671] submitted.
Waiting for job output...
Traceback (most recent call last):
  File "/tmp/4e884a235b934390bb8388bb97a55671/pyspark_project.py", line 8, in <module>
    import nltk
ModuleNotFoundError: No module named 'nltk'


Command killed by keyboard interrupt

^C


In [None]:
!gcloud dataproc jobs submit pyspark --cluster=$CLUSTER --region=$REGION --py-files gs://$BUCKET/nltk.zip pyspark_project.py -- gs://$BUCKET/steam.gz

Job [16cc74d715414c25b1f5c2ba3f41c0a8] submitted.
Waiting for job output...
Traceback (most recent call last):
  File "/tmp/16cc74d715414c25b1f5c2ba3f41c0a8/pyspark_project.py", line 9, in <module>
    nltk.download('stopwords')
AttributeError: module 'nltk' has no attribute 'download'


Command killed by keyboard interrupt

^C
