In [147]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import string
import re 
from collections import Counter
import numpy as np
import re

In [2]:
DATA_DIR = '../../data/wiki10/text_sample/'

## RDD Solution

In [3]:
sc = pyspark.SparkContext()

In [154]:
def extract_filename(path):
    return re.search(r'(?<=/)\w*$', path).group(0)

#### One RDD element per line in text file

In [155]:
rdd = sc.textFile(DATA_DIR)
print(rdd.count())
print(rdd.first()[:100])
#rdd.getNumPartitions()

2728
Undeclared was an American television series that aired on Fox during the 2001–2002 season.  The hal


#### One RDD element per text file, also preserves file name

In [156]:
rdd = sc.wholeTextFiles(DATA_DIR).map(lambda x: (extract_filename(x[0]), x[1])) # strip out full path of filename

document_count = rdd.count()

print(document_count)
print(rdd.first())

1000
('011847f83eba74ae537e88609355b068', 'Undeclared was an American television series that aired on Fox during the 2001–2002 season.  The half-hour comedy-drama/sitcom was Judd Apatow\'s follow-up to the TV cult classic Freaks and Geeks, which also lasted for one season. Undeclared centered on a group of college freshmen at the fictional University of North Eastern California. Unlike Freaks, it was set in the current time of the early 2000s rather than the 1980s. It gave a humorous look at the highs and lows of college life, from young adult relationships to the dreaded freshman fifteen. It takes its name from the status of an undergraduate who has not yet decided, or "declared", a specific major of study they wish to take.  College is "the reward for surviving high school. Most people have great fun stories from college and nightmare stories from high school," Judd Apatow told the Los Angeles Times in 2006. He also speculated on why college shows find it hard to gain a foothold on n

In [102]:
def tokenize(text):
    return re.sub(f'([{string.punctuation}])', r' \1 ', text.lower()).split()

Create tokenized RDD containing one element per document. Each element is a (filename, [list of tokens]) tuple

In [103]:
rdd_tokens = rdd.map(lambda x: (x[0], tokenize(x[1]))).persist() # cache result as we will be using is multiple times

print(rdd_tokens.first())

('011847f83eba74ae537e88609355b068', ['undeclared', 'was', 'an', 'american', 'television', 'series', 'that', 'aired', 'on', 'fox', 'during', 'the', '2001–2002', 'season', '.', 'the', 'half', '-', 'hour', 'comedy', '-', 'drama', '/', 'sitcom', 'was', 'judd', 'apatow', "'", 's', 'follow', '-', 'up', 'to', 'the', 'tv', 'cult', 'classic', 'freaks', 'and', 'geeks', ',', 'which', 'also', 'lasted', 'for', 'one', 'season', '.', 'undeclared', 'centered', 'on', 'a', 'group', 'of', 'college', 'freshmen', 'at', 'the', 'fictional', 'university', 'of', 'north', 'eastern', 'california', '.', 'unlike', 'freaks', ',', 'it', 'was', 'set', 'in', 'the', 'current', 'time', 'of', 'the', 'early', '2000s', 'rather', 'than', 'the', '1980s', '.', 'it', 'gave', 'a', 'humorous', 'look', 'at', 'the', 'highs', 'and', 'lows', 'of', 'college', 'life', ',', 'from', 'young', 'adult', 'relationships', 'to', 'the', 'dreaded', 'freshman', 'fifteen', '.', 'it', 'takes', 'its', 'name', 'from', 'the', 'status', 'of', 'an', '

Create term frequency RDD with one element per token, document pair. Eache element is a (token, (term frequency, filename)) tuple, where term frequency is the number of times that token appears in that document. Arranged this way to be able to join with document frequency RDD

In [104]:
rdd_tf = rdd_tokens.flatMap(lambda x: [(token, (count, x[0])) for token, count in Counter(x[1]).items()])

rdd_tf.take(5)

[('undeclared', (3, '011847f83eba74ae537e88609355b068')), ('was', (4, '011847f83eba74ae537e88609355b068')), ('an', (4, '011847f83eba74ae537e88609355b068')), ('american', (1, '011847f83eba74ae537e88609355b068')), ('television', (1, '011847f83eba74ae537e88609355b068'))]


Create document frequency RDD with one element per token. Each element is a (token, document frequency) tuple where document frequency is the proportion of documents that token appears in

In [112]:
rdd_df = (rdd_tokens.flatMap(lambda x: set(x[1]))
                    .map(lambda x: (x, 1))
                    .reduceByKey(lambda x, y: x + y) 
                    .map(lambda x: (x[0], x[1] / document_count)))
rdd_df.take(5)

[('was', 0.819),
 ('name', 0.396),
 ('share', 0.12),
 ('angeles', 0.067),
 ('for', 0.964)]

Join RDDs

In [113]:
rdd_join = rdd_tf.join(rdd_df)

rdd_join.take(5)

[('undeclared', ((3, '011847f83eba74ae537e88609355b068'), 0.002)),
 ('undeclared', ((1, '01bd03069620c8f5e101f9919d24d62d'), 0.002)),
 ('was', ((4, '011847f83eba74ae537e88609355b068'), 0.819)),
 ('was', ((15, '0a52ba2347e51f37e0e54e211a6c8db3'), 0.819)),
 ('was', ((1, '03881942942f39cfeba1c592e355b429'), 0.819))]

In [114]:
def tf_idf(tf, df):
    return tf * np.log(1 / df)

In [124]:
rdd_tf_idf = rdd_join.map(lambda x: (x[1][0][1], x[0], tf_idf(tf=x[1][0][0], df=x[1][1])))

rdd_tf_idf.take(5)

[('011847f83eba74ae537e88609355b068', 'undeclared', 18.643824295266576),
 ('01bd03069620c8f5e101f9919d24d62d', 'undeclared', 6.214608098422191),
 ('011847f83eba74ae537e88609355b068', 'was', 0.7986847805162709),
 ('0a52ba2347e51f37e0e54e211a6c8db3', 'was', 2.9950679269360156),
 ('03881942942f39cfeba1c592e355b429', 'was', 0.19967119512906772)]

In [134]:
print(rdd_tf_idf.toDebugString().decode('utf-8'))

(2) PythonRDD[155] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[147] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[146] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[145] at join at <ipython-input-113-2d854cda9982>:1 []
    |  PythonRDD[144] at join at <ipython-input-113-2d854cda9982>:1 []
    |  UnionRDD[143] at union at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[141] at RDD at PythonRDD.scala:48 []
    |  ../../data/wiki10/text_sample/ MapPartitionsRDD[106] at wholeTextFiles at NativeMethodAccessorImpl.java:0 []
    |  WholeTextFileRDD[105] at wholeTextFiles at NativeMethodAccessorImpl.java:0 []
    |  PythonRDD[142] at RDD at PythonRDD.scala:48 []
    |  MapPartitionsRDD[139] at mapPartitions at PythonRDD.scala:122 []
    |  ShuffledRDD[138] at partitionBy at NativeMethodAccessorImpl.java:0 []
    +-(1) PairwiseRDD[137] at reduceByKey at <ipython-input-112-37ff74581df4>:3 []
       |  PythonRDD[136] at reduceByKey at <ipython-i

## DataFrame Solution

In [5]:
spark = SparkSession.builder.appName("TF-IDF").getOrCreate()

In [190]:
df = (spark.read.text(DATA_DIR)
           .withColumn("filename", F.regexp_extract(F.input_file_name(), r'/(\w*)$', 1))
           .withColumn("tokens", F.split(F.lower(F.col("value")), ' ')) 
     ) 
df.printSchema()

root
 |-- value: string (nullable = true)
 |-- filename: string (nullable = false)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [191]:
df.show()

+--------------------+--------------------+--------------------+
|               value|            filename|              tokens|
+--------------------+--------------------+--------------------+
|Adolf Hitler in 1...|0937e815f6fac2890...|[adolf, hitler, i...|
|Auschwitz-Birkena...|0937e815f6fac2890...|[auschwitz-birken...|
|between Israel an...|0937e815f6fac2890...|[between, israel,...|
| Quotations from ...|0937e815f6fac2890...|[, quotations, fr...|
| Source texts fro...|0937e815f6fac2890...|[, source, texts,...|
| Images and media...|0937e815f6fac2890...|[, images, and, m...|
| News stories fro...|0937e815f6fac2890...|[, news, stories,...|
|Peace treaty sign...|028c3b31da88f3e08...|[peace, treaty, s...|
|Stanley Kubrick (...|0238d586d7d6676f9...|[stanley, kubrick...|
|Istanbul (Turkish...|0b9f59c213a048d5c...|[istanbul, (turki...|
|To the west, to t...|0b9f59c213a048d5c...|[to, the, west,, ...|
| Quotations from ...|0b9f59c213a048d5c...|[, quotations, fr...|
| Source texts fro...|0b9

In [201]:
df_long = df.select("filename", F.explode("tokens").alias("token")) 

df_long.show()

+--------------------+----------------+
|            filename|           token|
+--------------------+----------------+
|0937e815f6fac2890...|           adolf|
|0937e815f6fac2890...|          hitler|
|0937e815f6fac2890...|              in|
|0937e815f6fac2890...|            1933|
|0937e815f6fac2890...|        pogroms:|
|0937e815f6fac2890...| kristallnacht ·|
|0937e815f6fac2890...|     bucharest ·|
|0937e815f6fac2890...|       dorohoi ·|
|0937e815f6fac2890...|          iaşi ·|
|0937e815f6fac2890...|        kaunas ·|
|0937e815f6fac2890...|      jedwabne ·|
|0937e815f6fac2890...|            lviv|
|0937e815f6fac2890...|        ghettos:|
|0937e815f6fac2890...|        łachwa ·|
|0937e815f6fac2890...|          łódź ·|
|0937e815f6fac2890...|          lwów ·|
|0937e815f6fac2890...|        kraków ·|
|0937e815f6fac2890...|      budapest ·|
|0937e815f6fac2890...|theresienstadt ·|
|0937e815f6fac2890...|         kovno ·|
+--------------------+----------------+
only showing top 20 rows



In [210]:
df_tf = df_long.groupby("filename", "token").agg(F.count("*").alias("tf")) 

df_tf.show()

+--------------------+------------+---+
|            filename|       token| tf|
+--------------------+------------+---+
|0937e815f6fac2890...|     invaded|  6|
|0937e815f6fac2890...|          on|178|
|0937e815f6fac2890...|    huetler,|  1|
|0937e815f6fac2890...|      member|  6|
|0937e815f6fac2890...|   countries|  2|
|0937e815f6fac2890...|      terms,|  1|
|0937e815f6fac2890...|     system"|  1|
|0937e815f6fac2890...| hindenburg.|  1|
|0937e815f6fac2890...|        kaas|  1|
|0937e815f6fac2890...|      state,|  5|
|0937e815f6fac2890...|      repair|  1|
|0937e815f6fac2890...|disappointed|  2|
|0937e815f6fac2890...|     spanish|  1|
|0937e815f6fac2890...|     franco,|  2|
|0937e815f6fac2890...|      arises|  1|
|0937e815f6fac2890...|    build-up|  1|
|0937e815f6fac2890...|     kershaw|  3|
|0937e815f6fac2890...|      "there|  1|
|0937e815f6fac2890...|    thoughts|  1|
|0937e815f6fac2890...|  tripartite|  2|
+--------------------+------------+---+
only showing top 20 rows



In [211]:
df_df = df_tf.groupby("token").agg(F.count("*").alias("df"))

df_df.show()

+--------------+---+
|         token| df|
+--------------+---+
|    successor.|  3|
|         still|417|
|   president's| 14|
|          hope| 60|
|palaestrae.[4]|  1|
|        spared|  8|
|   expressive.|  1|
|         inner| 60|
|        filing|  9|
|       fossett|  1|
|     connected|106|
|         1966,| 19|
|         1946,| 17|
|         those|494|
|     traveling| 33|
|          zr ·|  2|
|         1970s| 67|
| infinitesimal|  5|
|          some|743|
|        width,|  2|
+--------------+---+
only showing top 20 rows



In [219]:
df_count = df.select(F.countDistinct("filename").alias("count")).first()["count"] 

df_count

1000

In [228]:
(df_tf.join(df_df, df_tf.token == df_df.token)
      .select("filename", df_tf.token, df_tf.tf * F.log(df_count / df_df.df))).show() 

+--------------------+--------------------+-----------------------+
|            filename|               token|(tf * LOG((1000 / df)))|
+--------------------+--------------------+-----------------------+
|08e4da2aa7a77e002...|         "absolutely|      6.214608098422191|
|0c825364f60dce638...|         "absolutely|      6.214608098422191|
|09a2600ed21a3ba87...|        "all-giving"|      20.72326583694641|
|0bb5a696a98db928c...|"anti-elitism"—ac...|      6.907755278982137|
|028f984ff7c11ad14...|         "autopilot"|      6.907755278982137|
|0561d6733af451a4a...|             "colory|      6.907755278982137|
|022fcb4c6dc724080...|         "composer's|      6.907755278982137|
|0053ee7f520dda846...|              "decal|      6.907755278982137|
|075656dc1199b28ec...|         "disclosure|      6.907755278982137|
|0937e815f6fac2890...|      "encirclement"|      6.907755278982137|
|038ceb44453aa83aa...|              "grand|      5.115995809754082|
|00ce4dcc93e6d1f4d...|              "grand|     