# Spark

[Spark](https://spark.apache.org/docs/1.6.2/index.html) (we are going to use version 1.6.2) is a fast and general-purpose cluster computing system, that allows to process large datasets using several machines. In the era of the *big data*, it has become a necessity to be able to distribute computing power and process data in parallel. We introduce here the basics of the framework.

In any case, you should have a look at the [official tutorial](https://spark.apache.org/docs/1.6.2/programming-guide.html).

## Hadoop File System (HDFS)

The main abstraction Spark offers is the *Resisilient Distributed Dataset* (RDD), which is a collection of elements distributed across the nodes of the cluster. The idea is that you typically **do not** load them in the memory, but rather process them *lazily*. They are initialized with files in the *Hadoop File System* (HDFS). 

Let us first see what there is in HDFS.

In [1]:
!hdfs dfs -ls

Found 1 items
drwxr-xr-x   - paluchow hadoop          0 2017-02-22 09:41 .sparkStaging


*Note:* a line starting with `!` indicates a shell command. You can hence run this same command in a bash terminal.

We can now add a dataset to HFDS. In the `data` folder, there is text file containing 30000 tweets collected from the [Inauguration Day](https://en.wikipedia.org/wiki/United_States_presidential_inauguration) on January 20, 2017. 

In [2]:
!hdfs dfs -put data/election-day-tweets.txt

And check that the file is indeed in HDFS.

In [3]:
!hdfs dfs -ls

Found 2 items
drwxr-xr-x   - paluchow hadoop          0 2017-02-22 09:41 .sparkStaging
-rw-r--r--   3 paluchow hadoop    2147813 2017-02-25 20:34 election-day-tweets.txt


*Note:* you can remove files from HDFS using `hdfs dfs -rm FILE`. 

## Dataset processing

We can now start playing with the tweets. We will instantiate the dataset from the text file.

In [4]:
tweets = sc.textFile('election-day-tweets.txt')

The `tweets` variable is now an RDD. An RDD is *resilient* to faults by storing the sequence of operations applied to it so that it can easily recreate the state of the data structure in case of a problem. It can also be logically cut to be *distributed* on several machines.

The `sc` object is a special object, namely a [Spark Context](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.SparkContext) obeject, that we configured to be available everywhere in your Python environment. It is the main interface with Spark that has numerous useful methods. Check the documentation!

Let us display a few tweets.

In [5]:
some_tweets = tweets.take(10)
for i, tweet in enumerate(some_tweets):
    print("%d: %s" % (i, tweet))

0: @Lawrence @HillaryClinton Two first  @SenSchumer tomorrow. @TheLastWord #brooklyn  TheRealAmerica #Vote #Democrats #NastyWomenVote #Senate
1: My @latimesopinion op-ed on historic #California #Senate race. First time an elected woman senator succeeds another.
2: https://t.co/cbjQTK0Q1V
3: #Senate Wisconsin Senate Preview: Johnson vs. Feingold
4: If Rubio Wins and #Trump Loses in #Florida... #HillaryClinton #Senate #RepublicanPrimary #Senaterace #Miami... https://t.co/zIeNEcVnMO
5: #Senate Wisconsin Senate Preview: Johnson vs. Feingold
6: bob day is an honest  person   #senate patterson . a loss to the senate
7: Make Republicans #PayAPrice!
8:  💙🇺🇸#VoteBLUE🔃theBallot🇺🇸💙
9:  #Congress #Senate #FlipItDem


Once created, the `tweets` dataset can be acted on by dataset **operations**. RDDs support two types of operations: *transformations*, which create a new RDD from an existing one, and *actions*, which return a value to the driver program after running a computation on the RDD. For instance, we can `count` the number of tweets. Is it a transofrmation or an action?

In [6]:
tweets.count()

30000

We can also `filter` the lines matching a specific string. Is it a transformation or an action ?

*Note*: `lower()` is a `str` method that converts the string to lower case. What happens if you do not do that? 

In [7]:
lines_with_trump = tweets.filter(lambda line: "trump" in line.lower())
lines_with_trump.count()

1023

A typical processing pattern is [MapReduce](https://en.wikipedia.org/wiki/MapReduce), as popularized by Hadoop. The idea is that you apply a function to your dataset that filters or sorts it (**map**), and then combine the ouputs by computing something (**reduce**). The typical example is a word count. So, how many times each word occur among our tweets?  

In [8]:
# Split every line into words and flattens the result (aggregate them all together)
words = tweets.flatMap(lambda line: line.split())
words.take(15)

['@Lawrence',
 '@HillaryClinton',
 'Two',
 'first',
 '@SenSchumer',
 'tomorrow.',
 '@TheLastWord',
 '#brooklyn',
 'TheRealAmerica',
 '#Vote',
 '#Democrats',
 '#NastyWomenVote',
 '#Senate',
 'My',
 '@latimesopinion']

In [9]:
# Assign 1 to each word
counts = words.map(lambda word: (word, 1))
counts.take(10)

[('@Lawrence', 1),
 ('@HillaryClinton', 1),
 ('Two', 1),
 ('first', 1),
 ('@SenSchumer', 1),
 ('tomorrow.', 1),
 ('@TheLastWord', 1),
 ('#brooklyn', 1),
 ('TheRealAmerica', 1),
 ('#Vote', 1)]

In [10]:
# Combines the counts by adding up the value (1) of equal keys (words)
word_counts = counts.reduceByKey(lambda a, b: a + b)
word_counts.take(10)

[('https://t.co/3Eq1y14Gjr', 1),
 ('Gujarati', 2),
 ('@ptshrikant', 3),
 ('officials…', 1),
 ('@YourDreaDose', 1),
 ('#CLAD2016', 1),
 ('.@gastropoda', 1),
 ('Just', 130),
 ('BSP', 37),
 ('lagayye', 1)]

You should read the list of common [transformations](https://spark.apache.org/docs/1.6.2/programming-guide.html#transformations) and [actions](https://spark.apache.org/docs/1.6.2/programming-guide.html#actions) to get an idea of what you can do.

## Dataset statistics

Your turn! Try to extract some statisctis from the dataset:

1. Total number of words
2. Average number of words per tweet
3. Ten most frequent word

In [13]:
# Your code goes here
counts = words.map(lambda word: (word, 1))
word_counts = counts.reduceByKey(lambda a, b: a + b)
word_counts.collect()


[('https://t.co/3Eq1y14Gjr', 1),
 ('Gujarati', 2),
 ('@ptshrikant', 3),
 ('officials…', 1),
 ('@YourDreaDose', 1),
 ('#CLAD2016', 1),
 ('.@gastropoda', 1),
 ('Just', 130),
 ('BSP', 37),
 ('lagayye', 1),
 ('vez', 1),
 ('Wordup.', 1),
 ('#UseThe19th.', 1),
 ('Trusting', 1),
 ('Indictment', 1),
 ('https://t.co/VpF2HevNvO)', 1),
 ('https://t.co/K9HE7b5hNF', 1),
 ('@robert_pohlman', 1),
 ('@RahulGandi1', 3),
 ("#BlackMoney.They're", 1),
 ('options?', 1),
 ('https://t.co/BEJsTUmlb3', 1),
 ('@nypost', 1),
 ('https://t.co/hJKhnzeIju', 1),
 ('#logistics', 1),
 ('graphics!', 1),
 ('wholeheartedly', 1),
 ('Things…', 1),
 ('why', 272),
 ('https://t.co/4ke1R5HXVE', 1),
 ('FLINT', 1),
 ('circle', 3),
 ('gustado', 1),
 ('hateful', 3),
 ('passe', 1),
 ('gerrymandering…', 1),
 ('https://t.co/vtgbwREax8', 1),
 ('🤗🤗🤗', 1),
 ('untimely', 4),
 ('https://t.co/7SL6hWi7Q8', 1),
 ('#GOPcongress', 1),
 ('@Eboracensis', 2),
 ('@SirPuffAlot_JLR', 1),
 ('BSP-Congress', 2),
 ('.@NancyAFrench', 1),
 ('CITIZENS...WOR

In [20]:
words_tweets = tweets.map(lambda line: len(line.split()))
words_total = words_tweets.reduce(lambda a, b: a+b )
words_total / words_tweets.count()

10.0294

In [22]:
word_counts.takeOrdered(10, key=lambda x: -x[1])

[('Congress', 8526),
 ('the', 6494),
 ('to', 5829),
 ('of', 4654),
 ('for', 3964),
 ('in', 3794),
 ('congress', 3675),
 ('is', 3478),
 ('and', 3295),
 ('a', 2985)]

## Bigrams

A bigram is combination of two consecutive words. 

- Extract the ten most frequent bigrams from the dataset. 

Be careful not to consider the last word of a tweet and the first one of the next tweet!

*Hint:* note that the `map` and `flatMap` methods take a **function** as argument, meaning that you can define a function that you pass as argument:

```python
def process(line):
    # Do something
    return something

x = tweets.flatMap(process)
```

In [42]:
# Your code goes here
def bigram(line):
    bigrams = []
    print(line)
    line = line.split()
    for i, val in enumerate(line):
        if i+1 < len(line):
            bigram = val + '_' + line[i+1]
            bigrams.append(bigram)
    return bigrams

In [43]:
x = tweets.flatMap(bigram)

In [45]:
counts_bigrams = x.map(lambda word: (word, 1))
bigram_counts = counts_bigrams.reduceByKey(lambda a, b: a + b)

In [46]:
bigram_counts.takeOrdered(10, key=lambda x: -x[1])

[('of_Congress', 677),
 ('of_the', 542),
 ('Congress_is', 518),
 ('in_the', 429),
 ('in_Congress', 410),
 ('for_Congress', 366),
 ('vote_for', 357),
 ('up_for', 344),
 ('can_do', 305),
 ('for_grabs', 293)]