In [4]:
import json
import math
import re
from pyspark import SparkConf, SparkContext as sc

In [2]:
frequent_word_threshold=1000
WORD_RE = re.compile(r'\b[\w]+\b') 
def convert_dict_to_tuples(d):
        text = d['text']
        rating = d['stars']
        tokens = WORD_RE.findall(text)
        tuples = []
        for w in tokens:
                tuples.append((rating, w))
        return tuples


In [5]:
input_file=sc.textFile("/scratch/siads618f22_class_root/siads618f22_class/shared_data/yelp_academic_dataset_review.json")

TypeError: textFile() missing 1 required positional argument: 'name'

In [18]:
input_file

/scratch/siads618f22_class_root/siads618f22_class/shared_data/yelp_academic_dataset_review.json MapPartitionsRDD[4] at textFile at DirectMethodHandleAccessor.java:104

In [15]:
# convert each json review into a dictionary
step_1a = input_file.map(lambda line: json.loads(line))

# convert a review's dictionary to a list of (rating, word) tuples
step_1b = step_1a.flatMap(lambda x : convert_dict_to_tuples(x))

In [16]:
step_1a

PythonRDD[5] at RDD at PythonRDD.scala:53

In [17]:
# count all words from all reviews
step_2a2 = step_1b.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)

# filter out all word-tuples from positive reviews
step_2b1=step_1b.filter(lambda x:x[0]>=5)

# count all words from positive reviews
step_2b2 = step_2b1.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)

Py4JJavaError: An error occurred while calling o32.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/scratch/siads618f22_class_root/siads618f22_class/shared_data/yelp_academic_dataset_review.json
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: java.io.IOException: Input path does not exist: file:/scratch/siads618f22_class_root/siads618f22_class/shared_data/yelp_academic_dataset_review.json
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 23 more


In [6]:
# lets see how our generated data looks like
step_2a2.take(10)
# note the time it takes to run this. Why? 

[('family', 358702),
 ('horrible', 116467),
 ('exactly', 107789),
 ('knowledgeable', 118494),
 ('sandwich', 411494),
 ('10', 458795),
 ('ton', 44869),
 ('pint', 11399),
 ('step', 47057),
 ('something', 502941)]

In [7]:
# filter out all word-tuples from negative reviews
step_2c1 = step_1b.filter(lambda x: x[0] <= 2)

# count all words from negative reviews
step_2c2=step_2c1.map(lambda x:(x[1],1)).reduceByKey(lambda a,b:a+b)

# get total word count for all, positive, and negative reviews
all_review_word_count = step_2a2.map(lambda x: x[1]).sum()
pos_review_word_count = step_2b2.map(lambda x:x[1]).sum()
neg_review_word_count = step_2c2.map(lambda x:x[1]).sum()

In [8]:
# filter to keep only frequent words, i.e. those with
# count greater than frequent_word_threshold.
freq_words=step_2a2.filter(lambda x:x[1]>frequent_word_threshold).cache()
# filter to keep only those word count tuples whose word can
# be found in the frequent list
step_3pos=freq_words.join(step_2b2)
step_3neg=freq_words.join(step_2c2)


In [9]:
# compute the log ratio score for each positive review word
unsorted_positive_words = step_3pos.map(lambda x: (x[0], math.log(float(x[1][1])/pos_review_word_count ) - math.log(float(x[1][0])/all_review_word_count)))
# sort by descending score to get the top-scoring positive words
sorted_positive_words = unsorted_positive_words.sortBy(lambda x: x[1], ascending = False)

# compute the log ratio score for each negative review word
unsorted_negative_words = step_3neg.map(lambda x:(x[0],math.log(float(x[1][1])/neg_review_word_count) - math.log(float(x[1][0])/all_review_word_count)))
# sort by descending score to get the top-scoring negative words
sorted_negative_words = unsorted_negative_words.sortBy(lambda x: x[1], ascending = False)

In [12]:
# make sure to change the folder below to point to your folder under our class scratch folder
# write out the top-scoring positive words to a text file
sorted_positive_words.saveAsTextFile("/scratch/siads618f22_class_root/siads618f22_class/cbudak/yelp_positive_words_output")
# write out the top-scoring negative words to a text file
sorted_negative_words.saveAsTextFile("/scratch/siads618f22_class_root/siads618f22_class/cbudak/yelp_negative_words_output")

#Rerun the same cell. What happens?