<a href="https://colab.research.google.com/github/divassya/BigDataAnalysis/blob/main/AssiyaKaratay_Assignment_4_Wiki_Categories_SparkDF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Info 
Assignment 4
MET CS777 Big Data Analytics

Faculty - Farshid Alizadeh-Shabdiz, PhD, MBA

Student - Assiya Karatay U95161396 karatay@bu.edu 857-294-7028

#### import libraries

In [1]:
!pip install --ignore-installed -q pyspark==3.1.2 

In [2]:
import os
import sys
import requests
import numpy as np
import pandas as pd
from operator import add
import re

from numpy import dot
from numpy.linalg import norm

from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, col,monotonically_increasing_id
from pyspark.sql.types import *

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()

#### set up the Google Drive

In [3]:
#### set up the Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
# choose where project files will be saved
project_folder = "/content/drive/MyDrive/CS777_BigDataAnalytics/Assignment4/"
# project_folder = sys.argv[2]
# change the OS to use the project folder as the working directory
os.chdir(project_folder)

print('\n Working directory was changed to ' + project_folder )


 Working directory was changed to /content/drive/MyDrive/CS777_BigDataAnalytics/Assignment4/


In [5]:
# 1. get the file
wikiPagesFile= project_folder + "WikipediaPagesOneDocPerLine1000LinesSmall.txt.bz2"

In [6]:
wikiPages = spark.read.format('csv')\
.options(header='false', inferSchema='true', sep='|')\
.load(wikiPagesFile)

In [7]:
# Each entry in validLines will be a line from the text file
validLines = wikiPages.filter(wikiPages['_c0'].contains('id' and 'url='))

### Task 1 Generate a 20K dictionary (10 points)


#### 1.1 The top 20,000 English words
Using Wikipedia pages, find the top 20,000 English words, save them in an array, and sort them based on the frequency of the occurrence.

In [8]:
def getID(lines):
  # divide the data defore and after url
  division = lines.split('" url')
  # strip all characters except DOC ID
  id = division[0].split('<doc id="')[1] 
  return id


def getText(lines):
  # divide the data defore and after url
  division = lines.split('" url')
  # strip the end of title and last 6 characters containing '.</doc' .
  text = division[1].split('">')[1][:-6]
  # check the UNICODE decoding for readability 
  # and lowercase to count words with the same letters together
  regex = re.compile(r'[^a-zA-Z]', re.UNICODE).split(text.lower())
  return regex

In [9]:
# Converting function to UDF
getIDUDF = udf(getID, StringType())
getTextUDF = udf(getText, ArrayType(StringType()))
# derive two columns of out the one text column
docID = validLines.withColumn('docID', getIDUDF('_c0'))
docIDAndListOfWords = docID.withColumn('text', getTextUDF('_c0')).select('docID', 'text')

In [10]:
# drops docID 
# explodes text to (word,1) pairs to count the number of occurrence
# get top 20K most common words in the corpus

topWords = docIDAndListOfWords.withColumn('word', f.explode(f.col('text')))\
    .groupBy('word')\
    .count()\
    .filter('word != ""')\
    .sort('count', ascending=False)\
    .limit(20000)

In [11]:
# add a new column of dict pos
dictionary = topWords.withColumn('dictNum', monotonically_increasing_id()).drop('count')
# dictionary.write.format("csv")\
# .mode("overwrite")\
# .option("header",True)\
# .save(project_folder + "df_results/task11_dict/")
# print(dictionary.show(2))

#### 1.2 docID as key and a Numpy array for the position of each word
As a result, a dictionary has been generated that contains the top 20K most frequent words in the corpus. Next go over each Wikipedia document and check if the words appear in the Top 20K words. At the end, produce an RDD that includes
the docID as key and a Numpy array for the position of each word in the top 20K dictionary.
(docID, [dictionaryPos1, dictionaryPos2, dictionaryPos3...])

In [12]:
# Get a DF that has, for each (docID, ["word1", "word2", "word3", ...]),
# ("word1", docID), ("word2", docId), ...
allWordsWithDocID = docIDAndListOfWords.withColumn('word', f.explode('text'))\
.drop('text').filter('word != ""')
# join and link them, to get a set of ("word1", (dictionaryPos, docID)) pairs
allDictionaryWords = dictionary.join(allWordsWithDocID, 'word')
# Drop the actual word itself to get a set of (docID, dictionaryPos) pairs
docIDAndPos = allDictionaryWords.drop('word')
# Get a set of (docID, [dictionaryPos1, dictionaryPos2, dictionaryPos3...]) pairs
# Group by key and use collect_set to combine dictNum for each key
allDictionaryWordsInEachDoc = docIDAndPos.groupBy('docID')\
.agg(f.collect_set('dictNum').alias('dictWordsList'))

