In ICP2 we will use spark in the colab. To run spark in Colab, first we need to install all the dependencies in Colab environment such as Apache Spark 3.0.3 with hadoop 3.2, Java 8 and Findspark in order to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab by running the following commands to install the dependencies.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz
!tar xf spark-3.0.3-bin-hadoop3.2.tgz
!pip install -q findspark

Now that we have installed Spark and Java in Colab, it is time to set the environment path that enables us to run PySpark in our Colab environment. Set the location of Java and Spark by running the following code:

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

We can now check the directory content for Java

In [3]:
!ls /usr/lib/jvm/

default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64


Finally, lets install the python library for spark called pyspark

In [4]:
!pip install pyspark==3.0.2

Collecting pyspark==3.0.2
  Downloading pyspark-3.0.2.tar.gz (204.8 MB)
[K     |████████████████████████████████| 204.8 MB 52 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186690 sha256=8bfc0b44561be085ecd5e09c7ba5e94eb56a2880fcb92d0049509badd782638b
  Stored in directory: /root/.cache/pip/wheels/9a/39/f6/970565f38054a830e9a8593f388b36e14d75dba6c6fdafc1ec
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.2


Configuring a SparkSession

The entry point to using Spark SQL is an object called SparkSession. It initiates a Spark Application which all the code for that Session will run on.

.builder — gives access to Builder API which is used to configure the session .

.master() — determines where the program will run; "local[*]" sets it to run locally on all cores but you can use "local[1]" to run on one core for example. In this case, our programs will be run on Google’s servers.

.appName() — optional method to name the Spark Application

.getOrCreate() — gets an existing SparkSession or creates new one if none exists


Configuring a SparkContext

SparkContext represents a connection to a Spark cluster and can be used to create RDDs and broadcast variables on that cluster.

In [5]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Big_Data_Application_ICP_2").getOrCreate()
sc=SparkContext.getOrCreate(conf=spark)


To open a local file on Google Colab you need to run the following code which will prompt you to select a file from your computer:

In [6]:
from google.colab import files
files.upload()

Saving icp.txt to icp.txt


{'icp.txt': b'As the Labor Day holiday nears, many people are planning travel and get-togethers to see family and friends.Unfortunately, this is occurring at the same time Covid-19 rates are climbing. The rates of new coronavirus infections are higher than they have been since January. Hospitalizations are also at their highest levels since January. In many parts of the United States, both infections and hospitalizations are higher than they were during Labor Day weekend in 2020.How should people think about Covid-19 safety now, compared to last year? Is it safe to see family and friends? What if extended family members want to stay in a house together -- what are some steps they should take to reduce risk? And how does the start of school affect our risk?To help navigate these questions, we spoke with CNN Medical Analyst Dr.Leana Wen. Wen is an emergency physician and visiting professor of health policy and management at the George Washington University Milken Institute School of Publ

Inorder to read the input text file into RDD, we will be using the following code:

In [7]:
input_data=sc.textFile('icp.txt')


Inorder to see number of elements in the input RDD we will be using count action.

In [10]:
input_data.count()


1

Take action will return first n elements in the input data.

In [11]:
input_data.take(1)

['As the Labor Day holiday nears, many people are planning travel and get-togethers to see family and friends.Unfortunately, this is occurring at the same time Covid-19 rates are climbing. The rates of new coronavirus infections are higher than they have been since January. Hospitalizations are also at their highest levels since January. In many parts of the United States, both infections and hospitalizations are higher than they were during Labor Day weekend in 2020.How should people think about Covid-19 safety now, compared to last year? Is it safe to see family and friends? What if extended family members want to stay in a house together -- what are some steps they should take to reduce risk? And how does the start of school affect our risk?To help navigate these questions, we spoke with CNN Medical Analyst Dr.Leana Wen. Wen is an emergency physician and visiting professor of health policy and management at the George Washington University Milken Institute School of Public Health. S

Inorder to use the regular expressions we imported re module.

In the below code we created function with name RemovcePunc which will remove the punctuations and coverted them into lower elements.



In [12]:
import re
def RemovePunc(sentence):
     return re.sub(re.compile(r'[^a-zA-Z\s]'),"",sentence).lower().strip()



The below code will splits each record by space in an RDD and flattens it.

In [13]:
cleaned_rdd=input_data.flatMap(lambda sentence: RemovePunc(sentence).split())


Removing all the recurring elements in the cleaned_rdd and checking the final count of elements.

In [14]:
dist_data_rdd=cleaned_rdd.distinct()

dist_data_rdd.count()

149

Checking the top 5 elements using take(5) action.

In [15]:
dist_data_rdd.take(5)

['as', 'holiday', 'are', 'planning', 'family']

After getting the distinct elements, grouped elements based on first letter of each element using groupBy and sorted them in descending order using sortByKey.

In [17]:

final_result = dist_data_rdd.groupBy(lambda x: (x[0])).sortByKey(False)



Created an output file in write mode and stored the final result int it.

In [27]:
textfile = open("ICP1_Output_File.txt", "w")
for (k,v) in final_result.collect(): 
  print(k,list(v))
  textfile.write(str(k)+' '+str(list(v))+'\n')

y ['year']
w ['we', 'were', 'weekend', 'what', 'want', 'with', 'wen', 'washington', 'well', 'who']
v ['very', 'visiting', 'vaccines', 'vaccinated']
u ['united', 'university', 'unvaccinated', 'us']
t ['this', 'than', 'think', 'take', 'these', 'the', 'travel', 'to', 'time', 'they', 'their', 'together', 'that', 'times']
s ['safe', 'steps', 'start', 'school', 'spoke', 'said', 'see', 'same', 'since', 'states', 'should', 'safety', 'stay', 'some', 'shes', 'severe']
r ['rates', 'reduce', 'risk', 'riskto', 'reason', 'report']
q ['questions']
p ['planning', 'policy', 'public', 'published', 'prevention', 'people', 'parts', 'physician', 'professor', 'protect']
o ['of', 'officials', 'occurring', 'our', 'one']
n ['new', 'now', 'nears', 'navigate']
m ['members', 'medical', 'management', 'more', 'many', 'milken', 'main']
l ['levels', 'last', 'lifelines', 'likely', 'los', 'labor']
j ['january', 'journey']
i ['is', 'infections', 'in', 'institute', 'it', 'if']
h ['holiday', 'higher', 'have', 'house', 'he