In [164]:
import os
import json
import boto3
import sklearn
import socket
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return "{}proxy/{}/jobs/".format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

# small fix to enable UI views
SparkContext.uiWebUrl = property(uiWebUrl)

# spark configurtion in local regime 
conf = SparkConf().set('spark.master', 'local[*]').set('spark.driver.memory', '8g')

#some needed objects
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

user: /user/st085838/


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-1-930e736913b2>:26 

# Part 1 Dataframe

In [174]:
filepath = "file:///home/jovyan/shared/lectures_folder/84-0.txt"

In [175]:
# read text as a dataframe
from pyspark.sql.functions import monotonically_increasing_id

dataframe= sc.textFile(f"{filepath}")\
     .map(lambda x: (x,))\
     .toDF()\
     .select(F.col("_1").alias("text"))\
     .withColumn("doc_id", monotonically_increasing_id())

In [176]:
dataframe.show()

+--------------------+------+
|                text|doc_id|
+--------------------+------+
|The Project Guten...|     0|
|                    |     1|
|This eBook is for...|     2|
|most other parts ...|     3|
|whatsoever. You m...|     4|
|of the Project Gu...|     5|
|www.gutenberg.org...|     6|
|will have to chec...|     7|
|   using this eBook.|     8|
|                    |     9|
| Title: Frankenstein|    10|
|       or, The Mo...|    11|
|                    |    12|
|Author: Mary Woll...|    13|
|                    |    14|
|Release Date: 31,...|    15|
|[Most recently up...|    16|
|                    |    17|
|   Language: English|    18|
|                    |    19|
+--------------------+------+
only showing top 20 rows



In [177]:
dataframe.count()

7743

In [178]:
#function for filtering out non-letters, creating list of words
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, IntegerType
import string
import re

def process_string(data):
    non_letters_removed = re.sub(r'[^a-zA-Z]', ' ', data)
    words = non_letters_removed.lower().split(" ") 
    return list(filter(lambda x: len(x) > 0, words))
    
process_string_udf = udf(lambda z: process_string(z), ArrayType(StringType()))

In [179]:
#filter out empty strings,apply function written above
by_words = dataframe\
    .select(process_string_udf(F.col("text")).alias("by_words"))\
    .where(F.size(F.col("by_words")) > 0)\
    .withColumn("doc_id", monotonically_increasing_id())

In [180]:
by_words.show()

+--------------------+------+
|            by_words|doc_id|
+--------------------+------+
|[the, project, gu...|     0|
|[this, ebook, is,...|     1|
|[most, other, par...|     2|
|[whatsoever, you,...|     3|
|[of, the, project...|     4|
|[www, gutenberg, ...|     5|
|[will, have, to, ...|     6|
|[using, this, ebook]|     7|
|[title, frankenst...|     8|
|[or, the, modern,...|     9|
|[author, mary, wo...|    10|
|[release, date, e...|    11|
|[most, recently, ...|    12|
| [language, english]|    13|
|[character, set, ...|    14|
|[produced, by, ju...|    15|
|[further, correct...|    16|
|[start, of, the, ...|    17|
|      [frankenstein]|    18|
|[or, the, modern,...|    19|
+--------------------+------+
only showing top 20 rows



In [181]:
#create a column with separate words in each row
by_words_count = by_words\
     .withColumn('word',(F.explode(F.col("by_words"))))

In [182]:
by_words_count.show()

+--------------------+------+--------------+
|            by_words|doc_id|          word|
+--------------------+------+--------------+
|[the, project, gu...|     0|           the|
|[the, project, gu...|     0|       project|
|[the, project, gu...|     0|     gutenberg|
|[the, project, gu...|     0|         ebook|
|[the, project, gu...|     0|            of|
|[the, project, gu...|     0|  frankenstein|
|[the, project, gu...|     0|            by|
|[the, project, gu...|     0|          mary|
|[the, project, gu...|     0|wollstonecraft|
|[the, project, gu...|     0|        godwin|
|[the, project, gu...|     0|       shelley|
|[this, ebook, is,...|     1|          this|
|[this, ebook, is,...|     1|         ebook|
|[this, ebook, is,...|     1|            is|
|[this, ebook, is,...|     1|           for|
|[this, ebook, is,...|     1|           the|
|[this, ebook, is,...|     1|           use|
|[this, ebook, is,...|     1|            of|
|[this, ebook, is,...|     1|        anyone|
|[this, eb

In [183]:
from pyspark.sql.functions import count

In [184]:
#calculate each term frequency in each document 
by_words_tf= by_words_count\
        .groupBy('doc_id', 'word').count()\
        .orderBy(F.col("doc_id"))\
        .withColumnRenamed("count", 'tf')

In [185]:
by_words_tf.show()

+------+--------------+---+
|doc_id|          word| tf|
+------+--------------+---+
|     0|          mary|  1|
|     0|        godwin|  1|
|     0|wollstonecraft|  1|
|     0|           the|  1|
|     0|     gutenberg|  1|
|     0|         ebook|  1|
|     0|  frankenstein|  1|
|     0|            of|  1|
|     0|       project|  1|
|     0|       shelley|  1|
|     0|            by|  1|
|     1|         ebook|  1|
|     1|        anyone|  1|
|     1|           and|  1|
|     1|            in|  1|
|     1|            is|  1|
|     1|           use|  1|
|     1|           the|  2|
|     1|        united|  1|
|     1|      anywhere|  1|
+------+--------------+---+
only showing top 20 rows



In [186]:
#calulate total number of docs
N_docs = by_words.count()
N_docs

6737

In [187]:
from pyspark.sql.functions import countDistinct

In [188]:
#calculate document frequency
by_words_df = by_words_count\
       .groupBy("word")\
       .agg(countDistinct("doc_id").alias("df"))\
       .orderBy(F.col('df').desc())

In [189]:
by_words_df.show()

+-----+----+
| word|  df|
+-----+----+
|  the|3284|
|  and|2706|
|   of|2436|
|    i|2361|
|   to|1901|
|   my|1536|
|    a|1315|
|   in|1128|
| that| 975|
|  was| 949|
|   me| 799|
| with| 695|
|  but| 683|
|  had| 649|
|which| 554|
|   he| 552|
|  you| 552|
|   it| 534|
|  not| 520|
|  for| 507|
+-----+----+
only showing top 20 rows



In [190]:
import numpy as np

In [191]:
#function for calulating IDF
def calcIDF(df):
    IDF = np.log(N_docs/df)
    return IDF
calcIDF_udf = udf(lambda z: calcIDF(z).tolist())

In [192]:
#apply function to the dataframe
by_words_idf = by_words_df\
       .withColumn('idf', calcIDF_udf(F.col("df")))

In [193]:
by_words_idf.show()

+-----+----+------------------+
| word|  df|               idf|
+-----+----+------------------+
|  the|3284| 0.718552530388341|
|  and|2706|0.9121431922299266|
|   of|2436|1.0172573721308722|
|    i|2361| 1.048529463875147|
|   to|1901|1.2652346584722307|
|   my|1536|1.4784330872530422|
|    a|1315| 1.633778056348795|
|   in|1128|1.7871685689026557|
| that| 975|1.9329325299628126|
|  was| 949| 1.959961202350732|
|   me| 799| 2.132009055194385|
| with| 695|2.2714581553958677|
|  but| 683|  2.28887514138987|
|  had| 649|2.3399372842565698|
|which| 554| 2.498205314213376|
|  you| 552|2.5018219546835647|
|   he| 552|2.5018219546835647|
|   it| 534| 2.534974162000465|
|  not| 520|2.5615411893851867|
|  for| 507|2.5868589973694767|
+-----+----+------------------+
only showing top 20 rows



In [197]:
#calculate tf-idf for each word
by_words_tfidf = by_words_tf.select('doc_id', 'word', 'tf')\
       .join (by_words_idf, 'word')\
       .orderBy(F.col('doc_id').asc())\
       .withColumn("tf_idf", F.col("tf") * F.col("idf"))

In [198]:
by_words_tfidf.show()

+--------------+------+---+----+------------------+------------------+
|          word|doc_id| tf|  df|               idf|            tf_idf|
+--------------+------+---+----+------------------+------------------+
|         ebook|     0|  1|  13| 6.250420643499123| 6.250420643499123|
|       shelley|     0|  1|   3|  7.71675771229255|  7.71675771229255|
|          mary|     0|  1|   3|  7.71675771229255|  7.71675771229255|
|           the|     0|  1|3284| 0.718552530388341| 0.718552530388341|
|            by|     0|  1| 480| 2.641583897058723| 2.641583897058723|
|            of|     0|  1|2436|1.0172573721308722|1.0172573721308722|
|       project|     0|  1|  88| 4.338033186482454| 4.338033186482454|
|     gutenberg|     0|  1|  96| 4.251021809492824| 4.251021809492824|
|wollstonecraft|     0|  1|   3|  7.71675771229255|  7.71675771229255|
|        godwin|     0|  1|   3|  7.71675771229255|  7.71675771229255|
|  frankenstein|     0|  1|  31| 5.381382796475513| 5.381382796475513|
|     

In [199]:
#create tf-idf for each document(row), create final dataframe
by_words_final = by_words_tfidf\
       .groupBy("doc_id").sum("tf_idf")\
       .withColumnRenamed("sum(tf_idf)", 'tf_idf vector')

In [200]:
by_words_final.show()

+------+------------------+
|doc_id|     tf_idf vector|
+------+------------------+
|     0| 55.46528308469805|
|     1| 51.91862506843507|
|     2| 57.02242496716091|
|     3| 67.08770598551247|
|     4|48.723213844898986|
|     5| 52.97018550081891|
|     6|49.338885018292586|
|     7|15.841680726319993|
|     8|13.503605616876229|
|     9|18.489440477629174|
|    10|  37.8906413809028|
|    11| 20.99078888752428|
|    12| 29.33189086045275|
|    13|12.437783151031109|
|    14| 30.13751602443938|
|    15|117.46291648866071|
|    16| 43.13491496340592|
|    17|29.673426050761677|
|    18| 5.381382796475513|
|    19|18.489440477629174|
+------+------------------+
only showing top 20 rows



In [201]:
#check that result is the dataframe
type(by_words_final)

pyspark.sql.dataframe.DataFrame

## Part 2 RDD

In [202]:
#read text as RDD
filepath = "file:///home/jovyan/shared/lectures_folder/84-0.txt"
from pyspark.sql.functions import monotonically_increasing_id

RDD = sc.textFile(f"{filepath}")

RDD.take(5)

['The Project Gutenberg eBook of Frankenstein, by Mary Wollstonecraft (Godwin) Shelley',
 '',
 'This eBook is for the use of anyone anywhere in the United States and',
 'most other parts of the world at no cost and with almost no restrictions',
 'whatsoever. You may copy it, give it away or re-use it under the terms']

In [203]:
#lower letters, create a list of word in each row, delete empty rows
RDD_by_words = RDD\
       .map(lambda text: process_string(text))\
       .filter(lambda x: len(x)>0)

In [204]:
RDD_by_words.take(10)

[['the',
  'project',
  'gutenberg',
  'ebook',
  'of',
  'frankenstein',
  'by',
  'mary',
  'wollstonecraft',
  'godwin',
  'shelley'],
 ['this',
  'ebook',
  'is',
  'for',
  'the',
  'use',
  'of',
  'anyone',
  'anywhere',
  'in',
  'the',
  'united',
  'states',
  'and'],
 ['most',
  'other',
  'parts',
  'of',
  'the',
  'world',
  'at',
  'no',
  'cost',
  'and',
  'with',
  'almost',
  'no',
  'restrictions'],
 ['whatsoever',
  'you',
  'may',
  'copy',
  'it',
  'give',
  'it',
  'away',
  'or',
  're',
  'use',
  'it',
  'under',
  'the',
  'terms'],
 ['of',
  'the',
  'project',
  'gutenberg',
  'license',
  'included',
  'with',
  'this',
  'ebook',
  'or',
  'online',
  'at'],
 ['www',
  'gutenberg',
  'org',
  'if',
  'you',
  'are',
  'not',
  'located',
  'in',
  'the',
  'united',
  'states',
  'you'],
 ['will',
  'have',
  'to',
  'check',
  'the',
  'laws',
  'of',
  'the',
  'country',
  'where',
  'you',
  'are',
  'located',
  'before'],
 ['using', 'this', 'ebook

In [205]:
#check if the number of rows equals to the result of part 1 calculation
if RDD_by_words.count() == N_docs:
    print('Numbers of rows are equal for both methods')
else:
    print('Numbers of rows differ by', RDD_by_words.count() - N_docs)

Numbers of rows are equal for both methods


In [206]:
#introduce indices for each document
RDD_by_words = RDD_by_words.zipWithIndex()

In [207]:
RDD_by_words.take(5)

[(['the',
   'project',
   'gutenberg',
   'ebook',
   'of',
   'frankenstein',
   'by',
   'mary',
   'wollstonecraft',
   'godwin',
   'shelley'],
  0),
 (['this',
   'ebook',
   'is',
   'for',
   'the',
   'use',
   'of',
   'anyone',
   'anywhere',
   'in',
   'the',
   'united',
   'states',
   'and'],
  1),
 (['most',
   'other',
   'parts',
   'of',
   'the',
   'world',
   'at',
   'no',
   'cost',
   'and',
   'with',
   'almost',
   'no',
   'restrictions'],
  2),
 (['whatsoever',
   'you',
   'may',
   'copy',
   'it',
   'give',
   'it',
   'away',
   'or',
   're',
   'use',
   'it',
   'under',
   'the',
   'terms'],
  3),
 (['of',
   'the',
   'project',
   'gutenberg',
   'license',
   'included',
   'with',
   'this',
   'ebook',
   'or',
   'online',
   'at'],
  4)]

In [208]:
#apply map function to list all instances of each word in the doc
RDD_tf = RDD_by_words.flatMap(lambda x: [((x[1],i),1) for i in x[0]])

In [209]:
RDD_tf.take(10)

[((0, 'the'), 1),
 ((0, 'project'), 1),
 ((0, 'gutenberg'), 1),
 ((0, 'ebook'), 1),
 ((0, 'of'), 1),
 ((0, 'frankenstein'), 1),
 ((0, 'by'), 1),
 ((0, 'mary'), 1),
 ((0, 'wollstonecraft'), 1),
 ((0, 'godwin'), 1)]

In [210]:
#apply reduce function to count all instances of each word in each doc
RDD_tf=RDD_tf.reduceByKey(lambda x,y:x+y)

In [211]:
RDD_tf.take(10)

[((0, 'project'), 1),
 ((0, 'gutenberg'), 1),
 ((0, 'ebook'), 1),
 ((0, 'of'), 1),
 ((0, 'mary'), 1),
 ((0, 'shelley'), 1),
 ((1, 'for'), 1),
 ((1, 'the'), 2),
 ((1, 'states'), 1),
 ((1, 'and'), 1)]

In [212]:
#recombine previous rdd to (word - (doc_id - tf))
RDD_2=RDD_tf.map(lambda x: (x[0][1],(x[0][0],x[1])))
RDD_2.take(5)

[('project', (0, 1)),
 ('gutenberg', (0, 1)),
 ('ebook', (0, 1)),
 ('of', (0, 1)),
 ('mary', (0, 1))]

In [213]:
#create an auxiliary rdd (word - 1), where each row is an occurence of a certain word in different documents
RDD_3=RDD_2.map(lambda x: (x[0],1))
RDD_3.take(5)


[('project', 1), ('gutenberg', 1), ('ebook', 1), ('of', 1), ('mary', 1)]

In [214]:
#create rdd to calculate document frequency of each word
RDD_df=RDD_3\
       .reduceByKey(lambda x,y:x+y)

In [215]:
RDD_df.takeOrdered(10, key = lambda x: -x[1])

[('the', 3284),
 ('and', 2706),
 ('of', 2436),
 ('i', 2361),
 ('to', 1901),
 ('my', 1536),
 ('a', 1315),
 ('in', 1128),
 ('that', 975),
 ('was', 949)]

In [216]:
#calculate idf for each word
idf=RDD_df.map(lambda x: (x[0], np.log(N_docs/x[1])))
idf.take(10)

[('project', 4.338033186482454),
 ('gutenberg', 4.251021809492824),
 ('ebook', 6.250420643499123),
 ('of', 1.0172573721308722),
 ('mary', 7.71675771229255),
 ('shelley', 7.71675771229255),
 ('other', 4.293581423911619),
 ('world', 4.965222399250601),
 ('at', 3.0533186181804832),
 ('no', 3.673706444458)]

In [217]:
idf.takeOrdered(10, key = lambda x: x[1])

[('the', 0.718552530388341),
 ('and', 0.9121431922299266),
 ('of', 1.0172573721308722),
 ('i', 1.048529463875147),
 ('to', 1.2652346584722307),
 ('my', 1.4784330872530422),
 ('a', 1.633778056348795),
 ('in', 1.7871685689026557),
 ('that', 1.9329325299628126),
 ('was', 1.959961202350732)]

In [218]:
#auxiliary rdd, join 2 rdds, format of the final rdd (word - (doc_id - tf) - idf)
RDD_4 =RDD_2.join(idf)

In [219]:
RDD_4.take(10)

[('gutenberg', ((0, 1), 4.251021809492824)),
 ('gutenberg', ((4, 1), 4.251021809492824)),
 ('gutenberg', ((6440, 1), 4.251021809492824)),
 ('gutenberg', ((6450, 1), 4.251021809492824)),
 ('gutenberg', ((6462, 1), 4.251021809492824)),
 ('gutenberg', ((6464, 1), 4.251021809492824)),
 ('gutenberg', ((6468, 1), 4.251021809492824)),
 ('gutenberg', ((6472, 1), 4.251021809492824)),
 ('gutenberg', ((6486, 1), 4.251021809492824)),
 ('gutenberg', ((6490, 1), 4.251021809492824))]

In [220]:
#auxiliary RDD to recombine rdd into format(doc_id - word - tf - idf - tf*idf)
RDD_5=RDD_4.map(lambda x: (x[1][0][0],(x[0],x[1][0][1],x[1][1],x[1][0][1]*x[1][1]))).sortByKey()
RDD_5.take(5)

[(0, ('gutenberg', 1, 4.251021809492824, 4.251021809492824)),
 (0, ('of', 1, 1.0172573721308722, 1.0172573721308722)),
 (0, ('mary', 1, 7.71675771229255, 7.71675771229255)),
 (0, ('shelley', 1, 7.71675771229255, 7.71675771229255)),
 (0, ('the', 1, 0.718552530388341, 0.718552530388341))]

In [221]:
#auxiliary rdd, exclude all info apart from doc_id and tf-idf
RDD_6 = RDD_5.map(lambda x: (x[0], x[1][3]))
RDD_6.take(10)

[(0, 4.251021809492824),
 (0, 1.0172573721308722),
 (0, 7.71675771229255),
 (0, 7.71675771229255),
 (0, 0.718552530388341),
 (0, 2.641583897058723),
 (0, 7.71675771229255),
 (0, 4.338033186482454),
 (0, 6.250420643499123),
 (0, 5.381382796475513)]

In [222]:
#final RDD with doc_id and tf-idf vector for each doc
RDD_final=RDD_6\
      .reduceByKey(lambda x,y:x+y)\
      .sortByKey()
RDD_final.take(10)

[(0, 55.46528308469805),
 (1, 51.91862506843506),
 (2, 57.022424967160916),
 (3, 67.08770598551247),
 (4, 48.72321384489898),
 (5, 52.97018550081891),
 (6, 49.338885018292586),
 (7, 15.841680726319993),
 (8, 13.503605616876229),
 (9, 18.489440477629174)]

In [223]:
#check that the type of output is RDD
type(RDD_final)

pyspark.rdd.PipelinedRDD