## Exercise 1: Word Count

In [None]:
from pyspark import SparkContext, SparkConf

# Create a SparkContext
conf = SparkConf().setAppName("Count Unique Words")
sc = SparkContext.getOrCreate(conf)

# hdfs://<hostname>:<port>/<path>

hdfs_path = "hdfs://namenode:9000/donald_trump_speeches/*.txt"

dataset_rdd = sc.textFile(hdfs_path)

words_rdd = dataset_rdd.flatMap(lambda line: line.split())

# Count the unique words
unique_words_count = words_rdd.distinct().count()

print(f"Number of unique words: {unique_words_count}")


Number of unique words: 20104


### Count unique word

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("DonaldTrumpSpeeches").getOrCreate()

# Read the text file(s) into an RDD
hdfs_path = "hdfs://namenode:9000/donald_trump_speeches/*.txt"
speeches_rdd = spark.sparkContext.textFile(hdfs_path)

words_rdd = speeches_rdd.flatMap(lambda line: line.split())

word_pairs_rdd = words_rdd.map(lambda word: (word.lower(), 1))

# Reduce by key to count occurrences of each word
word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)

# Count the unique words
unique_word_count = word_counts_rdd.count()

# Collect and print the results
word_counts = word_counts_rdd.collect()
for word, count in word_counts:
    print(f"{word}: {count}")

print(f"Total unique words: {unique_word_count}")

spark.stop()

was: 2436
is: 2582
before: 232
lot: 667
talking: 166
countries: 92
fine.: 25
unions: 12
"look,: 11
here.: 184
second: 140
less: 93
jets: 2
powerful: 41
world.: 163
sometimes: 47
universities: 4
watcher: 1
110,: 1
90%,: 4
thinks: 16
72%,: 1
hillary: 67
would've: 47
instead: 54
seats,: 4
table.: 8
table: 5
place: 108
o'clock: 16
clinton: 29
mcdaniel,: 8
name.: 20
credit,: 9
directly: 15
away: 150
party.: 51
forgot: 15
stage.: 10
minute,: 16
pelosi's: 5
congratulations: 8
strength.: 13
up?: 13
anybody: 139
meant.: 1
states,: 63
head: 45
liar!: 1
pathological: 1
haleigh!: 1
wow.: 14
learn: 5
republicans.: 11
worried.: 2
debbie: 4
flags.": 1
a+,: 2
exonerated.: 2
street: 12
story.: 55
attacked: 4
24,: 3
ah,: 6
level,: 3
prosecutor.: 2
fact.": 1
eventually: 6
surprised.: 5
beto,: 6
expansions.: 1
garbage.: 4
trading: 3
aliens: 63
border,: 32
fixed.: 1
how's: 27
grow: 8
day?: 8
30,: 8
government: 79
flag.: 38
generations: 25
estate: 19
protecting: 44
minnesota: 23
plan: 61
offenders: 1
pepper

## Keep only meaning full word

In [3]:
!pip install nltk

