You can use this notebook to develop your answers. Make sure to look at intermediate results using `take()` for debugging.

In [1]:
import json 

## Load data into RDDs
usersRDD = sc.textFile("datafiles/se_users.json").map(json.loads)
postsRDD = sc.textFile("datafiles/se_posts.json").map(json.loads)
playRDD = sc.textFile("datafiles/play.txt")
logsRDD = sc.textFile("datafiles/NASA_logs_sample.txt")
amazonInputRDD = sc.textFile("datafiles/amazon-ratings.txt")
nobelRDD = sc.textFile("datafiles/prize.json").map(json.loads)
amazonBipartiteRDD = amazonInputRDD.map(lambda x: x.split(" ")).map(lambda x: (x[0], x[1])).distinct()

In [2]:
for t in postsRDD.take(3): print(t)

{'id': 2, 'posttypeid': 1, 'title': 'How can a group track database schema changes?', 'acceptedanswerid': 4, 'parentid': None, 'creationdate': '2011-01-03', 'score': 68, 'viewcount': 11533, 'owneruserid': 7, 'lasteditoruserid': 97, 'tags': '<mysql><version-control><schema>'}
{'id': 3, 'posttypeid': 1, 'title': 'What is an effective way of labeling columns in a database?', 'acceptedanswerid': None, 'parentid': None, 'creationdate': '2011-01-03', 'score': 30, 'viewcount': 1302, 'owneruserid': 17, 'lasteditoruserid': 97, 'tags': '<database-design><erd>'}
{'id': 4, 'posttypeid': 2, 'title': None, 'acceptedanswerid': None, 'parentid': 2, 'creationdate': '2011-01-03', 'score': 46, 'viewcount': None, 'owneruserid': 18, 'lasteditoruserid': 1396, 'tags': None}


Task 1 (0.25): Use filter to find all posts where tags are not null (None in python) and that are tagged 'postgresql-9.4', and then a map so that the output RDD has tuples of the form: (ID, Title, Tags). Note that postsRDD contains dictionaries -- see the contents by running postsRDD.take(10).

In [3]:
def task1(postsRDD):
    res = postsRDD.filter(
        lambda x: x.get("tags")!= None).filter(
        lambda x: "postgresql-9.4" in x.get("tags")).map(
        lambda x: (x.get("id"), x.get("title"), x.get("tags"))
    )
    return res 
a = task1(postsRDD)

In [4]:
for t in a.take(10): print(t)

