In [1]:
import json
import os
import findspark
import numpy as np
from argparse import ArgumentParser
from string import punctuation

In [2]:
'''
parser = ArgumentParser(description='PySpark NLP Intro')
parser.add_argument('--minTokenLength', type=int, default=2, metavar='b',
                    help='The minimum length of any accepted tokens; tokens that are too short will be dropped')
parser.add_argument('--minInitialCount', type=int, default=2, metavar='b',
                    help='The minimum number of times a token must appear in a document\
                    to be considered for the global calculation')
parser.add_argument('--textDir', type=str, default='.', metavar='D',
                    help='The directory from which to pull texts')
'''

"\nparser = ArgumentParser(description='PySpark NLP Intro')\nparser.add_argument('--minTokenLength', type=int, default=2, metavar='b',\n                    help='The minimum length of any accepted tokens; tokens that are too short will be dropped')\nparser.add_argument('--minInitialCount', type=int, default=2, metavar='b',\n                    help='The minimum number of times a token must appear in a document                    to be considered for the global calculation')\nparser.add_argument('--textDir', type=str, default='.', metavar='D',\n                    help='The directory from which to pull texts')\n"

In [3]:
# Finds and adds spark to python path
# Convenient for env managers like conda

findspark.init()
from pyspark import SparkContext, SparkConf

In [4]:
# Creates an initial spark configuration utilizing all local cores
conf = SparkConf().setMaster("local[*]")

In [5]:
# Creates spark context through which to process RDD ops
sc = SparkContext(conf = conf)

In [6]:
# Set main data directory
directory='data'

# Tokenize and make case-insensitive the stopwords, then collect into a list
stopwords=sc.broadcast(sc.textFile(directory+'/stopwords.txt')\
                .flatMap(lambda x: x.strip().split())\
                .map(lambda x:x.lower())\
                .collect())

In [7]:
'''
Defines ops for a single document. Specifically we:
flatMap using .strip().split() to tokenize the initial textfile,
map x->(x.lower(),1) to both render the process case insensitive
while also conforming to key:value format for further ops. Next
we reduceByKey to calculate a 'word count' based on the tokenization.
Finally we filter to retain only words/tokens that appear more than once.
This returns the filtered RDD itself for further handling later.

'''
def singleDocSC1(file,stopwords):
    data=sc.textFile(file)\
        .flatMap(lambda x: x.strip().split())\
        .map(lambda x:(x.lower(),1))\
        .reduceByKey(lambda x, y: x + y)\
        .filter(lambda x:x[1]>1)
    return data

In [8]:
'''
Same as the SC1 variant, except this introduces stopwords
'''
def singleDocSC2(file,stopwords):
    data=sc.textFile(file)\
        .flatMap(lambda x: x.strip().split())\
        .map(lambda x:(x.lower(),1))\
        .reduceByKey(lambda x, y: x + y)\
        .filter(lambda x:x[1]>1 and x[0] not in stopwords)
    return data

In [47]:
def _strip(x,full=False):
    if full:
        x=x.strip(punctuation)
    else:
        punc='.,:;!?\''
        if x[0] in punc: x=x[1:]
        if x[-1] in punc: x=x[:-1]
    return x    

In [41]:
'''
Same as the SC2 variant, except this discards tokens of len==1
and strips trailing and leading punctuation
'''
def singleDocSC3(file,stopwords):
    data=sc.textFile(file)\
        .flatMap(lambda x: x.strip().split())\
        .filter(lambda x:len(x)>1)\
        .map(lambda x:(_strip(x).lower() ,1))\
        .reduceByKey(lambda x, y: x + y)\
        .filter(lambda x:x[1]>1 and x[0] not in stopwords)
    return data

In [42]:
def main(func,outName):
    
    # Creates an empty list which is then filled by the processed RDD of each doc
    docs=[]
    
    # Set a basic loop over the data directory to catch all relevant txt files
    # Note that only one txt file didn't start with 'pg', so for simplicity I renamed it
    for filename in os.listdir(directory):
        if filename.startswith("pg"): 
            docs.append(func(os.path.join(directory, filename),stopwords.value))
            print(f'processing: {filename}')
    # Initializes the final result using the first doc in the list
    out=docs[0]

    # For each doc in the list after master, merge with master
    print(f'merging')
    for d in docs[1:]:
        out=out.union(d)

    # Reduce to achieve a 'net count' accross docs
    out=out.reduceByKey(lambda x,y : x+y)
    print('reducing')

    # JSON dump the final, reduced master, which now contains the token count across ALL docs
    print('dumping')
    with open(outName+'.json', 'w') as f:
        json.dump(dict(out.top(40, key=lambda x: x[1])), f)

In [43]:
#main(singleDocSC1,'sp1')
#main(singleDocSC2,'sp2')
main(singleDocSC3,'sp3')

