In [0]:
import json
import sparknlp

sparknlp.start()

In [0]:
dffull = spark.read.format("avro").load("/mnt/scratch/BParticles17701850.avro")
dfads = spark.read.format("avro").load("/mnt/scratch/BPads17701850.avro")
dfERBM = spark.read.format("avro").load("/mnt/scratch/BPerbm4500.avro")
dftiny = spark.read.format("avro").load("/mnt/scratch/BPtinysample.avro")

In [0]:
# Selecting a database and selecting three relevant fields RecordID, text, and article title
# Change the line below to change the dataframe to work on
df = dffull
df = df.select ("RecordID" , "text", "ArticleTitle")

print(str(df.count()) + " articles available in the dataset")

In [0]:
# Filtering the dataframe based on the presence of certain words in the text
filteredDf = df.filter((df.text.rlike('(?i)libel')) & ( df.text.rlike('(?i)court') 
                      | df.text.rlike('(?i)case') | df.text.rlike('(?i)legal') 
                      | df.text.rlike('(?i)jury')))

numTexts = filteredDf.count()

print(str(numTexts) + " articles available in the filtered dataset")

In [0]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
from pyspark.ml import Pipeline, PipelineModel


documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

sentenceDetector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

regexTokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token") \
    .setTargetPattern("\w+") \
    .setMinLength(3)  # splits at any non-alphanumeric character

stop_words = StopWordsCleaner.pretrained("stopwords_en", "en") \
    .setInputCols(["token"]) \
    .setOutputCol("cleanTokens")

normalizer = Normalizer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("normalized") \
    .setLowercase(True)

lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["normalized"]) \
    .setOutputCol("lemma")

finisher = Finisher() \
    .setInputCols(["lemma"]) \
    .setIncludeMetadata(True)

pipeline = Pipeline().setStages([
    documentAssembler,
    sentenceDetector,
    regexTokenizer,
    stop_words,
    normalizer,
    lemmatizer,
    finisher 
])

In [0]:
tokenized_df = pipeline.fit(filteredDf).transform(filteredDf)
tokenized_df.printSchema()

In [0]:
# Documentation - https://radimrehurek.com/gensim/auto_examples/index.html#core-tutorials-new-users-start-here
# Helpful sections - Corpora and Vector Spaces, Topics and Transformations

from gensim import corpora, models

# An array of arrays with each sub-array representing the list of lemma 
# found in a single document
texts = [row['finished_lemma'] for row in tokenized_df.take(3000)]

# Storing all words in a dictionary that will be used to assign the same unique
# token id to a word across all documents 
dct = corpora.Dictionary(texts)

# Stores the frequencies of all words occuring in a text as a sparse vector
# Ex. if a word 'libel' occurs 5 times in document 1 and is assigned an unique id of
# 23, corpus[0] will contain (23, 5). Use dct.token2id to see what id is assigned to a word
corpus = [dct.doc2bow(text) for text in texts]

tfidf = models.TfidfModel(corpus) # initialize a model

corpus_tfidf = tfidf[corpus]

lsi_model = models.LsiModel(corpus_tfidf, id2word= dct, num_topics=300)  # initialize an LSI transformation
corpus_lsi = lsi_model[corpus_tfidf]  # create a double wrapper over the original corpus: bow->tfidf->fold-in-lsi

topics = lsi_model.print_topics(300)

In [0]:
# Iterating over list of correlation between each documents and all topics
# Each doc in this format [(0, 0.06600783396090518), (1, -0.520070330636184)]
documents = []
for doc in corpus_lsi:
  topicMatrix = []
  for topic in doc:
    topicMatrix.append((topics[topic[0]], topic[1] if topic[1] > 0 else -topic[1]))
    
  topicMatrix.sort(key = lambda x: x[1], reverse=True) # sort in descending order to get more related documents in front of list
  
  documents.append(topicMatrix)

In [0]:
from pprint import pprint

documentIdx = 0
numTopics = 5

pprint(documents[documentIdx][:numTopics]) # Prints 5 most related topics for that document