In [37]:
import sys
import re
import numpy as np
from pyspark import SparkContext
from numpy import dot
from numpy.linalg import norm
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from operator import add
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType

In [27]:
#!/usr/bin/env python
# coding: utf-8

# In[1]:



# In[2]:
sc = SparkContext.getOrCreate()

# Set the file paths on your local machine
# Change this line later on your python script when you want to run this on the CLOUD (GC or AWS)

wikiPagesFile = "C:/Users/Yidow/Desktop/cs777-new/FengYiduo_Assignment_4/WikipediaPagesOneDocPerLine1000LinesSmall.txt"
wikiCategoryFile = "C:/Users/Yidow/Desktop/cs777-new/FengYiduo_Assignment_4/wiki-categorylinks-small.csv.bz2"


# In[3]:


# Read two files into RDDs

wikiCategoryLinks=sc.textFile(wikiCategoryFile)

wikiCats=wikiCategoryLinks.map(lambda x: x.split(",")).map(lambda x: (x[0].replace('"', ''), x[1].replace('"', '') ))

# Now the wikipages
wikiPages = sc.textFile(wikiPagesFile)

wikiCategoryLinks.take(2)


# In[4]:


wikiCats.take(1)


# In[5]:

spark = SparkSession.builder.getOrCreate()
df = spark.read.csv(wikiPagesFile)

# Uncomment this line if you want to take look inside the file.
# df.take(1)


# In[6]:


# wikiPages.take(1)


# In[7]:


# Assumption: Each document is stored in one line of the text file
# We need this count later ...
numberOfDocs = wikiPages.count()

print(numberOfDocs)
# Each entry in validLines will be a line from the text file
validLines = wikiPages.filter(lambda x : 'id' in x and 'url=' in x)

# Now, we transform it into a set of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:][:-6]))

# keyAndText.take(1)


# In[8]:


def buildArray(listOfIndices):

    returnVal = np.zeros(20000)

    for index in listOfIndices:
        returnVal[index] = returnVal[index] + 1

    mysum = np.sum(returnVal)

    returnVal = np.divide(returnVal, mysum)

    return returnVal


def build_zero_one_array (listOfIndices):

    returnVal = np.zeros (20000)

    for index in listOfIndices:
        if returnVal[index] == 0: returnVal[index] = 1

    return returnVal


def stringVector(x):
    returnVal = str(x[0])
    for j in x[1]:
        returnVal += ',' + str(j)
    return returnVal



def cousinSim (x,y):
	normA = np.linalg.norm(x)
	normB = np.linalg.norm(y)
	return np.dot(x,y)/(normA*normB)


# In[9]:


# Now, we transform it into a set of (docID, text) pairs
keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:][:-6]))

# Now, we split the text in each (docID, text) pair into a list of words
# After this step, we have a data set with
# (docID, ["word1", "word2", "word3", ...])
# We use a regular expression here to make
# sure that the program does not break down on some of the documents

regex = re.compile('[^a-zA-Z]')

# remove all non letter characters
keyAndListOfWords = keyAndText.map(lambda x : (str(x[0]), regex.sub(' ', x[1]).lower().split()))
# better solution here is to use NLTK tokenizer

# Now get the top 20,000 words... first change (docID, ["word1", "word2", "word3", ...])
# to ("word1", 1) ("word2", 1)...
allWords = keyAndListOfWords.flatMap(lambda x: x[1])
allWords = allWords.map(lambda x:(x,1))

# Now, count all of the words, giving us ("word1", 1433), ("word2", 3423423), etc.
allCounts = allWords.reduceByKey(add)

# Get the top 20,000 words in a local array in a sorted format based on frequency
# If you want to run it on your laptio, it may a longer time for top 20k words.
topWords = allCounts.top(20000, key=lambda x: x[1])

#
print("Top Words in Corpus:", allCounts.top(10, key=lambda x: x[1]))

# We'll create a RDD that has a set of (word, dictNum) pairs
# start by creating an RDD that has the number 0 through 20000
# 20000 is the number of words that will be in our dictionary
topWordsK = sc.parallelize(range(20000))

