## HW Assignment 1

In this assignment, we will learn how to use Apache Spark RDDs and explore MapReduce and distributed processing. For each question, add your code below the question and run the code. 

Note: Your code can be in more than one cell if you choose.

Let's start by running the code below to start up a local Spark instance.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-eu.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
!pip install -q findspark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 72kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 48.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=39b6d19876c6cb4b4c959b0483e36f2048a1b1a13cdf65f99552b3d895cd4f15
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
from pyspark import SparkContext

In [None]:
sc = SparkContext()

In [None]:
sc

1. Load the Alice in Wonderland text file into an RDD

In [None]:
lines=sc.textFile("/content/sample_data/Alice.txt")

In [None]:
lines.take(4)

['The Project Gutenberg EBook of Aliceâ€™s Adventures in Wonderland, by Lewis Carroll',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or']

2. Transform all characters lowercase and remove all non-character symbols using the map function and store this in a new RDD.

In [None]:
def remove_non_characters(row):
  result=[]
  ls=[]
  for w in row.split():
    getVals = list([val for val in w if val.isalnum()]) 
    result= "".join(getVals)
    ls.append(result)
  return " ".join(ls)

rdd_token_lower_alpha=lines.map(lambda x: remove_non_characters(x.lower()))


In [None]:
rdd_token_lower_alpha.take(10)

['the project gutenberg ebook of aliceâs adventures in wonderland by lewis carroll',
 '',
 'this ebook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever you may copy it give it away or',
 'reuse it under the terms of the project gutenberg license included',
 'with this ebook or online at wwwgutenbergorg',
 '',
 '',
 'title aliceâs adventures in wonderland',
 '']

3. Write code to calculate the distribution of word length for the book Alice in Wonderland. Print the distribution in this notebook.

In [None]:
tokens_lower = rdd_token_lower_alpha.flatMap(lambda x: x.split(' '))

In [None]:
result = tokens_lower.map(lambda x: (len(x), 1)).reduceByKey(lambda x, y: x + y)

In [None]:
result.collect()

[(2, 4400),
 (10, 435),
 (0, 1032),
 (4, 5880),
 (6, 2473),
 (8, 1005),
 (12, 94),
 (14, 32),
 (32, 1),
 (20, 1),
 (16, 2),
 (22, 3),
 (18, 1),
 (3, 7176),
 (7, 2042),
 (9, 741),
 (5, 3974),
 (15, 14),
 (1, 964),
 (11, 252),
 (13, 40),
 (17, 1),
 (19, 1),
 (47, 1),
 (21, 2)]

4. N-grams are contiguous sequences of n words. Write a function to take a string containing multiple words and n as input and return a list of n-grams (the length of the n-grams is a parameter inputted into the function). 


Note: it's fine to only create ngrams out of each row of the text. For the purpose of this exercise, there is no need to combine rows of text.

In [None]:
def ngram(s, n):
  words = s.split(' ')
  res = []
  for i in range(0, len(words) - n):
    res.append(' '.join(words[i:i+n]))
  return res

5. Transform the RDD containing the book Alice in Wonderland to an RDD containing all 2-grams from each row in the book.

In [None]:
ngram_lines = rdd_token_lower_alpha.flatMap(lambda x: ngram(x.lower(), 2))

6. Write code to find the distribution of all 2-grams in the book and print it below.

In [None]:
flat_lines_gram=ngram_lines.map(lambda x: (x,1)).reduceByKey(lambda x, y: x + y)
flat_lines_gram.collect()

[('the project', 21),
 ('project gutenberg', 21),
 ('wonderland by', 2),
 ('ebook is', 2),
 ('is for', 2),
 ('use of', 9),
 ('of anyone', 2),
 ('at no', 3),
 ('cost and', 2),
 ('almost no', 1),
 ('whatsoever you', 2),
 ('copy it', 2),
 ('it give', 2),
 ('give it', 3),
 ('it away', 4),
 ('reuse it', 1),
 ('it under', 1),
 ('the terms', 12),
 ('terms of', 17),
 ('of the', 131),
 ('gutenberg license', 2),
 ('online at', 2),
 ('title aliceâs', 1),
 ('release date', 1),
 ('25 2008', 1),
 ('updated february', 1),
 ('february 22', 1),
 ('start of', 1),
 ('of this', 23),
 ('this project', 3),
 ('produced by', 2),
 ('by arthur', 2),
 ('fulcrum edition', 1),
 ('chapter i', 1),
 ('down the', 10),
 ('chapter ii', 1),
 ('the pool', 8),
 ('and a', 15),
 ('chapter iv', 1),
 ('the rabbit', 18),
 ('rabbit sends', 2),
 ('in a', 89),
 ('chapter v', 1),
 ('v advice', 1),
 ('advice from', 2),
 ('chapter vi', 1),
 ('vi pig', 1),
 ('chapter vii', 1),
 ('a mad', 2),
 ('the queenâs', 8),
 ('the mock', 50),
 ('