In [2]:


import os
import json
import boto3
import sklearn
import socket
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return "{}proxy/{}/jobs/".format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

# small fix to enable UI views
SparkContext.uiWebUrl = property(uiWebUrl)

# spark configurtion in local regime 
conf = SparkConf().set('spark.master', 'local[*]').set('spark.driver.memory', '8g')

#some needed objects
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

user: /user/st033822/


# Homework 1

Transform text file "The Project Gutenberg eBook of Frankenstein, by Mary Wollstonecraft (Godwin) Shelle" into TF-IDF. Take sentence as "document".

### Part 1: 
- read text file as dataframe 
- filter out non-letters and empty strings 
- transform into dataframe doc_id -> tf_idf vector 


### Part 2:
- read text file as RDD
- filter out non-letters and empty strings 
- transform into rdd in format doc_id -> tf_idf vector


### NB(!) 

It's not allowed to use TF-IDF code from Spark internal libraries. 
It's not allowed to cast DF/RDD into pandas and use scikit-learn. Please, keep it spark. 


## Part 1

Reading text file as rdd

In [3]:


result_prefix = "malyutin_demo_hw1"

filepath = "file:///home/jovyan/shared/lectures_folder/84-0.txt"
from pyspark.sql.functions import monotonically_increasing_id

dataframe = sc.textFile(f"{filepath}")\
    .map(lambda x: (x,))\
    .toDF()\
    .select(F.col("_1").alias("text"))\
    .withColumn("id", monotonically_increasing_id())
dataframe.show()