(89480, 'PostgreSQL timezone setting', '<postgresql><postgresql-9.4>')
(89555, 'Retrieving latest record using DISTINCT ON is slow', '<postgresql><index><performance><postgresql-9.4><query-performance>')
(89746, 'Use result of aggregate in same select?', '<postgresql><postgresql-9.4>')
(89971, 'PostgreSql JSONB SELECT against multiple values', '<postgresql><json><postgresql-9.4>')
(90002, 'PostgreSQL operator uses index but underlying function does not', '<postgresql><index-tuning><json><postgresql-9.4><operator>')
(90360, 'Rely on .pgpass in CREATE USER MAPPING', '<postgresql><postgresql-9.4><foreign-data>')
(95214, 'Working with Materialized View', '<postgresql><materialized-view><postgresql-9.4><pgbouncer>')
(95758, 'PostgreSQL update and delete property from JSONB column', '<postgresql><postgresql-9.4>')
(95778, 'Clarification on UNION ALL of JSONB_EACH result', '<postgresql><postgresql-9.4>')
(45870, 'How to do incremental backup every hour in Postgres?', '<postgresql><backup><win

Task 2 (0.25): Use flatMap on the postsRDD to create an RDD (ID, Tag), listing all the tags for each post as a separate tuple. If a post has no tags, it should not appear in the output RDD.

In [5]:
def task2FlatMapper(dic):
    res = []
    tags = dic.get("tags").replace("<", "").replace(">", " ").split(" ")
    tags.pop()
    for i in tags:
        res.append( (dic.get("id"), i)
        )
    return res
    

def task2(postsRDD):
    return postsRDD.filter(
        lambda x: x.get("tags")!= None).flatMap(
        task2FlatMapper
    )
a = task2(postsRDD)

In [6]:
for t in a.take(10): print(t)

(2, 'mysql')
(2, 'version-control')
(2, 'schema')
(3, 'database-design')
(3, 'erd')
(5, 'nosql')
(5, 'rdbms')
(5, 'database-recommendation')
(6, 'postgresql')
(6, 'replication')


Task 3 (0.25): The goal here is to find the 5 lexicographically smallest tags for each year, for the posts from that year. So the outputRDD should be contain tuples of the form: ('2001', ['tag1', 'tag2', ..., 'tag5']), with 'tag1' < 'tag2' and 'tag5' being smaller (lexicographically) than any other tag for a post from that year. All the five (or fewer for some of the years) tags should be distinct. Use a map followed by reduceByKey for doing this.

In [6]:
def task3MapA(dic):
    # get the year and tags for each row in postsRDD
    year = dic["creationdate"][:4]
    tags = dic.get("tags").replace("<", "").replace(">", " ").split(" ")
    tags.pop()
    res = (year, set(tags)) # change to set for set union in reduceByKey
    return res
def task3MapB(tu):
    # return to list for sorting, limit to top 5
    x = tu[1]
    x = list(x)
    x.sort()
    return (tu[0], x[:5])

def task3(postsRDD):
    return postsRDD.filter(
        lambda x: x.get("tags")!= None).map(
            task3MapA).reduceByKey( # set union to remove duplicates
                lambda v1, v2: v1 | v2).map(task3MapB)
a = task3(postsRDD)

In [7]:
for t in a.take(100): print(t)
# print(len(a.collect()))

('2011', ['access-control', 'active-directory', 'activity-monitor', 'ado.net', 'aggregate'])
('2014', ['access-control', 'acid', 'active-directory', 'activity-monitor', 'address'])
('2015', ['access-control', 'active-directory', 'ado.net', 'aggregate', 'alter-database'])
('2010', ['data-warehouse', 'database-design', 'dbcc', 'export', 'import'])
('2012', ['access-control', 'active-directory', 'activity-monitor', 'address', 'ado.net'])
('2013', ['access-control', 'acid', 'active-directory', 'activity-monitor', 'address'])
('2009', ['career', 'ssas'])


Task 4 (0.25): Use join to join the usersRDD and postsRDD on users.id = owneruserid. The output should be a tuple of the form: (userid, displayname, postid, posttitle). You will need to do several maps to do this. Make sure you look at the structure of the objects with the RDD after the join; it will need to postprocessed using a map to get to the desired output.

In [12]:
def userTup(dic):
    res = (dic["id"] , dic["displayname"])
    return res

def postsTup(dic):
    res = (dic["owneruserid"], (dic["id"], dic["title"]))
    return res

def combTup(tu):
    a = tu[0]
    b= tu[1][0]
    c = tu[1][1][0]
    d = tu[1][1][1]
    return (a, b, c, d)

def task4(usersRDD, postsRDD):
    rdd1 = usersRDD.map(userTup)
    rdd2 = postsRDD.map(postsTup)
    rdd3 = rdd1.join(rdd2)
    return rdd3.map(combTup)

c = task4(usersRDD, postsRDD)

In [13]:
for t in c.take(3): print(t)
# print(len(a.collect()))

a
(-1, 'Community')
(2, 'Geoff Dalgas')
(3, 'balpha')
b
(7, (2, 'How can a group track database schema changes?'))
(17, (3, 'What is an effective way of labeling columns in a database?'))
(18, (4, None))
c
(8, 'ilhan', 2107, 'How to get a users friends names?')
(8, 'ilhan', 6255, 'Should I record ID numbers in a table where I record who look whom profile page')
(8, 'ilhan', 42729, 'Merging two Access tables into one')


Task 5 (0.25): Using the postsRDD, create an RDD where the key is a 2-tuple (title-word, tag), where the former is a word in a title, and the latter is a tag. The value associated with the key should be the number of posts in which the title-word is in the title, and the tag is in the tags for that post. This will require a couple of flatMaps (to separate tags into individual tag values as well as to separate the title into its words) and an aggregateByKey to count.

In [None]:
# find title word tag first using previous questions
# find num posts separately 
# combine

def task5(postsRDD):
    # (title-word, tag) : num posts where title-word in title
    
    return dummyrdd

In [92]:
def task5MapA(dic):
    res = []
    # get the title word and tags for each row in postsRDD
    postid = dic.get("id")
    words = dic.get("title").split(" ")
    tags = dic.get("tags").replace("<", "").replace(">", " ").split(" ")
    tags.pop()
    for word in words:
        for tag in tags:
            res.append( (postid, word, tag) )
#     res = set(res)
#     res = list(res)
    return res
def task5MapB(dic):
    res = []
    # get the title word and tags for each row in postsRDD
    postid = dic.get("id")
    words = dic.get("title").split(" ")
    for word in words:
        res.append( (postid, word) )
    return res

def task5(postsRDD):
    rdd1 = postsRDD.filter(
        lambda x: x.get("tags")!= None).filter(
        lambda x: x.get("title")!= None).flatMap(
            task5MapA)
    # count num posts where title-word in title
    rdd2 = postsRDD.filter(
        lambda x: x.get("tags")!= None).filter(
        lambda x: x.get("title")!= None).flatMap(
            task5MapB)
    return rdd2
#     tup = rdd1.map(lambda x : (x, 0))
#     res = tup.aggregateByKey(1, lambda x,y:x+y , lambda x,y: x+y)
#     return res
c = task5(postsRDD)

In [93]:
for t in c.take(100): print(t)
# print(len(a.collect()))

(2, 'How')
(2, 'can')
(2, 'a')
(2, 'group')
(2, 'track')
(2, 'database')
(2, 'schema')
(2, 'changes?')
(3, 'What')
(3, 'is')
(3, 'an')
(3, 'effective')
(3, 'way')
(3, 'of')
(3, 'labeling')
(3, 'columns')
(3, 'in')
(3, 'a')
(3, 'database?')
(5, 'What')
(5, 'are')
(5, 'the')
(5, 'differences')
(5, 'between')
(5, 'NoSQL')
(5, 'and')
(5, 'a')
(5, 'traditional')
(5, 'RDBMS?')
(6, 'What')
(6, 'is')
(6, 'the')
(6, 'difference')
(6, 'between')
(6, 'PostgreSQL')
(6, '9.0')
(6, 'Replication')
(6, 'and')
(6, 'Slony-I?')
(14, 'When')
(14, 'is')
(14, 'the')
(14, 'right')
(14, 'time')
(14, 'to')
(14, 'use')
(14, 'MariaDB')
(14, 'instead')
(14, 'of')
(14, 'MySQL,')
(14, 'and')
(14, 'Why?')
(20, 'How')
(20, 'can')
(20, 'I')
(20, 'optimize')
(20, 'a')
(20, 'mysqldump')
(20, 'of')
(20, 'a')
(20, 'large')
(20, 'database?')
(21, 'Is')
(21, 'it')
(21, 'possible')
(21, 'to')
(21, 'use')
(21, 'SQLite')
(21, 'as')
(21, 'a')
(21, 'client-server')
(21, 'database?')
(29, 'Where')
(29, 'are')
(29, 'some')
(29, 'u

Task 6 (0.25): Write the function that takes as input the amazonInputRDD (which is an RDD of lines) and maps each line to a tuple while removing the initial descriptor, i.e., the first line "user1 product1 5.0" gets mapped to a tuple (1, 1, 5.0). This just requires a single map.

In [48]:
def task6mapper(line):
    words = line.replace("user", "").replace("product", "").split(" ")
    return (int(words[0]), int(words[1]), float(words[2]))
   
def task6(amazonInputRDD):
    return amazonInputRDD.map(task6mapper)
a = task6(amazonInputRDD)

In [50]:
for t in a.take(10): print(t)
# print(len(a.collect()))

(1, 1, 5.0)
(1, 2, 1.0)
(1, 3, 5.0)
(1, 4, 1.0)
(1, 5, 1.0)
(1, 6, 5.0)
(1, 7, 4.0)
(1, 8, 5.0)
(1, 9, 5.0)
(1, 10, 1.0)


Task 7 (0.25): Complete the function that takes as input the amazonInputRDD and computes the average rating for each user across all the products they reviewed. The output should be an RDD of 2-tuples of the form (user1, 2.87) (not the correct answer). You can either use aggregateByKey or a reduceByKey followed by a map.

In [153]:
def task7mapA(line):
    words = line.split()
    # return (words[0], (1, float(words[2])) )
    return (words[0],  float(words[2]))

def task7(amazonInputRDD):
    aTuple = (0,0)
    cleaned = amazonInputRDD.map(task7mapA)
    rdd1 = cleaned.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))
    # get average
    return rdd1.mapValues(lambda v: v[0]/v[1])