# Now, we transform (0), (1), (2), ... to ("MostCommonWord", 1)
# ("NextMostCommon", 2), ...
# the number will be the spot in the dictionary used to tell us
# where the word is located
dictionary = topWordsK.map (lambda x : (topWords[x][0], x))


print("Word Postions in our Feature Matrix. Last 20 words in 20k positions: ", dictionary.top(20, lambda x : x[1]))


# In[10]:


################### TASK 2  ##################

# Next, we get a RDD that has, for each (docID, ["word1", "word2", "word3", ...]),
# ("word1", docID), ("word2", docId), ...

allWordsWithDocID = keyAndListOfWords.flatMap(lambda x: ((j, x[0]) for j in x[1]))
#print(allWordsWithDocID.take(2))
#[('black', '431949'), ('people', '431949')]

# Now join and link them, to get a set of ("word1", (dictionaryPos, docID)) pairs
allDictionaryWords = dictionary.join(allWordsWithDocID)
#allDictionaryWords = allWordsWithDocID.join(dictionary)
#print(allDictionaryWords.take(2))
#934380

# Now, we drop the actual word itself to get a set of (docID, dictionaryPos) pairs
justDocAndPos = allDictionaryWords.map(lambda x: (x[1][1], x[1][0]))
#print(justDocAndPos.take(2))
#[('431949', 1), ('431949', 1)]

# Now get a set of (docID, [dictionaryPos1, dictionaryPos2, dictionaryPos3...]) pairs
allDictionaryWordsInEachDoc = justDocAndPos.groupByKey()
#print(allDictionaryWordsInEachDoc.take(2))

# The following line this gets us a set of
# (docID,  [dictionaryPos1, dictionaryPos2, dictionaryPos3...]) pairs
# and converts the dictionary positions to a bag-of-words numpy array...
#....................................
allDocsAsNumpyArrays = allDictionaryWordsInEachDoc.map(lambda x: (x[0], buildArray(x[1])))

print(allDocsAsNumpyArrays.take(3))
#[('431971', array([0.08553655, 0.02488336, 0.05132193, ..., 0.        , 0.

# In[11]:


# Now, create a version of allDocsAsNumpyArrays where, in the array,
# every entry is either zero or one.
# A zero means that the word does not occur,
# and a one means that it does.

zeroOrOne = allDictionaryWordsInEachDoc.mapValues(build_zero_one_array)
#print(zeroOrOne.take(2))
# Now, add up all of those arrays into a single array, where the
# i^th entry tells us how many
# individual documents the i^th word in the dictionary appeared in
dfArray = zeroOrOne.reduce(lambda x1, x2: ("", np.add(x1[1], x2[1])))[1]
#
# Create an array of 20,000 entries, each entry with the value numberOfDocs (number of docs)
multiplier = np.full(20000, numberOfDocs)

# Get the version of dfArray where the i^th entry is the inverse-document frequency for the
# i^th word in the corpus
idfArray = np.log(np.divide(np.full(20000, numberOfDocs), dfArray))

# Finally, convert all of the tf vectors in allDocsAsNumpyArrays to tf * idf vectors
allDocsAsNumpyArraysTFidf = allDocsAsNumpyArrays.map(lambda x: (x[0], np.multiply(x[1], idfArray)))

print(allDocsAsNumpyArraysTFidf.take(2))

# use the buildArray function to build the feature array
# allDocsAsNumpyArrays = allDictionaryWordsInEachDoc.map(lambda x: (x[0], buildArray(x[1])))


# print(allDocsAsNumpyArraysTFidf.take(2))


# In[12]:


print(wikiCats.take(1))

