### ENSF 619 - Python Code

Aaron Leung <br>
Andy Wu <br>
Tanner Litwin

### Connecting to the University of Calgary Spark Cluster

In [None]:
import os
import atexit
import sys

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=3
tasks_per_node=8 
memory_per_task=1024 #1 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="60:00" #60 min 
os.environ['SBATCH_PARTITION']='single' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)

### Filtering of Twitter Data By Location

In [None]:
# Reading in full dataset and filtering by location (US and Canada) then writing back out to JSON Files
# so we don't have to read in the full 50+ GB file each time

print("Reading in Full Twitter Dataset")
full_tweets = sqlCtx.read.json("./Twitter_Dataset.json")

print("Done Reading in Dataset")
print ("Total Number of Tweets: ", full_tweets.count())

print("Starting Location Filtering")
loc_filtered_tweets = full_tweets.filter(full_tweets['place']['country_code'].isin(['CA','US']))

print("Finished Location Filtering")
print("Number of Tweets Remaining: ", loc_filtered_tweets.count())

print("Writing to JSON")
loc_filtered_tweets.write.json("LocationFilteredTweets")

print("Done Writing to JSON")

In [None]:
# Further Filtering Down to Alberta Only Tweets for Classification

loc_filtered_tweets = loc_filtered_tweets.filter(full_tweets['place']['full_name'].contains('Alberta'))

print("Number of Tweets Remaining: ", loc_filtered_tweets.count())

print("Writing to JSON")
loc_filtered_tweets.write.json("AlbertaLocationFilteredTweets10")

print("Done Writing to JSON")

### Extraction of n-grams from Yelp to build Regular Expressions

In [None]:
# Reading in Yelp Businesses JSON File as a PySpark Dataframe Object

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('aggs').getOrCreate()

print("Reading in Business JSON File")
yelp_business = sqlCtx.read.json("business.json")
yelp_business.createOrReplaceTempView("business")

print("Number of Yelp Businesses: ", yelp_business.count())

In [None]:
# Filtering out to only food related businesses

yelp_foods = yelp_business.filter(yelp_business["categories"].contains("Restaurants") \
                                 | yelp_business["categories"].contains("Food"))

print("Food Related Businesses: ", yelp_foods.count())

In [None]:
# Reading in Yelp Reviews of all businesses

yelp_reviews = sqlCtx.read.json("./review.json")
yelp_reviews.createOrReplaceTempView("reviews")

print("Number of Yelp Reviews: ", yelp_reviews.count())

In [None]:
# Inner Join on the two dataframes to get all reviews of food related restaurants
# Renaming 2 columns so they don't conflict with the other dataframe when joining

yelp_reviews = yelp_reviews.withColumnRenamed("business_id", "review_business_id") \
                .withColumnRenamed("stars", "review_stars")

food_reviews = yelp_foods.join(yelp_reviews, yelp_foods.business_id == \
                              yelp_reviews.review_business_id, how="inner")

print(food_reviews.count())

In [None]:
# Test to see if Reviews match the restaurants

for item in food_reviews.select("business_id", "review_business_id").take(10):
    print(item["business_id"], " ", item["review_business_id"], "\n")

In [None]:
# Creating PySpark DataFrame of all the text reviews then turning it into RDD

only_reviews = food_reviews.select("text").rdd

# Comes back as RDD but the object is a PySpark Row Object.
# To Access String, access ["text"] of the row object
print(type(only_reviews))
print(type(only_reviews.first()))
print(type(only_reviews.first()["text"]))

In [None]:
#Creating Bi-Grams from the reviews

from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
import string

stop_words = set(stopwords.words("english"))

treated_review = only_reviews.map(lambda line: line["text"].replace("\n", " ").strip().lower())
treated_review = treated_review.map(lambda line: "".join([x for x in line if x not in string.punctuation]))
treated_review = treated_review.map(lambda line: line.strip().split(" "))
treated_review = treated_review.map(lambda wl: [w for w in wl if len(w) > 0])
treated_review = treated_review.map(lambda wl: [w for w in wl if w not in stop_words])

