In [1]:
import os
import json
import boto3
import sklearn
import socket
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return "{}proxy/{}/jobs/".format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

# small fix to enable UI views
SparkContext.uiWebUrl = property(uiWebUrl)

# spark configurtion in local regime 
conf = SparkConf().set('spark.master', 'local[*]').set('spark.driver.memory', '8g')

#some needed objects
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

user: /user/st091465/


# Homework 1

Transform text file "The Project Gutenberg eBook of Frankenstein, by Mary Wollstonecraft (Godwin) Shelle" into TF-IDF. Take row as "document".

### Part 1: 
- read text file as dataframe 
- filter out non-letters and empty strings 
- transform into dataframe doc_id -> tf_idf vector 


### Part 2:
- read text file as RDD
- filter out non-letters and empty strings 
- transform into rdd in format doc_id -> tf_idf vector


### Org part: 
I'm waiting your HW's as self-sufficient jupyter notebooks in github repository. 

Please, fill this table in specified comment with:

your name / github link / telegram (optionally, if u want to discuss some) / 

Fill the comment please and i will add your data in a few days

https://docs.google.com/spreadsheets/d/1p3yLsXqX2dp_TrPwNcikcS5FP_PTM0_gnSOzGn5Gn1E/edit#gid=0

Feel free to text me if u have some questions 

### Deadline: 05.05.2021 included

Dear students, dead in "deadline" means *dead*. This deadline is not for you - it's for me. Deadlines informs me from which point i should start to score your HWs.  You can commit anything after deadline but it's not guaranteed that I'll take it into account. It's possible to move deadline only for the whole group not "just for me plz cause I was ill / detentioned / skipped this message". 

### NB(!) 

It's not allowed to use TF-IDF code from Spark internal libraries. 
It's not allowed to cast DF/RDD into pandas and use scikit-learn. Please, keep it spark. 


## Part 1

Reading text file as DataFrame

In [2]:
result_prefix = "malyutin_demo_hw1"

filepath = "file:///home/jovyan/shared/lectures_folder/84-0.txt"
from pyspark.sql.functions import monotonically_increasing_id

dataframe = sc.textFile(f"{filepath}")\
    .map(lambda x: (x,))\
    .toDF()\
    .select(F.col("_1").alias("text"))\
    .withColumn("docid", monotonically_increasing_id())

dataframe.show()

