# Problem 1 - Working with RDDs (5 points)

This is an interactive PySpark session. Remember that when you open this notebook the `SparkContext` and `SparkSession` are already created, and they are in the `sc` and `spark` variables, respectively. You can run the following two cells to make sure that the Kernel is active.

**Do not insert any additional cells than the ones that are provided.**

In [1]:
sc

In [2]:
spark

In the following cell, make an RDD called `top1m` that contains the contents of the file `top-1m.csv` that you placed into the cluster's HDFS.

In [3]:
top1m = sc.textFile("hdfs:///user/hadoop/top-1m.csv")

There is one element in the RDD for each line in the file. The `.count()` method will compute how many lines are in the file. In the following cell, type the expression to count the lines in the `top1m` RDD. Run the cell and see the result.

In [8]:
top1m.count()

1000000

## Count the `.com` domains

How many of the websites in this RDD are in the .com domain?

In the following cell, write a code snippet that finds the records with `.com` and counts them. (Hint: use a regular expression.)

In [6]:
import re
com_re = re.compile(".*\.com$")

def extract(line):
    m = com_re.search(line)
    if m:
        return 'with .com'
    else:
        return 'without .com'

Search = top1m.map(lambda x: (extract(x), 1))
Search.countByKey()

defaultdict(int, {'with .com': 486317, 'without .com': 513683})

## Histogram the Top Level Domains (TLDs)

What is the distribution of TLDs in the top 1 million websites? We can quickly compute this using the RDD function `countByValue()`.

In the following cell, write a function called `tld` (in Python) that takes a domain name string and outputs the top-level domain.

In [4]:
def tld(domain_name):
    return domain_name.split('.')[-1]

In the following cell, map the `top1m` RDD using `tld` into a new RDD called `tlds`. 

In [5]:
tlds = top1m.map(lambda line: tld(line))

In the following two cells, evaluate `top1m.first()` and  `tlds.first()` to see if the first line of `top1m` transformed by `tld` is properly represented as the first line of `tlds`. 

In [7]:
top1m.first()

u'1,youtube.com'

In [8]:
tlds.first()

u'com'

Look at the first 50 elements of `top1m` by evaluating `top1m.take(50)`.

In [9]:
top1m.take(50)

[u'1,youtube.com',
 u'2,google.com',
 u'3,facebook.com',
 u'4,baidu.com',
 u'5,wikipedia.org',
 u'6,reddit.com',
 u'7,yahoo.com',
 u'8,google.co.in',
 u'9,qq.com',
 u'10,taobao.com',
 u'11,amazon.com',
 u'12,twitter.com',
 u'13,sohu.com',
 u'14,vk.com',
 u'15,instagram.com',
 u'16,tmall.com',
 u'17,google.co.jp',
 u'18,sina.com.cn',
 u'19,live.com',
 u'20,jd.com',
 u'21,weibo.com',
 u'22,360.cn',
 u'23,yandex.ru',
 u'24,google.co.uk',
 u'25,netflix.com',
 u'26,google.de',
 u'27,google.com.br',
 u'28,google.ru',
 u'29,twitch.tv',
 u'30,google.fr',
 u'31,pornhub.com',
 u'32,google.com.hk',
 u'33,login.tmall.com',
 u'34,xvideos.com',
 u'35,google.it',
 u'36,google.es',
 u'37,alipay.com',
 u'38,google.com.mx',
 u'39,yahoo.co.jp',
 u'40,bing.com',
 u'41,ebay.com',
 u'42,google.ca',
 u'43,ok.ru',
 u'44,csdn.net',
 u'45,t.co',
 u'46,microsoft.com',
 u'47,wikia.com',
 u'48,hao123.com',
 u'49,imdb.com',
 u'50,mail.ru']

Try the same thing with the `tlds` RDD to make sure that the first 50 lines were properly transformed.


In [10]:
tlds.take(50)

