In [1]:
#import libraries and define functions

from pyspark.sql import SparkSession
from pyspark.sql import functions as pyfunc
from pyspark.sql.functions import udf, col, lower ,regexp_replace
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import IDF as MLIDF
from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.conf import SparkConf

from nltk.stem.snowball import SnowballStemmer
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords

from yellowbrick.text import FreqDistVisualizer
from yellowbrick.datasets import load_hobbies
from PIL import Image
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
from joblib import Parallel
from multiprocessing import Pool
from dask import dataframe as ddf
from dask.delayed import delayed
import dask.bag as db

import os
import matplotlib.pyplot as plt
import pandas as pd
import time

sr = stopwords.words('english')
sp = stopwords.words('spanish')

stopwordsList = []
stopwordsList += sr
stopwordsList += sp

def cleanSentence(s):
    s = lower(s)
    #Retweet
    s = regexp_replace(s, "^rt ", "")
    #hyperlink
    s = regexp_replace(s, "(https?\://)\S+", "")
    #non-eng character
    s = regexp_replace(s, "[^a-zA-Z0-9\\s]", "")
    #hashtag
    s = regexp_replace(s, "/#\w+\s*/", "")
    #meaningless number
    s = regexp_replace(s, "[00-99]", "")
    return s

    
sc = SparkSession.builder.appName("WordCount").getOrCreate()
sqlContext=SQLContext(sparkContext=sc.sparkContext, sparkSession=sc)

print('---------------------------')
print(sc.sparkContext)

trumpRDD = sc.sparkContext.textFile('hashtag_donaldtrump.csv')
print('Reading hashtag_donaldtrump.csv')
print('type of data: ' , type(trumpRDD))
print('---------------------------')

---------------------------
<SparkContext master=local[*] appName=PySparkShell>
Reading hashtag_donaldtrump.csv
type of data:  <class 'pyspark.rdd.RDD'>
---------------------------


In [2]:
#handling Trump Data with PySpark

readStartTime = time.time()
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('hashtag_donaldtrump.csv', header=True)
print('PySpark Trump Reading Time: ', time.time() - readStartTime)
#df = sc.sparkContext.textFile('hashtag_donaldtrump.csv')
df = df.na.drop(subset=["tweet"])


#print(type(rdd))

from wordcloud import WordCloud

cleanDF = df.select(cleanSentence(col("tweet")).alias("words"))
tokenizer = Tokenizer(inputCol="words", outputCol="vector")
countDF = tokenizer.transform(cleanDF).select("vector")

remover = StopWordsRemover(inputCol="vector", outputCol="filtered", stopWords=stopwordsList)
noStopDF = remover.transform(countDF).select("filtered").where(col("filtered").isNotNull())

#cv = CountVectorizer(inputCol="filtered", outputCol="features", minDF=10.0)
#model = cv.fit(noStopDF)
#result = model.transform(noStopDF)

startTime = time.time()

#from pyspark.ml.feature import HashingTF, IDF

#ht = HashingTF(inputCol="filtered", outputCol="features")
#result = ht.transform(noStopDF)

#result.cache()
#idf = IDF().fit(result)
#tfidf = idf.transform(result)


noStopDFCount = noStopDF.select(pyfunc.explode("filtered").alias("word"))\
                .groupBy("word").count()\
                .filter("`word` != \'\'")\
                .sort(col("count").desc())


print('PySpark Trump Word Count: ' ,time.time()-startTime)

print('-----DataFrame Representation-----')
noStopDFCount.show(4)