In [13]:
# allDictionaryWordsInEachDoc.write\
# .mode("overwrite")\
# .option("header",True)\
# .parquet(project_folder + "df_results/task12_wordOccurrences/")

### Task 2 - Create the TF-IDF Array (20 Points)
#### TF
After having the top 20K words we want to create a large array that its
columns are the words of the dictionary with number of occurrences of each word and the rows are documents.
The first step is calculating the “Term Frequency”, TF (x, w), vector for each document as follows:
“Term Frequency” is an indication of the number of times a term occurs in a document.
Numerator is number of occurrences of a word, and the denominator is the sum of all the words of the document.

In [14]:
def tf(listOfIndices):
    # create an array of zeros
    returnVal = np.zeros(20000)
    # count the occurrence of words e.g. there are 514 'my' in docID1, where 
    # 'my' is in the position 0 in the corpus
    for index in listOfIndices:
        returnVal[index] = returnVal[index] + 1
    numberOfWords = np.sum(listOfIndices)
    returnVal = np.divide(returnVal, numberOfWords)
    return returnVal.tolist()

In [15]:
tfUDF = udf(tf, ArrayType(FloatType(), containsNull=False))

In [16]:
# The following line this gets us a set of
# (docID,  [dictionaryPos1, dictionaryPos2, dictionaryPos3...]) pairs
# and converts the dictionary positions to a bag-of-words numpy array...
# use the buildArray function to build the feature array
# regexp_replace is used to remove square brackets from buildArray output
tfArrays = allDictionaryWordsInEachDoc\
.withColumn('tf',tfUDF('dictWordsList'))\
.drop('dictWordsList')

tfArrays.show(2)

