# CS 5683 - Big Data Analytics
## Assignment - 1: Intro. to Spark and RDD

###### Use Google Colab to use this notebook
###### Let's setup Spark first

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 65 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=a6953a912734b4ab5580657fcaf09f4e148f40ae807a8fea5fd42d31c0ddd769
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


###### Import required libraries now

In [2]:
import sys
 
from pyspark import SparkContext, SparkConf

###### Let's initialize Spark context now

In [3]:
# create Spark context with necessary configuration
sc = SparkContext("local","PySpark - CS5683 - Assignment-1")

###### Follow the tutorial to mount your Google Drive. Give mounted Drive paths below

In [4]:
# Give **.txt FILE PATHS** here
# Use your own files which could match your application
file1 = 'file1.txt'
file2 = 'file2.txt'

# USE THESE FILES as input(s) FOR ALL BELOW QUESTIONS

### Example Spark program

In [None]:
# Example Spark application for a simple wordcount
# What is wordcount? 
    # Given a file, count the frequency of all words appearing in that file
    
# Step-1: Read the required file. In our case it is file1 or file2.
# NOTE: We do not need to initialize SparkContext as only one SparkContext can be initialized in one notebook
fileRDD = sc.textFile(file1)

# Step-2: 
    # Each line in our file(s) is a sentence. So, we need to split the sentence with ' ' to get words
    # Using map() will return RDD[list]. But we need RDD[string]. So we use flatMap()
wordsRDD = fileRDD.flatMap(lambda line: line.split(" ")) # <----------- TEST what happens when you use map()

# Step-3: For each input, we will make (K,V) pair, where K is the word and V is 1
pairRDD = wordsRDD.map(lambda word: (word,1))

# Step-4: Now we have to sum all 1's of each word
# NOTE: A word may present in multiple data partitions. So we use reduceByKey() to group by key and perform sum
countRDD = pairRDD.reduceByKey(lambda a,b: a+b)

#Step-5: Save results in a text file
countRDD.saveAsTextFile('') # <----------- GIVE FILE PATH

### Question - 1 (10 points) 

In [None]:
# YOUR CODE for Question-1 HERE

In [26]:
import re
file1_rdd = sc.textFile(file1)
file1_flat = file1_rdd.flatMap(lambda x: x.split(" "))


file1_initial = file1_flat.filter(lambda x: x.split()!=" ") 
                      
file1_step1 = file1_initial.filter(lambda x: len(x) != 1)

regex_string = '\A[>?/\|}\]-_`$%{~:;\[^&*()<@_!#]'
file1_filter = file1_step1.filter(lambda x: re.search(regex_string, x))
# Counting the character count
file1_pre = file1_filter.filter(lambda x: x.strip()!='').map(lambda x: x[0])
file1_map = file1_pre.map(lambda x: (x, 1))
file1_count = file1_map.reduceByKey(lambda x,y: x+y)
file1_count.collect()

[('[', 1),
 ('{', 1),
 ('}', 2),
 ('#', 1),
 ('*', 3),
 ('&', 1),
 ('^', 1),
 ('%', 1),
 ('$', 1),
 (';', 2),
 ('!', 2),
 ('@', 3),
 ('(', 1),
 (')', 1)]

In [None]:
# PRINT THE OUTPUT HERE

### Question - 2 (10 points)

In [None]:
# YOUR CODE for Question-2 HERE

In [45]:
file1_index = file1_rdd.zipWithIndex() \
                       .map(lambda x: (int(x[1])+1, x[0]))

file1_flat = file1_index.map(lambda x: (x[0], len(set(x[1].split(" "))))) \
                        .sortByKey()    
                    
file1_flat.collect()

[(1, 8), (2, 7), (3, 8), (4, 5), (5, 4)]

In [None]:
# PRINT THE OUTPUT HERE

### Question - 3 (10 points)

In [None]:
# YOUR CODE for Question-3 HERE

In [73]:
file1_rdd10 = sc.textFile(file1)
file2_rdd11 = sc.textFile(file2)
file1_flat1 = file1_rdd10.flatMap(lambda x: x.split(" "))

file2_flatmap1= file2_rdd11.map(lambda x: x.split(" ")) \
                         .flatMap(lambda x: x)


file1_count = file1_flat1.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y) \
                         .filter(lambda x: x[0]!='')

file2_count = file2_flatmap1.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y) \
                         .filter(lambda x: x[0]!='')

# Joining the data
rdd_final = file1_count.join(file2_count) \
                       .sortBy(lambda x: x[0])

rdd_final.collect()

[('!jerry', (2, 1)),
 ('$jump', (1, 1)),
 ('%yup', (1, 1)),
 ('(as', (1, 1)),
 ('*1', (1, 1)),
 ('*up', (2, 2)),
 ('4567', (1, 1)),
 ('8kill', (2, 2)),
 (';to', (1, 1)),
 ('@jerry', (2, 1)),
 ('[ab', (1, 1)),
 ('^kill', (1, 1)),
 ('as', (1, 1)),
 ('fight', (1, 1)),
 ('jump', (1, 1)),
 ('kill', (1, 1)),
 ('opd', (1, 1)),
 ('pop', (1, 1)),
 ('tom', (1, 1)),
 ('yup', (1, 1)),
 ('}lol', (2, 1))]

In [None]:
# PRINT THE OUTPUT HERE

### WHAT TO TURN-IN IN CANVAS

# Due Date: Sept. 1 at 11:59pm