In [15]:
# Global imports

from collections import defaultdict
import string
from operator import add
from functools import reduce

import pandas as pd

In [16]:
text1 = """
    Well, the way they make shows is, they make one show. 
    That show's called a pilot. 
    Then they show that show to the people who make shows, 
    and on the strength of that one show they decide 
    if they're going to make more shows. Some pilots 
    get picked and become television programs. 
    Some don't, become nothing. 
    She starred in one of the ones that became nothing.
"""

text2 = """
    You think water moves fast? You should see ice. 
    It moves like it has a mind. 
    Like it knows it killed the world once and got a taste for murder. 
    After the avalanche, it took us a week to climb out. 
    Now, I don't know exactly when we turned on each other, 
    but I know that seven of us survived the slide... 
    and only five made it out. Now we took an oath, that I'm breaking now. 
    We said we'd say it was the snow that killed the other two, but it wasn't. 
    Nature is lethal but it doesn't hold a candle to man.
"""

text3 = """
    Now that we know who you are, I know who I am. I'm not a mistake! 
    It all makes sense! In a comic, you know how you can tell who the arch-villain's going to be? 
    He's the exact opposite of the hero. And most times they're friends, like you and me! 
    I should've known way back when... You know why, David? Because of the kids. 
    They called me Mr Glass.
"""

text4 = """
    Your bones don't break, mine do. That's clear. 
    Your cells react to bacteria and viruses differently than mine. 
    You don't get sick, I do. That's also clear. 
    But for some reason, you and I react the exact same way to water. 
    We swallow it too fast, we choke. We get some in our lungs, we drown. 
    However unreal it may seem, we are connected, you and I. 
    We're on the same curve, just on opposite ends.
"""

documents = [text1, text2, text3, text4]

In [17]:
def remove_punctuation(text: str) -> str:
    """
    Simple function to remove punctuation from text

    :param text: text to remove punctuation from
    :return: text with punctuation removed
    """
    return ''.join([
        character
        for character in text
        if character not in string.punctuation
    ])

def word_count_map_function(text: str) -> list[tuple[str, int]]:
    """
    Simple map function that takes text and outputs tuples
    of words and counts
    
    :param text: text to convert into word/count tuples
    :return: A list of tuples with word and count
    """
    # TODO: Implement the word count map function

    # Step 1: Remove punctuation from text
    text = remove_punctuation(text)
    # Step 2: Convert text to lower case
    text = text.lower()
    # Step 3: Split the text by spaces to convert into words
    text = text.split()
    # Step 4: Return a list containing word/count tuple pairs
    
    # Removes duplicates
    words = list(set(text))    
    # Creates list containing word/count tuple pairs
    word_count_pairs = []
    for word in words:
        word_count_pairs.append((word, text.count(word)))

    return sorted(word_count_pairs)

In [18]:
documents_word_counts = list(map(word_count_map_function, documents))
documents_word_counts