# First lambda expression for Within-Partition Reduction Step::
#    a: is a TUPLE that holds: (runningSum, runningCount).
#    b: is a SCALAR that holds the next Value

#    Second lambda expression for Cross-Partition Reduction Step::
#    a: is a TUPLE that holds: (runningSum, runningCount).
#    b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).
a = task7(amazonInputRDD)

In [154]:
for t in a.take(2): print(t)

('user1', 4.08)
('user2', 3.5238095238095237)


Task 8 (0.25): Complete the function that takes as input the amazonInputRDD and computes the mode rating for each product across all users (i.e., the rating that was most common for that product). If there are ties, pick the higher rating. Easiest way to do this would be a groupByKey followed by a map to compute the mode.

In [173]:
def task8mapA(line):
    words = line.split()
    
    return ((words[1],  float(words[2])) ,0)

# count the number of unique tuples (product 181, 4.0) 
def task8(amazonInputRDD):
    cleaned = amazonInputRDD.map(task8mapA)
    rdd1 = cleaned.groupByKey()\
    .mapValues(lambda vals: len(vals))\
    .sortByKey()
    # rdd1 ((name, rating), count)
    # rdd2, mode (name, count) 
    # rdd3 ((name, count), rating)
    # rdd4 ((name, count), 0)
    
    # get mode
    rdd2 = rdd1.map(lambda x: (x[0][0], x[1])) # contains the count of every key
    mode = rdd2.reduceByKey(max)
    rdd3 = rdd1.map(lambda x: ((x[0][0], x[1]), x[0][1]))
    # rdd3 = rdd1.map(lambda x: (x[0][0], x[1]), x[0][1])
    rdd4 = mode.map(lambda x: (x, 0))
    res = rdd3.join(rdd4) # join on same count
    res = res.map(lambda x: (x[0][0], x[1][0]) ).sortByKey()
    return res

