In [1]:
import pandas as pd
import numpy as np

In [2]:
!pip install -q pyspark==3.5.1

In [3]:
# Setting SparkSession with 4 cores
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[4]').appName('SecondSession').config('spark.executor.memory', '2g').getOrCreate()
spark

Creating RDD

In [4]:
df_pandas = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/Raw Data/harry_potter_reviews.csv")

In [5]:
df_pandas.head(5)

Unnamed: 0,user_id,user_sex,user_age,user_country,rating,comment,favourite_character,date
0,0,female,50,Germany,2.5,"""The transitions between scenes were awkward, ...",Severus Snape,2004-12-27
1,1,female,23,Spain,4.0,"""Severus Snape's role adds an intriguing layer.""",Severus Snape,2003-11-22
2,2,male,32,France,3.0,"""The pacing was a bit slow, but the characters...",Ron Weasley,2005-09-16
3,3,female,24,Turkey,4.5,"""Hagrid's love for magical creatures is heartw...",Rubeus Hagrid,2002-09-17
4,4,female,40,Spain,5.0,"""Neville Longbottom's courage is awe-inspiring.""",Neville Longbottom,2004-10-17


In [6]:
# Creating RDD with 7 partitions
rdd = spark.sparkContext.parallelize(list(df_pandas.comment),7)

In [7]:
type(rdd)

In [8]:
rdd.getNumPartitions()

7

In [9]:
# Shuffles to 6 partitions with Coalesce
rdd2 = rdd.coalesce(6)
rdd2.getNumPartitions()

6

RDDs Transformations - lazy operations (return another RDD)

In [11]:
# Split text, then flattened it
rdd3 = rdd2.flatMap(lambda x: x.split(' '))

In [12]:
rdd3.collect()