+--------------------+-----+
|                text|docid|
+--------------------+-----+
|The Project Guten...|    0|
|                    |    1|
|This eBook is for...|    2|
|most other parts ...|    3|
|whatsoever. You m...|    4|
|of the Project Gu...|    5|
|www.gutenberg.org...|    6|
|will have to chec...|    7|
|   using this eBook.|    8|
|                    |    9|
| Title: Frankenstein|   10|
|       or, The Mo...|   11|
|                    |   12|
|Author: Mary Woll...|   13|
|                    |   14|
|Release Date: 31,...|   15|
|[Most recently up...|   16|
|                    |   17|
|   Language: English|   18|
|                    |   19|
+--------------------+-----+
only showing top 20 rows



In [3]:
dataframe.printSchema()

root
 |-- text: string (nullable = true)
 |-- docid: long (nullable = false)



In [4]:
dataframe.count()

7743

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import string
import re

def process_string(data):
    """
    basic preprocessing function:
    - removes punctuation
    - lower
    - split by space
    """
    punct_removed = re.sub(r'[^\w\s]','',data)
    words = punct_removed.lower().split(" ")
    
    
    return list(filter(lambda x: len(x) > 0, words))

# spark udf -- user defined function (~ mapper)

process_string_udf = udf(lambda z: process_string(z), ArrayType(StringType()))

In [6]:
by_words = dataframe\
    .select(process_string_udf(F.col("text")).alias("by_words"))\
    .where(F.size(F.col("by_words")) > 1)\
    .withColumn("docid", monotonically_increasing_id())


by_words.show()

+--------------------+-----+
|            by_words|docid|
+--------------------+-----+
|[the, project, gu...|    0|
|[this, ebook, is,...|    1|
|[most, other, par...|    2|
|[whatsoever, you,...|    3|
|[of, the, project...|    4|
|[wwwgutenbergorg,...|    5|
|[will, have, to, ...|    6|
|[using, this, ebook]|    7|
|[title, frankenst...|    8|
|[or, the, modern,...|    9|
|[author, mary, wo...|   10|
|[release, date, 3...|   11|
|[most, recently, ...|   12|
| [language, english]|   13|
|[character, set, ...|   14|
|[produced, by, ju...|   15|
|[further, correct...|   16|
|[start, of, the, ...|   17|
|[or, the, modern,...|   18|
|[by, mary, wollst...|   19|
+--------------------+-----+
only showing top 20 rows



In [7]:
# Separate word with docid
by_words_count = by_words\
     .withColumn('word',(F.explode(F.col("by_words"))))
by_words_count.show()

+--------------------+-----+--------------+
|            by_words|docid|          word|
+--------------------+-----+--------------+
|[the, project, gu...|    0|           the|
|[the, project, gu...|    0|       project|
|[the, project, gu...|    0|     gutenberg|
|[the, project, gu...|    0|         ebook|
|[the, project, gu...|    0|            of|
|[the, project, gu...|    0|  frankenstein|
|[the, project, gu...|    0|            by|
|[the, project, gu...|    0|          mary|
|[the, project, gu...|    0|wollstonecraft|
|[the, project, gu...|    0|        godwin|
|[the, project, gu...|    0|       shelley|
|[this, ebook, is,...|    1|          this|
|[this, ebook, is,...|    1|         ebook|
|[this, ebook, is,...|    1|            is|
|[this, ebook, is,...|    1|           for|
|[this, ebook, is,...|    1|           the|
|[this, ebook, is,...|    1|           use|
|[this, ebook, is,...|    1|            of|
|[this, ebook, is,...|    1|        anyone|
|[this, ebook, is,...|    1|    

In [8]:
from pyspark.sql.functions import count
from pyspark.sql.functions import countDistinct

# term frequency
by_words_tf = by_words_count.groupBy("docid").agg(F.count("word").alias("doc_len")) \
    .join(by_words_count.groupBy("docid", "word")
          .agg(F.count("by_words").alias("word_count")), ['docid']) \
    .withColumn("tf", F.col("word_count") / F.col("doc_len")) \
    .drop("doc_len", "word_count")\
    .orderBy(F.col("docid"))

by_words_tf.show()

+-----+--------------+-------------------+
|docid|          word|                 tf|
+-----+--------------+-------------------+
|    0|           the|0.09090909090909091|
|    0|wollstonecraft|0.09090909090909091|
|    0|       project|0.09090909090909091|
|    0|          mary|0.09090909090909091|
|    0|        godwin|0.09090909090909091|
|    0|         ebook|0.09090909090909091|
|    0|  frankenstein|0.09090909090909091|
|    0|            by|0.09090909090909091|
|    0|     gutenberg|0.09090909090909091|
|    0|            of|0.09090909090909091|
|    0|       shelley|0.09090909090909091|
|    1|        united|0.07142857142857142|
|    1|            is|0.07142857142857142|
|    1|          this|0.07142857142857142|
|    1|        anyone|0.07142857142857142|
|    1|           and|0.07142857142857142|
|    1|           use|0.07142857142857142|
|    1|           for|0.07142857142857142|
|    1|           the|0.14285714285714285|
|    1|            of|0.07142857142857142|
+-----+----

In [9]:
# document frequency
by_words_df = by_words_count\
       .groupBy("word")\
       .agg(countDistinct("docid").alias("df"))\
       .orderBy(F.col('df').desc())

by_words_df.show()

+-----+----+
| word|  df|
+-----+----+
|  the|3282|
|  and|2702|
|   of|2435|
|    i|2354|
|   to|1896|
|   my|1534|
|    a|1310|
|   in|1126|
| that| 971|
|  was| 948|
|   me| 792|
| with| 694|
|  but| 681|
|  had| 649|
|which| 554|
|  you| 549|
|   he| 545|
|   it| 533|
|  not| 519|
|  for| 505|
+-----+----+
only showing top 20 rows



In [10]:
D = by_words.count()
D

6650

In [11]:
import numpy as np

# IDF function
def calcIDF(df):
    IDF = np.log(D/(df+1))
    return IDF
calcIDF_udf = udf(lambda z: calcIDF(z).tolist())

by_words_idf = by_words_df\
       .withColumn('idf', calcIDF_udf(F.col("df")))

by_words_idf.show()

+-----+----+------------------+
| word|  df|               idf|
+-----+----+------------------+
|  the|3282|0.7058592161483073|
|  and|2702|0.9002545873734523|
|   of|2435|1.0042595048201122|
|    i|2354|1.0380761271993817|
|   to|1896|1.2543431637148856|
|   my|1534|1.4660864736286023|
|    a|1310|   1.6238266498862|
|   in|1126|1.7750576196101235|
| that| 971|1.9230163291894609|
|  was| 948| 1.946963335039972|
|   me| 792| 2.126548912015052|
| with| 694|2.2584602880851077|
|  but| 681| 2.277342475806438|
|  had| 649| 2.325399770760217|
|which| 554|2.4834040199034653|
|  you| 549|2.4924538554233835|
|   he| 545|2.4997531579049945|
|   it| 533| 2.521976294689705|
|  not| 519|2.5485433220744267|
|  for| 505|2.5758354643624344|
+-----+----+------------------+
only showing top 20 rows



In [12]:
# TF-IDF
by_words_tfidf = by_words_tf.select('docid', 'word', 'tf')\
       .join (by_words_idf, 'word')\
       .orderBy(F.col('docid').asc())\
       .withColumn("tfidf", F.col("tf") * F.col("idf"))

by_words_tfidf.show(30)

+--------------+-----+-------------------+----+------------------+-------------------+
|          word|docid|                 tf|  df|               idf|              tfidf|
+--------------+-----+-------------------+----+------------------+-------------------+
|     gutenberg|    0|0.09090909090909091|  31| 5.336636230850173|0.48514874825910664|
|         ebook|    0|0.09090909090909091|  13| 6.163314804034641| 0.5603013458213311|
|wollstonecraft|    0|0.09090909090909091|   3| 7.416077772530009|  0.674188888411819|
|        godwin|    0|0.09090909090909091|   3| 7.416077772530009|  0.674188888411819|
|          mary|    0|0.09090909090909091|   3| 7.416077772530009|  0.674188888411819|
|       project|    0|0.09090909090909091|  88|  4.31373576391776|0.39215779671979634|
|            of|    0|0.09090909090909091|2435|1.0042595048201122| 0.0912963186200102|
|       shelley|    0|0.09090909090909091|   3| 7.416077772530009|  0.674188888411819|
|           the|    0|0.09090909090909091|3

In [13]:
by_words_vector = by_words_tfidf\
       .groupBy("docid").sum("tfidf")\
       .withColumnRenamed("sum(tfidf)", 'tfidf vector')

by_words_vector.show()

+-----+------------------+
|docid|      tfidf vector|
+-----+------------------+
|    0|5.0226997979180545|
|    1| 3.646328983255845|
|    2|   3.9925193008026|
|    3|4.3410905364615715|
|    4|  4.09461905923505|
|    5| 3.861566679290804|
|    6|3.4829231244683445|
|    7|5.1983492532031566|
|    8| 6.569418074322607|
|    9|4.4890881319069535|
|   10| 7.304154614942924|
|   11| 7.460588236738183|
|   12|7.2150904351204765|
|   13| 6.128818368291165|
|   14| 7.133731785696293|
|   15| 6.822051285998794|
|   16| 6.781287163200986|
|   17| 4.339279942280633|
|   18|4.4890881319069535|
|   19| 6.458163190732835|
+-----+------------------+
only showing top 20 rows



In [14]:
by_words_vector.repartition(1)\
    .write.mode("overwrite").csv(f"df_by_words")

## Part 2

Reading text file into RDD

In [15]:
rddText = sc.textFile(f"{filepath}").repartition(1).zipWithIndex().repartition(5)

rddText.take(20)

[(' Letter 2', 40),
 (' Letter 3', 41),
 (' Letter 4', 42),
 (' Chapter 1', 43),
 (' Chapter 2', 44),
 (' Chapter 3', 45),
 (' Chapter 4', 46),
 (' Chapter 5', 47),
 (' Chapter 6', 48),
 (' Chapter 7', 49),
 ('Inspirited by this wind of promise, my daydreams become more fervent', 90),
 ('and vivid. I try in vain to be persuaded that the pole is the seat of', 91),
 ('frost and desolation; it ever presents itself to my imagination as the',
  92),
 ('region of beauty and delight. There, Margaret, the sun is for ever', 93),
 ('visible, its broad disk just skirting the horizon and diffusing a', 94),
 ('perpetual splendour. There—for with your leave, my sister, I will put', 95),
 ('some trust in preceding navigators—there snow and frost are banished;', 96),
 ('and, sailing over a calm sea, we may be wafted to a land surpassing in',
  97),
 ('wonders and in beauty every region hitherto discovered on the habitable',
  98),
 ('globe. Its productions and features may be without example, as the',