<a href="https://colab.research.google.com/github/Sudhir22/Spark-Notebooks/blob/main/Word_Count_MapReduce.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Wordcount in Spark

### Setup

Let's setup Spark on your Colab environment.  Run the cell below!

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [3]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [4]:
id='1SE6k_0YukzGd5wK-E4i6mG83nydlfvSa'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('pg100.txt')

If you executed the cells above, you should be able to see the file *pg100.txt* under the "Files" tab on the left panel.

Spark application which outputs the number of words that start with each letter. This means that for every letter we want to count the total number of (non-unique) words that start with a specific letter. In your implementation **ignore the letter case**, i.e., consider all words as lower case. Also, ignoring all the words **starting** with a non-alphabetic character.

In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd
from operator import add



# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

In [6]:
# YOUR
data = spark.read.text('pg100.txt')
data.printSchema()

root
 |-- value: string (nullable = true)



In [7]:
# CODE
def clean_data(x):
  punctuation = '!"@#$%^&*()+=,.:;?/~{}[]|\-_'
  lower_x = str(x).lower()
  for ch in punctuation:
    lower_x = lower_x.replace(ch,' ')

  return lower_x
  

In [9]:
data_rdd = data.rdd.map(clean_data)
data_rdd = data_rdd.flatMap(lambda x:x.split(" "))
data_rdd = data_rdd.filter(lambda x:x!=' ')

In [10]:
word_count = data_rdd.map(lambda word: (word,1))
word_count = word_count.reduceByKey(lambda a, b: a+b).sortBy(lambda r: -r[1])

In [12]:
word_count.take(10)

[('', 737036),
 ("'", 145207),
 ('value', 124812),
 ('row', 124791),
 ('the', 27133),
 ('and', 26437),
 ('i', 19996),
 ('to', 19502),
 ('of', 18024),
 ('a', 14252)]