# Processing FeedRequest Topic

In this notebook will create a simple Spark Streaming application that connects to our Kafka 'FeedRequest' topic, processes those FeedRequests and sends the data back to Kafka.. 

## Setup Environment

When deploying a python Spark Streaming application we need to add '--packages' option to `spark-submit` with the Kafka artifact which can be found (somehow) in the [Maven repository](https://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.spark%22%20AND%20a%3A%22spark-streaming-kafka-0-8-assembly_2.11%22).

E.g. to run standalone:
```shell
$ spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 FeedRequestStream.py```

We can also run the app directly in this notebook by setting the `PYSPARK_SUBMIT_ARGS` environment variable.

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

### Import Dependencies

In [2]:
import json
import requests
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

### Create Spark and Spark Streaming contexts

In [3]:
# Define Spark context under which everything else is called. setLogLevel to WARN saves a lot of noise on stdout.
sc = SparkContext(appName="FeedRequestStream")
sc.setLogLevel("WARN")

# Create local StreamingContext with batch interval of 5 second. 
ssc = StreamingContext(sc, batchDuration=5)

### Connect to Kafka
We will connect directly to the FeedRequest topic which contains bikeshare APIs to request. 

In [4]:
# Here we pass the Spark Streaming context object
# the hostname:port of the ZooKeeper service (note the default port is 2181)
# the consumer group id
# the topic to connect to and the number of partitions to use for streaming (this is not the same as Kafka partitions)
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'feedrequeststream', {'feedrequests':1})

## Message Processing

The messages in the FeedRequests topic have the form:

```json
{"id": 1, "plugin_id": 1, "url": "https://api.jcdecaux.com/vls/v1/stations?&apiKey=2ee63133e8fb025ba5beaf1a5c577f8a1ad0a536"}
```

In [5]:
class FeedRequest(object):
    '''
    Utility class to parse the feedrequest message and provide wrapper around requests
    '''

    def __init__(self, data):
        feedrequest = json.loads(data)
        
        self.id = feedrequest.get('id')
        self.plugin_id = feedrequest.get('plugin_id')
        self.url = feedrequest.get('url')
        
        self.r = requests.get(url=self.url)
        
    def __str__(self):
        return self.url
    
    @property
    def ok(self):
        return self.r.ok
    
    @property
    def text(self):
        return self.r.text

In [6]:
# Load json strings into DStream of FeedRequest. 
# Note the message received from Kafka are a tuple (key, value). E.g. the message above would look like:
# (None, '{"id": 1,  "plugin_id": 1, "url": \'www.myfeed.com\'}')

feedrequests = kafkaStream.map(lambda msg: FeedRequest(msg[1]))

In [7]:
# Filter the feedrequests DStream for ok requests
ok = feedrequests.filter(lambda r: r.ok)

In [8]:
# Create DStream for request text
text = ok.map(lambda r: r.text)

In [9]:
# text.pprint()

## Write data back to Kafka

Lets write our transformed data back to Kafka. [See this SO answer](https://stackoverflow.com/a/37357947/5221078) for a nice example on how to write to Kafka.

We want to write the text Dstream to the feedmessage topic. To do so we can instantiate a Kafka Producer (in this case I am using the Confluent Kafka distribution) to send the messages. 

Note, the producer should not be instantiated on the driver, as the connection cannot safely be serialised and shared among the executors (it would also incur a heavy overhead). Instead, we use `mapPartitions` to apply the `handler` function which instantiates the connection in each _Spark_ partition.

In [10]:
import socket
from confluent_kafka import Producer

def handler(message):
    # Instantiate Kafka producer
    p = Producer({'bootstrap.servers': "localhost:9092",
          'client.id': socket.gethostname(),
          'default.topic.config': {'acks': 1}})

    # Send message to feedmessages topic
    for record in message:
        p.produce('feedmessages',record.encode('utf-8'))
      
    p.flush()

# Map the handler function to each Spark partition (executor)
text.foreachRDD(lambda rdd: rdd.foreachPartition(handler))

## Start the app

In [11]:
ssc.start()
ssc.awaitTermination()

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Py4JJavaError: An error occurred while calling o24.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 725, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 794, in _batch_appends
    save(x)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 725, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 794, in _batch_appends
    save(x)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 725, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 794, in _batch_appends
    save(x)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 725, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 797, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 725, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 770, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.5/pickle.py", line 797, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 249, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 297, in save_function_tuple
    save(f_globals)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 810, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.5/pickle.py", line 836, in _batch_setitems
    save(v)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 391, in save_global
    __import__(modname)
ImportError: No module named 'cimpl'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/streaming/util.py", line 65, in call
    r = self.func(t, *rdds)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/streaming/dstream.py", line 159, in <lambda>
    func = lambda t, rdd: old_func(rdd)
  File "<ipython-input-10-f778dc7e4e54>", line 17, in <lambda>
    text.foreachRDD(lambda rdd: rdd.foreachPartition(handler))
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/rdd.py", line 799, in foreachPartition
    self.mapPartitions(func).count()  # Force evaluation
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/rdd.py", line 1041, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/rdd.py", line 1032, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/rdd.py", line 906, in fold
    vals = self.mapPartitions(func).collect()
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/rdd.py", line 809, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/rdd.py", line 2455, in _jrdd
    self._jrdd_deserializer, profiler)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/rdd.py", line 2388, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/serializers.py", line 460, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 704, in dumps
    cp.dump(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 162, in dump
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: ImportError: No module named 'cimpl'

	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)


Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3

Traceback (most recent call last):
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 148, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.5/pickle.py", line 408, in dump
    self.save(obj)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
    save(element)
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 255, in save_function
    self.save_function_tuple(obj)
  File "/home/greg/Envs/hadoop/lib/python3.5/site-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python3.5/pickle.py", line 475, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3