PySpark Trump Reading Time:  5.507136106491089
+--------------------+--------------------+
|            filtered|            features|
+--------------------+--------------------+
|[elecciones, , fl...|(262144,[1303,304...|
|                  []|(262144,[249180],...|
|                  []|(262144,[249180],...|
|[usa, , trump, co...|(262144,[1512,595...|
|[trump, student, ...|(262144,[55307,66...|
|                  []|(262144,[249180],...|
|[, hours, since, ...|(262144,[329,5381...|
|[get, tie, get, t...|(262144,[46479,12...|
|[clady, , minutes...|(262144,[34366,36...|
|                  []|(262144,[249180],...|
+--------------------+--------------------+
only showing top 10 rows



Py4JJavaError: An error occurred while calling o123.fit.
: java.util.NoSuchElementException: Failed to find a default value for inputCol
	at org.apache.spark.ml.param.Params.$anonfun$getOrDefault$2(params.scala:756)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.ml.param.Params.getOrDefault(params.scala:756)
	at org.apache.spark.ml.param.Params.getOrDefault$(params.scala:753)
	at org.apache.spark.ml.PipelineStage.getOrDefault(Pipeline.scala:41)
	at org.apache.spark.ml.param.Params.$(params.scala:762)
	at org.apache.spark.ml.param.Params.$$(params.scala:762)
	at org.apache.spark.ml.PipelineStage.$(Pipeline.scala:41)
	at org.apache.spark.ml.feature.IDFBase.validateAndTransformSchema(IDF.scala:60)
	at org.apache.spark.ml.feature.IDFBase.validateAndTransformSchema$(IDF.scala:59)
	at org.apache.spark.ml.feature.IDF.validateAndTransformSchema(IDF.scala:69)
	at org.apache.spark.ml.feature.IDF.transformSchema(IDF.scala:99)
	at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
	at org.apache.spark.ml.feature.IDF.fit(IDF.scala:89)
	at org.apache.spark.ml.feature.IDF.fit(IDF.scala:69)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [None]:
#handling Biden Data with PySpark

startTime = time.time()
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('hashtag_joebiden.csv')
print('PySpark Biden Reading time: ', time.time() - startTime)
df = df.na.drop(subset=["tweet"])



cleanDF = df.select(cleanSentence(col("tweet")).alias("words"))
tokenizer = Tokenizer(inputCol="words", outputCol="vector")
countDF = tokenizer.transform(cleanDF).select("vector")

remover = StopWordsRemover(inputCol="vector", outputCol="filtered", stopWords=stopwordsList)
noStopDF = remover.transform(countDF).select("filtered").where(col("filtered").isNotNull())

#startTime = time.time()
#cv = CountVectorizer(inputCol="filtered", outputCol="features", minDF=10.0)
#model = cv.fit(noStopDF)
#result = model.transform(noStopDF)
#print( time.time()-startTime)

startTime = time.time()
noStopDFCount = noStopDF.select(pyfunc.explode("filtered").alias("word"))\
                .groupBy("word").count()\
                .filter("`word` != \'\'")\
                .sort(col("count").desc())
print('PySpark Biden WordCount Time: ' ,time.time()-startTime)
noStopDFCount.show()

In [None]:

from sklearn.feature_extraction.text import CountVectorizer as SKCountVectorizer

startTime = time.time()
df = pd.read_csv('hashtag_donaldtrump.csv', lineterminator='\n')
print('Pandas Trump Reading Time: ', time.time()-startTime)

startTime = time.time()

dfCount = df.tweet.str.split(expand=True).stack().value_counts()


print('Pandas Trump WordCount Time: ', time.time()-startTime)

startTime = time.time()
vectorizer = SKCountVectorizer(max_features=5000)
docs = vectorizer.fit_transform(df['tweet'])


print(time.time() - startTime)
print('end')
#feature_names = vectorizer.get_feature_names()
#visualizer = FreqDistVisualizer(features=feature_names, orient='v')
#visualizer.fit(docs.toarray())


In [None]:
from sklearn.feature_extraction.text import CountVectorizer as SKCountVectorizer

startTime = time.time()
df = pd.read_csv('hashtag_joebiden.csv', lineterminator='\n')
print('Pandas Biden Reading Time: ', time.time()-startTime)

startTime = time.time()
dfCount = df.tweet.str.split(expand=True).stack().value_counts()
print('Pandas Biden WordCount Time: ', time.time()-startTime)

vectorizer = SKCountVectorizer(max_features=5000)
docs = vectorizer.fit_transform(df['tweet'])

print('end')
#feature_names = vectorizer.get_feature_names()
#visualizer = FreqDistVisualizer(features=feature_names, orient='v')
#visualizer.fit(docs.toarray())