In [1]:
pip install pandas

[0mNote: you may need to restart the kernel to use updated packages.


In [2]:
from __future__ import print_function
import sys
from pyspark.sql import functions as F 
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer
import pandas as pd

def remove_header(df):
    w = Window().orderBy(F.lit('value'))
    df = df.withColumn("rowNum", F.row_number().over(w))
    df = df.filter(df.rowNum > 244)
    return df

def remove_filler(df):
    return df.filter(~ df.value.startswith("<<THIS") & \
                    ~ df.value.startswith("SHAKESPEARE IS") & \
                    ~ df.value.startswith("PROVIDED BY") & \
                    ~ df.value.startswith("WITH PERMISSION") & \
                    ~ df.value.startswith("DISTRIBUTED") & \
                    ~ df.value.startswith("PERSONAL USE") & \
                    ~ df.value.startswith("COMMERCIALLY.") & \
                    ~ df.value.startswith("SERVICE THAT"))

def get_play_rows(df):
    return df.filter(df.value.rlike('[0-9]{4}')).drop('value')

def partition_by_play(df):
    line_ids = get_play_rows(data)
    splits = [x['rowNum'] for x in line_ids.collect()]
    splits.append(float('Inf'))
    bucketizer = Bucketizer(splits=splits,inputCol="rowNum", outputCol="playNum")
    df = bucketizer.setHandleInvalid("keep").transform(df)
    return df.repartition(df.playNum)
    
def count_words(df):
    return df.withColumn('words', F.size(F.split(F.col('value'), ' ')))

def format_play(play, id, words) :
    txt = "Play {}, words: {}, lines: {}"
    return txt.format(play, words[id]['sum(words)'], words[id]['count(value)'])
    
spark = SparkSession.builder.appName("PyPi").getOrCreate()
data = spark.read.text("data/Shakespeare.txt")

data = remove_header(data)
data = remove_filler(data)
data = partition_by_play(data)
data = count_words(data)
result = data.groupBy(F.col('playNum')).agg(F.sum('words'), F.count('value')).sort('playNum').collect()

play_names = [x.strip() for x in open('data/plays.txt').readlines()]

play_results = [format_play(x, id, result) for id, x in enumerate(play_names)]

[print(x) for x in play_results]

spark.stop()


23/01/26 12:48:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/26 12:48:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/26 12:48:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Play THE SONNETS, words: 26469, lines: 2634
Play ALLS WELL THAT ENDS WELL, words: 37196, lines: 3199
Play THE TRAGEDY OF ANTONY AND CLEOPATRA, words: 45757, lines: 4167
Play AS YOU LIKE IT, words: 35589, lines: 2939
Play THE COMEDY OF ERRORS, words: 18817, lines: 2080
Play THE TRAGEDY OF CORIOLANUS, words: 46589, lines: 4253
Play CYMBELINE, words: 46143, lines: 4140
Play THE TRAGEDY OF HAMLET, PRINCE OF DENMARK, words: 52365, lines: 4489
Play THE FIRST PART OF KING HENRY THE FOURTH, words: 39843, lines: 3323
Play SECOND PART OF KING HENRY IV, words: 41774, lines: 3555
Play THE LIFE OF KING HENRY THE FIFTH, words: 42167, lines: 3603
Play THE FIRST PART OF HENRY THE SIXTH, words: 38962, lines: 3377
Play THE SECOND PART OF KING HENRY THE SIXTH, words: 41811, lines: 3614
Play THE THIRD PART OF KING HENRY THE SIXTH, words: 40832, lines: 3484
Play KING HENRY THE EIGHTH, words: 40894, lines: 3733
Play KING JOHN, words: 33625, lines: 2997
Play THE TRAGEDY OF JULIUS CAESAR, words: 33572, lines: