## Big Data Analysis

In the data mining part of the project we found different methods to make the topic modelling computation less expensive, when the user changes the granularity of the grid. However, since we have to deal with dataset which contains log of http requests we have to analyze a large amount of data. In this part of the project we want to adapt our data mining algorithm to be performed in a parallel on a cluster of computers.

### The algorithm

The *baseline*, *lda update* algorithms are difficult to parallelize because it is necessary to maintain a read and write *lda model object* updated thorough the cluster. The algorithm *top topic aggregation* is more suited to our studies because it doesn't need to mantain a shared object updated but it only needs to access a shared read-only lda models, which we can distribute to the workers beforehand, and access from the file system.

#### Remarks
The *top topic aggregation* algorithm takes as input the *lda model* that holds for the whole grid, and the new grid (a partitioning of the documents) whose topics we wish to compute. Then, since each cell of the new grid is composed of multiple cells of the old grid, it computes the topics distributions for each cell of the old grid and then merges these distributions together in order to obtain the topic distributions for the new cells.

### Tools and Big Data adaptation
We use the tool *Apache Spark* in *python* so that we can reuse the same modules that we used in the data mining part. *Apache Spark* is a framework which contains API to deal with big data, data streaming processing and clustering computing. Apache Spark supports in-memory computing and different distributed file systems, between which *Apache Hadoop Distributed File System (HDFS™)*.

To adapt the algorithm *top topic aggregation*, we use the *MapReduce* programming model. To apply it, it is needed a pre-processing of the input in order to represent the old grid as a list of tuples (key, value). Where the key is the index of the cell of the old grid, and the value is the set of contained documents. Each key, ID of the cell of the old grid, is then replaced with ID of the cell of the new grid containing it.

This list is the input of the MapReduce algorithm that first maps each tuple into a new tuple (key, topic_distributions(documents)) computing the topic distributions for each subset of documents using the precomputed lda model of whole grid. The tuples with the same key are then aggregated together with the reduce operation  *top_topic_merge*, obtaining, for each cell of the new grid, a topic distribution. Refer to the data-mining part of the project for an explanation of *top_topic_merge*.

![mapReduce](img/mapreduce.png "MapReduce Algorithm")

### Results

The algorithm has a linear complexity O(n), the performances differs by a factor that depends on the number of machines, processor and cores used. The chart below represents the results obtained run the algorithm in a single machine and in a cluster of 4 worker machines with 4 cores, 2.4 GHz Intel Xeon processor.

The best results are obtained with the cluster of computers. It computed the topics for 20k documents in 20 seconds, 100k in 70 seconds and for 200k documents in 153 seconds.

![result](img/results.png "Results")

## Dealing with Streaming of Data

The problem that we are studying is about a continuos flow of data and, if we are interested in computing the topics on the fly, a streaming algorithm is needed. Sparks provides also streaming processing API that let us easily adapt the MapReduce algorithm to work with a continuos streaming of data.

The program *tweet_read.py* starts a streaming of tweets which are sent to the spark program via socket. 

```python
# Spark code receiving the streaming data
ssc = StreamingContext(sc, 30)
lines = ssc.socketTextStream("localhost", 5727)
```

the variable *lines* represents a *discretized stream (DStream)* which is an high-level abstraction that hides the complexity of dealing with a continuous data stream and makes it as easy for programmers as using one single RDD at a time. [1]

![streaming](img/streaming.png "Streaming Workflow")



[1] https://www.gitbook.com/book/jaceklaskowski/spark-streaming/details






In [None]:
# standard modules 
import time
import json
import pickle
import logging

# spark modules
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# custom modules
from lib.lda import LDAHelper

# third party modules
import shapely
from shapely import geometry
import preprocessor as twpr
import pprint

# init spark
try:
    sc.stop()
except:
    pass

sc = SparkContext('local[2]', appName='test')

print("Spark context created")

In [None]:
# LDA Helper
lda_helper = LDAHelper(15)

# import dictionary
with open('data/dictionary.pkl', 'rb') as f:
    dictionary = pickle.load(f)
    
# Get files back
new_grid_filename1 = "data/input_1_new_grid_part1.pkl"
new_grid_filename2 = "data/input_1_new_grid_part2.pkl"
lda_map_filename = "data/input_2_lda_map.pkl"

