<a href="https://colab.research.google.com/github/emiliamusso/pyspark/blob/main/NLP_in_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Ejercicio de NLP en Spark

El objetivo de este ejercicio es generar un modelo que identifique el sentimiento de un Tweet usando tecnicas de procesamiento de lenguaje natural (NLP).

Para esto usaremos el dataset en el archivo `Tweets.csv`que pueden encontrarlo en https://storage.googleapis.com/humai-datasets/datasets/Tweets.csv

Se tomaran como variables las siguientes columnas:
`text`: Texto del tweet.

Con esto se buscara predecir el valor de `airline_sentiment` que el sentimiento del tweet. Puede ser positivo, negativo o neutro. Esta

Recuerden que para evaluar la performance del modelo es necesario dividir el dataset en train y test y computar la metrica de performance `Accuracy` sobre el dataset de test.

Para llevar a cabo estas tareas recomendamos investigar los siguientes conceptos y ver que funciones nos ofrece MLlib para implementarlos:
* [Tokenization](https://en.wikipedia.org/wiki/Lexical_analysis#Tokenization)
* [Stop Word](https://en.wikipedia.org/wiki/Stop_word) removal
* [tf–idf](https://en.wikipedia.org/wiki/Tf%E2%80%93idf)
* [Feature hashing](https://en.wikipedia.org/wiki/Feature_hashing)


In [None]:
import pandas as pd

df = pd.read_csv("https://storage.googleapis.com/humai-datasets/datasets/Tweets.csv")

In [None]:
df

Unnamed: 0,tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
0,570306133677760513,neutral,1.0000,,,Virgin America,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)
1,570301130888122368,positive,0.3486,,0.0000,Virgin America,,jnardino,,0,@VirginAmerica plus you've added commercials t...,,2015-02-24 11:15:59 -0800,,Pacific Time (US & Canada)
2,570301083672813571,neutral,0.6837,,,Virgin America,,yvonnalynn,,0,@VirginAmerica I didn't today... Must mean I n...,,2015-02-24 11:15:48 -0800,Lets Play,Central Time (US & Canada)
3,570301031407624196,negative,1.0000,Bad Flight,0.7033,Virgin America,,jnardino,,0,@VirginAmerica it's really aggressive to blast...,,2015-02-24 11:15:36 -0800,,Pacific Time (US & Canada)
4,570300817074462722,negative,1.0000,Can't Tell,1.0000,Virgin America,,jnardino,,0,@VirginAmerica and it's a really big bad thing...,,2015-02-24 11:14:45 -0800,,Pacific Time (US & Canada)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
14635,569587686496825344,positive,0.3487,,0.0000,American,,KristenReenders,,0,@AmericanAir thank you we got on a different f...,,2015-02-22 12:01:01 -0800,,
14636,569587371693355008,negative,1.0000,Customer Service Issue,1.0000,American,,itsropes,,0,@AmericanAir leaving over 20 minutes Late Flig...,,2015-02-22 11:59:46 -0800,Texas,
14637,569587242672398336,neutral,1.0000,,,American,,sanyabun,,0,@AmericanAir Please bring American Airlines to...,,2015-02-22 11:59:15 -0800,"Nigeria,lagos",
14638,569587188687634433,negative,1.0000,Customer Service Issue,0.6659,American,,SraJackson,,0,"@AmericanAir you have my money, you change my ...",,2015-02-22 11:59:02 -0800,New Jersey,Eastern Time (US & Canada)


In [None]:
df.columns

Index(['tweet_id', 'airline_sentiment', 'airline_sentiment_confidence',
       'negativereason', 'negativereason_confidence', 'airline',
       'airline_sentiment_gold', 'name', 'negativereason_gold',
       'retweet_count', 'text', 'tweet_coord', 'tweet_created',
       'tweet_location', 'user_timezone'],
      dtype='object')

In [None]:
!pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

sc = spark.sparkContext

In [None]:
# Cargamos los datos
raw_df = spark.createDataFrame(pd.read_csv(
    'https://storage.googleapis.com/humai-datasets/datasets/Tweets.csv',
)[['airline_sentiment', 'text']])

df_cleaned = raw_df.na.drop(subset=["text"])

In [None]:
df_cleaned.show()

+-----------------+--------------------+
|airline_sentiment|                text|
+-----------------+--------------------+
|          neutral|@VirginAmerica Wh...|
|         positive|@VirginAmerica pl...|
|          neutral|@VirginAmerica I ...|
|         negative|@VirginAmerica it...|
|         negative|@VirginAmerica an...|
|         negative|@VirginAmerica se...|
|         positive|@VirginAmerica ye...|
|          neutral|@VirginAmerica Re...|
|         positive|@virginamerica We...|
|         positive|@VirginAmerica it...|
|          neutral|@VirginAmerica di...|
|         positive|@VirginAmerica I ...|
|         positive|@VirginAmerica Th...|
|         positive|@VirginAmerica @v...|
|         positive|@VirginAmerica Th...|
|         negative|@VirginAmerica SF...|
|         positive|@VirginAmerica So...|
|         negative|@VirginAmerica  I...|
|         positive|I ❤️ flying @Virg...|
|         positive|@VirginAmerica yo...|
+-----------------+--------------------+
only showing top

In [None]:

#AGREGO ESTO PARA VER SI PUEDO SOLUCIONAR EL PROBLEMA DE QUE EL MODELO NO SOPORTA STRING
from pyspark.ml.feature import StringIndexer
label_indexer = StringIndexer(inputCol='text', outputCol='textr').fit(df_cleaned)
transformed_data = label_indexer.transform(df_cleaned)

In [None]:
transformed_data.show()

+-----------------+--------------------+-------+
|airline_sentiment|                text|  textr|
+-----------------+--------------------+-------+
|          neutral|@VirginAmerica Wh...|10022.0|
|         positive|@VirginAmerica pl...|10173.0|
|          neutral|@VirginAmerica I ...| 9922.0|
|         negative|@VirginAmerica it...|10135.0|
|         negative|@VirginAmerica an...|10050.0|
|         negative|@VirginAmerica se...|10182.0|
|         positive|@VirginAmerica ye...|10254.0|
|          neutral|@VirginAmerica Re...| 9994.0|
|         positive|@virginamerica We...|14071.0|
|         positive|@VirginAmerica it...|10134.0|
|          neutral|@VirginAmerica di...|10081.0|
|         positive|@VirginAmerica I ...| 9912.0|
|         positive|@VirginAmerica Th...|10012.0|
|         positive|@VirginAmerica @v...| 9856.0|
|         positive|@VirginAmerica Th...|  174.0|
|         negative|@VirginAmerica SF...| 9997.0|
|         positive|@VirginAmerica So...|10000.0|
|         negative|@

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier

tokenizer = Tokenizer(inputCol='text', outputCol='words')
remover = StopWordsRemover(inputCol='words', outputCol='words_filtered')
hashingTF = HashingTF(inputCol='words_filtered', outputCol='rawFeatures', numFeatures=20)

idf = IDF(inputCol="rawFeatures", outputCol="features")

In [None]:
label_indexer = StringIndexer(inputCol='airline_sentiment', outputCol='airline_sentiment_label')

In [None]:
#ENTRENAMIENTO DEL MODELO
from pyspark.ml.classification import DecisionTreeClassifier

# Convertimos las feature textuales en indices.
dt = DecisionTreeClassifier(labelCol="airline_sentiment_label", featuresCol="features")

# Creamos las mismas transformacion de los pasos anteriores de antemano.
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, label_indexer, dt])


# Generamos un split de datos al comienzo.
(train_data, test_data) = transformed_data.randomSplit([0.7, 0.3])


#fiteamos todos los pasos del pipline
model = pipeline.fit(train_data)

In [None]:
label_map = dict(list(enumerate(model.stages[4].labels)))
print(f'level de cada sentimiento:{label_map}')

level de cada sentimiento:{0: 'negative', 1: 'neutral', 2: 'positive'}


In [None]:
# Generamos las predicciones del modelo.
predictions = model.transform(test_data)

In [None]:
predictions.select("prediction").show()

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       1.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+
only showing top 20 rows



In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="airline_sentiment_label",
    predictionCol="prediction",
    metricName="precisionByLevel"
)

for label_index, label_name in label_map.items():
  score = evaluator.evaluate(
      predictions, {
          evaluator.metricName: "precisionByLabel",
          evaluator.metricLabel: float(label_index),
      }
  )
  print(f'score para la clase {label_name} = {round(score, 3)}')

score para la clase negative = 0.66
score para la clase neutral = 0.343
score para la clase positive = 0.0


In [None]:
print("Labels reales (ground truth)")
print(predictions.toPandas()['airline_sentiment_label'].value_counts())
print("")

print("Labels predichas")
print(predictions.toPandas()['prediction'].value_counts())
print("")

Labels reales (ground truth)
0.0    2768
1.0     937
2.0     679
Name: airline_sentiment_label, dtype: int64

Labels predichas
0.0    3967
1.0     417
Name: prediction, dtype: int64