['"The',
 'transitions',
 'between',
 'scenes',
 'were',
 'awkward,',
 'and',
 'the',
 'soundtrack',
 'was',
 'forgettable."',
 '"Severus',
 "Snape's",
 'role',
 'adds',
 'an',
 'intriguing',
 'layer."',
 '"The',
 'pacing',
 'was',
 'a',
 'bit',
 'slow,',
 'but',
 'the',
 'characters',
 'were',
 'charming."',
 '"Hagrid\'s',
 'love',
 'for',
 'magical',
 'creatures',
 'is',
 'heartwarming."',
 '"Neville',
 "Longbottom's",
 'courage',
 'is',
 'awe-inspiring."',
 '"Rubeus',
 "Hagrid's",
 'love',
 'for',
 'magical',
 'creatures',
 'is',
 'endearing."',
 '"Severus',
 "Snape's",
 'complexity',
 'adds',
 'depth',
 'to',
 'the',
 'story."',
 '"Albus',
 "Dumbledore's",
 'presence',
 'feels',
 'unnecessary',
 'and',
 'distracting."',
 '"Ron',
 "Weasley's",
 'humor',
 'adds',
 'a',
 'delightful',
 'touch."',
 '"Hermione',
 "Granger's",
 'determination',
 'is',
 'inspiring."',
 '"The',
 'seamless',
 'transitions',
 'between',
 'scenes',
 'added',
 'to',
 'the',
 'overall',
 'magic."',
 '"A',
 'mag

In [13]:
type(rdd3)

In [14]:
# Creates a column with value '1' for each row
# The result is PairRDD which contains key(string)-value(1 for this case) pair
rdd4 = rdd3.map(lambda x: (x,1))

In [15]:
# Merges the values for each key with the function specified
# In this case, applies the sum function on value
rdd5 = rdd4.reduceByKey(lambda a,b: a+b)
rdd5.collect()

[('transitions', 12),
 ('"Severus', 34),
 ('characters', 12),
 ('"Hagrid\'s', 18),
 ('love', 13),
 ('"Albus', 25),
 ('feels', 26),
 ('unnecessary', 1),
 ('inspiring."', 13),
 ('overall', 6),
 ('touch', 9),
 ('of', 89),
 ('development', 10),
 ('lacking,', 2),
 ('ages."', 4),
 ('asset."', 1),
 ('stunning', 7),
 ('"Draco', 43),
 ('captivating."', 4),
 ('spirit', 3),
 ('Longbottom', 2),
 ('exceptional."', 1),
 ('references', 9),
 ('are', 30),
 ('nice', 1),
 ("film's", 35),
 ('both', 8),
 ('enhanced', 4),
 ('game-changer."', 1),
 ('commendable."', 3),
 ('uninspiring."', 2),
 ('needed', 2),
 ('depth,', 2),
 ('well-done."', 1),
 ('like', 6),
 ('intense', 2),
 ('thrilling."', 1),
 ('authenticity."', 2),
 ('captures', 10),
 ('essence', 10),
 ('detail."', 3),
 ('standout."', 1),
 ('compassion', 1),
 ('imagination."', 2),
 ('wonderful', 1),
 ('capture', 7),
 ('tone', 12),
 ('poorly', 6),
 ('out', 8),
 ('into', 4),
 ('generic."', 2),
 ('showcased', 1),
 ('fails', 7),
 ('falls', 1),
 ('force."', 1)

In [16]:
# Using map transformation, key-value pairs are inverted
# Then applies sortByKey --> False parameter for descending order
rdd6 = rdd5.map(lambda x: (x[1],x[0])).sortByKey(False)
rdd6.collect()

[(227, '"The'),
 (226, 'the'),
 (152, 'and'),
 (127, 'is'),
 (89, 'of'),
 (85, 'to'),
 (81, 'a'),
 (79, 'magical'),
 (74, 'character'),
 (49, 'adds'),
 (43, '"Draco'),
 (43, "Malfoy's"),
 (42, '"Neville'),
 (40, "Longbottom's"),
 (38, 'world'),
 (36, "Snape's"),
 (35, "film's"),
 (35, 'was'),
 (35, 'were'),
 (34, '"Severus'),
 (30, 'are'),
 (30, 'for'),
 (28, 'soundtrack'),
 (26, 'feels'),
 (25, '"Albus'),
 (24, 'effects'),
 (24, "Dumbledore's"),
 (23, 'visual'),
 (22, 'depth'),
 (22, '"A'),
 (22, 'with'),
 (21, 'magic'),
 (21, "Weasley's"),
 (21, 'creatures'),
 (20, '"Ron'),
 (20, 'added'),
 (19, 'lacks'),
 (19, 'in'),
 (19, 'wizarding'),
 (19, 'world."'),
 (19, 'bravery'),
 (18, '"Hagrid\'s'),
 (18, '"Hermione'),
 (18, 'pacing'),
 (18, "Granger's"),
 (18, 'growth'),
 (16, 'felt'),
 (16, 'film."'),
 (15, 'humor'),
 (15, 'wisdom'),
 (15, 'film'),
 (14, 'scenes'),
 (14, 'an'),
 (14, 'journey'),
 (14, 'intelligence'),
 (13, 'love'),
 (13, 'inspiring."'),
 (13, 'complexity'),
 (13, 'betwe

In [17]:
# Delete characters other than comment text from RDD Values
def clean_value(value):
  mapping_table = str.maketrans({'"': '', '.': '', ',': '', ':': '', ';': '', '!': '', '?': '', "/": ''})
  cleaned_value = value.translate(mapping_table).strip()
  return cleaned_value

In [18]:
rdd7 = rdd6.map(lambda x: (x[0], clean_value(x[1])))
rdd7.collect()

[(227, 'The'),
 (226, 'the'),
 (152, 'and'),
 (127, 'is'),
 (89, 'of'),
 (85, 'to'),
 (81, 'a'),
 (79, 'magical'),
 (74, 'character'),
 (49, 'adds'),
 (43, 'Draco'),
 (43, "Malfoy's"),
 (42, 'Neville'),
 (40, "Longbottom's"),
 (38, 'world'),
 (36, "Snape's"),
 (35, "film's"),
 (35, 'was'),
 (35, 'were'),
 (34, 'Severus'),
 (30, 'are'),
 (30, 'for'),
 (28, 'soundtrack'),
 (26, 'feels'),
 (25, 'Albus'),
 (24, 'effects'),
 (24, "Dumbledore's"),
 (23, 'visual'),
 (22, 'depth'),
 (22, 'A'),
 (22, 'with'),
 (21, 'magic'),
 (21, "Weasley's"),
 (21, 'creatures'),
 (20, 'Ron'),
 (20, 'added'),
 (19, 'lacks'),
 (19, 'in'),
 (19, 'wizarding'),
 (19, 'world'),
 (19, 'bravery'),
 (18, "Hagrid's"),
 (18, 'Hermione'),
 (18, 'pacing'),
 (18, "Granger's"),
 (18, 'growth'),
 (16, 'felt'),
 (16, 'film'),
 (15, 'humor'),
 (15, 'wisdom'),
 (15, 'film'),
 (14, 'scenes'),
 (14, 'an'),
 (14, 'journey'),
 (14, 'intelligence'),
 (13, 'love'),
 (13, 'inspiring'),
 (13, 'complexity'),
 (13, 'between'),
 (13, 'fri

In [19]:
# Filter by words with more than 3 characters to exclude most of prepositions
rdd8 = rdd7.filter(lambda x: len(x[1]) > 3)
rdd8.collect()

[(79, 'magical'),
 (74, 'character'),
 (49, 'adds'),
 (43, 'Draco'),
 (43, "Malfoy's"),
 (42, 'Neville'),
 (40, "Longbottom's"),
 (38, 'world'),
 (36, "Snape's"),
 (35, "film's"),
 (35, 'were'),
 (34, 'Severus'),
 (28, 'soundtrack'),
 (26, 'feels'),
 (25, 'Albus'),
 (24, 'effects'),
 (24, "Dumbledore's"),
 (23, 'visual'),
 (22, 'depth'),
 (22, 'with'),
 (21, 'magic'),
 (21, "Weasley's"),
 (21, 'creatures'),
 (20, 'added'),
 (19, 'lacks'),
 (19, 'wizarding'),
 (19, 'world'),
 (19, 'bravery'),
 (18, "Hagrid's"),
 (18, 'Hermione'),
 (18, 'pacing'),
 (18, "Granger's"),
 (18, 'growth'),
 (16, 'felt'),
 (16, 'film'),
 (15, 'humor'),
 (15, 'wisdom'),
 (15, 'film'),
 (14, 'scenes'),
 (14, 'journey'),
 (14, 'intelligence'),
 (13, 'love'),
 (13, 'inspiring'),
 (13, 'complexity'),
 (13, 'between'),
 (13, 'friendship'),
 (12, 'transitions'),
 (12, 'characters'),
 (12, 'tone'),
 (12, 'story'),
 (11, 'cunning'),
 (11, 'design'),
 (11, 'production'),
 (10, 'development'),
 (10, 'captures'),
 (10, 'es

RDDs Actions - return values from RDDs

In [21]:
# Count RDD elements
rdd8.count()

688

In [22]:
# Return record with max occurences
datMax = rdd8.max()
datMax

(79, 'magical')

In [23]:
# Back to key(words)-values(ocurrences) makes it more legible
rdd9 = rdd8.map(lambda x: (x[1],x[0]))
rdd9.collect()

[('magical', 79),
 ('character', 74),
 ('adds', 49),
 ('Draco', 43),
 ("Malfoy's", 43),
 ('Neville', 42),
 ("Longbottom's", 40),
 ('world', 38),
 ("Snape's", 36),
 ("film's", 35),
 ('were', 35),
 ('Severus', 34),
 ('soundtrack', 28),
 ('feels', 26),
 ('Albus', 25),
 ('effects', 24),
 ("Dumbledore's", 24),
 ('visual', 23),
 ('depth', 22),
 ('with', 22),
 ('magic', 21),
 ("Weasley's", 21),
 ('creatures', 21),
 ('added', 20),
 ('lacks', 19),
 ('wizarding', 19),
 ('world', 19),
 ('bravery', 19),
 ("Hagrid's", 18),
 ('Hermione', 18),
 ('pacing', 18),
 ("Granger's", 18),
 ('growth', 18),
 ('felt', 16),
 ('film', 16),
 ('humor', 15),
 ('wisdom', 15),
 ('film', 15),
 ('scenes', 14),
 ('journey', 14),
 ('intelligence', 14),
 ('love', 13),
 ('inspiring', 13),
 ('complexity', 13),
 ('between', 13),
 ('friendship', 13),
 ('transitions', 12),
 ('characters', 12),
 ('tone', 12),
 ('story', 12),
 ('cunning', 11),
 ('design', 11),
 ('production', 11),
 ('development', 10),
 ('captures', 10),
 ('essenc

In [29]:
# Take sample from dataset as training set
rdd9.takeSample(True, 140)

[('Harry', 3),
 ('showcase', 1),
 ('storytelling', 3),
 ('captivating', 5),
 ('lackluster', 2),
 ('filled', 4),
 ('captivating', 4),
 ('leadership', 1),
 ('montage', 10),
 ('delivered', 1),
 ('incredible', 1),
 ('Rubeus', 6),
 ('charm', 2),
 ('seemed', 3),
 ('Pacing', 1),
 ('deeply', 1),
 ('Captivating', 3),
 ('interest', 1),
 ('motives', 1),
 ('more', 1),
 ('created', 5),
 ('painfully', 1),
 ('just', 2),
 ('there', 1),
 ('finesse', 1),
 ('balances', 1),
 ('school', 2),
 ('overrated', 1),
 ('pacing', 18),
 ('lackluster', 2),
 ('lighthearted', 1),
 ('appreciate', 1),
 ('genre', 1),
 ('capturing', 1),
 ("Dumbledore's", 1),
 ('acting', 1),
 ('game-changer', 1),
 ('top-notch', 1),
 ('sync', 1),
 ('enjoyed', 1),
 ('preachy', 1),
 ('haven', 1),
 ('slow', 1),
 ("film's", 35),
 ('flow', 1),
 ('disjointed', 1),
 ('witness', 2),
 ('layer', 3),
 ('forced', 10),
 ('masterpiece', 1),
 ('powerful', 1),
 ('sense', 2),
 ('intelligence', 14),
 ("characters'", 9),
 ('heart', 4),
 ('setting', 6),
 ('leav

Optimization

In [30]:
cachedRdd = rdd9.cache()

In [31]:
cachedRdd.unpersist()

PythonRDD[19] at collect at <ipython-input-23-e29ba5ff1715>:3

Write output

In [33]:
rdd9.saveAsTextFile('/content/drive/MyDrive/Colab Notebooks/Raw Data/WordCountRDD_HP.txt')

In [34]:
rdd_textFile = spark.sparkContext.textFile('/content/drive/MyDrive/Colab Notebooks/Raw Data/WordCountRDD_HP.txt', 7)

Stop session

In [35]:
spark.stop()