bi_grams = treated_review.flatMap(lambda low: [(low[x],low[x+1]) for x in range(0, len(low)-1)])
tri_grams = treated_review.flatMap(lambda low: [(low[x-1],low[x],low[x+1]) for x in range (1, len(low)-1)])

In [None]:
# Counting up the bi-grams and taking the top 1000 bi-grams

bg_count = bi_grams.map(lambda bg: (bg,1)).reduceByKey(lambda i,j: i+j)
ordered_bg = bg_count.takeOrdered(1000, lambda x: -1*x[1])

In [None]:
# Writing the Bi-grams out to a text file

file_out = open("bi_gram_counts.txt", "w")

print("Start Writing to File...")

for bg in ordered_bg:
    file_out.write(bg[0][0] + " " + bg[0][1] + " " + str(bg[1]) + "\n")
    
print("Done Writing")
file_out.close()

### Filtering of Twitter Data Based on Context (Keywords/REGEX)

In [None]:
# REGEX Expressions to Filter Tweets

import re

def context_filter(tweet):
    
    first_pattern = "(food poison|foodpoison)+"
    sec_pattern = ('(inedible|great|terrible|good|fast|mexican|amazing|delicious|chinese|thai|excellent|order|gross|'
    'indian|love|awesome|amaze|italian|excellent|fantastic|disgust|incredible|perfect|super|unbelievable|stellar|remarkable|'
    'outstanding|bad|nasty|sour|tasteless|rancid|stale|appetizing|appealing|bake|bland|burnt|cold|deep fried|delicious|'
    'edible|flavor|fresh|frozen|greasy|grill|prepared|rotten|taste|yummy|quality|ok|sick|puke|eat|ate|ingest|swallow|raw|pink|'
    'vomit|cook)+.*(food|restaurant|drink|dine|dining)+')  
    
    third_pattern = ('(food|restaurant|drink|dine|dining)+.*(court|inedible|great|terrible|good|fast|mexican|amazing|gross|'
    'delicious|chinese|thai|excellent|order|indian|love|awesome|amaze|italian|excellent|fantastic|incredible|perfect|super|'
    'unbelievable|stellar|remarkable|outstanding|bad|nasty|sour|tasteless|rancid|stale|appetizing|appealing|bake|bland|'
    'burnt|cold|deep fried|delicious|edible|flavor|disgust|fresh|frozen|greasy|sick|puke|vomit|eat|ate|ingest|swallow|raw|pink|'
    'grill|prepared|rotten|taste|yummy|quality|ok|cook)+')
    
    fourth_pattern = ('(pizza|burger|sushi|steak|barbecue|bbq|salad|taco|fajita|sandwich|cereal|popcorn|burrito|smores|'
    'chimichangas|soup|pepperoni|bratwurst|meatball|fries|cookie|macaroni|nacho|cake|biscuit|gravy|fruit|vegetable)+')
    
    text = tweet["text"].lower()
    
    for num in range(1,5):
        if num == 1:
            result = re.search(first_pattern,text)
        elif num == 2:
            result = re.search(sec_pattern,text)
        elif num == 3:
            result = re.search(third_pattern,text)
        else:
            result = re.search(fourth_pattern,text)
            
        if result != None:
            return True
        
    return False

In [None]:
# Reading the US/Canada only tweets back into Spark and converting it into an RDD
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('aggs').getOrCreate()

print("Staring to read in JSON Files..")
tweets = sqlCtx.read.json("./LocationFilteredTweets2")
tweets.createOrReplaceTempView("loc_filtered")

print("Done Reading in JSON Files..")

tweets_rdd = tweets.rdd

# Filtering the Tweets using RDD operations

context_filtered = tweets_rdd.filter(lambda rowobj: context_filter(rowobj))
context_filtered = context_filtered.filter(lambda rowobj: rowobj["retweet_count"] < 10)
filtered = context_filtered.map(lambda x: (x["text"],x["place"]["full_name"],x["place"]["bounding_box"]["coordinates"]))
                                

