# Assignment 2 - Henry Woodyard

In the following two sections, I import necessary libraries and create useful functions. Please run these before proceeding to Question 1.

In [1]:
from pyspark.sql import SparkSession
from functools import reduce
import string
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

sc = spark.sparkContext

In [2]:
def GetAttributeValue(line, attribute, punct = False, spaces = False, numbers = True, returnNA = True):
    """
    Given a line of text in xml format and the name of an attribute, 
    this returns the value associated with the attribute.
    
    Arguments:
    line:      Line of XML text
    attribute: Attribute from line of XML text. e.g., "DisplayName"
    punct:     If false, removes punctuation
    spaces:    If true, and if punct is false, replaces punctuation with ' ' rather than ''
    numbers:   If true, keep numbers. Otherwise remove them.
    returnNA:  If an attribute cannot be found and this is true, returns the string 'NA'. Otherwise
               returns 0.
    """
    
    valueloc = [0,0]
    
    attrloc = line.find(attribute)
    if attrloc < 0:
        if returnNA:
            return 'NA'
        return 0
    
    valueloc[0] = attrloc + len(attribute) + 1
    valueloc[1] = valueloc[0] + line[valueloc[0] + 1:].find('\"')
    
    value = line[(valueloc[0] + 1):(valueloc[1] + 1)]
    
    if not numbers:
        value = value.translate(str.maketrans('0123456789', ' '*10))
    
    if punct:
        return value.lower()
    
    if spaces:
        value_nopunc = value.translate(str.maketrans(string.punctuation, ' '*len(string.punctuation)))
        return value_nopunc.lower()
    
    value_nopunc = value.translate(str.maketrans('', '', string.punctuation))
    
    return value_nopunc.lower()



def IsFromGeorgia(location):
    """
    Takes a location string as input and returns a bool for whether or not the location refers to 
    the Georgia, USA. Returns false if one of the ten most populous cities in the country of Georgia
    are in the string.
    """
    
    
    location = ' ' + location + ' '
    
    included = ['georgia', 'ga', 'atlanta']
    excluded = ['tbilisi', 'kutaisi', 'batumi', 'rustavi', 'sukhumi', 'zugdidi',
                'gori', 'poti', 'tskhinvali', 'samtredia']
  

    foundIncluded = list(filter(lambda i: location.find(' ' + i + ' ') >= 0, included))
    foundExcluded = list(filter(lambda i: location.find(' ' + i + ' ') >= 0, excluded))
    
    includeBool = len(foundIncluded) > 0
    excludeBool = len(foundExcluded) > 0

    if includeBool and not excludeBool:
        return True
    

    
def GetYear(date):
    """
    Takes date string as input and returns year
    """
    
    try:
        return date[0:4]
    except:
        return 0
    
    
def removeXMLFormatting(text):
    """
    Takes text string as input and removes common XML formatting
    """
    
    removalStrings = ['&#xd;', '&#xD;', '&#xa;', '&#xA;', '%watermark;', '&quot;', '&gt;', '&amp;',
                      '&lt;br', '&lt;']
    
    cleanText = text
    for removal in removalStrings:
        cleanText = cleanText.replace(removal, '')

    return cleanText


# Question 1

Question Text: "From the Users.xml file, find all users which are from Georgia and output to screen their DisplayName only."


In [3]:
# Load user data
userdata = sc.textFile("Users.xml")

# Filter using the IsFromGeorgia classification function
georgiaUsers = userdata.filter(lambda line: IsFromGeorgia(GetAttributeValue(line, 'Location')))

# Print the names of users from the state of Georgia
for line in georgiaUsers.map(lambda line: GetAttributeValue(line, 'DisplayName', punct=True)).collect():
    print(line)

tony boyles
pkerl
nick larsen
gfritz
aleksandr blekh
michael
ayush
azoorob
ontek
aravind r. yarram
ilya
vkb
daisuke aramaki
alexander stocko
tempusfugit
henry crutcher
goddard
matt simpson
peter woolfitt
matt biskup
jason w
peter mourfield
magsol
bob baxley
badjr
mplunney
yc hu
ryan
patrick gerbes
ilya lapitan
dan anton
pradyumnad
dato janez
psidom
teresa madsen
neuromeda
brandon
jpm
mr. rooter of savannah
mr. rooter of southeast ga
khiem ha
jenna kwon
ahmet cecen
jeff
guy gordon
c3theo
niru dyogi
vinitha palani
mac18
andrew
aditya gogoi
gh4x
turtlemonvh
lewis rodgers
tarun luthra
jesse scherer
devendra lattu
cosmosa
todd dawson
ironv
mboolean
jimd
melinda weathers
n.c.w.
david f
psinf
chirag
sandeep gunda
hellofanengineer
pavel komarov
will gao
oriol mirosa
andrew king
david
rajb245
sealander
afshin
ashish powani
boris n.
atul kaushik
harnoor singh
vincent
tiago cogumbreiro
cbarrick
len greski
red_eight
pebkac
christoph
bryce
david hofmann
nburn42
donlan
nick m
kiran
barclayk
zer0k
dp

