# WordCount, now with more data

In this section we'll use a sligthly larger dataset.

*Initiate Spark Context - ONLY first time for each notebook. If you get problems with below, see [Help](/notebooks/spark_course/1-Course-Information-and-Links/If-you-get-problems-initiating-spark-context.ipynb)*

In [1]:
import os
from pyspark import SparkContext
sc = SparkContext(appName="search", master=os.environ['MASTER'])

Let's see what data we have to play with in HDFS (note: the '%%sh' gives you the shell environment):

In [2]:
%%sh
hdfs dfs -ls /

Found 2 items
drwxr-xr-x   - ubuntu supergroup          0 2015-04-18 06:11 /user
drwxr-xr-x   - ubuntu supergroup          0 2015-04-18 12:22 /uuData


In [3]:
%%sh
hdfs dfs -ls /uuData

Found 9 items
-rw-r--r--   1 ubuntu supergroup        534 2015-04-18 12:22 /uuData/README.md
-rw-r--r--   1 ubuntu supergroup     174449 2015-04-18 06:26 /uuData/access_log
-rw-r--r--   1 ubuntu supergroup      14989 2015-04-18 06:26 /uuData/error_log
-rw-r--r--   1 ubuntu supergroup     197105 2015-04-18 06:26 /uuData/lr_data.txt
drwxr-xr-x   - ubuntu supergroup          0 2015-04-18 12:14 /uuData/movies
-rw-r--r--   1 ubuntu supergroup    3004200 2015-04-18 06:26 /uuData/names
drwxr-xr-x   - ubuntu supergroup          0 2015-04-18 06:26 /uuData/pagecounts
-rw-r--r--   1 ubuntu supergroup         73 2015-04-18 06:26 /uuData/people.json
-rw-r--r--   1 ubuntu supergroup         32 2015-04-18 06:26 /uuData/people.txt


Now point read the data into pagecounts

In [4]:
dataFile = "/uuData/pagecounts"
pagecounts = sc.textFile(dataFile)

In [5]:
pagecounts

/uuData/pagecounts MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

Let's take a peek at the data. You can use the take operation of an RDD to get the first K records. Here, K = 10.

In [6]:
pagecounts.take(10) 

[u'20090505-000000 aa Main_Page 2 9980',
 u'20090505-000000 ab %D0%90%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D0%BD%D0%B5%D1%82 1 465',
 u'20090505-000000 ab %D0%98%D1%85%D0%B0%D0%B4%D0%BE%D1%83_%D0%B0%D0%B4%D0%B0%D2%9F%D1%8C%D0%B0 1 16086',
 u'20090505-000000 af.b Tuisblad 1 36236',
 u'20090505-000000 af.d Tuisblad 4 189738',
 u'20090505-000000 af.q Tuisblad 2 56143',
 u'20090505-000000 af Afrika 1 46833',
 u'20090505-000000 af Afrikaans 2 53577',
 u'20090505-000000 af Australi%C3%AB 1 132432',
 u'20090505-000000 af Barack_Obama 1 23368']

Unfortunately this is not very readable because take() returns an array and Scala simply prints the array with each element separated by a comma. We can make it prettier by traversing the array to print each record on its own line.

In [7]:
for x in pagecounts.take(10):
    print x

20090505-000000 aa Main_Page 2 9980
20090505-000000 ab %D0%90%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D0%BD%D0%B5%D1%82 1 465
20090505-000000 ab %D0%98%D1%85%D0%B0%D0%B4%D0%BE%D1%83_%D0%B0%D0%B4%D0%B0%D2%9F%D1%8C%D0%B0 1 16086
20090505-000000 af.b Tuisblad 1 36236
20090505-000000 af.d Tuisblad 4 189738
20090505-000000 af.q Tuisblad 2 56143
20090505-000000 af Afrika 1 46833
20090505-000000 af Afrikaans 2 53577
20090505-000000 af Australi%C3%AB 1 132432
20090505-000000 af Barack_Obama 1 23368


2. Let's see how many records in total are in this data set (this command will take a while, so read ahead while it is running).

In [8]:
pagecounts.count()