+------+--------------------+
| docID|                  tf|
+------+--------------------+
|434061|[3.0526055E-6, 3....|
|455037|[0.0, 0.0, 0.0, 0...|
+------+--------------------+
only showing top 2 rows



#### IDF
Next, calculate “Inverse Document Frequency” for all the documents and finally
calculate TF-IDF(w) and create TF-IDF matrix of the corpus:
Note that the “size of corpus” is total number of documents (numerator).
To learn more about TF-IDF see the Wikipedia page:
https://en.wikipedia.org/wiki/Tf-idf

In [17]:
# the i^th entry tells us how many
# individual documents the i^th word in the dictionary appeared in
dfArray = np.array(docIDAndPos.distinct()\
                   .groupBy('dictNum')\
                   .agg(f.count('docID'))\
                   .drop('dictNum')
                   .collect()).flatten()
print(dfArray)  

[  5 105 526 ...   1   1   1]


In [18]:
# Get the version of dfArray where the i^th entry is the inverse-document frequency for the
# i^th word in the corpus
numberOfDocs = wikiPages.count()
idfArray = np.log(np.divide(np.full(20000, numberOfDocs), dfArray))

In [19]:
multiply_idfArray = udf(lambda x: np.multiply(x, idfArray).tolist(), ArrayType(FloatType(), containsNull=False))


In [20]:
# Finally, convert all of the tf vectors in allDocsAsNumpyArrays to tf * idf vectors and remove []
# Then split by ,
allDocsAsNumpyArraysTFidf = tfArrays.withColumn('TFidf', multiply_idfArray('tf'))\
.drop('tf')

print(allDocsAsNumpyArraysTFidf.show(2))

+------+--------------------+
| docID|               TFidf|
+------+--------------------+
|434061|[1.6173673E-5, 6....|
|455037|[0.0, 0.0, 0.0, 0...|
+------+--------------------+
only showing top 2 rows

None


In [21]:
allDocsAsNumpyArraysTFidf.write\
.mode("overwrite")\
.option("header",True)\
.parquet(project_folder + "/df_results/task2_tfidf/")

### Task 3 - Implement the getPrediction function (30 Points)
Finally, implement the function getPrediction(textInput, k), which will predict the
membership of the textInput to the top 20 closest documents, and the list of top
categories.
You should use the cosine similarity to calculate the distances.

In [22]:
wikiCategoryFile = project_folder + "wiki-categorylinks-small.csv.bz2"

wikiCats=spark.read.format('csv')\
.options(header = 'false', inferSchema = 'true', sep = ',')\
.load(wikiCategoryFile)
wikiCats.show(2)


+------+--------------------+
|   _c0|                 _c1|
+------+--------------------+
|434042|   1987_debut_albums|
|434042|Albums_produced_b...|
+------+--------------------+
only showing top 2 rows



In [23]:
# Now, we join it with categories, and map it after join so that we have only the wikipageID 
# This joun can take time on your laptop. 
# You can do the join once and generate a new wikiCats data and store it. Our WikiCategories includes all categories
# of wikipedia. 

wikiAndCatsJoined = wikiCats.withColumnRenamed('_c0', 'docID')\
            .withColumnRenamed('_c1', 'category')\
            .join(allDocsAsNumpyArraysTFidf, on='docID')
featuresDF = wikiAndCatsJoined.select('category', 'TFidf')

# Cache this important data because we need to run kNN on this data set. 
featuresDF.cache()
featuresDF.show(10)

+--------------------+--------------------+
|            category|               TFidf|
+--------------------+--------------------+
|Use_dmy_dates_fro...|[1.6173673E-5, 6....|
|Politics_of_East_...|[1.6173673E-5, 6....|
|Lists_of_politica...|[1.6173673E-5, 6....|
|Leaders_of_East_G...|[1.6173673E-5, 6....|
|East_Germany_poli...|[1.6173673E-5, 6....|
|Articles_lacking_...|[1.6173673E-5, 6....|
|Articles_containi...|[1.6173673E-5, 6....|
|All_articles_lack...|[1.6173673E-5, 6....|
|Human_name_disamb...|[0.0, 0.0, 0.0, 0...|
|Disambiguation_pa...|[0.0, 0.0, 0.0, 0...|
+--------------------+--------------------+
only showing top 10 rows



In [24]:
def cosineSim (x,y):
	normA = np.linalg.norm(x)
	normB = np.linalg.norm(y)
	return (round(np.dot(x,y)/(normA*normB),3)).tolist()

cosinSim_udf = udf(cosineSim, FloatType())


In [25]:
# Assumption: Each document is stored in one line of the text file
# We need this count later ... 
numberOfDocs = wikiPages.count()

In [26]:
def getPrediction(textInput,k):
    # create a df 
    df = spark.createDataFrame([textInput], StringType())
    print(df.show(2))
    #Flat map the text to (word, 1) pair for each word in the doc
    textWords =df.withColumn ('word', f.explode(f.split(f.lower(
                                                  f.regexp_replace('value', '[^a-zA-Z]', ' ')), ' '))).\
                                                  withColumn('count', f.lit(1))\
                                                  .filter('word != ""')\
                                                  .drop('value')
    print(textWords.show(2))      
    # This will give us a set of (word, (dictionaryPos, 1)) pairs
    allDictionaryWordsInThatDoc = dictionary.join (textWords, on='word').\
                                  select('dictNum', 'count').groupBy('count').\
                                      agg(f.collect_set('dictNum'))
    print(allDictionaryWordsInThatDoc.show(2))
    #Get tf array for the input string
    tfArray = allDictionaryWordsInThatDoc.orderBy('count', ascending = False).limit(1)\
                          .withColumn('tfArray', tfUDF('collect_set(dictNum)')).\
                          select('tfArray') 
    # Multiply by idfArray
    myArray = tfArray.withColumn('tfxIdf', multiply_idfArray('tfArray')).select('tfxIdf')

    # Get the tf * idf array for the input string
    # Get the distance from the input text string to all database documents, 
    # using cosine similarity (np.dot() )
    distances = featuresDF.join(myArray)
    distances = distances.withColumn('dist', cosinSim_udf('TFidf', 'tfxIdf'))\
                            .select('category', 'dist')
        # print(distances.show(2))
    
    # get the top k distances
    topK = distances.orderBy('dist', ascending = False).limit(k)
    # print(topK.show(5))

    # now, for each docID, get the count of the number of times this document ID appeared in the top k
    numTimes = topK.groupBy('category')\
                  .agg(f.count('category').alias('count'))\
                  .drop('dist')
    # print(numTimes.show(5))

    return numTimes.orderBy('count', ascending = False).limit(k).collect()                                                 

In [None]:
print(getPrediction('How many goals Vancouver score last year?', 10))


+--------------------+
|               value|
+--------------------+
|How many goals Va...|
+--------------------+

None
+----+-----+
|word|count|
+----+-----+
| how|    1|
|many|    1|
+----+-----+
only showing top 2 rows

None
+-----+--------------------+
|count|collect_set(dictNum)|
+-----+--------------------+
|    1|[66, 63, 2626, 64...|
+-----+--------------------+

None