1000
Top Words in Corpus: [('the', 74530), ('of', 34512), ('and', 28479), ('in', 27758), ('to', 22583), ('a', 21212), ('was', 12160), ('as', 8811), ('for', 8773), ('on', 8435)]
Word Postions in our Feature Matrix. Last 20 words in 20k positions:  [('quebecor', 19999), ('poten', 19998), ('kasada', 19997), ('yadnya', 19996), ('drift', 19995), ('iata', 19994), ('satire', 19993), ('expreso', 19992), ('olimpico', 19991), ('auxiliaries', 19990), ('tenses', 19989), ('petherick', 19988), ('stowe', 19987), ('infimum', 19986), ('parramatta', 19985), ('rimpac', 19984), ('hyderabad', 19983), ('cubes', 19982), ('meats', 19981), ('chaat', 19980)]
[('431971', array([0.08553655, 0.02488336, 0.05132193, ..., 0.        , 0.        ,
       0.        ])), ('431999', array([0.06479482, 0.03239741, 0.03887689, ..., 0.        , 0.        ,
       0.        ])), ('432000', array([0.07192043, 0.04475899, 0.02371844, ..., 0.        , 0.        ,
       0.        ]))]
[('431971', array([0.0072252 , 0.00207481, 

In [32]:
# In[19]:


# Now, we join it with categories, and map it after join so that we have only the wikipageID
# This joun can take time on your laptop.
# You can do the join once and generate a new wikiCats data and store it. Our WikiCategories includes all categories
# of wikipedia.

featuresRDD = wikiCats.join(allDocsAsNumpyArraysTFidf).map(lambda x: (x[1][0], x[1][1]))


# Cache this important data because we need to run kNN on this data set.
print(featuresRDD.cache())
print(featuresRDD.take(10))


# In[14]:


# Let us count and see how large is this data set.
featuresRDD.count()

PythonRDD[510] at RDD at PythonRDD.scala:53
[('Asteroid_spectral_classes', array([0.00674469, 0.00348744, 0.00373721, ..., 0.        , 0.        ,
       0.        ])), ('S-type_asteroids', array([0.00674469, 0.00348744, 0.00373721, ..., 0.        , 0.        ,
       0.        ])), ('All_stub_articles', array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
       0.        ])), ('Military_communications_of_the_United_States', array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
       0.        ])), ('United_States_Department_of_Defense_agencies', array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
       0.        ])), ('United_States_military_stubs', array([0.00787168, 0.00328745, 0.00754904, ..., 0.        , 0.        ,
       0.        ])), ('2004_European_Parliament_election', array([0.00711319, 0.0026331 , 0.00394139, ..., 0.        , 0.        ,
       0.        ])), ('All_articles_that_may_contain_original_research', arr

13780

In [41]:
# In[20]:


# Finally, we have a function that returns the prediction for the label of a string, using a kNN algorithm
def getPrediction (textInput, k):
    # Create an RDD out of the textIput
    myDoc = sc.parallelize (('', textInput))

    # Flat map the text to (word, 1) pair for each word in the doc
    wordsInThatDoc = myDoc.flatMap (lambda x : ((j, 1) for j in regex.sub(' ', x).lower().split()))

    # This will give us a set of (word, (dictionaryPos, 1)) pairs
    allDictionaryWordsInThatDoc = dictionary.join (wordsInThatDoc).map (lambda x: (x[1][1], x[1][0])).groupByKey ()
    print(allDictionaryWordsInThatDoc.take(2))
    # Get tf array for the input string
    myArray = buildArray (allDictionaryWordsInThatDoc.top (1)[0][1])
    #print(myArray)
    # Get the tf * idf array for the input string
    myArray = np.multiply (myArray, idfArray)

    # Get the distance from the input text string to all database documents, using cosine similarity (np.dot() )
    distances = featuresRDD.map (lambda x : (x[0], np.dot (x[1], myArray)))
    # distances = allDocsAsNumpyArraysTFidf.map (lambda x : (x[0], cousinSim (x[1],myArray)))
    # get the top k distances
    topK = distances.top (k, lambda x : x[1])

    # and transform the top k distances into a set of (docID, 1) pairs
    docIDRepresented = sc.parallelize(topK).map (lambda x : (x[0], 1))

    # now, for each docID, get the count of the number of times this document ID appeared in the top k
    numTimes = docIDRepresented.reduceByKey(add)

    # Return the top 1 of them.
    # Ask yourself: Why we are using twice top() operation here?
    return numTimes.top(k, lambda x: x[1])


# In[21]:


print(getPrediction('Sport Basketball Volleyball Soccer', 10))


# In[22]:


# print(getPrediction('What is the capital city of Australia?', 10))
#
#
# # In[23]:
#
#
# print(getPrediction('How many goals Vancouver score last year?', 10))


# In[ ]:


# Congradulations, you have implemented a prediction system based on Wikipedia data.
# You can use this system to generate automated Tags or Categories for any kind of text
# that you put in your query.
# This data model can predict categories for any input text.

[(1, <pyspark.resultiterable.ResultIterable object at 0x0000023644340A90>)]
[0. 0. 0. ... 0. 0. 0.]
[('Human_name_disambiguation_pages', 1), ('1931_births', 1), ('All_article_disambiguation_pages', 1), ('All_disambiguation_pages', 1), ('Lists_of_sportspeople_by_sport', 1), ('2015_deaths', 1), ('Disambiguation_pages_with_short_description', 1), ('Bullfighters', 1), ('All_articles_with_dead_external_links', 1), ("Air_Force_Falcons_men's_basketball_coaches", 1)]


In [81]:
spark = SparkSession.builder.getOrCreate()
dictionary_df = dictionary.toDF()
#allDocsAsNumpyArraysTFidf.map(lambda x:(x[0],x[1].tolist()))
#array_lst = allDocsAsNumpyArraysTFidf.map(lambda x: x[1]).collect()
allDocsAsNumpyArraysTFidf_col = StructType([
    StructField('name', StringType(), True),
    StructField('array', ArrayType(IntegerType()), True)
])
# featuresRDD_df = spark.createDataFrame(featuresRDD, schema = featuresRDD_col)
allDocsAsNumpyArraysTFidf_df = spark.createDataFrame(allDocsAsNumpyArraysTFidf, schema = allDocsAsNumpyArraysTFidf_col)

def getPrediction_df (textInput, k):
    # Create an dataframe out of the textIput
    data = textInput.split()
    myDoc = spark.createDataFrame(data, StringType())
    # Flat map the text to (word, 1) pair for each word in the doc
    wordsInThatDoc = myDoc.withColumn("count", lit(1))
    # This will give us a set of (word, (dictionaryPos, 1)) pairs
    allDictionaryWordsInThatDoc = dictionary_df.join (wordsInThatDoc, dictionary_df._1 == wordsInThatDoc.value,"inner")
    allDictionaryWordsInThatDoc = allDictionaryWordsInThatDoc.drop("value")
    allDictionaryWordsInThatDoc.show()
    # Get tf array for the input string
    #print(list(allDictionaryWordsInThatDoc.select("_2")))
    myArray = np.array(allDictionaryWordsInThatDoc.select("_2").collect())

    #Get the tf * idf array for the input string
    myArray = np.multiply (myArray, idfArray)
    array_lst = allDocsAsNumpyArraysTFidf.map(lambda x: x[1]).collect()
    #array_lst = np.array(array_lst)
    myArray = np.squeeze(np.asarray(myArray))
    array_lst = np.squeeze(np.asarray(array_lst))
    #print(featuresRDD_df.show(2))
    # Get the distance from the input text string to all database documents, using cosine similarity (np.dot() )
    #distances = featuresRDD_df.withColumn( "distance", cousinSim (featuresRDD_df.select("_2"), myArray) )
    # print(myArray)
    # print(array_lst)
    distances = allDocsAsNumpyArraysTFidf_df.withColumn( "distance", np.dot (array_lst, myArray) )
    # get the top k distances
    distances.orderby("array")
    topK = distances.head(k)

    # and transform the top k distances into a set of (docID, 1) pairs
    docIDRepresented = spark.createDataFrame(topK.docID, IntegerType())

    # now, for each docID, get the count of the number of times this document ID appeared in the top k
    numTimes = docIDRepresented.groupby(add)

    # Return the top 1 of them.
    # Ask yourself: Why we are using twice top() operation here?
    return numTimes.orderby(_2).head(k)




print(getPrediction_df('How many goals Vancouver score last year?', 10))

+-----+----+-----+
|   _1|  _2|count|
+-----+----+-----+
|goals|2676|    1|
| many|  63|    1|
|score|1418|    1|
| last| 246|    1|
+-----+----+-----+



ValueError: shapes (1000,20000) and (4,20000) not aligned: 20000 (dim 1) != 4 (dim 0)