+--------------------+---+
|                text| id|
+--------------------+---+
|The Project Guten...|  0|
|                    |  1|
|This eBook is for...|  2|
|most other parts ...|  3|
|whatsoever. You m...|  4|
|of the Project Gu...|  5|
|www.gutenberg.org...|  6|
|will have to chec...|  7|
|   using this eBook.|  8|
|                    |  9|
| Title: Frankenstein| 10|
|       or, The Mo...| 11|
|                    | 12|
|Author: Mary Woll...| 13|
|                    | 14|
|Release Date: 31,...| 15|
|[Most recently up...| 16|
|                    | 17|
|   Language: English| 18|
|                    | 19|
+--------------------+---+
only showing top 20 rows



In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import string
import re


def process_string(data):
    """
    basic preprocessing function:
    - removes punctuation
    - lower
    - split by space
    """
    punct_removed = re.sub(r'[^\w\s]','',data)
    words = punct_removed.lower().split(" ")
    
    
    return list(filter(lambda x: len(x) > 0, words))

# spark udf -- user defined function (~ mapper)

process_string_udf = udf(lambda z: process_string(z), ArrayType(StringType()))

In [246]:
"""
process words

filter out empty and small sentences
"""

by_words = dataframe\
    .select(process_string_udf(F.col("text")).alias("by_words"))\
    .where(F.size(F.col("by_words")) > 1)\
    .withColumn("doc_id", monotonically_increasing_id())
by_words.show()

+--------------------+------+
|            by_words|doc_id|
+--------------------+------+
|[the, project, gu...|     0|
|[this, ebook, is,...|     1|
|[most, other, par...|     2|
|[whatsoever, you,...|     3|
|[of, the, project...|     4|
|[wwwgutenbergorg,...|     5|
|[will, have, to, ...|     6|
|[using, this, ebook]|     7|
|[title, frankenst...|     8|
|[or, the, modern,...|     9|
|[author, mary, wo...|    10|
|[release, date, 3...|    11|
|[most, recently, ...|    12|
| [language, english]|    13|
|[character, set, ...|    14|
|[produced, by, ju...|    15|
|[further, correct...|    16|
|[start, of, the, ...|    17|
|[or, the, modern,...|    18|
|[by, mary, wollst...|    19|
+--------------------+------+
only showing top 20 rows



In [247]:
# exploding to get separate words
by_words_count = by_words.select(F.explode(F.col("by_words")).alias("word"), F.col("doc_id"))
by_words_count.show()

+--------------+------+
|          word|doc_id|
+--------------+------+
|           the|     0|
|       project|     0|
|     gutenberg|     0|
|         ebook|     0|
|            of|     0|
|  frankenstein|     0|
|            by|     0|
|          mary|     0|
|wollstonecraft|     0|
|        godwin|     0|
|       shelley|     0|
|          this|     1|
|         ebook|     1|
|            is|     1|
|           for|     1|
|           the|     1|
|           use|     1|
|            of|     1|
|        anyone|     1|
|      anywhere|     1|
+--------------+------+
only showing top 20 rows



In [248]:
# calculating term frequency

by_words_count = by_words_count.groupBy("doc_id", "word")\
    .agg(F.count("word").alias("tf"))
by_words_count.show()

+------+----------+---+
|doc_id|      word| tf|
+------+----------+---+
|    12|        13|  1|
|    62|      vain|  1|
|    92|expedition|  1|
|   127| fluctuate|  1|
|   170|   approve|  1|
|   171|      poor|  1|
|   189|        my|  1|
|   259|  remember|  1|
|   315|        in|  2|
|   319|     until|  1|
|   339|         i|  1|
|   419|  although|  1|
|   453|         a|  1|
|   477|        to|  1|
|   480|       you|  1|
|   506|        in|  1|
|   513|      what|  1|
|   515|  greatest|  1|
|   515|       but|  1|
|   536|  beaufort|  1|
+------+----------+---+
only showing top 20 rows



In [256]:
# document frequency

by_words_count_with_df = by_words_count.groupBy("word")\
      .agg(F.countDistinct("doc_id").alias("df"))
by_words_count_with_df.show()

+-------------+---+
|         word| df|
+-------------+---+
|         some|145|
|         hope| 49|
|        those| 91|
|          art|  7|
|    solemnity|  4|
|      blossom|  1|
|          few| 62|
|     ignominy|  4|
|        still| 65|
|       voyage| 15|
|    imitation|  2|
|       waters| 11|
|gratification|  3|
|    arguments|  7|
|      barrier|  4|
|   circulates|  1|
|apprehensions|  2|
|       spared|  5|
|     painters|  1|
|  transaction|  1|
+-------------+---+
only showing top 20 rows



In [286]:
# we use the same formula to calculate IDF that is included in Spark MLlib

import numpy as np
from pyspark.sql.functions import log, log1p
number_of_documents = dataframe.count()
import numpy as np
def calc_idf(df):
    """
    Args:
        df: document frequency - how many times a given word appears in whole corpus of docs (without normalization)
        
    Returns:
        idf: inverse document frequency
    """
    exp_idf = (number_of_documents + 1)/(df + 1) 
    idf = np.log(exp_idf)
    return idf

calc_idf_udf = udf(lambda df: calc_idf(docCount, df), FloatType())

In [283]:
import numpy as np
from pyspark.sql.functions import log, log1p
number_of_documents = dataframe.count()
import numpy as np
def calc_idf_exp(df):
    """
    Args:
        df: document frequency - how many times a given word appears in whole corpus of docs (without normalization)
        
    Returns:
        exp_idf: inverse document frequency without taking log
    """
    exp_idf = (number_of_documents + 1)/(df + 1) 

    return exp_idf

calc_idf_exp_udf = udf(lambda df: calc_idf(docCount, df), FloatType())

In [257]:
by_words_with_idf = by_words_count_with_df.withColumn("idf", calc_idf(F.col("df")))
by_words_with_idf.show()

+-------------+---+------------------+
|         word| df|               idf|
+-------------+---+------------------+
|         some|145|3.9710670072480765|
|         hope| 49| 5.042650623528266|
|        those| 91| 4.432885051907372|
|          art|  7| 6.875232087276577|
|    solemnity|  4| 7.345235716522312|
|      blossom|  1| 8.261526448396468|
|          few| 62|  4.81153890256488|
|     ignominy|  4| 7.345235716522312|
|        still| 65| 4.765018886929988|
|       voyage| 15| 6.182084906716632|
|    imitation|  2| 7.856061340288304|
|       waters| 11| 6.469766979168413|
|gratification|  3| 7.568379267836522|
|    arguments|  7| 6.875232087276577|
|      barrier|  4| 7.345235716522312|
|   circulates|  1| 8.261526448396468|
|apprehensions|  2| 7.856061340288304|
|       spared|  5| 7.162914159728358|
|     painters|  1| 8.261526448396468|
|  transaction|  1| 8.261526448396468|
+-------------+---+------------------+
only showing top 20 rows



In [258]:
tf_idf_full = by_words_with_idf.join(by_words_count, on=["word"], how = "left_outer")\
    .withColumn("tf_idf", F.col("tf") * F.col("idf"))
tf_idf_full.show()

+-------------+---+-----------------+----------+---+-----------------+
|         word| df|              idf|    doc_id| tf|           tf_idf|
+-------------+---+-----------------+----------+---+-----------------+
| accumulation|  2|7.856061340288304|      2614|  1|7.856061340288304|
| accumulation|  2|7.856061340288304|8589937145|  1|7.856061340288304|
|apprehensions|  2|7.856061340288304|       785|  1|7.856061340288304|
|apprehensions|  2|7.856061340288304|      1511|  1|7.856061340288304|
|    arguments|  7|6.875232087276577|8589935320|  1|6.875232087276577|
|    arguments|  7|6.875232087276577|      1734|  1|6.875232087276577|
|    arguments|  7|6.875232087276577|8589936445|  1|6.875232087276577|
|    arguments|  7|6.875232087276577|      2676|  1|6.875232087276577|
|    arguments|  7|6.875232087276577|      2327|  1|6.875232087276577|
|    arguments|  7|6.875232087276577|       870|  1|6.875232087276577|
|    arguments|  7|6.875232087276577|       423|  1|6.875232087276577|
|     

In [259]:
tf_idf_aggregated = tf_idf_full.groupBy('doc_id')\
    .agg(F.collect_list('tf_idf').alias('tf_idf'))\
    .orderBy(F.col("doc_id"))
tf_idf_aggregated.show(truncate = 100)

+------+----------------------------------------------------------------------------------------------------+
|doc_id|                                                                                              tf_idf|
+------+----------------------------------------------------------------------------------------------------+
|     0|[2.7788063588506517, 7.568379267836522, 7.568379267836522, 4.466037259224273, 7.568379267836522, ...|
|     1|[6.652088535962367, 2.7281369596689475, 1.9273591149166367, 5.91015119123299, 3.231088527004032, ...|
|     2|[7.162914159728358, 2.410761783391621, 4.432885051907372, 7.568379267836522, 0.8581607114548204, ...|
|     3|[6.469766979168413, 2.644755350729896, 5.488937726156687, 5.553476247294258, 4.263325746727269, 7...|
|     4|[7.345235716522312, 2.410761783391621, 4.466037259224273, 0.8581607114548204, 7.345235716522312, ...|
|     5|[2.70084481738094, 5.289510701459792, 1.9273591149166367, 5.91015119123299, 6.875232087276577, 5....|
|     6|[3

In [285]:
# whole pipeline
by_words_count = sc.textFile(f"{filepath}")\
    .map(lambda x: (x,))\
    .toDF()\
    .select(F.col("_1").alias("text"))\
    .select(process_string_udf(F.col("text")).alias("by_words"))\
    .where(F.size(F.col("by_words")) > 1)\
    .withColumn("doc_id", monotonically_increasing_id())\
    .select(F.explode(F.col("by_words")).alias("word"), F.col("doc_id"))\
    .groupBy("doc_id", "word")\
    .agg(F.count("word").alias("tf"))

   

by_words_with_idf = by_words_count.groupBy("word")\
    .agg(F.countDistinct("doc_id").alias("df"))\
    .withColumn("idf", log(calc_idf_exp(F.col("df"))))
    

tf_idf = by_words_with_idf.join(by_words_count, on=["word"], how = "left_outer")\
    .withColumn("tf_idf", F.col("tf") * F.col("idf"))\
    .groupBy('doc_id')\
    .agg(F.collect_list('tf_idf').alias('tf_idf'))\
    .orderBy(F.col("doc_id"))

tf_idf.show(truncate = 100)

+------+----------------------------------------------------------------------------------------------------+
|doc_id|                                                                                              tf_idf|
+------+----------------------------------------------------------------------------------------------------+
|     0|[2.7788063588506517, 7.568379267836522, 7.568379267836522, 4.466037259224273, 7.568379267836522, ...|
|     1|[6.652088535962367, 2.7281369596689475, 1.9273591149166367, 5.91015119123299, 3.231088527004032, ...|
|     2|[7.162914159728358, 2.410761783391621, 4.432885051907372, 7.568379267836522, 0.8581607114548204, ...|
|     3|[6.469766979168413, 2.644755350729896, 5.488937726156687, 5.553476247294258, 4.263325746727269, 7...|
|     4|[7.345235716522312, 2.410761783391621, 4.466037259224273, 0.8581607114548204, 7.345235716522312, ...|
|     5|[2.70084481738094, 5.289510701459792, 1.9273591149166367, 5.91015119123299, 6.875232087276577, 5....|
|     6|[3

## Part 2

Reading text file into RDD

In [271]:
result_prefix = "malyutin_demo_hw1"

filepath = "file:///home/jovyan/shared/lectures_folder/84-0.txt"
from pyspark.sql.functions import monotonically_increasing_id

rddText = sc.textFile(f"{filepath}").repartition(1).zipWithIndex().repartition(5)



rddText.take(20)

[(' Letter 2', 40),
 (' Letter 3', 41),
 (' Letter 4', 42),
 (' Chapter 1', 43),
 (' Chapter 2', 44),
 (' Chapter 3', 45),
 (' Chapter 4', 46),
 (' Chapter 5', 47),
 (' Chapter 6', 48),
 (' Chapter 7', 49),
 ('Inspirited by this wind of promise, my daydreams become more fervent', 90),
 ('and vivid. I try in vain to be persuaded that the pole is the seat of', 91),
 ('frost and desolation; it ever presents itself to my imagination as the',
  92),
 ('region of beauty and delight. There, Margaret, the sun is for ever', 93),
 ('visible, its broad disk just skirting the horizon and diffusing a', 94),
 ('perpetual splendour. There—for with your leave, my sister, I will put', 95),
 ('some trust in preceding navigators—there snow and frost are banished;', 96),
 ('and, sailing over a calm sea, we may be wafted to a land surpassing in',
  97),
 ('wonders and in beauty every region hitherto discovered on the habitable',
  98),
 ('globe. Its productions and features may be without example, as the',

In [272]:
# tokenize RDD
rdd_tokenized = rddText.map(lambda line: process_string(line[0]))
rdd_tokenized.collect()

[['letter', '2'],
 ['letter', '3'],
 ['letter', '4'],
 ['chapter', '1'],
 ['chapter', '2'],
 ['chapter', '3'],
 ['chapter', '4'],
 ['chapter', '5'],
 ['chapter', '6'],
 ['chapter', '7'],
 ['inspirited',
  'by',
  'this',
  'wind',
  'of',
  'promise',
  'my',
  'daydreams',
  'become',
  'more',
  'fervent'],
 ['and',
  'vivid',
  'i',
  'try',
  'in',
  'vain',
  'to',
  'be',
  'persuaded',
  'that',
  'the',
  'pole',
  'is',
  'the',
  'seat',
  'of'],
 ['frost',
  'and',
  'desolation',
  'it',
  'ever',
  'presents',
  'itself',
  'to',
  'my',
  'imagination',
  'as',
  'the'],
 ['region',
  'of',
  'beauty',
  'and',
  'delight',
  'there',
  'margaret',
  'the',
  'sun',
  'is',
  'for',
  'ever'],
 ['visible',
  'its',
  'broad',
  'disk',
  'just',
  'skirting',
  'the',
  'horizon',
  'and',
  'diffusing',
  'a'],
 ['perpetual',
  'splendour',
  'therefor',
  'with',
  'your',
  'leave',
  'my',
  'sister',
  'i',
  'will',
  'put'],
 ['some',
  'trust',
  'in',
  'precedin

In [273]:
# rid of documents with length < 1 + indexing RDD
rdd_tokenized = rdd_tokenized.filter(lambda x: len(x) > 1).zipWithIndex()
rdd_tokenized.collect()

[(['letter', '2'], 0),
 (['letter', '3'], 1),
 (['letter', '4'], 2),
 (['chapter', '1'], 3),
 (['chapter', '2'], 4),
 (['chapter', '3'], 5),
 (['chapter', '4'], 6),
 (['chapter', '5'], 7),
 (['chapter', '6'], 8),
 (['chapter', '7'], 9),
 (['inspirited',
   'by',
   'this',
   'wind',
   'of',
   'promise',
   'my',
   'daydreams',
   'become',
   'more',
   'fervent'],
  10),
 (['and',
   'vivid',
   'i',
   'try',
   'in',
   'vain',
   'to',
   'be',
   'persuaded',
   'that',
   'the',
   'pole',
   'is',
   'the',
   'seat',
   'of'],
  11),
 (['frost',
   'and',
   'desolation',
   'it',
   'ever',
   'presents',
   'itself',
   'to',
   'my',
   'imagination',
   'as',
   'the'],
  12),
 (['region',
   'of',
   'beauty',
   'and',
   'delight',
   'there',
   'margaret',
   'the',
   'sun',
   'is',
   'for',
   'ever'],
  13),
 (['visible',
   'its',
   'broad',
   'disk',
   'just',
   'skirting',
   'the',
   'horizon',
   'and',
   'diffusing',
   'a'],
  14),
 (['perpetual',

In [274]:
# document frequency - how many documents in the corpus contain the term

rdd_df = rdd_tokenized\
    .map(lambda line: (line[1], line[0]))\
    .flatMapValues(lambda line: line)\
    .map(lambda line: (line[1], line[0]))\
    .distinct()\
    .groupByKey()\
    .mapValues(len)
rdd_df.collect()

[('presents', 3),
 ('is', 305),
 ('perpetual', 5),
 ('put', 15),
 ('on', 461),
 ('undertaking', 13),
 ('to', 1896),
 ('branches', 12),
 ('alarmed', 6),
 ('at', 318),
 ('would', 179),
 ('one', 197),
 ('us', 67),
 ('distant', 10),
 ('of', 2435),
 ('land', 18),
 ('countenance', 38),
 ('cup', 3),
 ('such', 91),
 ('intuitive', 1),
 ('all', 213),
 ('while', 68),
 ('mind', 85),
 ('their', 173),
 ('an', 215),
 ('plaited', 2),
 ('bring', 3),
 ('lot', 6),
 ('me', 792),
 ('prevailed', 2),
 ('want', 12),
 ('than', 112),
 ('sisterthe', 1),
 ('was', 948),
 ('therefore', 37),
 ('united', 20),
 ('have', 350),
 ('humane', 1),
 ('generosity', 2),
 ('reflections', 16),
 ('ocean', 13),
 ('learned', 19),
 ('her', 308),
 ('fever', 10),
 ('no', 171),
 ('duties', 11),
 ('accompany', 6),
 ('narrowminded', 1),
 ('utterly', 11),
 ('waldman', 9),
 ('never', 64),
 ('town', 26),
 ('fifty', 1),
 ('former', 14),
 ('true', 21),
 ('why', 32),
 ('knowledge', 32),
 ('believes', 4),
 ('allow', 14),
 ('dabbled', 1),
 ('sen

In [287]:
# Inverse document frequency
import numpy as np

from pyspark.sql.functions import log1p
rdd_idf = rdd_df\
    .map(lambda line: (line[0], calc_idf(line[1])))
rdd_idf.collect()

[('presents', 7.568379267836522),
 ('is', 3.231088527004032),
 ('perpetual', 7.162914159728358),
 ('put', 6.182084906716632),
 ('on', 2.819108737874674),
 ('undertaking', 6.315616299341154),
 ('to', 1.4066446590213988),
 ('branches', 6.3897242714948765),
 ('alarmed', 7.008763479901099),
 ('at', 3.1894825261715685),
 ('would', 3.761716778066203),
 ('one', 3.6664065982618776),
 ('us', 4.735165923780306),
 ('distant', 6.556778356158042),
 ('of', 1.1565610001266253),
 ('land', 6.010234649789973),
 ('countenance', 5.2911119828267665),
 ('cup', 7.568379267836522),
 ('such', 4.432885051907372),
 ('intuitive', 8.261526448396468),
 ('all', 3.5886976139345617),
 ('while', 4.720567124359153),
 ('mind', 4.5003263327029055),
 ('their', 3.7956183297418837),
 ('an', 3.579395221272248),
 ('plaited', 7.856061340288304),
 ('bring', 7.568379267836522),
 ('lot', 7.008763479901099),
 ('me', 2.278850407321565),
 ('prevailed', 7.856061340288304),
 ('want', 6.3897242714948765),
 ('than', 4.227285810244073),
 

In [276]:
# TERM FREQUENCY (tf)

rdd_tf = rdd_tokenized\
    .map(lambda line: (line[1], line[0]))\
    .flatMapValues(lambda line: line)\
    .groupBy(lambda line: line)\
    .mapValues(len)
rdd_tf.collect()

[((0, 'letter'), 1),
 ((3, 'chapter'), 1),
 ((5, '3'), 1),
 ((6, 'chapter'), 1),
 ((7, 'chapter'), 1),
 ((7, '5'), 1),
 ((9, '7'), 1),
 ((10, 'of'), 1),
 ((11, 'vain'), 1),
 ((12, 'imagination'), 1),
 ((13, 'there'), 1),
 ((13, 'the'), 1),
 ((13, 'for'), 1),
 ((13, 'ever'), 1),
 ((14, 'its'), 1),
 ((15, 'your'), 1),
 ((15, 'sister'), 1),
 ((17, 'sailing'), 1),
 ((17, 'a'), 2),
 ((17, 'be'), 1),
 ((17, 'to'), 1),
 ((17, 'surpassing'), 1),
 ((17, 'in'), 1),
 ((18, 'beauty'), 1),
 ((18, 'on'), 1),
 ((19, 'may'), 1),
 ((20, 'the'), 1),
 ((20, 'earlier'), 1),
 ((20, 'bent'), 1),
 ((21, 'resolved'), 1),
 ((21, 'on'), 1),
 ((21, 'present'), 1),
 ((23, 'commenced'), 1),
 ((23, 'to'), 1),
 ((24, 'whalefishers'), 1),
 ((24, 'several'), 1),
 ((24, 'expeditions'), 1),
 ((25, 'i'), 2),
 ((25, 'want'), 1),
 ((25, 'of'), 1),
 ((25, 'often'), 1),
 ((26, 'common'), 1),
 ((26, 'during'), 1),
 ((27, 'nights'), 1),
 ((28, 'derive'), 1),
 ((29, 'walton'), 1),
 ((31, 'mrs'), 1),
 ((32, '28th'), 1),
 ((32, '

In [277]:
# preparing RDD with calculated TF for joining

def mapper(rdd):
    return ( rdd[0][1], (rdd[0][0], rdd[1]))

rdd_tf_flat = rdd_tf.map(lambda x : mapper(x))

In [278]:
rdd_tf_flat.collect()

[('letter', (0, 1)),
 ('chapter', (3, 1)),
 ('3', (5, 1)),
 ('chapter', (6, 1)),
 ('chapter', (7, 1)),
 ('5', (7, 1)),
 ('7', (9, 1)),
 ('of', (10, 1)),
 ('vain', (11, 1)),
 ('imagination', (12, 1)),
 ('there', (13, 1)),
 ('the', (13, 1)),
 ('for', (13, 1)),
 ('ever', (13, 1)),
 ('its', (14, 1)),
 ('your', (15, 1)),
 ('sister', (15, 1)),
 ('sailing', (17, 1)),
 ('a', (17, 2)),
 ('be', (17, 1)),
 ('to', (17, 1)),
 ('surpassing', (17, 1)),
 ('in', (17, 1)),
 ('beauty', (18, 1)),
 ('on', (18, 1)),
 ('may', (19, 1)),
 ('the', (20, 1)),
 ('earlier', (20, 1)),
 ('bent', (20, 1)),
 ('resolved', (21, 1)),
 ('on', (21, 1)),
 ('present', (21, 1)),
 ('commenced', (23, 1)),
 ('to', (23, 1)),
 ('whalefishers', (24, 1)),
 ('several', (24, 1)),
 ('expeditions', (24, 1)),
 ('i', (25, 2)),
 ('want', (25, 1)),
 ('of', (25, 1)),
 ('often', (25, 1)),
 ('common', (26, 1)),
 ('during', (26, 1)),
 ('nights', (27, 1)),
 ('derive', (28, 1)),
 ('walton', (29, 1)),
 ('mrs', (31, 1)),
 ('28th', (32, 1)),
 ('march

In [279]:
# join two RDD's with TF and with IDF by word

rdd_merged = rdd_tf_flat.join(rdd_idf)
rdd_merged.collect()

[('of', ((10, 1), 1.430067661274726)),
 ('of', ((25, 1), 1.430067661274726)),
 ('of', ((33, 1), 1.430067661274726)),
 ('of', ((45, 1), 1.430067661274726)),
 ('of', ((49, 1), 1.430067661274726)),
 ('of', ((53, 1), 1.430067661274726)),
 ('of', ((73, 1), 1.430067661274726)),
 ('of', ((77, 1), 1.430067661274726)),
 ('of', ((93, 1), 1.430067661274726)),
 ('of', ((134, 1), 1.430067661274726)),
 ('of', ((136, 1), 1.430067661274726)),
 ('of', ((148, 1), 1.430067661274726)),
 ('of', ((185, 1), 1.430067661274726)),
 ('of', ((192, 1), 1.430067661274726)),
 ('of', ((198, 1), 1.430067661274726)),
 ('of', ((202, 1), 1.430067661274726)),
 ('of', ((206, 1), 1.430067661274726)),
 ('of', ((209, 1), 1.430067661274726)),
 ('of', ((214, 2), 1.430067661274726)),
 ('of', ((224, 1), 1.430067661274726)),
 ('of', ((263, 1), 1.430067661274726)),
 ('of', ((276, 1), 1.430067661274726)),
 ('of', ((282, 2), 1.430067661274726)),
 ('of', ((304, 1), 1.430067661274726)),
 ('of', ((315, 1), 1.430067661274726)),
 ('of', (

In [280]:
# calculating tf-idf for each word in each document (word itself is not represented)

def calculate_tf_idf(rdd):
    return (rdd[1][0][0], rdd[1][0][1] * rdd[1][1]) 

rdd_merged_tfidf = rdd_merged.map(lambda row: calculate_tf_idf(row))
rdd_merged_tfidf.collect()

[(10, 1.430067661274726),
 (25, 1.430067661274726),
 (33, 1.430067661274726),
 (45, 1.430067661274726),
 (49, 1.430067661274726),
 (53, 1.430067661274726),
 (73, 1.430067661274726),
 (77, 1.430067661274726),
 (93, 1.430067661274726),
 (134, 1.430067661274726),
 (136, 1.430067661274726),
 (148, 1.430067661274726),
 (185, 1.430067661274726),
 (192, 1.430067661274726),
 (198, 1.430067661274726),
 (202, 1.430067661274726),
 (206, 1.430067661274726),
 (209, 1.430067661274726),
 (214, 2.860135322549452),
 (224, 1.430067661274726),
 (263, 1.430067661274726),
 (276, 1.430067661274726),
 (282, 2.860135322549452),
 (304, 1.430067661274726),
 (315, 1.430067661274726),
 (317, 1.430067661274726),
 (330, 1.430067661274726),
 (356, 2.860135322549452),
 (357, 2.860135322549452),
 (366, 1.430067661274726),
 (381, 1.430067661274726),
 (406, 1.430067661274726),
 (412, 1.430067661274726),
 (429, 1.430067661274726),
 (433, 2.860135322549452),
 (435, 1.430067661274726),
 (446, 1.430067661274726),
 (458, 1.4

In [281]:
# final grouping by doc_id to get RDD "doc_id -> tf_idf vector"

rdd_merged_tfidf\
    .groupByKey()\
    .mapValues(list)\
    .sortBy(lambda x: x[0])\
    .collect()

[(0, [7.163688653119338, 5.462418377228463]),
 (1, [5.462418377228463, 6.876264611890766]),
 (2, [5.462418377228463, 6.876264611890766]),
 (3, [7.009666997226198, 5.069160875732934]),
 (4, [7.163688653119338, 5.069160875732934]),
 (5, [5.069160875732934, 6.876264611890766]),
 (6, [6.876264611890766, 5.069160875732934]),
 (7, [7.5688956634069955, 5.069160875732934]),
 (8, [7.8564486619637925, 5.069160875732934]),
 (9, [7.8564486619637925, 5.069160875732934]),
 (10,
  [1.430067661274726,
   7.5688956634069955,
   2.8390663055278114,
   2.9167330393313127,
   5.432693990099531,
   7.8564486619637925,
   1.7992234013318817,
   5.822145052490498,
   5.626078300322127,
   3.8877809086242467,
   7.8564486619637925]),
 (11,
  [1.430067661274726,
   3.2698422684602284,
   1.3519525885610604,
   7.5688956634069955,
   6.876264611890766,
   2.0632282944808105,
   3.1113584758368957,
   1.4558956815760389,
   1.6257511467293662,
   7.34588116932985,
   7.34588116932985,
   6.317422518384654,
   2.