<p align="center"><img src="logo/spark.png" alt="Hadoop Logo" width="250"/></p>
# **Lab 2 - Part 1 - Spark**
#### The following steps demonstrate how to create a simple Spark application in Scala. In this notebook you will see how to make a base RDD and appy functions to it.



### ** Part 1: Warm Up **

Create a collection of integers in the range of 1 ... 10000.

In [1]:
import Array._
// TODO: Replace <FILL IN> with appropriate code
val data = range(1,10001)
print(data.length)

10000



Use that collection to create a base RDD.

In [2]:
// TODO: Replace <FILL IN> with appropriate code
val distData = sc.parallelize(data)
print(distData)

ParallelCollectionRDD[0] at parallelize at <console>:27



Namely a `filter()` transformation to keep the values less than 10, then a `collect()` action to collect the results.

In [42]:
def lessThan10(num: Int): Boolean=
    { num < 10 }

val x = distData.filter(lessThan10)
x.collect()
//Another way
//val x2 = distData.filter((a) => a <10)
//x2.collect





Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

### ** Part 2: Create an RDD From a File **

The following steps demonstrate how to create an RDD from a file and apply transofrmations on it in Scala. Creat an RDD, named `pagecounts`, from the input files at `data/pagecounts`. The files entries will look something like this:
```
20090507-040000 zh favicon.ico 67 62955
```

In [21]:
// TODO: Replace <FILL IN> with appropriate code
val pagecounts = sc.textFile("data/pagecounts")
print(pagecounts)
//for (i <- pagecounts){  print( " " + i )}

data/pagecounts MapPartitionsRDD[12] at textFile at <console>:27



Use the `take()` operation of an RDD to get the first 10 records.

In [43]:
// TODO: Replace <FILL IN> with appropriate code
val pc10 = pagecounts.take(10)
pc10.foreach(println)

20090507-040000 aa Main_Page 7 51309
20090507-040000 ab %D0%90%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D0%BD%D0%B5%D1%82 1 34069
20090507-040000 ab %D0%98%D1%85%D0%B0%D0%B4%D0%BE%D1%83_%D0%B0%D0%B4%D0%B0%D2%9F%D1%8C%D0%B0 3 65763
20090507-040000 af.b Tuisblad 1 36231
20090507-040000 af.d Tuisblad 1 58960
20090507-040000 af.q Tuisblad 1 44265
20090507-040000 af Afrikaans 3 80838
20090507-040000 af Australi%C3%AB 1 132433
20090507-040000 af Ensiklopedie 2 60584
20090507-040000 af Internet 1 48816




An alternative way to print the fields is to travers the array and print each record on its own line.

In [19]:
// TODO: Replace <FILL IN> with appropriate code

for (x <- pc10){  println( x )}

20090507-040000 aa Main_Page 7 51309
20090507-040000 ab %D0%90%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D0%BD%D0%B5%D1%82 1 34069
20090507-040000 ab %D0%98%D1%85%D0%B0%D0%B4%D0%BE%D1%83_%D0%B0%D0%B4%D0%B0%D2%9F%D1%8C%D0%B0 3 65763
20090507-040000 af.b Tuisblad 1 36231
20090507-040000 af.d Tuisblad 1 58960
20090507-040000 af.q Tuisblad 1 44265
20090507-040000 af Afrikaans 3 80838
20090507-040000 af Australi%C3%AB 1 132433
20090507-040000 af Ensiklopedie 2 60584
20090507-040000 af Internet 1 48816




Use the `count()` function to see how many records in total are in this data set. The `pagecounts` folder consists of two files, each with around 700K lines, so in total we have around 1400K lines.

In [22]:
// TODO: Replace <FILL IN> with appropriate code
pagecounts.count





1398882

The second field of each record in the data set is the "project code" and contains information about the language of the pages. For example, the project code "en" indicates an English page. Let's derive an RDD, named `enPages`, containing only English pages from pagecounts. This can be done by applying a `filter()` function to pagecounts. For each record, we can split it by the field delimiter (i.e., a space) and get the second field, and then compare it with the string "en". To avoid reading from disks each time we perform any operations on the RDD, we can use `cache()` to cache the RDD into memory. 

