## 1. Set  environment

In [1]:
#import pyspark package
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [2]:
#define  SPARK session.
conf = SparkConf()\
                  .setAll([('spark.executores','5'),
                         ('spark.executor.memory','20g'),
                         ('spark.executor.cores','4'),
                         ('spark.driver.memory','20g')])\
                  .setAppName("ejemplo").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [3]:
#Start  SPARK Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

## 2. Code to clean files and  find stop word

In [4]:
# import libraries to be used by preprocessing process
import nltk
import re, string
nltk.download('stopwords')
from nltk.corpus import stopwords
stopwords =stopwords.words('english')
stopwords[0:9]
import time

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [5]:
## You can use this expression to verify if stop words exists.
"ours" in stopwords

True

In [6]:
# Use this function to delete special characters and puntuaction signs in string x
def clean_txt(x):
  punc = '!"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~𐑍𐑌𐑋𐑊𐑉𐑉𐑉𐑈𐑇𐑇𐑆𐑅𐑄𐑃𐑂𐑁𐑀𐐿𐐾𐐽𐐼𐐻𐐺𐐹𐐸𐐷𐐶𐐵𐐴𐐳𐐲𐐱𐐰𐐯𐐮𐐭𐐬𐐫𐐪𐐩𐐨⅌„℥℞℔'
  lowercased_str = x.lower()
  lowercased_str = lowercased_str.replace('--',' ').replace('\\u',' ').replace('\\r',' ')
  lowercased_str = re.sub(r"[\\r\\u!\"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~]"," ",lowercased_str,flags=re.I)
  for ch in punc:
     lowercased_str = lowercased_str.replace(ch, '')
  return lowercased_str.strip().encode('utf-8').decode('utf-8-sig')

In [7]:
# code to test the function clean_txt
clean_txt("este texto# &%$ tie^ne& caracteres especia{{les~ a borrar \n \ ")

'este texto  tiene caracteres especiales a borrar'

## 3. preprocessing input files

### Step 1: Define broadcast variable to store stop words.

In [8]:
# They will be send to all executors
bc_stopwords =  sc.broadcast(stopwords)        

In [9]:
# testing the broadcast variable
print(type(bc_stopwords.value))
bc_stopwords.value[0:9]

<class 'list'>


['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you']

### Step 2: Read input files, you can use clean_txt and additionally remove stop words

Read input files, delete special characters and remove stop words. <br>
<b>Output: </b> RDD storing the following <em>(key,value)</em> records like <em>(('doc-name', 'word'), 1)</em>

In [10]:
# Funtion return a list of <key,value> pairs where key= doc-id, value=text in file
def organize_txt(text):
  key=text[0].split('/')[-1].split('.')[0]
  value=text[1].split('\n')
  lines=[]
  for line in value:
    lines.append((key,line))
  return lines

### guidelines
<ul align="justify">
<li><em>wholeTextFile</em> read a directory of text files. Each file is read as a single record and returned in a <em>(key-value, pair)</em>, where the key is the path of each file, the value is the content of each file. The text files must be encoded as UTF-8.</li>
    
```
OUTPUT (key, value) -->  <'path file-1',txt'>, <'path file-2',txt>, ...  <'path file-n',txt>
```

<li><em>flatMap()</em> applies the function <em>organize_txt()</em> for each line in the input RDD. As result it tags each document line with the document id. </li>

```
<'file-1',txt'>, <'file-2',txt>, ...  ) ---> ( <file-1, line-1>,.....,<file-1,line-m> ,<file-2,line-1>,<file-2,lin-2>,...)
```
<li><em>flatMap()</em> processes each input line from RDD and  applies word by word the  function <em>txt_clean()</em> to delete special characters and puntuaction signs. You have to define a lambda function uses every word in the text to build entries</li>

```
<file-1,line> ---> <(file-1,word-1),1>, <(file-1,word-2),1>,......., <(file-2,word-8),1>
```
<li><em>filter()</em>. You can use a lambda function to delete stop words in RDD</li>

In [11]:
path="../Data"
#path="../Data_tmp"
start1 = time.time()
words_RDD = sc.wholeTextFiles(path, minPartitions=5) \
                          .flatMap(lambda txt: organize_txt(txt))\
                          .flatMap(lambda txt:[((txt[0],clean_txt(i)),1) for i in txt[1].split()])\
                          .filter(lambda txt:txt[0][1]!='' and txt[0][1] not in bc_stopwords.value)
step1_time = time.time() - start1
words_RDD.take(3)

[(('pg238', 'project'), 1),
 (('pg238', 'gutenberg'), 1),
 (('pg238', 'ebook'), 1)]

In [12]:
words_RDD.getNumPartitions()

5

## 4. Computing TF

### Step 1: Compute word frequency for document, i.e. $f_{i,j}$ in expression (1)
### guidelines
<ul align="justify">
    <li>Use the transformation <em>reduceByKey</em> to obtain word frequency.</li>
    