Collecting nltk
  Downloading nltk-3.9.1-py3-none-any.whl (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m210.9 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting regex>=2021.8.3
  Downloading regex-2024.11.6-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (783 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m783.6/783.6 kB[0m [31m391.6 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: regex, nltk
Successfully installed nltk-3.9.1 regex-2024.11.6


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, WordNetLemmatizer

# Download NLTK data
nltk.download('stopwords')
nltk.download('wordnet')

# Initialize Spark session
spark = SparkSession.builder.appName("DonaldTrumpSpeeches").getOrCreate()

# Read the text file(s) into an RDD
hdfs_path = "hdfs://namenode:9000/donald_trump_speeches/*.txt"
speeches_rdd = spark.sparkContext.textFile(hdfs_path)

# Split the text into words
words_rdd = speeches_rdd.flatMap(lambda line: line.split())

# Convert words to lowercase
words_rdd = words_rdd.map(lambda word: word.lower())

# Remove stop words
stop_words = set(stopwords.words('english'))
words_rdd = words_rdd.filter(lambda word: word not in stop_words)

stemmer = PorterStemmer()
lemmatizer = WordNetLemmatizer()

# Apply stemming and lemmatization
words_rdd = words_rdd.map(lambda word: lemmatizer.lemmatize(stemmer.stem(word)))

# Map each word to a key-value pair (word, 1)
word_pairs_rdd = words_rdd.map(lambda word: (word, 1))

# Reduce by key to count occurrences of each word
word_counts_rdd = word_pairs_rdd.reduceByKey(lambda a, b: a + b)

unique_word_count = word_counts_rdd.count()

word_counts = word_counts_rdd.collect()
for word, count in word_counts:
    print(f"{word}: {count}")

print(f"Total unique words: {unique_word_count}")

spark.stop()

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /home/jovyan/nltk_data...


lot: 693
fine.: 25
"look,: 11
here.: 184
second: 142
less: 93
world.: 163
head: 66
watcher: 2
mayb: 183
110,: 1
90%,: 4
72%,: 1
instead: 54
seats,: 4
table.: 8
place: 138
o'clock: 16
clinton: 29
mcdaniel,: 8
aggress: 10
name.: 20
credit,: 9
away: 150
illeg: 162
declar: 24
party.: 51
immedi: 20
forgot: 15
stage.: 10
minute,: 16
strength.: 13
up?: 13
learn: 26
"that': 44
meant.: 1
states,: 63
liar!: 1
patholog: 1
haleigh!: 1
wow.: 14
republicans.: 11
interf: 2
worried.: 2
cheapen: 4
flags.": 1
a+,: 2
exonerated.: 2
offic: 60
street: 24
story.: 55
ladi: 47
24,: 3
versu: 2
ah,: 6
level,: 3
prosecutor.: 2
merci: 2
fact.": 1
surprised.: 5
energi: 84
beto,: 6
expansions.: 1
garbage.: 4
border,: 32
fixed.: 1
ridicul: 17
capit: 30
grow: 19
day?: 8
30,: 8
flag.: 38
germani: 12
minnesota: 23
plan: 83
pepper: 3
four.: 7
knee,: 1
ceilings,: 1
idea,: 8
joe?: 11
$2,000,000: 4
iran?: 2
futur: 52
lady.: 10
acid: 4
somebody.: 9
staters,: 1
ted: 22
joe': 8
statement?: 3
police.: 30
score: 4
uphill,: 1
po

## Visualize datasets with wordcloud 

## Exercise 2: Filter and Count

In [8]:
from pyspark import SparkContext, SparkConf

# Create a SparkContext
conf = SparkConf().setAppName("Filter and Count")
sc = SparkContext.getOrCreate(conf)

# hdfs://<hostname>:<port>/<path>

hdfs_path = "hdfs://namenode:9000/donald_trump_speeches/*.txt"


# Load the text file into an RDD
lines_rdd = sc.textFile(hdfs_path)

# Specify the word to filter out
word_to_filter = "specific_word"  # Update with the word you want to filter out

# Filter out lines containing the specific word
filtered_lines_rdd = lines_rdd.filter(lambda line: word_to_filter not in line)

# Count the number of remaining lines
remaining_lines_count = filtered_lines_rdd.count()

# Print the number of remaining lines
print(f"Number of lines remaining after filtering: {remaining_lines_count}")

# Stop the SparkContext
sc.stop()


Number of lines remaining after filtering: 35


### Exercise 3: Top N Words


In [9]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("Top N Words Example")
sc = SparkContext.getOrCreate(conf)

hdfs_path = "hdfs://namenode:9000/donald_trump_speeches/*.txt"
lines_rdd = sc.textFile(hdfs_path)

# Split the lines into words and create a new RDD with the words
words_rdd = lines_rdd.flatMap(lambda line: line.split())

# Map each word to a tuple (word, 1) and reduce by key to count occurrences
# Sort the word counts in descending order and take the top N words
word_counts_rdd = words_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
N = 10  # Specify the number of top words you want
top_n_words = word_counts_rdd.takeOrdered(N, key=lambda x: -x[1])

# Print the top N words and their counts
print(f"Top {N} most frequently occurring words:")
for word, count in top_n_words:
    print(f"{word}: {count}")

# Stop the SparkContext
sc.stop()


Top 10 most frequently occurring words:
the: 13161
to: 9173
and: 8145
a: 7648
I: 7421
of: 6759
you: 4694
in: 4097
we: 3861
have: 3748


### Exercise 4: Join Operations


In [41]:
from pyspark import SparkContext, SparkConf

# Create a SparkContext
conf = SparkConf().setAppName("Inner Join Example")
sc = SparkContext.getOrCreate(conf)

# Create the first RDD (e.g., student IDs and their names)
rdd1 = sc.parallelize([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie"),
    (4, "David")
])

# Create the second RDD (e.g., student IDs and their grades)
rdd2 = sc.parallelize([
    (1, "A"),
    (2, "B"),
    (3, "C"),
    (5, "D")  # Note that ID 5 does not exist in rdd1
])

# Perform an inner join on the two RDDs
joined_rdd = rdd1.join(rdd2)

# Collect and print the results
result = joined_rdd.collect()
print("Inner Join Result:")
for item in result:
    print(item)

# Stop the SparkContext
sc.stop()


Inner Join Result:
(1, ('Alice', 'A'))
(2, ('Bob', 'B'))
(3, ('Charlie', 'C'))
