In [107]:
import pyspark
import pandas as pd
import pyspark
from pyspark import SparkContext
#sc = SparkContext()

In [120]:
from pymongo import MongoClient, errors
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.classification import LogisticRegression
import pandas_profiling

In [121]:
while True:
    try:
        client = MongoClient("mongodb://bigdata-mongodb-04.virtual.uniandes.edu.co:8087/", retryWrites=False, serverSelectionTimeoutMS=10, connectTimeoutMS=20000)
        client.server_info() # force connection on a request as the
    except errors.ServerSelectionTimeoutError as err:
        print(err)
    finally:
        if client.server_info():            
            break

database = client["Grupo03"]
collection = database["COL_dataset"]
query = {}
query["emocion"] = {

    u"$ne": u""

}
query["$and"] = [

    {

        u"emocion": {

            u"$exists": True

        }

    }

]

cursor = collection.find(query)
data = []

for doc in cursor:
    try:
        data.append([doc['id'], doc['user'].lower(), doc['tweet'].lower(), doc['id_reply_or_quote'],
                     doc['user_replier'].lower(), doc['reply_or_quote'].lower(), doc['emocion'], 
                     doc['tendencia'], doc['coherencia']])
        
    except KeyError as e:        
        data.append([doc['id'], doc['user'].lower(), doc['tweet'].lower(), doc['id_reply_or_quote'],
                     doc['user_replier'].lower(), doc['reply_or_quote'].lower(), doc['emocion'], doc['tendencia'], ''])
        continue

    
client.close()
   
print(len(data))


721


In [122]:
df = pd.DataFrame(data,columns=['id', 'user', 'tweet', 'id_reply_or_quote', 'user_replier',
                                'reply_or_quote', 'emocion', 'tendencia', 'coherencia'])
df.head()
clean_df = df.drop(['id', 'user', 'tweet', 'id_reply_or_quote', 'user_replier',
        'tendencia', 'coherencia'], 1)
clean_df.head()

Unnamed: 0,reply_or_quote,emocion
0,@claudialopez @infopresidencia @bogota claudia...,3
1,@claudialopez @infopresidencia @bogota dios la...,1
2,@claudialopez @infopresidencia @bogota no clau...,3
3,@claudialopez @infopresidencia @bogota esas ba...,3
4,@claudialopez @infopresidencia @bogota ayuda!\...,2


In [123]:
y = clean_df['emocion']
X = clean_df.drop(columns=['emocion'], axis=1)


In [124]:
pandas_profiling.ProfileReport(clean_df)