print("Filtering Process Completed..")


### Sentiment Analysis + Formatting For LDA

In [None]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.tokenize import TweetTokenizer
import string

tweetTknzer = TweetTokenizer()
sentAnalyzer = SentimentIntensityAnalyzer()

In [None]:
text_no_links = filtered.map(lambda x: (" ".join([w for w in x[0].split(" ") if w.startswith("https://") == False]),\
                                        x[0],x[1],x[2])) 

sentAnalyzed = text_no_links.map(lambda x: (sentAnalyzer.polarity_scores(x[0]),x[1],x[2],x[3]))

In [None]:
def lat_long (coord):
    lat = (coord[0][1] + coord[1][1])/2
    long = (coord[0][0] + coord[2][0])/2
    return (lat,long)

final_out = sentAnalyzed.map(lambda x: (x[1],x[0]['compound'],x[2],lat_long(x[3][0])))
final_out = final_out.map(lambda x: (x[0].replace("\n", " "),x[1],x[2],x[3]))
final_out = final_out.map(lambda x: (x[0].replace("\t", " "),x[1],x[2],x[3]))
final_out = final_out.map(lambda x: (x[0].replace('"', ''),x[1],x[2],x[3]))

In [None]:
positive_sent = final_out.filter(lambda x: x[1] >= 0.05)
neutral_sent = final_out.filter(lambda x: x[1] > -0.05 and x[1] < 0.05)
negative_sent = final_out.filter(lambda x: x[1] <= -0.05)

### Writing the Tweets Out to Individual Files For LDA

In [None]:
def toCSVLine(data):
    return '\t'.join(str(d) for d in data)

fout = open("Twitter_31_Info", "w")
header = "Tweet31_"


counter = 0

print("Writing Positive Tweets\n")
for tweet in positive_sent.collect():
    string = header + "positive_" + str(counter) + "\t" + tweet[0] + "\t" + str(tweet[1]) + "\t" + str(tweet[2]) + "\t" + str(tweet[3]) + "\n"
    fout.write(string)
    counter += 1

print("Done Writing Positive Tweets\n")

print("Writing Neutral Tweets\n")

for tweet in neutral_sent.collect():
    string = "Neutral Stuff \n"
    fout.write(string)
    counter += 1

print("Done Writing Neutral Tweets\n")

print("Writing Negative Tweets\n")
for tweet in negative_sent.collect():
    string = header + "negative_" + str(counter) + "\t" + tweet[0] + "\t" + str(tweet[1]) + "\t" + str(tweet[2]) + "\t" + str(tweet[3]) + "\n"
    fout.write(string)
    counter += 1 

print("Done Writing Negative Tweets\n")

### Context Filtering + Information Extraction + Formatting For Classification

In [None]:
# Reading in the Alberta Location Filtered Tweets. 
# Context Filtering is the same as above for LDA (Same REGEX Statements)
# The information outputted here is different than that of LDA

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('aggs').getOrCreate()

print("Staring to read in JSON Files..")
tweets = sqlCtx.read.json("./AlbertaLocationFilteredTweets10")
tweets.createOrReplaceTempView("loc_filtered")

print("Done Reading in JSON Files..")

tweets_rdd = tweets.rdd

# Filtering the Tweets using RDD operations

context_filtered = tweets_rdd.filter(lambda rowobj: context_filter(rowobj))
context_filtered = context_filtered.filter(lambda rowobj: rowobj["retweet_count"] < 10)
filtered = context_filtered.map(lambda x: (x["text"],x["place"]["full_name"],x["place"]["bounding_box"]["coordinates"],\
                                          x["created_at"],x["place"]["place_type"]))
                                
print("Filtering Process Completed..")

In [None]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.tokenize import TweetTokenizer
import string

tweetTknzer = TweetTokenizer()
sentAnalyzer = SentimentIntensityAnalyzer()