```
<(doc-id,word),1> , <(doc-id,word),1> -> <(doc-id,word),frequency>
```
    
<li>Next, use transfomation <em> map</em> to build a new key-value pair.</li>
    
```
<<(doc-id,word),frequency> -> <(doc-id, (word-1,frequency)>

```

In [13]:
start2p1 = time.time()
word_freq=words_RDD.reduceByKey(lambda x,y:x+y)\
                   .map( lambda x : (x[0][0], (x[0][1],  int(x[1])) ))
step2p1 = time.time() - start2p1

In [14]:
#test your computation
word_freq.take(10)

[('pg238', ('enemy', 31)),
 ('pg238', ('wwwgutenbergorg', 5)),
 ('pg238', ('country', 20)),
 ('pg238', ('using', 6)),
 ('pg238', ('author', 1)),
 ('pg238', ('date', 3)),
 ('pg238', ('2008', 1)),
 ('pg238', ('david', 2)),
 ('pg238', ('widger', 1)),
 ('pg238', ('stone', 2))]

In [15]:
# define persistence for new RDD word_freq
words_RDD.unpersist
# ADD YOUR CODE HERE#
word_freq.cache()

PythonRDD[8] at RDD at PythonRDD.scala:53

### Step 2: Find the max frequency among its words for each document. $MAX_k F_{i,j}$
### Guidelines
<li>Use transformation <em>map()</em> to iterate over RDD <em>word_freq</em> an obtain</li>

```
<(file-1,(word-1,freq-1)><(file-1,(word-2,freq-2)> --> <(file-1,freq-1)>, ...<(file-1,freq-2)>

```
<li>Use <em>groupByKey</em> and function <em>mapValues(max)</em> to obtain</li>

```
<(file-1, freq-1)>, <(file-1, freq-2)> --> <file-1, Max_freq>

```

In [16]:
start2p2 = time.time()
Max_freq = word_freq.map( lambda x : (x[0], int(x[1][1])) )\
                    .groupByKey().mapValues(max)
step2p2 = time.time() - start2p2
Max_freq.take(10)

[('pg238', 246),
 ('pg633', 83),
 ('pg464', 429),
 ('pg765', 398),
 ('pg232', 83),
 ('pg224', 793),
 ('pg529', 245),
 ('pg261', 84),
 ('pg281', 83),
 ('pg399', 524)]

In [17]:
Max_freq.cache()

PythonRDD[14] at RDD at PythonRDD.scala:53

### Step 3: Compute TF

### Guidelines
<li>Join both RDD <em>word_freq</em> and <em>Max_freq </em>  by  attribute <em>doc-id</em> and  compute TF </li>
<li>Recall  $ TF_{ij} = \frac{f_{ij}}{MAX_k F_{ij}} $ = frequency / max_frequency  </li>
<li>Exchange items to build a new key-value as the following </li>
 

```
<doc-id,(word,frequency>  JOIN  <doc-id,max_frequency)>  ----> <word, (doc-i, TF) >
```


In [18]:
start2p3 = time.time()
TF= word_freq.join(Max_freq).map(lambda x : ( x[1][0][0],(x[0], x[1][0][1] / x[1][1] ) ))
step2p3 = time.time() - start2p3
step2_time = step2p1 + step2p2 + step2p3
TF.take(10)

[('south', ('pg464', 0.09557109557109557)),
 ('seas', ('pg464', 0.06526806526806526)),
 ('states', ('pg464', 0.05827505827505827)),
 ('cost', ('pg464', 0.016317016317016316)),
 ('included', ('pg464', 0.011655011655011656)),
 ('located', ('pg464', 0.016317016317016316)),
 ('laws', ('pg464', 0.03263403263403263)),
 ('title', ('pg464', 0.009324009324009324)),
 ('date', ('pg464', 0.02097902097902098)),
 ('464', ('pg464', 0.002331002331002331))]

## 5. Computing IDF

<p align="justify"> <font face="Verdana" size='2'>
Althought SPARK has its built-in library to compute TD-IDF,  We will build our own library.<br>

### Step 1: For each word, compute $n_i$,  how many documents contain this one?. 
### Guidelines
<ul align="justify">
<li>Use transformation <em>distinct()</em> to find documents where each word appears</li>

```
<(doc-id,word),1>  -> <(doc-id,word-id),1>
```
<li>Use transformation <em>map()</em> to build a transformed RDD </li>

```
<(doc-id,word),1> <(doc-id,word-id),1> --> <(word),1>  

```

    
<li>Use transformation <em>reduceByKey()</em> to compute in how many documents does each word appear? </li>

```
<word,1>  -> <word,n_i>
```


In [19]:
start3p1 = time.time()
occ = words_RDD.distinct()\
               .map( lambda x : ((x[0][1], 1))) \
               .reduceByKey(lambda x,y : x+y )
