# Setup
Preliminary steps for installing packages, importing libraries, starting the Spark session, and loading the file

## PySpark Session
Initialize the PySpark session

In [1]:
# Install required packages
!pip install pyspark
!pip install findspark
!pip install pandas



In [2]:
# Import libraries
import pyspark
import re
import findspark
import time
import pandas as pd
import pyspark.sql.functions as f
import glob
import os
from pyspark.sql.types import StructType,StructField, StringType
from pyspark.sql.functions import col
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import max
from collections import Counter
findspark.init()

In [3]:
# Function for formatting a time string
def hms_string(sec_elapsed):
    h = int(sec_elapsed / (60 * 60))
    m = int((sec_elapsed % (60 * 60)) / 60)
    s = sec_elapsed % 60
    return "{}:{:>02}:{:>05.2f}".format(h, m, s)

In [4]:
# Create a spark context class
#sc = SparkContext()

# Other attempts at configuration changes
#     .config("spark.sql.execution.arrow.pyspark.enabled","true") \
#     .config("spark.executor.heartbeatInterval","3600") \
#     .config("spark.sql.autoBroadcastJoinThreshold","-1") \
#     .config("spark.executor.memory",'8g') \

# Create a spark session
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Wikipedia Search Engine") \
    .config("spark.driver.maxResultSize",'0') \
    .config("spark.ui.showConsoleProgress","true") \
    .config("spark.driver.memory",'12g') \
    .getOrCreate()

In [5]:
spark

## Load Data

Load Wikipedia data (pre-processed) from local storage

In [24]:
# Read JSON file and drop any bad records (DROPMALFORMED)
# Select from various files, sorted from smallest to largest size

start_time = time.time()

# First 100 Wikipedia articles
#main_df = spark.read.option("mode", "DROPMALFORMED").json("C:\\Users\\Brian\\Downloads\\articlesSampleData.json")

# First 100000 Wikipedia articles
main_df = spark.read.option("mode", "DROPMALFORMED").json("C:\\Users\\Brian\\Downloads\\articles100000.json")

# First 500000 Wikipedia articles
#main_df = spark.read.option("mode", "DROPMALFORMED").json("C:\\Users\\Brian\\Downloads\\articles500000.json")

# First 1000000 Wikipedia articles
#main_df = spark.read.option("mode", "DROPMALFORMED").json("C:\\Users\\Brian\\Downloads\\articlesMillion.json")

# Half the Wikipedia JSON
#main_df = spark.read.option("mode", "DROPMALFORMED").json("C:\\Users\\Brian\\Downloads\\articlesHalf.json")

# Full Wikipedia JSON (Takes about a minute to run)
#main_df = spark.read.option("mode", "DROPMALFORMED").json("C:\\Users\\Brian\\Downloads\\articles.json")

# Further clean up bad records (found through testing that both cleanups are necessary)
main_df = main_df.na.drop()

elapsed_time = time.time() - start_time
print("Elapsed time: {}".format(hms_string(elapsed_time)))

Elapsed time: 0:00:02.27


In [7]:
# Print the dataframe as well as the data schema

print(main_df.columns)
main_df.printSchema()
main_df.show(100)

['id', 'text', 'title']
root
 |-- id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)

+---+--------------------+--------------------+
| id|                text|               title|
+---+--------------------+--------------------+
| 12|  short descripti...|           Anarchism|
| 39|  Short descripti...|              Albedo|
|290|  Short descripti...|                   A|
|303|  Short descripti...|             Alabama|
|305|  short descripti...|            Achilles|
|307|  Short descripti...|     Abraham Lincoln|
|308|  Short descripti...|           Aristotle|
|309|  short descripti...|An American in Paris|
|316|  Short descripti...|Academy Award for...|
|324|  short descripti...|      Academy Awards|
|330|  Use dmy dates d...|             Actrius|
|332|  Short descripti...|     Animalia (book)|
|334|  Short descripti...|International Ato...|
|336|  short descripti...|            Altruism|
|339|  Short descripti...|            Ayn Rand|
|

