## Start by loading up a real subset of Wikipedia data placed into S3.

In [1]:
rawdata = spark.read.options(sep="\t").csv("s3://ml-rvce-us-east-1/tf-idf/subset-small.tsv")
rawdata.show()

VBox()

Starting Spark application


ID,Kind,State,Spark UI,Driver log,User,Current session?
0,pyspark,idle,Link,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+
|_c0|                 _c1|                _c2|                 _c3|
+---+--------------------+-------------------+--------------------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|
|303|             Alabama|2008-12-29 08:15:47|Alabama (formally...|
|305|            Achilles|2008-12-30 06:18:01|thumb\n\nIn Greek...|
|307|     Abraham Lincoln|2008-12-28 20:18:23|Abraham Lincoln (...|
|308|           Aristotle|2008-12-29 23:54:48|Aristotle (Greek:...|
|309|An American in Paris|2008-09-27 19:29:28|An American in Pa...|
|324|       Academy Award|2008-12-28 17:50:43|The Academy Award...|
|330|             Actrius|2008-05-23 15:24:32|Actrius (Actresse...|
|332|     Animalia (book)|2008-12-18 11:12:34|th

## Seems there are no column names in the source data, so we'll have to add them ourselves

In [2]:
articles = rawdata.toDF("ID", "Title", "Time", "Document")
articles.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+
| ID|               Title|               Time|            Document|
+---+--------------------+-------------------+--------------------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|
|303|             Alabama|2008-12-29 08:15:47|Alabama (formally...|
|305|            Achilles|2008-12-30 06:18:01|thumb\n\nIn Greek...|
|307|     Abraham Lincoln|2008-12-28 20:18:23|Abraham Lincoln (...|
|308|           Aristotle|2008-12-29 23:54:48|Aristotle (Greek:...|
|309|An American in Paris|2008-09-27 19:29:28|An American in Pa...|
|324|       Academy Award|2008-12-28 17:50:43|The Academy Award...|
|330|             Actrius|2008-05-23 15:24:32|Actrius (Actresse...|
|332|     Animalia (book)|2008-12-18 11:12:34|th

## Next we need to clean our data. We know TF/IDF can't handle null documents, so first let's check for that.

In [3]:
articles.filter(articles.Document.isNull()).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1

## Looks like there is one null document.  we can just remove it .

In [4]:
cleanedArticles = articles.filter(articles.Document.isNotNull())
cleanedArticles.filter(articles.Document.isNull()).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

## TF/IDF wants numbers, not words. So now we need to pre-process our data before we can run any algorithms on it. We'll first tokenize the articles to split them up into words, and store them in a sparse vector that is now a numeric representation of the words in each article.

In [5]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer= Tokenizer(inputCol="Document", outputCol="words")
wordsData = tokenizer.transform(cleanedArticles)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurizedData = hashingTF.transform(wordsData)
featurizedData.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+--------------------+--------------------+
| ID|               Title|               Time|            Document|               words|         rawFeatures|
+---+--------------------+-------------------+--------------------+--------------------+--------------------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|[anarchism, (some...|(262144,[116,120,...|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|[autism, is, a, b...|(262144,[521,1546...|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|[the, albedo, of,...|(262144,[1625,179...|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|[the, letter, a, ...|(262144,[5303,603...|
|303|             Alabama|2008-12-29 08:15:47|Alabama (formally...|[alabama, (formal...|(262144,[93,115,3...|
|305|            Achilles|2008-12-30 06:18:01|thumb\n\nIn Greek...|[thumb\n\nin, gre...|(262144,[305,991,...|
|307|     

## That hashing operation basically computed term frequencies for us by storing how often each hashed word occured in each article. So we have TF, but we want TF/IDF scores for every term in every document. We'll store these final scores in a new column called "features", which is a sparse vector containing TF/IDF scores for each feature.

In [7]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
rescaledData.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
| ID|               Title|               Time|            Document|               words|         rawFeatures|            features|
+---+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|[anarchism, (some...|(262144,[116,120,...|(262144,[116,120,...|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|[autism, is, a, b...|(262144,[521,1546...|(262144,[521,1546...|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|[the, albedo, of,...|(262144,[1625,179...|(262144,[1625,179...|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|[the, letter, a, ...|(262144,[5303,603...|(262144,[5303,603...|
|303|             Alabama|2008-12-29 08:15:47|Alabama (formally...|[alabama, (forma

## So let's use this to do a search for the term "gettysburg". Again, we need numbers, not words, so the first task is to get the hash value for "gettysburg"

In [18]:
from pyspark.sql.types import *

schema = StructType([StructField("words", ArrayType(StringType()))])

df = spark.createDataFrame(([[["gettysburg"]]]), schema).toDF("words")
df.show()

trump = hashingTF.transform(df)
trump.show()

featureVec = trump.select('rawFeatures').collect()
print(featureVec)

trumpID = int(featureVec[0].rawFeatures.indices[0])
print(trumpID)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+
|       words|
+------------+
|[gettysburg]|
+------------+

+------------+--------------------+
|       words|         rawFeatures|
+------------+--------------------+
|[gettysburg]|(262144,[116775],...|
+------------+--------------------+

[Row(rawFeatures=SparseVector(262144, {116775: 1.0}))]
116775

## OK, we have the magic number that represents "Trump". Now we can add another column - we'll call it "score" - that just extracts the TF/IDF value for Trump for each document.

In [19]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

termExtractor = udf(lambda x: float(x[trumpID]), FloatType())
trumpDF = rescaledData.withColumn('score', termExtractor(rescaledData.features))

trumpDF.show()
                                                        

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+
| ID|               Title|               Time|            Document|               words|         rawFeatures|            features|    score|
+---+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+---------+
| 12|           Anarchism|2008-12-30 06:23:05|Anarchism (someti...|[anarchism, (some...|(262144,[116,120,...|(262144,[116,120,...|      0.0|
| 25|              Autism|2008-12-24 20:41:05|Autism is a brain...|[autism, is, a, b...|(262144,[521,1546...|(262144,[521,1546...|      0.0|
| 39|              Albedo|2008-12-29 18:19:09|The albedo of an ...|[the, albedo, of,...|(262144,[1625,179...|(262144,[1625,179...|      0.0|
|290|                   A|2008-12-27 04:33:16|The letter A is t...|[the, letter, a, ...|(262144,[5303,603...|(262144,[5303,603...|      0.0|
|303|        

## Now all we have to do is sort our articles by score, and we'll have the most relevant articles for gettysburg!

In [20]:
sortedResults = trumpDF.filter("score > 0").orderBy('score', ascending=False).select('ID', 'Title', 'Document', 'score')
sortedResults.show(truncate=100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+------------------+----------------------------------------------------------------------------------------------------+---------+
|  ID|             Title|                                                                                            Document|    score|
+----+------------------+----------------------------------------------------------------------------------------------------+---------+
| 307|   Abraham Lincoln|Abraham Lincoln (February 12, 1809 – April 15, 1865) was the sixteenth President of the United St...|30.695974|
|1135|   Abner Doubleday|Abner Doubleday (June 26, 1819 – January 26, 1893) was a career United States Army officer and Un...|25.579979|
| 863|American Civil War|The American Civil War (1861–1865), also known as the War Between the States and several other na...|15.347987|
|1998|     Austin, Texas|Austin is the capital of the U.S. state of Texas and the county seat of Travis County. Situated i...| 5.115996|
|2085|           Assyria|Assyria was a ge