[[('a', 1),
  ('and', 2),
  ('became', 1),
  ('become', 2),
  ('called', 1),
  ('decide', 1),
  ('dont', 1),
  ('get', 1),
  ('going', 1),
  ('if', 1),
  ('in', 1),
  ('is', 1),
  ('make', 4),
  ('more', 1),
  ('nothing', 2),
  ('of', 2),
  ('on', 1),
  ('one', 3),
  ('ones', 1),
  ('people', 1),
  ('picked', 1),
  ('pilot', 1),
  ('pilots', 1),
  ('programs', 1),
  ('she', 1),
  ('show', 4),
  ('shows', 4),
  ('some', 2),
  ('starred', 1),
  ('strength', 1),
  ('television', 1),
  ('that', 4),
  ('the', 4),
  ('then', 1),
  ('they', 4),
  ('theyre', 1),
  ('to', 2),
  ('way', 1),
  ('well', 1),
  ('who', 1)],
 [('a', 4),
  ('after', 1),
  ('an', 1),
  ('and', 2),
  ('avalanche', 1),
  ('breaking', 1),
  ('but', 3),
  ('candle', 1),
  ('climb', 1),
  ('doesnt', 1),
  ('dont', 1),
  ('each', 1),
  ('exactly', 1),
  ('fast', 1),
  ('five', 1),
  ('for', 1),
  ('got', 1),
  ('has', 1),
  ('hold', 1),
  ('i', 2),
  ('ice', 1),
  ('im', 1),
  ('is', 1),
  ('it', 9),
  ('killed', 2),
  ('kno

In [19]:
def merge_lists_reduce_function(x, y):
    return x + y

def word_count_reduce_function(key_value_pairs):
    result = dict()
    
    # TODO: Implement code to return dictionary containing word/count dictionary
    for word, count in key_value_pairs:
        
        if word in result:
            result[word] += count
        else:
            result[word] = count
    
    return sorted(result.items())

In [20]:
# Step 1: Apply the `word_count_map_function` to each the documents
map_output_values = map(word_count_map_function, documents)

# Step 2: Merge the outputs of the map function into a single list
merged_output_values = reduce(merge_lists_reduce_function, map_output_values)

# Step 3: Apply the `word_count_reduce_function` to create a single word/count dictionary
counted_words_dict = word_count_reduce_function(merged_output_values)

counted_words_dict

[('a', 7),
 ('after', 1),
 ('all', 1),
 ('also', 1),
 ('am', 1),
 ('an', 1),
 ('and', 9),
 ('archvillains', 1),
 ('are', 2),
 ('avalanche', 1),
 ('back', 1),
 ('bacteria', 1),
 ('be', 1),
 ('became', 1),
 ('because', 1),
 ('become', 2),
 ('bones', 1),
 ('break', 1),
 ('breaking', 1),
 ('but', 4),
 ('called', 2),
 ('can', 1),
 ('candle', 1),
 ('cells', 1),
 ('choke', 1),
 ('clear', 2),
 ('climb', 1),
 ('comic', 1),
 ('connected', 1),
 ('curve', 1),
 ('david', 1),
 ('decide', 1),
 ('differently', 1),
 ('do', 2),
 ('doesnt', 1),
 ('dont', 4),
 ('drown', 1),
 ('each', 1),
 ('ends', 1),
 ('exact', 2),
 ('exactly', 1),
 ('fast', 2),
 ('five', 1),
 ('for', 2),
 ('friends', 1),
 ('get', 3),
 ('glass', 1),
 ('going', 2),
 ('got', 1),
 ('has', 1),
 ('hero', 1),
 ('hes', 1),
 ('hold', 1),
 ('how', 1),
 ('however', 1),
 ('i', 8),
 ('ice', 1),
 ('if', 1),
 ('im', 2),
 ('in', 3),
 ('is', 2),
 ('it', 12),
 ('just', 1),
 ('kids', 1),
 ('killed', 2),
 ('know', 6),
 ('known', 1),
 ('knows', 1),
 ('letha

In [None]:
conda install pyspark

In [7]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'C:\Users\12162\anaconda3\python.exe -m pip install --upgrade pip' command.


In [11]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


You should consider upgrading via the 'C:\Users\12162\anaconda3\python.exe -m pip install --upgrade pip' command.


In [21]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("DSC 400 Assignment 3") \
    .getOrCreate()

spark_context = spark.sparkContext

RuntimeError: Java gateway process exited before sending its port number

In [22]:
documents_rdd = spark_context.parallelize(documents)

print("Number of Partitions: "+str(documents_rdd.getNumPartitions()))

NameError: name 'spark_context' is not defined

In [23]:
documents_rdd_collect = documents_rdd.collect()
print(documents_rdd_collect)

NameError: name 'documents_rdd' is not defined

In [24]:
map_output_rdd = documents_rdd.flatMap(word_count_map_function)
map_output_rdd.collect()

NameError: name 'documents_rdd' is not defined

In [25]:
word_count_rdd = map_output_rdd.reduceByKey(add)
word_count_rdd.collect()

NameError: name 'map_output_rdd' is not defined

In [26]:
# Test if both method's returns are equal
sorted(word_count_rdd.collect()) == counted_words_dict

NameError: name 'word_count_rdd' is not defined

In [27]:
# TODO: Calculate `counts_gt_four_rdd` from `word_count_rdd`
counts_gt_four_rdd = word_count_rdd.filter(lambda x: x[1] > 4)
counts_gt_four_rdd.collect()

NameError: name 'word_count_rdd' is not defined

In [28]:
# TODO: Calculate `sorted_word_count_rdd` from `word_count_rdd`
sorted_word_count_rdd = word_count_rdd.sortBy(lambda x: x[1], ascending=False)
sorted_word_count_rdd.collect()

NameError: name 'word_count_rdd' is not defined

In [29]:
word_count_columns = ["word", "count"]
word_count_df = spark.createDataFrame(
    data=word_count_rdd, 
    schema=word_count_columns
)
word_count_df.printSchema()

NameError: name 'spark' is not defined

In [30]:
word_count_df.show()

NameError: name 'word_count_df' is not defined

In [31]:
# TODO: Create `word_count_filtered` from `word_count_df`

word_count_filtered = word_count_df.filter('count > 4')
word_count_filtered.show()

NameError: name 'word_count_df' is not defined

In [32]:
# TODO: Create `word_count_sorted` from `word_count_df`

word_count_sorted = word_count_df.sort('count', ascending=False)
word_count_sorted.show()

NameError: name 'word_count_df' is not defined

In [33]:
# TODO: Create `word_count_top_10` from `word_count_sorted`

word_count_top_10 = word_count_sorted.limit(10)
word_count_top_10.show()

NameError: name 'word_count_sorted' is not defined