In [25]:
// TODO: Replace <FILL IN> with appropriate code
def get2Field(rec: String): Boolean=
    { val fieldsList = rec.split(' ')
      fieldsList(1) == "en"
    }

val enPages = pagecounts.filter(get2Field).cache()





MapPartitionsRDD[13] at filter at <console>:32

The above command defines the RDD, but because of lazy evaluation, no computation is done yet. Next time any action is invoked on `enPages`, Spark will cache the data set in memory across the workers in your cluster. So, let's count the number of records, which are there for English pages. The first time this command is run, it will take 2-3 minutes while Spark scans through the entire data set on disk. But since `enPages` was marked as "cached" in the previous step, if you run count on the same RDD again, it should return an order of magnitude faster.

In [28]:
// TODO: Replace <FILL IN> with appropriate code
enPages.count
enPages.take(5)





Array(20090507-040000 en ! 3 105344, 20090507-040000 en !! 2 13398, 20090507-040000 en !!! 9 117779, 20090507-040000 en !!!Fuck_You!!! 4 38684, 20090507-040000 en !!!Fuck_You!!!_and_Then_Some 2 17960)

Let's generate a histogram of total page views on Wikipedia English pages for the date range represented in our dataset (May 5 to May 7, 2009). The high level idea of what we'll be doing is as follows. First, we generate a key value pair for each line; the key is the date (the first eight characters of the first field), and the value is the number of pageviews for that date (the fourth field).

In [36]:
// TODO: Replace <FILL IN> with appropriate code
//entuples
val enKeyValuePairs = enPages.map(x => (x.split(" ")(0).substring(0, 8), x(3).toInt))
for (x <- enKeyValuePairs.take(5)){  println( x )}
//val enKeyValuePairs = enTuples.map(<FILL IN>)

(20090507,57)
(20090507,57)
(20090507,57)
(20090507,57)
(20090507,57)




Next, we shuffle the data and group all values of the same key together. Finally we sum up the values for each key. There is a convenient method called `reduceByKey` in Spark for exactly this pattern. Note that the second argument to `reduceByKey` determines the number of reducers to use. By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side. Since we know there is a very limited number of keys in this case (because there are only 3 unique dates in our data set), let’s use only one reducer.

In [38]:
// TODO: Replace <FILL IN> with appropriate code
enKeyValuePairs.reduceByKey((a, b) => a + b, 1).collect().foreach(println)

(20090507,27367524)
(20090505,27953541)




The `collect()` method at the end converts the result from an RDD to an array. We can combine the previous three commands into one:

In [39]:
// TODO: Replace <FILL IN> with appropriate code
//enPages.map(<FILL IN>).map(<FILL IN>).reduceByKey(<FILL IN>, 1).collect()
enPages.map(x => (x.split(" ")(0).substring(0, 8), x(3).toInt)).reduceByKey((a, b) => a + b, 1).collect()





Array((20090507,27367524), (20090505,27953541))

Suppose we want to find pages that were viewed more than 200,000 times during the three days covered by our dataset. Conceptually, this task is similar to the previous query. But, given the large number of pages (23 million distinct page names), the new task is very expensive. We are doing an expensive group-by with a lot of network shuffling of data. To recap, first we split each line of data into its respective fields. Next, we extract the fields for page name and number of page views. We reduce by key again, this time with 40 reducers. Then we filter out pages with less than 200,000 total views over our time window represented by our dataset.

In [41]:
// TODO: Replace <FILL IN> with appropriate code
enPages.map(x => x.split(" ")).map(x => (x(2), x(3).toInt)).filter(x => x._2 > 200000)
.reduceByKey(_+_ , 40).map(x => (x._2, x._1)).collect()





Array((468159,Special:Search), (451126,Main_Page), (1066734,404_error/))