In [None]:
text_no_links = filtered.map(lambda x: (" ".join([w for w in x[0].split(" ") if w.startswith("https://") == False]),\
                                        x[0],x[1],x[2],x[3],x[4])) 

sentAnalyzed = text_no_links.map(lambda x: (sentAnalyzer.polarity_scores(x[0]),x[1],x[2],x[3],x[4],x[5]))

In [None]:
def lat_long (coord):
    lat = (coord[0][1] + coord[1][1])/2
    long = (coord[0][0] + coord[2][0])/2
    return (lat,long)

def date (date_time):
    split = date_time.split(" ")
    date = str(split[1]) + str(split[2]) + str(split[5])
    return date

def time(date_time):
    split = date_time.split(" ")
    time = str(split[3])
    return time

final_out = sentAnalyzed.map(lambda x: (x[1],x[0]['compound'],x[2],lat_long(x[3][0]),date(x[4]),time(x[4]),x[5]))

final_out = final_out.map(lambda x: (x[0].replace("\n", " "),x[1],x[2],x[3],x[4],x[5],x[6]))
final_out = final_out.map(lambda x: (x[0].replace("\t", " "),x[1],x[2],x[3],x[4],x[5],x[6]))
final_out = final_out.map(lambda x: (x[0].replace('"', ''),x[1],x[2],x[3],x[4],x[5],x[6]))

positive_sent = final_out.filter(lambda x: x[1] >= 0.05)
neutral_sent = final_out.filter(lambda x: x[1] > -0.05 and x[1] < 0.05)
negative_sent = final_out.filter(lambda x: x[1] <= -0.05)

### Writing Tweet, Compound Score, Location, Coordinates, Date, Time, Place Type into a Tab Separated Value File

In [None]:
def toCSVLine(data):
    return '\t'.join(str(d) for d in data)

tsv_fout = open("AB_Twitter_10_TSV", "w")
header = "Tweet10_"

indv_file_path = "./AB_Tweets_10/Tweet10_"

counter = 0

print("Writing Positive Tweets\n")

for tweet in positive_sent.collect():
    
    string = header + "positive_" + str(counter) + "\t" + tweet[0] + "\t" + str(tweet[1]) + "\t" + str(tweet[2]) + "\t"\
    + str(tweet[3]) + "\t" + str(tweet[4]) + "\t" + str(tweet[5]) + "\t" + str(tweet[6]) + "\n"
    tsv_fout.write(string)
    
    indv_tweet = open(indv_file_path + "positive_" + str(counter), "w")
    indv_tweet.write(tweet[0])
    indv_tweet.close()
    
    counter += 1

print("Done Writing Positive Tweets\n")

print("Writing Neutral Tweets\n")

for tweet in neutral_sent.collect():
    
    string = header + "neutral_" + str(counter) + "\t" + tweet[0] + "\t" + str(tweet[1]) + "\t" + str(tweet[2]) + "\t"\
    + str(tweet[3]) + "\t" + str(tweet[4]) + "\t" + str(tweet[5]) + "\t" + str(tweet[6]) + "\n"
    tsv_fout.write(string)
    
    indv_tweet = open(indv_file_path + "neutral_" + str(counter), "w")
    indv_tweet.write(tweet[0])
    indv_tweet.close()
    
    counter += 1

print("Done Writing Neutral Tweets\n")

print("Writing Negative Tweets\n")

for tweet in negative_sent.collect():
    
    string = header + "negative_" + str(counter) + "\t" + tweet[0] + "\t" + str(tweet[1]) + "\t" + str(tweet[2]) + "\t"\
    + str(tweet[3]) + "\t" + str(tweet[4]) + "\t" + str(tweet[5]) + "\t" + str(tweet[6]) + "\n"
    tsv_fout.write(string)
    
    indv_tweet = open(indv_file_path + "negative_" + str(counter), "w")
    indv_tweet.write(tweet[0])
    indv_tweet.close()
    
    counter += 1

print("Done Writing Negative Tweets\n")

tsv_fout.close()