Tab(children=(HTML(value='<div id="overview-content" class="row variable spacing">\n    <div class="row">\n   …



In [125]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

mySchema = StructType([ StructField("reply_or_quote", StringType(), True)\
                       ,StructField("emocion", IntegerType(), True)])

In [126]:
df = spark.createDataFrame(clean_df,schema=mySchema)

In [127]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

In [128]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="reply_or_quote", outputCol="words", pattern="\\W")

In [129]:
# bag of words count
countVectors = CountVectorizer(inputCol="words", outputCol="features", vocabSize=10000, minDF=5)

In [130]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "emocion", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
dataset.show(20)

+--------------------+-------+--------------------+--------------------+-----+
|      reply_or_quote|emocion|               words|            features|label|
+--------------------+-------+--------------------+--------------------+-----+
|@claudialopez @in...|      3|[claudialopez, in...|(413,[0,2,6,9,20,...|  0.0|
|@claudialopez @in...|      1|[claudialopez, in...|(413,[3,4,24,72,1...|  3.0|
|@claudialopez @in...|      3|[claudialopez, in...|(413,[1,4,5,6,8,9...|  0.0|
|@claudialopez @in...|      3|[claudialopez, in...|(413,[0,1,4,6,7,9...|  0.0|
|@claudialopez @in...|      2|[claudialopez, in...|(413,[24,31,35,36...|  4.0|
|@claudialopez los...|      0|[claudialopez, lo...|(413,[0,4,5,10,24...|  1.0|
|@claudialopez #in...|      2|[claudialopez, in...|(413,[0,4,5,6,7,9...|  4.0|
|@claudialopez y e...|      4|[claudialopez, y,...|(413,[4,5,24],[1....|  2.0|
|@claudialopez @bo...|      0|[claudialopez, bo...|(413,[5,8,12,16,2...|  1.0|
|@claudialopez muy...|      3|[claudialopez, mu...|(

In [131]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 513
Test Dataset Count: 208


### Logistic Regression using Count Vector Features

In [132]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.select("reply_or_quote","emocion","probability","label","prediction") \
    .show(n = 10, truncate = 30)

+------------------------------+-------+------------------------------+-----+----------+
|                reply_or_quote|emocion|                   probability|label|prediction|
+------------------------------+-------+------------------------------+-----+----------+
|@bluradioco ahí está pintad...|      3|[0.981413237043641,0.001486...|  0.0|       0.0|
|@bluradioco deben ir a luch...|      0|[0.301938811166103,0.431392...|  1.0|       1.0|
|@bluradioco jajajaja que ne...|      0|[0.4821385195332191,0.20609...|  1.0|       0.0|
|@bluradioco no será mejor q...|      3|[0.6813223137409922,0.02489...|  0.0|       0.0|
|@bluradioco soy programador...|      4|[0.3070022258133353,0.15729...|  2.0|       2.0|
|@bluradioco 😴😴😴😴...soña...|      3|[0.5700957777159859,0.11889...|  0.0|       0.0|
|@claudialopez #ingresosolid...|      2|[0.6406634774622589,0.03300...|  4.0|       0.0|
|@claudialopez @infopresiden...|      2|[0.44956851424285377,0.2514...|  4.0|       0.0|
|@claudialopez @infopresi

In [133]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.46849692346565486

### Logistic Regression using TF-IDF Features

In [134]:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.select("reply_or_quote","emocion","probability","label","prediction") \
    .show(n = 10, truncate = 30)

+------------------------------+-------+------------------------------+-----+----------+
|                reply_or_quote|emocion|                   probability|label|prediction|
+------------------------------+-------+------------------------------+-----+----------+
|@bluradioco ahí está pintad...|      3|[0.9738740618449296,0.00179...|  0.0|       0.0|
|@bluradioco deben ir a luch...|      0|[0.28400268665066625,0.4250...|  1.0|       1.0|
|@bluradioco jajajaja que ne...|      0|[0.5225992416620454,0.19014...|  1.0|       0.0|
|@bluradioco no será mejor q...|      3|[0.7850959001310596,0.01432...|  0.0|       0.0|
|@bluradioco soy programador...|      4|[0.2832846494805151,0.08087...|  2.0|       2.0|
|@bluradioco 😴😴😴😴...soña...|      3|[0.4998536812712038,0.15643...|  0.0|       0.0|
|@claudialopez #ingresosolid...|      2|[0.7140672229936277,0.03754...|  4.0|       0.0|
|@claudialopez @infopresiden...|      2|[0.4563980494350243,0.27192...|  4.0|       0.0|
|@claudialopez @infopresi

In [135]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.45225172596919805

### Cross Validation

In [136]:
pipeline = Pipeline(stages=[regexTokenizer, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.46849692346565486

### Naive Bayes

In [137]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.select("reply_or_quote","emocion","probability","label","prediction") \
    .show(n = 10, truncate = 30)

+------------------------------+-------+------------------------------+-----+----------+
|                reply_or_quote|emocion|                   probability|label|prediction|
+------------------------------+-------+------------------------------+-----+----------+
|@bluradioco ahí está pintad...|      3|[0.9999551453093772,1.32431...|  0.0|       0.0|
|@bluradioco deben ir a luch...|      0|[0.6075805747763934,0.18318...|  1.0|       0.0|
|@bluradioco jajajaja que ne...|      0|[0.657115505000501,0.084000...|  1.0|       0.0|
|@bluradioco no será mejor q...|      3|[0.9938092612442766,5.00574...|  0.0|       0.0|
|@bluradioco soy programador...|      4|[0.3902241687730766,0.01997...|  2.0|       2.0|
|@bluradioco 😴😴😴😴...soña...|      3|[0.7800847653448444,0.02052...|  0.0|       0.0|
|@claudialopez #ingresosolid...|      2|[0.02329271124543518,6.9134...|  4.0|       4.0|
|@claudialopez @infopresiden...|      2|[0.39830484123541954,0.2771...|  4.0|       0.0|
|@claudialopez @infopresi

In [138]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.47131565401689235

### Random Forest

In [139]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.select("reply_or_quote","emocion","probability","label","prediction") \
    .show(n = 10, truncate = 30)

+------------------------------+-------+------------------------------+-----+----------+
|                reply_or_quote|emocion|                   probability|label|prediction|
+------------------------------+-------+------------------------------+-----+----------+
|@bluradioco ahí está pintad...|      3|[0.5917470123122784,0.16490...|  0.0|       0.0|
|@bluradioco deben ir a luch...|      0|[0.5735002666769805,0.18079...|  1.0|       0.0|
|@bluradioco jajajaja que ne...|      0|[0.5610412374169625,0.17916...|  1.0|       0.0|
|@bluradioco no será mejor q...|      3|[0.5595072883479089,0.18078...|  0.0|       0.0|
|@bluradioco soy programador...|      4|[0.546845871057056,0.174658...|  2.0|       0.0|
|@bluradioco 😴😴😴😴...soña...|      3|[0.5610412374169625,0.17916...|  0.0|       0.0|
|@claudialopez #ingresosolid...|      2|[0.4989342178430342,0.17855...|  4.0|       0.0|
|@claudialopez @infopresiden...|      2|[0.5168220721495833,0.19583...|  4.0|       0.0|
|@claudialopez @infopresi

In [140]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.3174321633059497