In [8]:
# Get number of partitions
main_df.rdd.getNumPartitions()

2

## Cleanup

Remove special characters and change all words to lowercase
Do this at the very beginning so time isn't wasted doing it more than once in other steps.

In [9]:
# Define function cleaning up text in each article

def textCleanup(text):
  # Only leave words and whitespace in string; convert all words to lowercase
  return(re.sub(r'[^\w\s]', '', text).lower())

In [25]:
# Update text row in place; remove special characters and make all characters lowercase

start_time = time.time()

main_df = main_df.withColumn("text", pyspark.sql.functions.udf(lambda x: textCleanup(x))("text"))
main_df.show()

elapsed_time = time.time() - start_time
print("Elapsed time: {}".format(hms_string(elapsed_time)))

+---+--------------------+--------------------+
| id|                text|               title|
+---+--------------------+--------------------+
| 12|  short descripti...|           Anarchism|
| 39|  short descripti...|              Albedo|
|290|  short descripti...|                   A|
|303|  short descripti...|             Alabama|
|305|  short descripti...|            Achilles|
|307|  short descripti...|     Abraham Lincoln|
|308|  short descripti...|           Aristotle|
|309|  short descripti...|An American in Paris|
|316|  short descripti...|Academy Award for...|
|324|  short descripti...|      Academy Awards|
|330|  use dmy dates d...|             Actrius|
|332|  short descripti...|     Animalia (book)|
|334|  short descripti...|International Ato...|
|336|  short descripti...|            Altruism|
|339|  short descripti...|            Ayn Rand|
|340|  short descripti...|        Alain Connes|
|344|  short descripti...|          Allan Dwan|
|358|  short descripti...|             A

# Indexing Engine
Process each document in the text corpus to prepare them for text queries

## Word Enumerator / IDF Count
Build vocabulary of unique words and how many documents they appear in

In [11]:
# Create column of unique words using a set (returned converted to string)
main_df = main_df.withColumn("words", pyspark.sql.functions.udf(lambda x: textCleanup(str(set(x.split()))))("text"))

# Working Method: Count how many documents each word appears in; this is probably where we can speed it up further
#https://stackoverflow.com/questions/48927271/count-number-of-words-in-a-spark-dataframe
vocabulary_df = main_df.withColumn('word', f.explode(f.split(f.col('words'), ' '))).groupBy('word').count()

In [27]:
start_time = time.time()

# Processing spends most of its time here but that's because the previous line is lazily evaluated
#https://stackoverflow.com/questions/41206255/convert-pyspark-sql-dataframe-dataframe-type-dataframe-to-dictionary
word_counts = {row.asDict()['word'] : row.asDict()['count'] for row in vocabulary_df.collect()}

# Output time taken
elapsed_time = time.time() - start_time
print("Elapsed time: {}".format(hms_string(elapsed_time)))

Elapsed time: 0:00:00.27


In [13]:
word_counts["test"]

18

### Alternative attempts at the Word Enumerator / IDF Count step
These methods were either slower or didn't end up working

In [None]:
# Method 1: Works but doesn't work on large datasets (times out)
# Convert to a set so we get only unique words in each string
# This way we can count the inverse document frequency (IDF) of each term instead of the total number of occurrences
#word_counts = text_df.flatMap(lambda x: set(x[0].split())).countByValue()
#word_counts = main_df.select(col("wordCounts")).rdd.flatMap(lambda x: dict(x)).countByValue()
#word_counts = main_df.select(col("text")).rdd.flatMap(lambda text: (text, 1)).reduceByKey(lambda a, b: a + b)

# Test printing out word_counts
#print(sorted(word_counts.items()))
#for i, (word, count) in enumerate(word_counts.items()):
  #if i > 1000:
    #break
  #print(word, count)







# Method 2: Run after the term frequency / word counts step
# The lambda function did not work with global variables
# Declare global IDF dictionary
#word_counts = dict()
#list(word_counts.keys())

