You can use this notebook to develop your answers. Make sure to look at intermediate results using `take()` for debugging.

In [43]:
import json 
import re

## Load data into RDDs
usersRDD = sc.textFile("datafiles/se_users.json").map(json.loads)
postsRDD = sc.textFile("datafiles/se_posts.json").map(json.loads)
playRDD = sc.textFile("datafiles/play.txt")
logsRDD = sc.textFile("datafiles/NASA_logs_sample.txt")
amazonInputRDD = sc.textFile("datafiles/amazon-ratings.txt")
nobelRDD = sc.textFile("datafiles/prize.json").map(json.loads)
amazonBipartiteRDD = amazonInputRDD.map(lambda x: x.split(" ")).map(lambda x: (x[0], x[1])).distinct()

In [3]:
class DisplayRDD:
        def __init__(self, rdd):
                self.rdd = rdd

        def _repr_html_(self):                                  
                x = self.rdd.mapPartitionsWithIndex(lambda i, x: [(i, [y for y in x])])
                l = x.collect()
                s = "<table><tr>{}</tr><tr><td>".format("".join(["<th>Partition {}".format(str(j)) for (j, r) in l]))
                s += '</td><td valign="bottom" halignt="left">'.join(["<ul><li>{}</ul>".format("<li>".join([str(rr) for rr in r])) for (j, r) in l])
                s += "</td></table>"
                return s

In [4]:
for t in postsRDD.take(1): print(t)

{'id': 2, 'posttypeid': 1, 'title': 'How can a group track database schema changes?', 'acceptedanswerid': 4, 'parentid': None, 'creationdate': '2011-01-03', 'score': 68, 'viewcount': 11533, 'owneruserid': 7, 'lasteditoruserid': 97, 'tags': '<mysql><version-control><schema>'}


Use filter to find all posts where tags are not null (None in python) and that are tagged 'postgresql-9.4', and then a map so that the output RDD has tuples of the form: (ID, Title, Tags). Note that postsRDD contains dictionaries -- see the contents by running postsRDD.take(10).

In [5]:
def task1(postsRDD):
    res = postsRDD.filter(
        lambda x: x.get("tags") != None).filter(
        lambda x: "postgresql-9.4" in x.get("tags")).map(
        lambda x: (x.get("id"), x.get("title"), x.get("tags"))
    )
    return res 

Use flatMap on the postsRDD to create an RDD (ID, Tag), listing all the tags for each post as a separate tuple. If a post has no tags, it should not appear in the output RDD.

In [6]:
def help2(d):
    res = []
    tags = d.get("tags").replace("<", "").split(">")
    tags.pop()
    for i in tags:
        res.append( (d.get("id"), i) )
    return res
    

def task2(postsRDD):
    res = postsRDD.filter(lambda x: x.get("tags") != None).flatMap(help2)
    return res
a = task2(postsRDD)
a.take(4)

[(2, 'mysql'), (2, 'version-control'), (2, 'schema'), (3, 'database-design')]

The goal here is to find the 5 lexicographically smallest tags for each year, for the posts from that year. 

So the outputRDD should be contain tuples of the form: `('2001', ['tag1', 'tag2', ..., 'tag5'])`, with `'tag1' < 'tag2'` and `'tag5'` being smaller (lexicographically) than any other tag for a post from that year. 

All the five (or fewer for some of the years) tags should be distinct. 
Use a `map` followed by `reduceByKey` for doing this.

In [7]:
def help3_1(d):
    tags = d.get("tags").replace("<", "").split(">")
    tags.pop()
    yr = d['creationdate'][:4] # date: 2011-02-02 format, so get year by splice
    return (yr, set(tags))

def help3_2(tup):
    tags = tup[1]
    tags = list(tags)
    tags.sort()
    return (tup[0], tags[:5])

def task3(postsRDD):
    res = postsRDD.filter(lambda x: x.get('tags') != None).map(help3_1).reduceByKey(lambda a,b: a | b).map(help3_2)
    return res

a = task3(postsRDD)


Use join to join the `usersRDD` and `postsRDD` on `users.id = owneruserid`. The output should be a tuple of the form: `(userid, displayname, postid, posttitle)`. You will need to do several maps to do this. Make sure you look at the structure of the objects with the RDD after the join; it will need to postprocessed using a map to get to the desired output.

In [8]:
def combine(tup):
    a = tup[0]
    b = tup[1][0]
    c = tup[1][1][0]
    d = tup[1][1][1]
    return (a,b,c,d)

def task4(usersRDD, postsRDD):
    user = usersRDD.map(lambda x: (x.get("id"),  x.get("displayname")))
    post = postsRDD.map(lambda x: (x.get('owneruserid') , (x.get('id'), x.get('title'))))
    res = user.join(post)
    return res.map(combine)

a = task4(usersRDD, postsRDD)
a.take(2)

[(8, 'ilhan', 2107, 'How to get a users friends names?'),
 (8,
  'ilhan',
  6255,
  'Should I record ID numbers in a table where I record who look whom profile page')]

Using the postsRDD, create an RDD where the 
- key is a 2-tuple `(title-word, tag)`, where the former is a word in a title, and the later is a tag. 
- The value associated with the key should be the **number of posts in which the title-word is in the title**, and the **tag is in the tags for that post**. 