step3p1 = time.time() - start3p1
occ.take(15)

[('clubs', 216),
 ('away', 785),
 ('nearly', 785),
 ('prevention', 62),
 ('conscientious', 159),
 ('pulpit', 174),
 ('personable', 16),
 ('ugly', 462),
 ('lime', 127),
 ('brazen-throated', 4),
 ('cigar', 247),
 ('hath', 281),
 ('therefore', 603),
 ('companion', 514),
 ('second', 785)]

### Step 2: Compute IDF for each word. 
### Guidelines
<li> Find number of documents

In [20]:
start3p2 = time.time()
nro_docs=len(sc.wholeTextFiles("../Data/", minPartitions=3)\
               .map(lambda txt: txt[0].split('/')[-1].split('.')[0]).collect())
step3p2 = time.time() - start3p2
nro_docs

785

<li> Compute $IDF_i = \log_2 \frac {N}{n_i}\tag{2}$  </li>

In [21]:
import math
from pyspark.sql.functions import *
start3p3 = time.time()
IDF=occ.map(lambda w: (w[0],math.log10(nro_docs/w[1])))
step3p3 = time.time() - start3p3
step3_time = step3p1 + step3p2 + step3p3
IDF.take(10)

[('clubs', 0.5604159055943216),
 ('away', 0.0),
 ('nearly', 0.0),
 ('prevention', 1.1024779672469986),
 ('conscientious', 0.693472532424801),
 ('pulpit', 0.6543204084626528),
 ('personable', 1.6907496740893277),
 ('ugly', 0.23022768118912704),
 ('lime', 0.7910659357892956),
 ('brazen-throated', 2.2928096654172903)]

## 6. Computing TF-IDF

### Guidelines

<li> We have two RDD.</li>   

```
 TF: <word, (doc-id,TF)>  and  IDF: <word, IDF>
```
 <li>Use the join transformation to build the transformed RDD  </li>

```
 TF_IDF: <word, (doc-id,TF * IDF)>
```

    

In [22]:
start4 = time.time()
tf_idf=TF.join(IDF).map(lambda x: (x[1][0][0],(x[0],x[1][0][1],x[1][1],x[1][0][1]*x[1][1])))
step4_time = time.time() - start4
tf_idf.take(5)

[('pg464', ('cost', 0.016317016317016316, 0.0, 0.0)),
 ('pg232', ('cost', 0.04819277108433735, 0.0, 0.0)),
 ('pg224', ('cost', 0.007566204287515763, 0.0, 0.0)),
 ('pg529', ('cost', 0.0163265306122449, 0.0, 0.0)),
 ('pg399', ('cost', 0.01717557251908397, 0.0, 0.0))]

In [23]:
# to display all values
df_tf_idf=tf_idf.map(lambda x: (x[0],x[1][0],x[1][1],x[1][2],x[1][3])).toDF(["DocumentId","Word","TF","IDF","TF_IDF"])
df_tf_idf.orderBy('TF_IDF',ascending=False).show()

+----------+------------+------------------+------------------+------------------+
|DocumentId|        Word|                TF|               IDF|            TF_IDF|
+----------+------------+------------------+------------------+------------------+
|     pg432|    strether|               1.0|2.5938396610812715|2.5938396610812715|
|     pg208|winterbourne|               1.0|2.5938396610812715|2.5938396610812715|
|     pg347|     grettir|               1.0|2.5938396610812715|2.5938396610812715|
|     pg338|      iktomi|               1.0|2.5938396610812715|2.5938396610812715|
|     pg500|   pinocchio|               1.0|2.5938396610812715|2.5938396610812715|
|     pg212|       00000|               1.0|2.5938396610812715|2.5938396610812715|
|     pg268|    annixter|               1.0|2.5938396610812715|2.5938396610812715|
|     pg401|       condy|               1.0|2.5938396610812715|2.5938396610812715|
|     pg823|      declan|               1.0|2.5938396610812715|2.5938396610812715|
|   

In [24]:
overall_time = step1_time + step2_time + step3_time + step4_time

In [25]:
print(f"Timing Metrics:")
print(f"1. Text Cleaning and Loading: {step1_time:.2f} seconds")
print(f"2. TF Calculation: {step2_time:.2f} seconds")
print(f"3. IDF Calculation: {step3_time:.2f} seconds")
print(f"4. TF-IDF and Keyword Extraction: {step4_time:.2f} seconds")
print(f"Overall Time: {overall_time:.2f} seconds")
print(f"\n")
print(f"\n")
print(f"\n")

Timing Metrics:
1. Text Cleaning and Loading: 0.79 seconds
2. TF Calculation: 0.17 seconds
3. IDF Calculation: 21.11 seconds
4. TF-IDF and Keyword Extraction: 0.04 seconds
Overall Time: 22.12 seconds






