In [None]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()
import pyspark.sql.functions as spfn
# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

drive	     spark-3.1.1-bin-hadoop3.2	    spark-3.1.1-bin-hadoop3.2.tgz.1
sample_data  spark-3.1.1-bin-hadoop3.2.tgz  spark-3.1.1-bin-hadoop3.2.tgz.2


In [None]:
sc = spark.sparkContext

In [None]:
from google.colab import drive
drive.mount("/content/drive/")

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


**Reading text file**

This creates a RDD

In [None]:
textfile=sc.textFile("/content/drive/MyDrive/news.txt")
wholetextfile=sc.wholeTextFiles("/content/drive/MyDrive/news.txt")

**Part A**

Printing the total number of news and total word count

In [None]:
print ("Total number of news (using the textFile method ):",len(textfile.collect()))
print ("Output length of the wholeTextFiles method : ",len(wholetextfile.collect()))

Total number of news (using the textFile method ): 12
Output length of the wholeTextFiles method :  1


In [None]:
#wordcount=textfile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x, y : x+y)
#this section does not require any reduce
total_words=textfile.flatMap(lambda line: line.split(" ")).map(lambda word: word)

print ("total number of words :",len(total_words.collect()))
print ("first 10 words :",total_words.take(11))#indexes start at 1

total number of words : 2787
first 10 words : ['JAPAN', 'TO', 'REVISE', 'LONG', '-', 'TERM', 'ENERGY', 'DEMAND', 'DOWNWARDS', 'The', 'Ministry']


**PART B**

In [None]:
def lower (line):#lowercase map
  return line.lower()

lower_case_words=textfile.map(lower).flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x, y : x+y)
ordered=lower_case_words.takeOrdered(10, key = lambda x: -x[1])
print ("first 10 most used words")
for j,i in ordered :
  print (j,":",i)

first 10 most used words
. : 130
the : 123
, : 102
to : 84
of : 64
said : 55
and : 55
in : 54
a : 45
s : 33


**Part C**

In [None]:
from string import punctuation
def check (x):
  if x[0] not in punctuation:
    return True
  else :
    return False
no_spec=lower_case_words.filter(check)
ordered_no_spec=no_spec.takeOrdered(10, key = lambda x: -x[1])
print (no_spec.collect())
print ("first 10 most used words excluding special characters")
for j,i in ordered_no_spec :
  print (j,":",i)

[('japan', 16), ('long', 4), ('term', 3), ('demand', 6), ('ministry', 5), ('of', 64), ('trade', 16), ('miti', 4), ('meet', 1), ('forecast', 1), ('in', 54), ('officials', 4), ('said', 55), ('is', 13), ('lower', 4), ('primary', 1), ('supplies', 1), ('year', 11), ('600', 1), ('decision', 2), ('follows', 1), ('emergence', 1), ('structural', 1), ('following', 1), ('value', 1), ('yen', 4), ('decline', 2), ('electric', 2), ('power', 2), ('planning', 1), ('work', 3), ('out', 3), ('revised', 1), ('meetings', 1), ('agency', 1), ('resources', 1), ('nuclear', 2), ('s', 33), ('ended', 2), ('supplying', 1), ('an', 8), ('estimated', 1), ('pct', 15), ('kilowatt', 1), ('basis', 1), ('liquefied', 1), ('21', 1), ('),', 2), ('quarter', 9), ('widened', 1), ('4', 2), ('baht', 3), ('1', 4), ('ago', 6), ('business', 3), ('economics', 1), ('department', 2), ('janunary', 1), ('rose', 2), ('improved', 1), ('climate', 1), ('this', 3), ('resulted', 1), ('increase', 4), ('raw', 1), ('country', 2), ('import', 3), ('

**Part D**

In [None]:
z=no_spec.collect()
type(z)

list

In [None]:
alphabet=sc.parallelize("abcdefghijklmnopqrstuvwxyz")
lower_case_words=textfile.map(lower).flatMap(lambda line: line.split(" ")).filter(check).collect()
print(lower_case_words)
def count_char(x):
  counter=0
  for j in lower_case_words:
    if j[0]==x:
      counter+=1
  return counter
char_counts=alphabet.map(lambda x : (x,count_char(x)))

print ("charachter counts : ",char_counts.collect())

['japan', 'to', 'revise', 'long', 'term', 'energy', 'demand', 'downwards', 'the', 'ministry', 'of', 'international', 'trade', 'and', 'industry', 'miti', 'will', 'revise', 'its', 'long', 'term', 'energy', 'supply', 'demand', 'outlook', 'by', 'august', 'to', 'meet', 'a', 'forecast', 'downtrend', 'in', 'japanese', 'energy', 'demand', 'ministry', 'officials', 'said', 'miti', 'is', 'expected', 'to', 'lower', 'the', 'projection', 'for', 'primary', 'energy', 'supplies', 'in', 'the', 'year', '2000', 'to', '550', 'mln', 'kilolitres', 'kl', 'from', '600', 'mln', 'they', 'said', 'the', 'decision', 'follows', 'the', 'emergence', 'of', 'structural', 'changes', 'in', 'japanese', 'industry', 'following', 'the', 'rise', 'in', 'the', 'value', 'of', 'the', 'yen', 'and', 'a', 'decline', 'in', 'domestic', 'electric', 'power', 'demand', 'miti', 'is', 'planning', 'to', 'work', 'out', 'a', 'revised', 'energy', 'supply', 'demand', 'outlook', 'through', 'deliberations', 'of', 'committee', 'meetings', 'of', 'th

In [None]:
ordered_chars_no_spec=char_counts.takeOrdered(5, key = lambda x: -x[1])
for j,i in ordered_chars_no_spec :
  print (j,":",i)

t : 337
a : 224
s : 200
o : 164
i : 150