[u'com',
 u'com',
 u'com',
 u'com',
 u'org',
 u'com',
 u'com',
 u'in',
 u'com',
 u'com',
 u'com',
 u'com',
 u'com',
 u'com',
 u'com',
 u'com',
 u'jp',
 u'cn',
 u'com',
 u'com',
 u'com',
 u'cn',
 u'ru',
 u'uk',
 u'com',
 u'de',
 u'br',
 u'ru',
 u'tv',
 u'fr',
 u'com',
 u'hk',
 u'com',
 u'com',
 u'it',
 u'es',
 u'com',
 u'mx',
 u'jp',
 u'com',
 u'com',
 u'ca',
 u'ru',
 u'net',
 u'co',
 u'com',
 u'com',
 u'com',
 u'com',
 u'ru']

At this point, `tlds.countByValue()` would give us a list of each TLD and the number of times that it appears in the top1m file. Note that this function returns the results as a `defaultDict` in the Python environemnt, not as an RDD. But we want it reverse sorted by count. To do this, we can set a variable called `tlds_and_counts` equal to `tlds.countByValue()` and then reverse the order, sort, and take the top 50, like this:

```
tlds_and_counts = tlds.countByValue()
counts_and_tlds = [(count,domain) for (domain,count) in tlds_and_counts.items()]
```

In the following cell, run the code above to produce the Python Dictionary.

In [11]:
tlds_and_counts = tlds.countByValue()
counts_and_tlds = [(count,domain) for (domain,count) in tlds_and_counts.items()]

In the following cell, reverse sort `counts_and_tlds` and display the first 50.

In [12]:
counts_and_tlds.sort(reverse=True)
counts_and_tlds[:50]

[(486317, u'com'),
 (49901, u'org'),
 (43522, u'ru'),
 (42822, u'net'),
 (33117, u'de'),
 (19368, u'br'),
 (18232, u'uk'),
 (15498, u'pl'),
 (15209, u'ir'),
 (11204, u'in'),
 (11150, u'fr'),
 (10471, u'it'),
 (9287, u'au'),
 (9072, u'info'),
 (8672, u'es'),
 (8046, u'cz'),
 (7592, u'nl'),
 (7082, u'jp'),
 (6421, u'mx'),
 (6248, u'co'),
 (5810, u'tw'),
 (5774, u'ua'),
 (5710, u'cn'),
 (5705, u'ca'),
 (5427, u'gr'),
 (5116, u'io'),
 (5010, u'eu'),
 (4877, u'se'),
 (4039, u'sk'),
 (3968, u'ch'),
 (3848, u'me'),
 (3730, u'tv'),
 (3652, u'hu'),
 (3619, u'kr'),
 (3338, u'no'),
 (3287, u'at'),
 (3251, u'us'),
 (3197, u'ar'),
 (3052, u'ro'),
 (3050, u'za'),
 (2978, u'id'),
 (2957, u'cl'),
 (2910, u'dk'),
 (2897, u'be'),
 (2887, u'edu'),
 (2638, u'vn'),
 (2265, u'biz'),
 (2233, u'pt'),
 (2222, u'tr'),
 (2151, u'fi')]

**Question:** `top1m.collect()[0:50]` and `top1m.take(50)` produce the same result. Which one is more efficient and why? Put your answer in the cell below.

In [None]:
## Answer Here (don't run this cell)
'top1m.take(50)' is more efficient compared to 'top1m.collect()[0:50]', because when a 'collect' operation is issued on a RDD, the whole dataset is copied to the driver, i.e. the master node (a memory exception will be thrown if the dataset is too large to fit in memory); While a 'take' operation can be used to retrieve only a limited number of elements instead, which is more efficient.

When you finish this problem, click on the File -> 'Save and Checkpoint' in the menu bar to make sure that the latest version of the workbook file is saved. Also, before you close this notebook and move on, make sure you disconnect your SparkContext, otherwise you will not be able to re-allocate resources. Remember, you will commit the .ipynb file to the repository for submission (in the master node terminal.)

In [4]:
sc.stop()