In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.types import IntegerType


#import nltk
#from nltk.stem import PorterStemmer
#nltk.download('wordnet')
#from nltk.stem.snowball import SnowballStemmer
#from nltk.corpus import stopwords
#from nltk.stem import WordNetLemmatizer
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp

#import com.johnsnowlabs.nlp.base._
#import com.johnsnowlabs.nlp.annotator._

import sparknlp
from sparknlp.pretrained import PretrainedPipeline

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import *
from pyspark.sql.functions import rand,when

In [2]:
spark.version

In [3]:
spark=SparkSession.builder.appName('nlp').getOrCreate()

In [4]:
df1=spark.read.table('table1')
df3=spark.read.table('table3')
#df1.count()
#df3.count()
dfinal= df1.union(df3)
#dfinal.count()
#dfinal.show(5)

In [5]:
dpandas = dfinal.select('*').toPandas()

In [6]:
dpandas['content'] = dpandas['content'].str.replace('\W', ' ')
dpandas['title'] = dpandas['title'].str.replace('\W', ' ')
dpandas['content'] = dpandas['content'].str.replace(' +', ' ')

In [7]:
dfSpark = spark.createDataFrame(dpandas)
#dfSpark.show(5)

In [8]:
tokenization=Tokenizer(inputCol='content',outputCol='tokens')
tokenized_df=tokenization.transform(dfSpark)
tokenized_df.show(10)

In [9]:
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')

In [10]:
dfSpark=stopword_removal.transform(tokenized_df)

In [11]:
dfSpark.select(['id','tokens','refined_tokens']).show(10)

In [12]:
dfSpark.filter(((dfSpark.publication =='New York Times') | (dfSpark.publication =='Vox')))

In [13]:
display(dfSpark.groupBy('publication').count())
#display(dfSpark.groupBy('year').count())


publication,count
Business Insider,6757
Washington Post,11114
Reuters,10710
CNN,11488
New York Times,7803
Guardian,3808
Breitbart,23781
NPR,11992
Atlantic,171
Vox,4947


In [14]:
sparknlp.start()
pipeline = PretrainedPipeline('analyze_sentiment', 'en')

In [15]:
dfSpark = dfSpark.withColumn('Sentimiento', when(rand() > 0.5, 1).otherwise(0))

In [16]:
dfSpark = dfSpark.withColumn("label", dfSpark.Sentimiento.cast('float')).drop('Sentimiento')

In [17]:
dfSpark.orderBy(rand()).show(10)

In [18]:
length = dfSpark.count()
lista_sentimientos = []
#Como parametro de range solo seria necesario cambiarlo por la variable LENGTH para que evalue todo el dataset
for i in range(3):
  columna_content = dfSpark.select('content').collect()[i].__getitem__("content")
  pred = pipeline.annotate(columna_content)
  if pred['sentiment'][0] == 'positive':
    lista_sentimientos.append('1')
  elif pred['sentiment'][0] == 'negative':
    lista_sentimientos.append('0')
  #print(pred['sentiment'])
#print(lista_sentimientos)
spark.createDataFrame(lista_sentimientos, StringType()).show()
#print(dfSpark.show())

In [19]:
len_udf = udf(lambda s: len(s), IntegerType())
dfSpark = dfSpark.withColumn("token_count", len_udf(col('refined_tokens')))
dfSpark.orderBy(rand()).show(10)

In [20]:
from pyspark.ml.feature import CountVectorizer

In [21]:
count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')
dfSpark_V=count_vec.fit(dfSpark).transform(dfSpark)
dfSpark_V.select(['refined_tokens','token_count','features','Label']).show(10)

In [22]:
model_df=dfSpark_V.select(['features','token_count','Label'])

In [23]:
from pyspark.ml.feature import VectorAssembler

In [24]:
df_assembler = VectorAssembler(inputCols=['features','token_count'],outputCol='features_vec')
model_df = df_assembler.transform(model_df)

In [25]:

model_df.printSchema()

In [26]:
from pyspark.ml.classification import LogisticRegression

In [27]:
training_df,test_df=model_df.randomSplit([0.25,0.75])

In [28]:
training_df.groupBy('Label').count().show()

In [29]:
test_df.groupBy('Label').count().show()

In [30]:
log_reg=LogisticRegression(featuresCol='features_vec',labelCol='Label').fit(training_df)
results=log_reg.evaluate(test_df).predictions
results.show()

In [31]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [32]:
true_postives = results[(results.Label == 1) & (results.prediction == 1)].count()
true_negatives = results[(results.Label == 0) & (results.prediction == 0)].count()
false_positives = results[(results.Label == 0) & (results.prediction == 1)].count()
false_negatives = results[(results.Label == 1) & (results.prediction == 0)].count()

In [33]:
recall = float(true_postives)/(true_postives + false_negatives)
print(recall)

In [34]:
precision = float(true_postives) / (true_postives + false_positives)
print(precision)

In [35]:
accuracy=float((true_postives+true_negatives) /(results.count()))
print(accuracy)