This will require a couple of flatMaps (to separate tags into individual tag values as well as to separate the title into its words) and an aggregateByKey to count.

In [9]:
def help5(d):
    res = []
    tags = d.get("tags").replace("<", "").split(">")
    tags.pop()
    title = d.get('title').split(' ')
    for i in tags:
        for t in title:
            res.append( (t,i) )
    return res

# RDD.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<function portable_hash>)

def task5(postsRDD):
    key = postsRDD.filter(lambda x: x.get("tags") != None).flatMap(help5)
    tup = key.map(lambda x : (x, 0))
    res = tup.aggregateByKey(1, lambda x,y:x+y , lambda x,y: x+y)
    return res

a = task5(postsRDD)
a.take(2)

[(('How', 'mysql'), 2), (('can', 'mysql'), 2)]

Write the function that takes as 
- input the `amazonInputRDD` (which is an RDD of lines) and 
- maps each line to a tuple while removing the initial descriptor
    - i.e., the first line "user1 product1 5.0" gets mapped to a tuple `(1, 1, 5.0)`. 
    
This just requires a single map.

In [10]:
def rem(s):
    l = []
    l = s.replace('user', '').replace('product', '').split(' ')
    return (l[0], l[1], l[2])
    
def task6(amazonInputRDD):
    res = amazonInputRDD.map(rem)
    return res

a = task6(amazonInputRDD)
a.take(2)

[('1', '1', '5.0'), ('1', '2', '1.0')]

Complete the function that takes as 
- input the `amazonInputRDD` and 
- computes the average rating for each user across all the products they reviewed. 

The output should be an 
- RDD of 2-tuples of the form `(user1, 2.87)` (not the correct answer). 

You can either use `aggregateByKey` or a `reduceByKey` followed by a map.

In [11]:
def help7(s):
    l = []
    l = s.replace('product', '').split(' ')
    for i in l:
        if ('user' not in i):
            i = float(i)
    return (l[0], l[2])
    

def task7(amazonInputRDD):
    res = amazonInputRDD.map(help7) # (userid, rating)
    avg_by_key = res.mapValues(lambda v: (v, 1)).reduceByKey(lambda a,b: (a[0], a[1]+b[1])).mapValues(lambda v: v[0]/v[1])
    return avg_by_key

a = task7(amazonInputRDD)
a.take(10)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 13) (5ca67b86faca executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/pyspark/rdd.py", line 1877, in takeUpToNumLeft
    yield next(iterator)
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/spark/python/pyspark/rdd.py", line 2714, in map_values_fn
    return kv[0], f(kv[1])
  File "<ipython-input-11-743867200f4f>", line 12, in <lambda>
TypeError: unsupported operand type(s) for /: 'str' and 'int'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/pyspark/rdd.py", line 1877, in takeUpToNumLeft
    yield next(iterator)
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/spark/python/pyspark/rdd.py", line 2714, in map_values_fn
    return kv[0], f(kv[1])
  File "<ipython-input-11-743867200f4f>", line 12, in <lambda>
TypeError: unsupported operand type(s) for /: 'str' and 'int'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


Complete the function that takes as 
- input the `amazonInputRDD` and 
- computes the mode rating for each product across all users (i.e., the rating that was most common for that product). 

If there are ties, pick the higher rating. Easiest way to do this would be a groupByKey followed by a map to compute the mode.

In [40]:
def num(s):
    l = []
    l = s.replace('user', '').split(' ')
    return [l[1], float(l[2])]

def help8(l):
    halp = []
    
    for product in l: # [product, rating]
        counts = {}
        ratinglist = product[1] # [5.0, 1.0, 3.0, ...]
        for i in ratinglist:
            counts[i] = counts.get(i,0) + 1
        mode = max(counts, key=counts.get)
        halp.append((product[0], mode))
    return halp

def task8(amazonInputRDD):
    st = amazonInputRDD.map(num)
    gr = st.groupByKey().map(help8)
    return gr

a = task8(amazonInputRDD)
a.take(10)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 53.0 failed 1 times, most recent failure: Lost task 0.0 in stage 53.0 (TID 76) (5ca67b86faca executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/pyspark/rdd.py", line 1877, in takeUpToNumLeft
    yield next(iterator)
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-40-7fa1aeb0a4dd>", line 11, in help8
TypeError: 'ResultIterable' object is not subscriptable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/pyspark/rdd.py", line 1877, in takeUpToNumLeft
    yield next(iterator)
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-40-7fa1aeb0a4dd>", line 11, in help8
TypeError: 'ResultIterable' object is not subscriptable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


For `logsRDD`, write a function that computes the number of log requests for each year. 
So the output should be an RDD with records of the form `(1995, 2952)` (not the correct answer). 
This can be done through a map to extract the years, followed by a group by aggregate.

In [60]:
def help9(s):
    reg = r'.*([1-2][0-9]{3})'
    return ''.join(re.findall(reg, s))


def task9(logsRDD):
    res = logsRDD.map(help9).map(lambda x: (x,0))
    
    return res

a = task9(logsRDD)
a.take(10)

[('1995', 0),
 ('1995', 0),
 ('1995', 0),
 ('1995', 0),
 ('1995', 0),
 ('1995', 0),
 ('1995', 0),
 ('1995', 0),
 ('1995', 0),
 ('1995', 0)]