# Use the already calculated word counts to count the words per document
#def wordEnumeration(wordDictionary):

    #textDict = dict(wordDictionary)

    #for word in textDict.keys():
        #if word in word_counts:
            #word_counts[word] += 1
        #else:
            #word_counts[word] = 1

    #return wordDictionary

# Call wordEnumeration funtion on all rows, using wordCounts since it's already a dictionary with the unique words
#main_df = main_df.withColumn("wordCounts", pyspark.sql.functions.udf(lambda x: wordEnumeration(x))("wordCounts"))

#main_df.show(20)






# Method 3: RDD map and reduce from assignment 4
# Works but was slower than the main working method

#word_counts = main_df.select("text").rdd.flatMap(lambda line: str(line).split()) \
#  .map(lambda word: (word, 1)) \
#  .reduceByKey(lambda x, y: x+y) \
#  .sortByKey() \
#  .collect()

## Term Frequency / Word Counts in Each Text String
Count how many times each term occurs in each string of text
Store in a dictionary in a new column

In [14]:
# Count occurrences of all words in a string; return as dictionary
def wordCount(text):

    counts = dict()
    words = text.split()

    for word in words:
        if word in counts:
            counts[word] += 1
        else:
            counts[word] = 1

    return counts

In [15]:
start_time = time.time()

# Add column to main df with counts of all unique words in the text (lazily evaluated)
main_df = main_df.withColumn("wordCounts", pyspark.sql.functions.udf(lambda x: wordCount(x))("text"))

# Call show to actually process the change
main_df.show(20)

elapsed_time = time.time() - start_time
print("Elapsed time: {}".format(hms_string(elapsed_time)))