new_grid = []
with open(new_grid_filename1, 'rb') as fw:
    new_grid = pickle.load(fw)

with open(new_grid_filename2, 'rb') as fw:
    new_grid += pickle.load(fw)

lda_map = None
with open(lda_map_filename, 'rb') as fw:
    lda_map = pickle.load(fw)
    
print("Data files loaded")

## Compute Topic using Top Topic Aggregation with Spark

In [None]:
# Compute the spark_grid
spark_grid = []
counter = 0
for cell in new_grid:
    cell_id = counter
    for p in cell['parts']:
        corpus = p['corpus']
        tup = (cell_id, corpus)
        spark_grid.append(tup)
    counter += 1

lda_map_broadcast = sc.broadcast(lda_map)

# So the first thing that we have to do is convert a list in a RDD
start_time = time.time()

rdd_grid = sc.parallelize(spark_grid, 4)
topics = rdd_grid.map(lambda scell: (scell[0], lda_helper.calculate_topic_distributions(lda_map_broadcast.value, scell[1])))
topics = topics.reduceByKey(lambda a, b: lda_helper.merge_toptopic(a, b))
topics = topics.sortByKey(True)
spark_result = topics.collect()
spark_result = list(zip(*spark_result))[1]
spark_result = list(spark_result)

# spark results contains the topics
print("--- %s seconds ---" % (time.time() - start_time))

# integrate topics into spark_grid
for i, cell in enumerate(new_grid):
    cell['topic'] = spark_result[i]

## Spark Streaming

In [None]:
def find_cell(shape_tweet):
    """
    Find the cell that contains the tweet
    """
    for i, cell in enumerate(new_grid):
        pprint.pprint('cell polygon')
        pprint.pprint(type(cell['polygon']))
        pprint.pprint('shape_tweet')
        pprint.pprint(type(shape_tweet))
        if cell["polygon"].contains(shape_tweet):
            return i
    return -1

def filter_tweet(tweet_raw):
    """
    Remove tweets without geo-coordinates
    or out of our maps
    """
    tweet = json.loads(tweet_raw)
    coordinates = tweet['coordinates']
    if not coordinates or coordinates == 'null':
        coordinates = (tweet['place'])['bounding_box']
        if not coordinates or coordinates == 'null': return False
    shape_tweet = geometry.shape(coordinates)
    cell_id = find_cell(shape_tweet)
    if cell_id == -1: return False
    return True
    
def evaluate_tweet(tweet):
    """
    Convert the text in a tweet into a bag of words object
    """
    tweet = json.loads(tweet)
    text = tweet['text']
    coordinates = tweet['coordinates']
    if not coordinates or coordinates == 'null':
        coordinates = (tweet['place'])['bounding_box']
    shape_tweet = shapely.geometry.shape(coordinates)
    text = twpr.clean(text)
    content = lda_helper.clean(text)
    tweet_corpus = lda_helper.create_corpus([content], dictionary)
    topic_dist = lda_helper.calculate_topic_distributions(lda_map, tweet_corpus)
    cell_id = find_cell(shape_tweet)
    return (cell_id, topic_dist)

def final_aggregations(grid, topics):
    """
    Aggregate top topics distributions
    with the existing topics distributions
    in the grid
    """
    for key, value in topics:
        grid[key]['topic'] = lda_helper.merge_toptopic(grid[key]['topic'], value)

def evaluate_rdd(topics):
    """
    Collect from RDD and call final_aggregations
    """
    if topics.isEmpty():
        print('is empty')
    else:
        print('not empty')
        spark_result = topics.collect()
        final_aggregations(new_grid, spark_result)

ssc = StreamingContext(sc, 30) #30 is the batch interval in seconds

ip = "localhost"
port = 5727

lines = ssc.socketTextStream(ip, port)
tweets = lines.filter(filter_tweet)
tweets = tweets.map(evaluate_tweet)
topics = tweets.reduceByKey(lambda a, b: lda_helper.merge_toptopic(a, b))
topics.transform(lambda rdd: rdd.sortByKey(True))

topics.foreachRDD( evaluate_rdd )
 
ssc.start()
ssc.awaitTermination()