# Question 2

Question Text: "Using the Users.xml file, provide the count for all users which joined (CreationDate) in 2017. Output this to the screen."

In [4]:
# Filters the RDD to only elements with CreationDates of 2017
madeIn2017 = userdata.filter(lambda line: GetYear(GetAttributeValue(line, 'CreationDate', punct = True)) == '2017')

# Gets the names of the users in the filtered RDD
users2017 = madeIn2017.map(lambda line: GetAttributeValue(line, 'DisplayName', punct = True))

print("A total of", users2017.count(), "users joined in 2017.")
    


A total of 14239 users joined in 2017.


# Question 3

Question Text: "Using the PostHistory file, count the number of Posts that feature the words 'Spark' and 'Scala'. Output this to the screen. (20 points)"


In [5]:
# Import post history
posts = sc.textFile("PostHistory.xml")

# Remove common XML formatting from text. This helps identification of words.
posts = posts.map(lambda line: removeXMLFormatting(line))

# Require posts to have the words spark and scala
sparkScalaMentioned_NoSpace = posts.filter(lambda line: GetAttributeValue(line, 'Text').find('spark') > 0 
                                                and GetAttributeValue(line, 'Text').find('scala') > 0)

# Require posts to have the words spark or scala
sparkScalaMentioned_Space = posts.filter(lambda line: GetAttributeValue(line, 'Text').find(' spark ') > 0 
                                                and GetAttributeValue(line, 'Text').find(' scala ') > 0)

print("In both of the following methods, I do not subset by PostHistoryTypeId. This means that this methods \
searches over all elements in the PostHistory file, including titles, tags, etc.\n")

print("A total of", sparkScalaMentioned_Space.count(), "posts were found including the words 'spark' and 'scala' using a method \
requiring that the words be surrounded by spaces.\n")

print("A total of", sparkScalaMentioned_NoSpace.count(), "posts were found including the words 'spark' and 'scala' using a method \
that does not require that the words be surrounded by spaces. this could lead to words like \
'sparkling' and 'scalable' being counted, and is probably inflated.")

In both of the following methods, I do not subset by PostHistoryTypeId. This means that this methods searches over all elements in the PostHistory file, including titles, tags, etc.

A total of 72 posts were found including the words 'spark' and 'scala' using a method requiring that the words be surrounded by spaces.

A total of 202 posts were found including the words 'spark' and 'scala' using a method that does not require that the words be surrounded by spaces. this could lead to words like 'sparkling' and 'scalable' being counted, and is probably inflated.


# Question 4

Question Text: "Using the PostHistory file, provide a total count of the words used by each distinct user. In other words, count all words in all posts for each user and display this to screen. You can only identify users by the UserID (30 points). You get 15 bonus points if you get the actual DisplayName of the user"

In [6]:
# Get only UserId and text. Don't filter by any type of PostHistoryTypeId.
userPosts = posts.map(lambda line: (GetAttributeValue(line, 'UserId', punct=True), 
                                    GetAttributeValue(line, 'Text', spaces=True, numbers=False)))

# Split the text for each user into a list of words.
userWords = userPosts.map(lambda pair: (pair[0], list(pair[1].split(" "))))

# Count the length of the list of words for each user.
counts = userWords.map(lambda pair: (pair[0], len(pair[1])))

# Because each user has multiple entries, reduce by key.
summedCounts = counts.reduceByKey(lambda x, y: x + y)

# Merge the DisplayNames from the user data file with the UserIds.
linkedNames = userdata.map(lambda line: (GetAttributeValue(line, 'Id', punct=True), 
                                         GetAttributeValue(line, 'DisplayName', punct=True)))
mergedNames = summedCounts.leftOuterJoin(linkedNames)

print("Displaying the number of words used by user for", mergedNames.count(), "users.")
print("(UserId, (Number of words, DisplayName))")

for line in mergedNames.collect():
    print(line)