a = task8(amazonInputRDD)

In [174]:

for t in a.take(200): print(t)

('product0', 5.0)
('product1', 5.0)
('product10', 5.0)
('product100', 5.0)
('product101', 5.0)
('product102', 5.0)
('product103', 5.0)
('product104', 5.0)
('product105', 5.0)
('product106', 5.0)
('product107', 5.0)
('product108', 5.0)
('product109', 5.0)
('product11', 5.0)
('product110', 4.0)
('product110', 5.0)
('product111', 5.0)
('product112', 5.0)
('product113', 5.0)
('product114', 5.0)
('product115', 5.0)
('product116', 5.0)
('product117', 5.0)
('product118', 5.0)
('product119', 5.0)
('product12', 5.0)
('product120', 4.0)
('product121', 5.0)
('product122', 5.0)
('product123', 5.0)
('product124', 5.0)
('product125', 5.0)
('product126', 5.0)
('product127', 5.0)
('product128', 5.0)
('product129', 5.0)
('product13', 5.0)
('product130', 5.0)
('product131', 5.0)
('product132', 5.0)
('product133', 5.0)
('product134', 5.0)
('product135', 5.0)
('product136', 5.0)
('product137', 5.0)
('product138', 5.0)
('product139', 5.0)
('product14', 5.0)
('product140', 5.0)
('product141', 5.0)
('product