# Entrenamiento del modelo para Spark

In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.functions import col
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import udf, size, col
from pyspark.sql.types import (IntegerType, StringType, 
                               TimestampType, StructType,
                               StructField, ArrayType,
                               TimestampType)

from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StopWordsRemover, RegexTokenizer, \
        CountVectorizer, OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline


import os

conf = pyspark.SparkConf()
#conf.set("spark.executor.extraClassPath",  os.path.join(os.getcwd(), 'extras/sqlite-jdbc-3.34.0.jar'))
#conf.set("spark.driver.extraClassPath", os.path.join(os.getcwd(), 'extras/sqlite-jdbc-3.34.0.jar'))

sc = pyspark.SparkContext(appName="TwitterStreamApp", conf=conf)

spark = SparkSession(sc)


In [2]:
#url = 'jdbc:sqlite:' + os.path.join(os.getcwd(), 'database.sqlite')

# Importación de los datos de entrenamiento

schema = StructType([StructField("sentiment", StringType()),
                   StructField("id", StringType()),
                   StructField("date", StringType()),
                   StructField("flag", StringType()),
                   StructField("user", StringType()),
                   StructField("transtext", StringType())
                  ])



df = spark.read.format('csv').schema(schema).load('datasets/tweets_clean.csv')

df.show()

+---------+----------+--------------------+--------+---------------+--------------------+
|sentiment|        id|                date|    flag|           user|           transtext|
+---------+----------+--------------------+--------+---------------+--------------------+
|        0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|        0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|        0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|        0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|        0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|        0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|        0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|        0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|        0

In [3]:
df.groupBy("sentiment") \
    .count() \
    .orderBy(col("count").desc()) \
    .distinct() \
    .show()

+---------+------+
|sentiment| count|
+---------+------+
|        0|800000|
|        4|800000|
+---------+------+



In [4]:
# División de los datos para entrenamiento y tests

(trainingData, testData) = df.randomSplit([0.99, 0.01], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))


Training Dataset Count: 1584015
Test Dataset Count: 15985


In [5]:
# Creación de un PipelinedModel
my_stopwords = ["http","https","amp","rt","t","c","the","@"]
tokenizer = Tokenizer(inputCol="transtext", outputCol="words")
#stopwords = StopWordsRemover(inputCol="words", outputCol='tokens').setStopWords(my_stopwords)
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")

pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(trainingData)
train_df = pipelineFit.transform(trainingData)
val_df = pipelineFit.transform(testData)
train_df.show(5)


+---------+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+-----+
|sentiment|        id|                date|    flag|           user|           transtext|               words|                  tf|            features|label|
+---------+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+-----+
|        0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|[@switchfoot, htt...|(65536,[12429,164...|(65536,[12429,164...|  1.0|
|        0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|[is, upset, that,...|(65536,[1981,3085...|(65536,[1981,3085...|  1.0|
|        0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|[@kenichan, i, di...|(65536,[2888,3924...|(65536,[2888,3924...|  1.0|
|        0|1467811184|Mon Apr 06 22:19:...|NO_

In [6]:
# Regresión logística para casificar los tweets

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

In [7]:
predictions.groupBy("prediction") \
    .count() \
    .orderBy(col("count").desc()) \
    .distinct() \
    .show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 8149|
|       1.0| 7836|
+----------+-----+



In [8]:
# Evaluación de los resultados

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8545616226114139

In [None]:
# Backup de los modelos

pipelineFit.save('models/final_idf.model')
lrModel.save('models/final_lr.model')

In [None]:
predictions.show()