In [1]:
import findspark
findspark.init()

In [2]:
# Set up SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark-nlp") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5") \
    .getOrCreate()

In [3]:
# Check the SparkSession
spark

In [25]:
sc = spark.sparkContext
#data = sc.wholeTextFiles("s3a://zihe-public/articles/AA/wiki_00")
data = sc.wholeTextFiles("s3a://zihe-public/articles/*")

In [26]:
import re
def get_id(x):
    pattern = "\<doc\sid\=\"(\d+)\"(.*)title\=\"(.*)\"\>"
    pattern_re = re.compile(pattern)
    matches = pattern_re.search(x)
    if not matches:
        return ""
    return matches[1]

def get_title(x):
    pattern = "\<doc\sid\=\"(\d+)\"(.*)title\=\"(.*)\"\>"
    pattern_re = re.compile(pattern)
    matches = pattern_re.search(x)
    if not matches:
        return ""
    return matches[3]

def get_content(x):
    pattern = "\<doc\sid\=\"(\d+)\"(.*)title\=\"(.*)\"\>\\n(.*?)\\n{2}"
    pattern_re = re.compile(pattern)
    matches = pattern_re.search(x)
    if not matches:
        return ""
    idx = matches.end(0)
    return x[idx:]

In [10]:
text = ""
get_content(text)

''

In [32]:
# Map each page to id, title and content 
pages = data.flatMap(lambda x: (x[1].split('</doc>'))).map(lambda x : (get_id(x), get_title(x), get_content(x)))

In [33]:
type(pages)

pyspark.rdd.PipelinedRDD

In [34]:
pages.count()

6075800

In [48]:
pages.take(2)[0][1]

'Anarchism'

In [44]:
test = pages.take(2)

In [47]:
titles = pages.map(lambda s: s[1])
titles.take(2)

['Anarchism', 'Autism']

In [51]:
import nltk 
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /home/hadoop/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [52]:
# Topic Modling 
import pandas as pd
import pyspark
from pyspark.sql import SQLContext
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF
# stuff we'll need for building the model

from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel

In [69]:
import nltk
# get the list of stopwords from nltk
from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')
eng_stopwords.append('xxxx')
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer, 
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline

documentAssembler = DocumentAssembler() \
    .setInputCol('title') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

# note normalizer defaults to changing all words to lowercase.
# Use .setLowercase(False) to maintain input case.
normalizer = Normalizer() \
    .setInputCols(['token']) \
    .setOutputCol('normalized') \
    .setLowercase(True)

# note that lemmatizer needs a dictionary. So I used the pre-trained
# model (note that it defaults to english)
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(['normalized']) \
    .setOutputCol('lemma') \

stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(['lemma']) \
    .setOutputCol('clean_lemma') \
    .setCaseSensitive(False) \
    .setStopWords(eng_stopwords)

# finisher converts tokens to human-readable output
finisher = Finisher() \
    .setInputCols(['clean_lemma']) \
    .setCleanAnnotations(False)

pipeline = Pipeline() \
    .setStages([
        documentAssembler,
        tokenizer,
        normalizer,
        lemmatizer,
        stopwords_cleaner,
        finisher
    ])

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [77]:
#pages = pages.toDF(["id","title","text"])
# transform text with the pipeline
cleaned = pipeline.fit(pages).transform(pages)

In [78]:
cleaned = cleaned.selectExpr("id", "finished_clean_lemma")

In [62]:
type(cleaned)

pyspark.sql.dataframe.DataFrame

In [66]:
cleaned.take(10)

[Row(id='12', features=['anarchism']),
 Row(id='25', features=['autism']),
 Row(id='39', features=['albedo']),
 Row(id='290', features=[]),
 Row(id='303', features=['alabama']),
 Row(id='305', features=['achilles']),
 Row(id='307', features=['abraham', 'lincoln']),
 Row(id='308', features=['aristotle']),
 Row(id='309', features=['american', 'paris']),
 Row(id='316', features=['academy', 'award', 'good', 'production', 'design'])]

In [79]:
from pyspark.ml.feature import CountVectorizer
# create a vector of words that at least appeared in two different tweets, and set maximum vocab size to 20000.
vectorizer = CountVectorizer().setInputCol("finished_clean_lemma").setOutputCol("features").setVocabSize(20000).setMinDF(2).fit(
    cleaned)