processing: pg1497.txt
processing: pg19033.txt
processing: pg3207.txt
processing: pg36.txt
processing: pg42671.txt
processing: pg4300-0.txt
processing: pg514.txt
processing: pg6130.txt
merging
reducing
dumping


In [44]:
'''
Identical to SC3
'''
def singleDocSC4(file,stopwords):
    data=sc.textFile(file)\
        .flatMap(lambda x: x.strip().split())\
        .filter(lambda x:len(x)>1)\
        .map(lambda x:(_strip(x).lower() ,1))\
        .reduceByKey(lambda x, y: x + y)\
        .filter(lambda x:x[1]>1 and x[0] not in stopwords)
    return data

In [45]:
def mainTF_IDF(func,outName):
    # Number of documents, used to calculate IDF
    N=8
    
    # Creates an empty list which is then filled by the processed RDD of each doc
    docs=[]
    
    # Set a basic loop over the data directory to catch all relevant txt files
    # Note that only one txt file didn't start with 'pg', so for simplicity I renamed it
    for filename in os.listdir(directory):
        if filename.startswith("pg"): 
            docs.append(func(os.path.join(directory, filename),stopwords.value))
            print(f'processing: {filename}')
    # Retain a master list of all words used across each document, as well as
    # the number of documents that they are present in
    master=docs[0].map(lambda x:(x[0],1))
    print('Tallying IDF contribution from doc #0')
            
    # For each doc in the list, add (union) its tokens to the master list with value 1
    for i in range(len(docs)-1):
        master=master.union(docs[i+1].map(lambda x:(x[0],1)))
        print(f'Tallying IDF contribution from doc #{i+1}')

    # Take the sum of the values of each key, thereby calculating the number of documents
    # that each token is present in. Then, map to calculate IDF
    print(f'Calculating IDF')
    master=master.reduceByKey(lambda x,y:x+y)\
                 .map(lambda x:(x[0],np.log(N/x[1])))\
                 .cache()
    
    print(f'Converting to dict and broadcasting')
    # Convert the TF-IDF list into a dictionary
    weights=sc.broadcast(master.collectAsMap())
    
    # Using the TF, calculate the TF-IDF for each document
    for i in range(len(docs)):
        print(f'Weighting document {i}')
        docs[i]=docs[i].join.map(lambda x:(x[0],x[1]*weights.value[x[0]]))
    
    # Collect the top 5 words in each doc by TF-IDF score
    out=[]
    for d in docs:
        out+=d.top(5, key=lambda x: x[1])

    # JSON dump the final list of the top 40 TF-IDF scores
    with open(outName+'.json', 'w') as f:
        json.dump(dict(out), f)

In [46]:
mainTF_IDF(singleDocSC4,'sp4')

processing: pg1497.txt
processing: pg19033.txt
processing: pg3207.txt
processing: pg36.txt
processing: pg42671.txt
processing: pg4300-0.txt
processing: pg514.txt
processing: pg6130.txt
Tallying IDF contribution from doc #0
Tallying IDF contribution from doc #1
Tallying IDF contribution from doc #2
Tallying IDF contribution from doc #3
Tallying IDF contribution from doc #4
Tallying IDF contribution from doc #5
Tallying IDF contribution from doc #6
Tallying IDF contribution from doc #7
Calculating IDF
Converting to dict and broadcasting
Weighting document 0
Weighting document 1
Weighting document 2
Weighting document 3
Weighting document 4
Weighting document 5
Weighting document 6
Weighting document 7


In [39]:
'''
A modification of SC3/4 that performs a more comprehensive, well-behaved strip
'''
def singleDocSC5(file,stopwords):
    data=sc.textFile(file)\
        .flatMap(lambda x: x.strip().split())\
        .map(lambda x:(_strip(x.lower(),True) ,1))\
        .reduceByKey(lambda x, y: x + y)\
        .filter(lambda x:x[1]>1 and x[0] not in stopwords)
    return data

In [40]:
mainTF_IDF(singleDocSC5,'sp5')

processing: pg1497.txt
processing: pg19033.txt
processing: pg3207.txt
processing: pg36.txt
processing: pg42671.txt
processing: pg4300-0.txt
processing: pg514.txt
processing: pg6130.txt
Tallying IDF contribution from doc #0
Tallying IDF contribution from doc #1
Tallying IDF contribution from doc #2
Tallying IDF contribution from doc #3
Tallying IDF contribution from doc #4
Tallying IDF contribution from doc #5
Tallying IDF contribution from doc #6
Tallying IDF contribution from doc #7
Calculating IDF
Converting to dict and broadcasting
Weighting document 0
Weighting document 1
Weighting document 2
Weighting document 3
Weighting document 4
Weighting document 5
Weighting document 6
Weighting document 7