1398882

   This should launch tasks on the your Spark cluster.
   If you look closely at the terminal, the console log is pretty chatty and tells you the progress of the tasks.

   While it's running, you can open the Spark web console to see the progress.
   To do this, open your favorite browser, and type in the following URL.

   `http://<the public IP you're using right now>:4040`

   Note that this page is only available if you have an active job or Spark shell.  
   You should see the Spark application status web interface, similar to the following:

   ![Spark Application Status Web UI](images/stages.jpg)

   The links in this interface allow you to track the job's progress and
   various metrics about its execution, including task durations and cache
   statistics.

   In addition, the Spark Standalone cluster status web interface displays
   information that pertains to the entire Spark cluster.  To view this UI,
   browse to

   `http://<the public IP you're using right now>:8080`

   You should see a page similar to the following:

   ![Spark Cluster Status Web UI](images/sparkmaster.jpg)

   When your query finishes running, it should return the following count:

1398882

4. Recall from above when we described the format of the data set, that the second field is the "project code" and contains information about the language of the pages.
   For example, the project code "en" indicates an English page.
   Let's derive an RDD containing only English pages from `pagecounts`.
   This can be done by applying a filter function to `pagecounts`.
   For each record, we can split it by the field delimiter (i.e. a space) and get the second field-– and then compare it with the string "en".

   To avoid reading from disks each time we perform any operations on the RDD, we also __cache the RDD into memory__.
    This is where Spark really starts to to shine.

In [9]:
enPages = pagecounts.filter(lambda x: x.split(" ")[1] == "en").cache()

When you type this command into the Spark shell, Spark defines the RDD, but because of lazy evaluation, no computation is done yet.
   Next time any action is invoked on `enPages`, Spark will cache the data set in memory across the workers in your cluster.

5. How many records are there for English pages?

In [10]:
enPages.count()

970545

   The first time this command is run, similar to the last count we did, it will take 2 - 3 minutes while Spark scans through the entire data set on disk.
   __But since enPages was marked as "cached" in the previous step, if you run count on the same RDD again, it should return an order of magnitude faster__.

   If you examine the console log closely, you will see lines like this, indicating some data was added to the cache:

13/02/05 20:29:01 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_172 in memory on ip-10-188-18-127.ec2.internal:42068 (size: 271.8 MB, free: 5.5 GB)

6. Let's try something fancier.
   Generate a histogram of total page views on Wikipedia English pages for the date range represented in our dataset (May 5 to May 7, 2009).
   The high level idea of what we'll be doing is as follows.
   First, we generate a key value pair for each line; the key is the date (the first eight characters of the first field), and the value is the number of pageviews for that date (the fourth field).

In [11]:
enTuples = enPages.map(lambda x: x.split(" "))
enKeyValuePairs = enTuples.map(lambda x: (x[0][:8], int(x[3])))

Next, we shuffle the data and group all values of the same key together.
   Finally we sum up the values for each key.
   There is a convenient method called `reduceByKey` in Spark for exactly this pattern.
   Note that the second argument to `reduceByKey` determines the number of reducers to use.
   By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side.
   Since we know there is a very limited number of keys in this case (because there are only 3 unique dates in our data set), let's use only one reducer.

In [12]:
enKeyValuePairs.reduceByKey(lambda x, y: x + y, 1).collect()

[(u'20090507', 6175726), (u'20090505', 7076855)]

The `collect` method at the end converts the result from an RDD to an array.
We can combine the previous three commands into one:


In [13]:
enPages.map(lambda x: x.split(" ")).map(lambda x: (x[0][:8], int(x[3]))).reduceByKey(lambda x, y: x + y, 1).collect()

[(u'20090507', 6175726), (u'20090505', 7076855)]

7. Suppose we want to find pages that were viewed more than 200,000 times during the three days covered by our dataset.
   Conceptually, this task is similar to the previous query.
   But, given the large number of pages (23 million distinct page names), the new task is very expensive.
   We are doing an expensive group-by with a lot of network shuffling of data.

   To recap, first we split each line of data into its respective fields.
   Next, we extract the fields for page name and number of page views.
   We reduce by key again, this time with 40 reducers.
   Then we filter out pages with less than 200,000 total views over our time window represented by our dataset.

In [14]:
enPages.map(lambda x: x.split(" ")).map(lambda x: (x[2], int(x[3]))).reduceByKey(lambda x, y: x + y, 40).filter(lambda x: x[1] > 200000).map(lambda x: (x[1], x[0])).collect()

[(451126, u'Main_Page'), (1066734, u'404_error/'), (468159, u'Special:Search')]

   There is no hard and fast way to calculate the optimal number of reducers for a given problem; you will build up intuition over time by experimenting with different values.