wordVectors = vectorizer.transform(cleaned).select("id", "features")

In [None]:
wordVectors.take(10)

In [83]:
# LDA
from pyspark.ml.clustering import LDA
# create Latent Dirichlet Allocation model and run it on our data with 25 iteration and 5 topics
lda = LDA(k=10, maxIter=25)
# fit the model on data
ldaModel = lda.fit(wordVectors)
# create topics based on LDA
lda_topics = ldaModel.describeTopics()
# show LDA topics

In [88]:
lda_topics.show(11, False)

+-----+----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topic|termIndices                                   |termWeights                                                                                                                                                                                                                 |
+-----+----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |[0, 33, 6, 59, 70, 117, 115, 104, 119, 135]   |[0.02243211522673124, 0.020238984635445562, 0.015749959537255054, 0.01454214853957949, 0.010277370243841063, 0.0089475

In [84]:
# similarity 

rawRDD = data.flatMap(lambda x: (x[1].split('</doc>'))).map(lambda x : (get_title(x), get_content(x)))
rawRDD2 = rawRDD.map(lambda line:(line[0],list(set(line[1].split(" ")))))

wordIDs = rawRDD2.flatMap(lambda x:x[1]).distinct().zipWithIndex().collectAsMap()
wordIDs = sc.broadcast(wordIDs)

## Experiment Spark Gensim
def parseCorpse(line):
    A = [(wordIDs.value[el],1) for el in line[1]]
    return A

corpse = rawRDD2.map(parseCorpse).collect()


from gensim import corpora,models,similarities

dictionary = corpora.Dictionary(rawRDD2.map(lambda x:x[1]).collect())


keyword = "human behavior"
tfidf = models.TfidfModel(corpse) 
feature_count = len(dictionary.token2id)
kw_vector = dictionary.doc2bow(keyword.split())
index = similarities.SparseMatrixSimilarity(tfidf[corpse], num_features = feature_count)
sim = index[tfidf[kw_vector]]



for i in range(len(sim)):
    if i < 20:
        print('keyword is similar to text%d: %.4f' % (i + 1, sim[i]))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 40 tasks (1027.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
titles = wiki.flatMap(lambda x :(x[1].split('</doc>'))).map(lambda x : get_title(x))
titles2 = titles.collect()

In [None]:
import pandas as pd

l = [range(len(titles2)),titles2, sim]
lt = list(map(list, zip(*l)))
pdf = pd.DataFrame(lt, columns = ['id','title','similarity'])
pdf.head()

In [28]:
pages = pages.toDF(["id","title","text"])

In [21]:
article = pages[pages['title'] == "Autism"]
article.show()

+---+------+--------------------+
| id| title|                text|
+---+------+--------------------+
| 25|Autism|Autism is a devel...|
+---+------+--------------------+



In [8]:
pages.show()

+---+--------------------+--------------------+
| id|               title|                text|
+---+--------------------+--------------------+
| 12|           Anarchism|Anarchism is a po...|
| 25|              Autism|Autism is a devel...|
| 39|              Albedo|Albedo () (, mean...|
|290|                   A|A or a is the fir...|
|303|             Alabama|Alabama () is a s...|
|305|            Achilles|In Greek mytholog...|
|307|     Abraham Lincoln|Abraham Lincoln (...|
|308|           Aristotle|Aristotle (; "Ari...|
|309|An American in Paris|An American in Pa...|
|316|Academy Award for...|The Academy Award...|
|324|      Academy Awards|The Academy Award...|
|330|             Actrius|Actresses (Catala...|
|332|     Animalia (book)|Animalia is an il...|
|334|International Ato...|International Ato...|
|336|            Altruism|Altruism is the p...|
|339|            Ayn Rand|Ayn Rand (; born ...|
|340|        Alain Connes|Alain Connes (; b...|
|344|          Allan Dwan|Allan Dwan (3 

In [9]:
type(pages)

pyspark.sql.dataframe.DataFrame

In [29]:
# print((pages.count(), len(pages.columns)))

KeyboardInterrupt: 