In [3]:
import sparknlp
sparknlp.start(spark32=True)

In [4]:
from pyspark.sql import SparkSession
# start spark session configured for spark nlp
spark = SparkSession.builder \
     .master('local[*]') \
     .appName('Spark NLP') \
     .config('spark.jars.packages', 
             'com.johnsnowlabs.nlp:spark-nlp_2.12:2.3.5') \
     .getOrCreate()

# from pyspark.sql import SparkSession

# spark = sparknlp.start(spark32=True)

In [5]:
from nltk.corpus import stopwords
eng_stopwords = stopwords.words('english')

In [6]:
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer,
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline

In [7]:
documentAssembler = DocumentAssembler() \
     .setInputCol('value') \
     .setOutputCol('document')
tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('token')
# note normalizer defaults to changing all words to lowercase.
# Use .setLowercase(False) to maintain input case.
normalizer = Normalizer() \
     .setInputCols(['token']) \
     .setOutputCol('normalized') \
     .setLowercase(True)
# note that lemmatizer needs a dictionary. So I used the pre-trained
# model (note that it defaults to english)
lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemma')
stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemma']) \
     .setOutputCol('clean_lemma') \
     .setCaseSensitive(False) \
     .setStopWords(eng_stopwords)
# finisher converts tokens to human-readable output
finisher = Finisher() \
     .setInputCols(['clean_lemma']) \
     .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ]lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
Download done! Loading the resource.
[ / ]

                                                                                

[OK!]


In [8]:
pipeline = Pipeline() \
     .setStages([
           documentAssembler,
           tokenizer,
           normalizer,
           lemmatizer,
           stopwords_cleaner,
           finisher
     ])

In [9]:
# import data

from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window, SQLContext
sqlContext = SQLContext(spark)


path = 'classic'
data = spark.read.text(path, wholetext=True)

books = ["erewhon", "time_traveler"]
b = sqlContext.createDataFrame([(l,) for l in books], ['Book'])
data = data.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))

data = b.join(data, b.row_idx == data.row_idx).\
    drop("row_idx")




In [10]:

# transform text with the pipeline
processed_data = pipeline.fit(data).transform(data)

In [None]:
# View processed_data
# processed_data.createOrReplaceTempView("processed")
# spark.sql("Select * from processed").show()

In [11]:
from pyspark.sql.functions import explode, col
scifi_words = processed_data.withColumn('exploded_text', 
                               explode(col('finished_clean_lemma')))

In [None]:
# View scifi_words
# scifi_words.createOrReplaceTempView("scifi")
# spark.sql("Select * from scifi").show()

In [12]:
counts_er = scifi_words.filter(scifi_words['Book'] == 'erewhon').groupby('exploded_text').count()
counts_pd1 = counts_er.toPandas()
er_dict = {counts_pd1.loc[i, 'exploded_text']: 
                counts_pd1.loc[i, 'count'] 
                for i in range(counts_pd1.shape[0])}

22/04/13 14:03:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
          

In [13]:
counts_tt = scifi_words.filter(scifi_words['Book'] == 'time_traveler').groupby('exploded_text').count()
counts_pd2 = counts_tt.toPandas()
tt_dict = {counts_pd2.loc[i, 'exploded_text']: 
                counts_pd2.loc[i, 'count'] 
                for i in range(counts_pd2.shape[0])}

22/04/13 14:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/04/13 14:03:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
          

In [15]:
# Helper Functions

def term_frequency(BoW_dict):
     tot_words = sum(BoW_dict.values())
     freq_dict = {word: BoW_dict[word]/tot_words 
                  for word in BoW_dict.keys()}
     return freq_dict
     
from math import log
def inverse_document_frequency(list_of_dicts):
    tot_docs = len(list_of_dicts)
    words = set([w for w_dict in list_of_dicts 
                   for w in w_dict.keys()])
    idf_dict = {word: log(float(tot_docs)/
                      (1.0 + sum([1 for w_dict in list_of_dicts 
                              if word in w_dict.keys()]))) 
                    for word in words}
    return idf_dict

def tf_idf(list_of_dicts):
     words = set([w for w_dict in list_of_dicts 
                  for w in w_dict.keys()])
     tf_idf_dicts = []
     idfs = inverse_document_frequency(list_of_dicts)
     for i, w_dict in enumerate(list_of_dicts):
          w_dict.update({word: 0 for word in words 
                         if word not in w_dict.keys()})
          tf = term_frequency(w_dict)
          tf_idf_dicts.append({word: tf[word]*idfs[word] 
                               for word in words})
     return tf_idf_dicts

In [16]:
list_of_word_dicts = [er_dict, tt_dict]
tf_idf_by_book_list = tf_idf(list_of_word_dicts)
tf_idf_by_book_dict = {c: tf_dict 
           for c, tf_dict in zip(books, tf_idf_by_book_list)}

In [17]:
tf_idf_by_book_dict

{'erewhon': {'big': 0.0,
  'grant': 0.0,
  'cart': 0.0,
  'fast': 0.0,
  'adapt': 0.0,
  'root': 0.0,
  'distinguish': 0.0,
  'memory': 0.0,
  'accept': 0.0,
  'profound': 0.0,
  'minister': 0.0,
  'xxiv': 0.0,
  'plain': 0.0,
  'science': 0.0,
  'island': 0.0,
  'suddenly': 0.0,
  'enough': -0.00027255104735032337,
  'increase': 0.0,
  'swift': 0.0,
  'curious': -9.085034911677447e-05,
  'accordance': 0.0,
  'lunatic': 0.0,
  'survive': 0.0,
  'class': 0.0,
  'influence': 0.0,
  'shavings': 0.0,
  'terrify': 0.0,
  'gently': 0.0,
  'selfregulating': 0.0,
  'dog': 0.0,
  'mention': 0.0,
  'whereby': 0.0,
  'mile': 0.0,
  'intensity': 0.0,
  'myriad': 0.0,
  'universally': 0.0,
  'rich': 0.0,
  'engage': 0.0,
  'lest': 0.0,
  'need': -0.00027255104735032337,
  'absurd': 0.0,
  'office': 0.0,
  'fourif': 0.0,
  'flag': 0.0,
  'gain': 0.0,
  'soul': 0.0,
  'previous': 0.0,
  'expound': 0.0,
  'withhold': 0.0,
  'consist': 0.0,
  'power': 0.0,
  'practice': 0.0,
  'inherently': 0.0,
  'dis