+---+--------------------+--------------------+--------------------+--------------------+
| id|                text|               title|               words|          wordCounts|
+---+--------------------+--------------------+--------------------+--------------------+
| 12|  short descripti...|           Anarchism|virtues causal 18...|{half=2, nowadays...|
| 39|  short descripti...|              Albedo|2008apopt jetcont...|{news_releases=2,...|
|290|  short descripti...|                   A|putman unchanged ...|{exception=1, ࠀ=1...|
|303|  short descripti...|             Alabama|744 outbreak 2010...|{prepare=1, seapo...|
|305|  short descripti...|            Achilles|virtues ἄειδε dra...|{vulci=2, fifty=1...|
|307|  short descripti...|     Abraham Lincoln|wanted hung outbr...|{pills=3, half=2,...|
|308|  short descripti...|           Aristotle|virtues causal 20...|{minerals=1, half...|
|309|  short descripti...|An American in Paris|10154 citations i...|{germinal=1, germ...|
|316|  sho

## TF/IDF Weights

Reduce word count values by their IDF. This makes common words less valuable in a search query

In [16]:
# Write function to reduce dictionary values by IDF weights
# Use it for both the lambda function (indexer) and query processing (ranker)
def TDIDFWeights(text, isString):

    # Convert string to dictionary if necessary
    if isString:
      textDict = dict(text)
    else:
      textDict = text

    # Reduce weights; don't try to access non-existent dictionary entries
    for i in textDict.keys():
      try:
        textDict[i] = float(textDict[i] / word_counts[i])
      except:
        continue

    return textDict

In [17]:
start_time = time.time()

# Add column to main df with dictionary containing all unique words reduced by IDF weights
main_df = main_df.withColumn("reducedWordCounts", pyspark.sql.functions.udf(lambda x: TDIDFWeights(x, True))("wordCounts"))

# Call show to actually process the change
main_df.show(20)

elapsed_time = time.time() - start_time
print("Elapsed time: {}".format(hms_string(elapsed_time)))

+---+--------------------+--------------------+--------------------+--------------------+--------------------+
| id|                text|               title|               words|          wordCounts|   reducedWordCounts|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+
| 12|  short descripti...|           Anarchism|virtues causal 18...|{half=2, nowadays...|{half=0.05, nowad...|
| 39|  short descripti...|              Albedo|2008apopt jetcont...|{news_releases=2,...|{news_releases=1....|
|290|  short descripti...|                   A|putman unchanged ...|{exception=1, ࠀ=1...|{exception=0.0434...|
|303|  short descripti...|             Alabama|744 outbreak 2010...|{prepare=1, seapo...|{prepare=0.1, sea...|
|305|  short descripti...|            Achilles|virtues ἄειδε dra...|{vulci=2, fifty=1...|{vulci=2.0, fifty...|
|307|  short descripti...|     Abraham Lincoln|wanted hung outbr...|{pills=3, half=2,...|{pills=3.0, half=...|
|

# Ranking Engine
Run a text query against the indexed documents to find the most relevant ones
A new search can be run by changing the text query and running the ranking engine again (no need to rerun the indexing engine)

## Text Query
The search query to be run against the documents

In [18]:
text_query = "abraham lincoln"

## Query Vectorizer
Creates a dictionary containing all words in the query and their counts, then reduces it by TD/IDF weights

In [19]:
vectorized_text_query = TDIDFWeights(wordCount(textCleanup(text_query)), False)
print(vectorized_text_query)

{'abraham': 0.1111111111111111, 'lincoln': 0.1}


## Relevance Analysis
Runs the text query against each document to calculate a relevance number

In [20]:
# Write function to calculate the relevance number between a text query and a document
def calculateRelevance(text):

    textDict = dict(text)
    relevanceNumber = float(0)

    for i in vectorized_text_query.keys():
      if i in textDict.keys():
        relevanceNumber += vectorized_text_query[i] * textDict[i]

    return relevanceNumber

In [21]:
start_time = time.time()

# Add column to main df with relevance number for each document
main_df = main_df.withColumn("relevanceNumber", pyspark.sql.functions.udf(lambda x: calculateRelevance(x))("reducedWordCounts"))
main_df = main_df.withColumn("relevanceNumber", main_df["relevanceNumber"].cast('float'))

# Call show to actually process the change
main_df.show()

elapsed_time = time.time() - start_time
print("Elapsed time: {}".format(hms_string(elapsed_time)))

+---+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+
| id|                text|               title|               words|          wordCounts|   reducedWordCounts|relevanceNumber|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+
| 12|  short descripti...|           Anarchism|virtues causal 18...|{half=2, nowadays...|{half=0.05, nowad...|            0.0|
| 39|  short descripti...|              Albedo|2008apopt jetcont...|{news_releases=2,...|{news_releases=1....|            0.0|
|290|  short descripti...|                   A|putman unchanged ...|{exception=1, ࠀ=1...|{exception=0.0434...|            0.0|
|303|  short descripti...|             Alabama|744 outbreak 2010...|{prepare=1, seapo...|{prepare=0.1, sea...|            0.0|
|305|  short descripti...|            Achilles|virtues ἄειδε dra...|{vulci=2, fifty=1...|{vulci=2.0, fifty...| 

## Ranker/Output
Write results to file and read into Pandas
Determine the top ten most relevant documents and print the corresponding links

## Output
Output the ID, title, link, and relevance number of the top ten most relevant documents to the text query.

In [22]:
start_time = time.time()

# Still doesn't work for large datasets
# Write to a file
# Rename columns
# Generate Link column
# https://sparkbyexamples.com/pyspark/pyspark-write-dataframe-to-csv-file/?expand_article=1
main_df \
  .select("id", "title", "relevanceNumber") \
  .withColumnRenamed("id","ID").withColumnRenamed("title","Title").withColumnRenamed("relevanceNumber","Relevance Number") \
  .withColumn("Link", pyspark.sql.functions.udf(lambda x: "https://en.wikipedia.org/wiki/" + x.replace(' ', '_') + " ")("Title")) \
  .write.options(header='True', delimiter=',') \
  .mode('overwrite') \
  .csv("C:\\Users\\Brian\\Downloads\\results.csv")

elapsed_time = time.time() - start_time
print("Elapsed time: {}".format(hms_string(elapsed_time)))

Elapsed time: 0:00:01.37


In [23]:
start_time = time.time()

#https://sparkbyexamples.com/pandas/pandas-read-multiple-csv-files/?expand_article=1
# Get CSV files list from a folder
path = r'C:\Users\Brian\Downloads\results.csv'
csv_files = glob.glob(path + "/*.csv")

# Read each CSV file into DataFrame
# This creates a list of dataframes
df_list = (pd.read_csv(file) for file in csv_files)

# Concatenate all DataFrames
results   = pd.concat(df_list, ignore_index=True)

# Select top 10 results and print
display(results.nlargest(10, "Relevance Number"))

elapsed_time = time.time() - start_time
print("Elapsed time: {}".format(hms_string(elapsed_time)))

Unnamed: 0,ID,Title,Relevance Number,Link
5,307,Abraham Lincoln,10.916049,https://en.wikipedia.org/wiki/Abraham_Lincoln
91,736,Albert Einstein,0.067037,https://en.wikipedia.org/wiki/Albert_Einstein
9,324,Academy Awards,0.04,https://en.wikipedia.org/wiki/Academy_Awards
13,336,Altruism,0.037037,https://en.wikipedia.org/wiki/Altruism
86,711,Albert Sidney Johnston,0.032346,https://en.wikipedia.org/wiki/Albert_Sidney_Jo...
56,662,Apollo 11,0.024691,https://en.wikipedia.org/wiki/Apollo_11
67,678,Abel,0.024691,https://en.wikipedia.org/wiki/Abel
8,316,Academy Award for Best Production Design,0.02,https://en.wikipedia.org/wiki/Academy_Award_fo...
53,657,Bitumen,0.012346,https://en.wikipedia.org/wiki/Bitumen
78,700,Arthur Schopenhauer,0.012346,https://en.wikipedia.org/wiki/Arthur_Schopenhauer


Elapsed time: 0:00:00.01


### Alternative attempts at the Ranker / Output step
These methods were either slower or didn't end up working

In [None]:
# Method 1: Not working
#main_df.select(f.max(f.col("relevanceNumber")).alias("MAX")).limit(10).collect()[0].MAX


# Method 2: Pandas method; works but takes about the same amount of time as Spark SQL
#pandas_main_df = main_df.select("id", "title", "relevanceNumber").toPandas()#['title']
#pandas_main_df = pandas_main_df.nlargest(10, "relevanceNumber")#sort_values('relevanceNumber', ascending = False)#.groupby('relevanceNumber').
#print (pandas_main_df)




# Method 3: Data frame .sort
# Works but is much slower than Spark SQL
#results_df = main_df.sort("relevanceNumber", ascending=False).show(10)
#results_df = results_df.withColumnRenamed("id","ID").withColumnRenamed("title","Title").withColumnRenamed("relevanceNumber","Relevance Number")
#results_df = results_df.withColumn("Link", pyspark.sql.functions.udf(lambda x: "https://en.wikipedia.org/wiki/" + x.replace(' ', '_') + " ")("Title"))
#results_df.select("ID", "Title", "Link", "Relevance Number").show(10, False)







# Method 4: Spark SQL method
# Works well on smaller datasets but has memory issues when they get too big
# Does not account for extremely small numbers (treats 9.0E-7 as higher than 1 since it begins with 9)
#main_df.createOrReplaceTempView("main")
#results_df = spark.sql("SELECT id, title, relevanceNumber FROM main ORDER BY relevanceNumber DESC LIMIT 10").rdd.map(list)

# Rename column names
#results_df = results_df.toDF().withColumnRenamed("_1","ID").withColumnRenamed("_2","Title").withColumnRenamed("_3","Relevance Number")

# Add link row; uses title with spaces replaced by underscores; add a space to the end so it plays nice with output tables
#results_df = results_df.withColumn("Link", pyspark.sql.functions.udf(lambda x: "https://en.wikipedia.org/wiki/" + x.replace(' ', '_') + " ")("Title"))

# Convert pipelinedRDD to dataframe and output
#results_df.show(10, False)

# Close Session
Close the Spark session

In [None]:
spark.stop()