In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark-nlp") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5") \
    .getOrCreate()
sc = spark.sparkContext

In [2]:
import nltk
nltk.download('stopwords')
nltk.download('words')

[nltk_data] Downloading package stopwords to /home/hadoop/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package words to /home/hadoop/nltk_data...
[nltk_data]   Package words is already up-to-date!


True

In [3]:
# get the list of stopwords from nltk
from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')
eng_stopwords.append('rt')
eng_stopwords.append('qt')
eng_stopwords.append('&amp')
eng_stopwords.append('amp')
eng_stopwords.append('+')
eng_stopwords.append('w')

In [4]:
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer, 
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline

In [5]:
documentAssembler = DocumentAssembler() \
    .setInputCol('text_no_links') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

# note normalizer defaults to changing all words to lowercase.
# Use .setLowercase(False) to maintain input case.
normalizer = Normalizer() \
    .setInputCols(['token']) \
    .setOutputCol('normalized') \
    .setLowercase(True)

# note that lemmatizer needs a dictionary. So I used the pre-trained
# model (note that it defaults to english)
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(['normalized']) \
    .setOutputCol('lemma') \

stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(['lemma']) \
    .setOutputCol('clean_lemma') \
    .setCaseSensitive(False) \
    .setStopWords(eng_stopwords)

# finisher converts tokens to human-readable output
finisher = Finisher() \
    .setInputCols(['clean_lemma']) \
    .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [6]:
pipeline = Pipeline() \
    .setStages([
        documentAssembler,
        tokenizer,
        normalizer,
        lemmatizer,
        stopwords_cleaner,
        finisher
    ])

In [7]:
from pyspark.sql.functions import regexp_replace, monotonically_increasing_id, col, when, arrays_zip, explode
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.ml.clustering import LDA, LDAModel

In [8]:
congDF = spark.read.csv("s3://502finalprojbucky/congresstweets/data/June2017.csv/*.part",header=True)

In [9]:
congDF = congDF.drop("_c0")
data = congDF.filter(congDF['text'].isNull()==False)

#data.select('text').show(1,False)

In [10]:
noLinks = data.withColumn('text_no_links',regexp_replace('text','http.*($|\s)',''))

#noLinks.show(1,False)

In [11]:
# transform text with the pipeline
congress = pipeline.fit(noLinks).transform(noLinks).withColumn('index',monotonically_increasing_id())
#congress.show(10,False)

In [12]:
data = congress.select('finished_clean_lemma').withColumn('index',monotonically_increasing_id())

In [13]:
#TF
cv = CountVectorizer(inputCol="finished_clean_lemma",outputCol="features",
                     vocabSize=3500,minDF = 8.5)

In [14]:
# Fit TF
cvmodel = cv.fit(data)

In [15]:
# Transform
result_cv = cvmodel.transform(data)

In [16]:
tokenIds = result_cv.select("features").rdd

In [17]:
# Get the counts out of here too
vocabKeys = tokenIds.map(lambda r: [r[0].indices.tolist()]).toDF(["termIdx"]).withColumn("idx",monotonically_increasing_id())

In [18]:
vocabArr = cvmodel.vocabulary

In [19]:
wordRDD = result_cv.select("finished_clean_lemma").rdd

In [20]:
words = wordRDD.map(lambda r: [[i for i in r[0] if i in vocabArr]]).toDF(["terms"]).withColumn("idx1",monotonically_increasing_id())

In [21]:
vocabMap = vocabKeys.join(words,vocabKeys.idx==words.idx1).drop("idx").drop("idx1")

In [22]:
num_topics=3
max_iter=100

In [23]:
lda_model = LDA(k=num_topics,maxIter=max_iter,optimizer='online').fit(result_cv.select("index","features"))

In [24]:
transform = lda_model.transform(result_cv)

In [25]:
def extractTopDist(row):
    return row.topicDistribution.toArray().tolist()
DF = transform.rdd.map(extractTopDist)

In [26]:
DF = spark.createDataFrame(DF,["Topic1","Topic2","Topic3"]).withColumn("index_1",monotonically_increasing_id())

In [27]:
CoTopDF = transform.join(DF,transform.index==DF.index_1,'inner')\
.select(["index","finished_clean_lemma","features","Topic1","Topic2","Topic3"])

In [28]:
CongressTopics = CoTopDF.rdd.map(lambda r: r.asDict())\
       .map(lambda r: Row(MaxTopic=[max([i for i in r.items() if i[0]\
                                        not in ["index","finished_clean_lemma","features"]], 
                                        key=lambda kv: kv[1])[0],
                                   max([i for i in r.items() if i[0]\
                                        not in ["index","finished_clean_lemma","features"]], 
                                      key=lambda kv: kv[1])[1]], **r) )\
       .toDF()

In [29]:
TopicsCongress = CongressTopics.withColumn("tweet_content",col("finished_clean_lemma"))\
.withColumn("Idx",col("index"))\
.drop("finished_clean_lemma").drop("index")\
.drop("Topic1").drop("Topic2").drop("Topic3")

In [30]:
TopicsCongress.show(20,False)

+-----------------------------+------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|MaxTopic                     |features                                                                                                                      |tweet_content                                                                                                                                                           |Idx|
+-----------------------------+------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|[To

In [31]:
cvRDD = result_cv.select("features","finished_clean_lemma").rdd

In [32]:
termIdxRDD = cvRDD.map(lambda r: Row(\
                                     Idx=[int(str(r[0].indices[i])) for i in range(len(r[0].indices))],
                                     Term=[r[1][i] for i in range(len(r[0].indices))]))

In [33]:
PairedRDD = termIdxRDD.toDF().withColumn("tmp",arrays_zip("Idx","Term")).withColumn("IdxPairs",explode("tmp")).select("IdxPairs")

In [34]:
PairedDF = PairedRDD.rdd.map(lambda r: Row(Index=r[0][0],
                              Term = r[0][1])).toDF()

In [35]:
#spark.stop()