In [None]:
#import os
# make sure pyspark tells workers to use python3 not 2 if both are installed
#os.environ['PYSPARK_PYTHON'] = '/home/ubuntu/anaconda3/bin/python3'
#os.environ['PYSPARK_DRIVER_PYTHON'] = '/home/ubuntu/anaconda3/bin/ipython'
#!pip install metapy

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import pyspark
from pyspark.sql import SQLContext
import metapy
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyspark.sql.functions as F

In [None]:
#Al ejecutar esta celda se demora un poco, así que un poco de paciencia
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local', "news_sentiment_analysis") 
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

In [None]:
dfNews=spark.read.csv("file:///home/ubuntu/Caso_estudio/datasets/mini_news.csv", inferSchema=True, header=True, encoding="UTF-8")
dfNews.show(10)


# Sentimientos

In [None]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer 
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.types import StructType,ArrayType,StringType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer

In [None]:
dat=dfNews.toPandas()

### Limpieza de datos
Se elimina del titulo la publicación, esto ayuda a mejorar la clasifiación de los sentimientos de la noticia

In [None]:
#dat=dat[dat['title'].fillna(0, inplace=True)]
zipped = zip(dat['title'], dat['content'])
title=[]
content=[]
sep = ' - '
for i,j in zipped:
    if pd.notna(i):
        tit=i
        p = tit.split(sep, 1)[0]
        content.append(j)
        title.append(p)
    else:
        title.append(0)

### Identificación del sentimiento
0: negativo,
1: neutro,
2: positivo

In [None]:
target=[]
sid = SentimentIntensityAnalyzer()
for t in title:
    comp=sid.polarity_scores(t)['compound'] # Score de sentimiento
    #print(comp)
    if comp<0:  # Negativo
        lb=0
    elif comp==0:
        lb=1       # Neutro
    else:
        lb=2       # Positivo
    target.append(lb)

In [None]:
d = {'title':title,'content':content,'target':target}
dfsent = pd.DataFrame(d)

In [None]:
dfsent.head(5)

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
spark_dfsent = sqlContext.createDataFrame(dfsent)

In [None]:
spark_dfsent.show(5)

In [None]:
df_clean=spark_dfsent.select(lower(regexp_replace(spark_dfsent.content, "[^a-zA-Z\\s]", "")).alias('text'),spark_dfsent.target)
df_clean.show(5)

In [None]:
tokenizer = Tokenizer(inputCol='text', outputCol='words_token')
df_words_token = tokenizer.transform(df_clean).select('text', 'words_token','target')
df_words_token.show(5)

In [None]:
locale = sc._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df_words_no_stopw = remover.transform(df_words_token).select('words_token', 'words_clean','target')
df_words_no_stopw.show(5)

# Modelos

### Datos

In [None]:
train, test = df_clean.randomSplit([0.7, 0.3], seed = 2019)
print("Training Dataset: " + str(train.count()))
print("Test Dataset: " + str(test.count()))

### Preparación de datos con Pipeline

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train)
train_df = pipelineFit.transform(train)
val_df = pipelineFit.transform(test)

In [None]:
train_df.show(10)

## RadomForest

In [None]:
from pyspark.ml.classification import RandomForestClassifier

pipelineFit = pipeline.fit(train)
train_df = pipelineFit.transform(train)
test_df = pipelineFit.transform(test)

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train_df)
predictions_Forest = rfModel.transform(test_df)


In [None]:
predictions_Forest.select('label', 'rawPrediction', 'prediction', 'probability').show(20)

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions_Forest, {evaluator.metricName: "areaUnderROC"})))

### K-folds

In [None]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())


In [None]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(train_df)

In [None]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(test_df)

In [None]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [None]:
# View Best model's predictions and probabilities of each prediction class
#selected = predictions.select("label", "prediction", "probability", "age", "occupation")
#display(selected)

## Multinomial

In [None]:
from pyspark.ml.classification import LogisticRegression
mlr = LogisticRegression(featuresCol = 'features', labelCol = 'label',maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
mlrModel = mlr.fit(train_df)
mlpredictions = mlrModel.transform(test_df)
mlpredictions.select('target','label', 'rawPrediction', 'prediction', 'probability').show(10)


In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(mlpredictions, {evaluator.metricName: "areaUnderROC"})))