## Task 2. Heuristic user segmentation

In this task you should firstly parse the user logs. Then distiguish the segments and count the *unique* uids in each segment. Sort the output by counts.

You may find more useful methods in the following sources:

* Book "Learning Spark: Lightning-Fast Big Data Analysis" by Holden Karau.

* [Spark Streaming documentation](https://spark.apache.org/docs/latest/streaming-programming-guide.html)

* [PySpark Streaming documentation](https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark-streaming-module) 

* [PySpark Streaming examples](https://github.com/apache/spark/tree/master/examples/src/main/python/streaming)

* [HyperLogLog documentation](https://pypi.org/project/hyperloglog/)

* [Ua_Parser_documentation](https://pypi.org/project/ua-parser/0.7.0/)

In [1]:
import os
import time
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

# You need also use these specific libraries
from ua_parser import user_agent_parser
from hyperloglog import HyperLogLog

In [2]:
# Here is an example of `user_agent_parser` usage

# ua = 'Mozilla/5.0 (iPad; CPU OS 7_0_4 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B554a Safari/9537.53'
# parsed_ua = user_agent_parser.Parse(ua)

In [3]:
# Here is an example of `hyperloglog` usage

# from hyperloglog import HyperLogLog
# hll = HyperLogLog(0.01)
# for word in ['abc', 'acb', 'def', 'fghijk']:
#     hll.add(word)
# print hll.card(), int(hll.card())

**NB.** Please don't change the cell (even comments) below. It is used for emulation realtime batch arriving. But figure out the code, it will help you when you work with real SparkStreaming applications.

In [4]:
sc = SparkContext(master='local[4]')

# Preparing batches with the input data
DATA_PATH = "/data/course4/uid_ua_100k_splitted_by_5k"
batches = [sc.textFile(os.path.join(DATA_PATH, path)) for path in os.listdir(DATA_PATH)]

# Creating Dstream to emulate realtime data generating
BATCH_TIMEOUT = 5 # Timeout between the batch generation
ssc = StreamingContext(sc, BATCH_TIMEOUT)
dstream = ssc.queueStream(rdds=batches)

There are 2 flags used in this task. 
* The `finished` flag indicates if the current RDD is empty.
* The `printed` one indicates the the result has been printed and SparkStreaming context can be stopped.

**NB**. Spark transformations work in a lazy mode. When the transformation is called, it doesn't execute really. It just saves in the computational DAG. All the transformations will be executed when the action will be called. Let's look at `print_only_at_the_end()` function. The action will be called only when the stream will be finished. So in this moment  Spark will execute all the transformations. This will lead to container's overflow if the dataset is really big. So if you faced the error like `Container killed by YARN for exceeding memory limits`, call some action (e.g. `rdd.count()`) before `if` clause in this function.

In [5]:
finished = False
printed = False

def set_ending_flag(rdd):
    global finished
    if rdd.isEmpty():
        finished = True

def print_only_at_the_end(rdd):
    global printed
    
    if finished and not printed:
        # Type your code for sorting and printing the resulting RDD
        res = rdd.collect()
        for i in range(len(res)):
            (seg, hll) = res[i]
            print('{}\t{}'.format(seg, len(hll)))
        printed = True

# If we have received empty an rdd, the stream is finished.
# So print the result and stop the context.

dstream.foreachRDD(set_ending_flag)

In [6]:
# Type your code for data processing and aggregation here

def seg_map(line):
    (uid, ua) = line.split("\t")
    parsed_ua = user_agent_parser.Parse(ua)
    return [
        ('seg_iphone', uid, parsed_ua['device']['family'], 'iPhone'),
        ('seg_firefox', uid, parsed_ua['user_agent']['family'], 'Firefox'),
        ('seg_windows', uid, parsed_ua['os']['family'], 'Windows')
    ]

def seg_filter(x):
    (_, _, family, name) = x
    return name in family

def seg_update(uids, state):
    hll = (state or HyperLogLog(0.01))
    for uid in uids:
        hll.add(uid)
    return hll

dstream.flatMap(seg_map)\
    .filter(seg_filter)\
    .map(lambda x: (x[0], x[1]))\
    .updateStateByKey(seg_update)\
    .transform(lambda rdd: rdd.sortBy(lambda x: len(x[1]), ascending=False))\
    .foreachRDD(print_only_at_the_end)

**NB.** Please don't change the cell below. It is used for stopping SparkStreaming context and Spark context when the stream is finished.

In [7]:
ssc.checkpoint('./checkpoint{}'.format(time.strftime("%Y_%m_%d_%H_%M_%s", time.gmtime())))  # checkpoint for storing current state        
ssc.start()
while not printed:
    time.sleep(0.1)
ssc.stop()  # when the result printed, stop SparkStreaming context
sc.stop()  # stop Spark context to be able to restart the code without restarting the kernel

seg_windows	6069
seg_firefox	925
seg_iphone	290


Here you can see an output on the sample dataset:
```
seg_windows 24241
seg_firefox 4176
seg_iphone 1361
```
Of course, the numbers may be different but not very much (the error about 10% will be accepted).

Also, remove trailing empty cells before submission.