Displaying the number of words used by user for 14103 users.
(UserId, (Number of words, DisplayName))
('118', (1002, 'konstantin v. salikhov'))
('132', (1140, 'dvk'))
('199', (377, 'filipe ferminiano'))
('454', (1548, 'tomaskazemekas'))
('655', (81, 'will cairns'))
('471', (23984, 'spacedman'))
('913', (210, 'chrshmmmr'))
('960', (212, 'abdelmawla'))
('984', (4609, 'debasis'))
('1015', (392, 'divya'))
('819', (4481, 'charlie greenbacker'))
('1057', (432, 'mlespiau'))
('728', (399, 'jack twain'))
('924', (51554, 'anony-mousse'))
('1220', (115, 'islam el hosary'))
('1235', (1714, 'user3782364'))
('1271', (146, 'maxim fridental'))
('127', (240, 'rhand'))
('870', (546, 'arundhaj'))
('1386', (574, 'alext'))
('2583', (54, 'stenemo'))
('2648', (155, 'royalstream'))
('2798', (81, 'gleissongraca'))
('776', (354, '0111001101110000'))
('2883', (861, 'brycemcd'))
('2972', (202, 'mynameisjeff'))
('2997', (513, 'jessica mick'))
('3058', (91, 'seraphina'))
('3097', (61, 'user3097'))
('3259', (39, 'ha

# Question 5

Question Text: "GRADUATE STUDENTS: Using the users.xml, comments.xml and PostHistory.xml files, produce a single file that includes the following information: DisplayName, Number of Comments, total Score and Number of posts. This file should have the users (DisplayName) sorted by score, descending from higher to lower. (40 points)"

In [14]:
comments = sc.textFile("Comments.xml")


# Key-value pairs of (UserId, comment score)
userScores = comments.map(lambda line: (GetAttributeValue(line, "UserId"), 
                                        int(GetAttributeValue(line, "Score", returnNA=False))))
# This sums the score over each user
totalUserScore = userScores.reduceByKey(lambda x, y: x + y)



# Key-value pairs of (UserId, 1) for each comment in the file
userComments = comments.map(lambda line: (GetAttributeValue(line, "UserId"), 1))
# Since there is a key-value pair of (UserId, 1) for each comment, upon being reduced
# we get (UserId, N), where N is the number of comments
totalUserComments = userComments.reduceByKey(lambda x, y: x + y)



# Filtering to only have titles of posts rather than body text.
titlesOnly = posts.filter(lambda line: GetAttributeValue(line, 'PostHistoryTypeId') == '1')
# Key-value pairs of (UserId, 1) for each post in the file.
userPosts = titlesOnly.map(lambda line: (GetAttributeValue(line, 'UserId'), 1))
totalUserPosts = userPosts.reduceByKey(lambda x, y: x + y)



# Join the various metrics we're concerned with. Also prints to console the number of observations
# that we're losing.
commentsAndScores = totalUserScore.join(totalUserComments)
print("Merging user score and user comments RDDs.\n",
      "Users in the user comments RDD:", totalUserComments.count(),"\n",
      "Users in the user score RDD:", totalUserScore.count(),"\n",
      "Users in the merged file:", commentsAndScores.count(),"\n")

masterNoNames = commentsAndScores.leftOuterJoin(totalUserPosts)
print("Merging user score and comments RDD with user posts RDD.\n",
      "Users in the user comments and score RDD:", commentsAndScores.count(),"\n",
      "Users in the user posts RDD:", totalUserPosts.count(),"\n",
      "Users in the merged file:", masterNoNames.count(),"\n")

master = masterNoNames.join(linkedNames)
print("Merging comments/score/posts RDD with DisplayNames.\n",
      "Users in the comments/score/posts RDD:", masterNoNames.count(),"\n",
      "Users in the DisplayName RDD:", linkedNames.count(),"\n",
      "Users in the merged file:", master.count(),"\n")



# Rearrange the content of each key-value pair to be in the order specified in the problem.
# It looks messy due to the nesting from the merges, but the resulting data is in the following order:

# DisplayName, number of comments, total score, total posts
masterClean = master.map(lambda line: (line[1][1], line[1][0][0][1], line[1][0][0][0], line[1][0][1]))

# Sort by total comment score and save as text file.
masterCleanSorted = masterClean.sortBy(lambda x: x[2], ascending=False)
print("Writing to text file...")
print("Each line has the following format: (DisplayName, Number of Comments, Total Score, Number of Posts)")

with open('MasterSorted.txt', 'w') as file:
    for element in masterCleanSorted.collect():
        for item in element:
            file.write(str(item) + ", ")
        file.write("\n")

print("Finished writing.")

Merging user score and user comments RDDs.
 Users in the user comments RDD: 8007 
 Users in the user score RDD: 8007 
 Users in the merged file: 8007 

Merging user score and comments RDD with user posts RDD.
 Users in the user comments and score RDD: 8007 
 Users in the user posts RDD: 10532 
 Users in the merged file: 8007 

Merging comments/score/posts RDD with DisplayNames.
 Users in the comments/score/posts RDD: 8007 
 Users in the DisplayName RDD: 66954 
 Users in the merged file: 8008 

Writing to text file...
Each line has the following format: (DisplayName, Number of Comments, Total Score, Number of Posts)
Finished writing.


# Note:

In the above section of code, I merge the score and comments RDD with the user posts RDD with a leftOuterJoin. There are many users with posts but no comments. I decide to do this to remove them from the data, as the resulting None for comment count and score seem to go against what you expect for the rest of the problem.