# ConsumerPySpark

#### Alberto Cereser, [alberto.cereser@gmail.com](alberto.cereser@gmail.com)

At first, we receive the messages using a Kafka receiver. Then we do the same using PySpark, which is also used to do some simple analysis of the messages in the different batches.

### Add dependencies

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'

### Load modules

In [2]:
from kafka import KafkaConsumer
from random import randint
from time import sleep

### Check that communication works using a Kafka consumer

In [4]:
bootstrap_servers = ['localhost:9092']

%store -r topicName    # Get the topic name from the kafka producer
print topicName

consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers,
                         auto_offset_reset='earliest'
                        )
consumer.subscribe([topicName])

for message in consumer:
    nn = message.offset
    if nn%10 == 0:
        print "Messages received: ", nn
    
    # In production code, the lines below will be commented to keep the consumer awake
    if nn == 99:
        break

no stored variable # Get the topic name from the kafka producer
json_data154226
Messages received:  0
Messages received:  10
Messages received:  20
Messages received:  30
Messages received:  40
Messages received:  50
Messages received:  60
Messages received:  70
Messages received:  80
Messages received:  90


In [5]:
#for message in consumer:
#    print (message)

### Load modules and start SparkContext

In [6]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:
    sc.stop()
except:
    pass  

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

Define, in seconds, the length of the streaming (rolling) window. 

In [7]:
ssc = StreamingContext(sc, 60)

Define the PySpark consumer.

In [8]:
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

Parse the incoming data as JSON.

In [9]:
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

Count the number of messages per batch.

In [30]:
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()

**Important: I am not sure about the sintax of the commands below. Errors are also due to compatibility issues of the container with Hadoop.**

Count the number of distinct users per window.

In [33]:
def print_users_count(count):
    print 'The number of unique users is:', count

#print_users_count (lambda message: message['email']).distinct().count()

print_users_count(kafkaStream.map(lambda message: message['email']).distinct().count())

AttributeError: 'TransformedDStream' object has no attribute 'distinct'

Count number of users per country. First a make a schema, then calculate min and max. 

In [32]:
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
from pyspark import SparkContext
from pyspark.sql import SQLContext

fields = ['id', 'first_name', 'last_name', 'email', 'gender', 'ip_address', 'date', 'country']
schema =  StructType([
  StructField(field, StringType(), True) for field in fields
])

def parse(s, fields):
    try:
        d = json.loads(s[0])
        return [tuple(d.get(field) for field in fields)]
    except:
        return []
    
array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema)

rdd = sc.parallelize(array_of_users)

# group by country and then substitute the list of messages for each country by its length, resulting into a rdd of (country, length) tuples
country_count = rdd.groupBy(lambda user: user['country']).mapValues(len)

# identify the min and max using as comparison key the second element of the (country, length) tuple
country_min = country_count.min(key = lambda grp: grp[1])
country_max = country_count.max(key = lambda grp: grp[1])

AttributeError: 'TransformedDStream' object has no attribute 'SQLContext'

Start streaming context

In [37]:
ssc.start()

-------------------------------------------
Time: 2018-11-20 14:06:00
-------------------------------------------

-------------------------------------------
Time: 2018-11-20 14:06:00
-------------------------------------------

-------------------------------------------
Time: 2018-11-20 14:06:00
-------------------------------------------
0

-------------------------------------------
Time: 2018-11-20 14:06:00
-------------------------------------------

-------------------------------------------
Time: 2018-11-20 14:07:00
-------------------------------------------

-------------------------------------------
Time: 2018-11-20 14:07:00
-------------------------------------------

-------------------------------------------
Time: 2018-11-20 14:07:00
-------------------------------------------
0

-------------------------------------------
Time: 2018-11-20 14:07:00
-------------------------------------------

-------------------------------------------
Time: 2018-11-20 14:08:00
------

In [None]:
ssc.stop()