In [68]:
# verificar que tengan instalado la librería 'pyspark', no requerido en AWS EMR/Spark
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [69]:
# directorios (path) de entrada y salida:
# 
path_in="../datasets/"
path_out="../out/"
filenametxt_in='in.txt'
filenametxt_out='out.txt'
filenamecsv_in='in.csv'
filenamecsv_out='out.csv'

In [70]:
#create spark session, no requerido en AWS EMR/Spark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('nlp').getOrCreate()

In [71]:
df=spark.createDataFrame([(1,'I really liked this movie'),
                         (2,'I would recommend this movie to my friends'),
                         (3,'movie was alright but acting was horrible'),
                         (4,'I am never watching that movie ever again')],
                        ['user_id','review'])

In [72]:
df.show(5,False)

+-------+------------------------------------------+
|user_id|review                                    |
+-------+------------------------------------------+
|1      |I really liked this movie                 |
|2      |I would recommend this movie to my friends|
|3      |movie was alright but acting was horrible |
|4      |I am never watching that movie ever again |
+-------+------------------------------------------+



In [73]:
# Tokenization

In [74]:
from pyspark.ml.feature import Tokenizer

In [75]:
tokenization=Tokenizer(inputCol='review',outputCol='tokens')

In [76]:
tokenized_df=tokenization.transform(df)

In [77]:
tokenized_df.show(4,False)

+-------+------------------------------------------+---------------------------------------------------+
|user_id|review                                    |tokens                                             |
+-------+------------------------------------------+---------------------------------------------------+
|1      |I really liked this movie                 |[i, really, liked, this, movie]                    |
|2      |I would recommend this movie to my friends|[i, would, recommend, this, movie, to, my, friends]|
|3      |movie was alright but acting was horrible |[movie, was, alright, but, acting, was, horrible]  |
|4      |I am never watching that movie ever again |[i, am, never, watching, that, movie, ever, again] |
+-------+------------------------------------------+---------------------------------------------------+



In [78]:
# stopwords removal 

In [79]:
from pyspark.ml.feature import StopWordsRemover

In [80]:
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')

23/09/04 21:09:29 WARN StopWordsRemover: Default locale set was [en_EC]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.


In [81]:
refined_df=stopword_removal.transform(tokenized_df)

In [82]:
refined_df.select(['user_id','tokens','refined_tokens']).show(10,False)

+-------+---------------------------------------------------+----------------------------------+
|user_id|tokens                                             |refined_tokens                    |
+-------+---------------------------------------------------+----------------------------------+
|1      |[i, really, liked, this, movie]                    |[really, liked, movie]            |
|2      |[i, would, recommend, this, movie, to, my, friends]|[recommend, movie, friends]       |
|3      |[movie, was, alright, but, acting, was, horrible]  |[movie, alright, acting, horrible]|
|4      |[i, am, never, watching, that, movie, ever, again] |[never, watching, movie, ever]    |
+-------+---------------------------------------------------+----------------------------------+



In [83]:
# Movies reviews 

In [84]:
text_df=spark.read.csv('../datasets/movie_reviews.csv',inferSchema=True,header=True,sep=',')
# datos desde S3
# text_df=spark.read.csv('s3://bucket_name/datasets/movie_reviews.csv',inferSchema=True,header=True,sep=',')

In [85]:
text_df.printSchema()

root
 |-- Review: string (nullable = true)
 |-- Sentiment: string (nullable = true)



In [86]:
text_df.count()

7087

In [87]:
# Data Cleaning

In [88]:
tokenization=Tokenizer(inputCol='Review',outputCol='tokens')

In [89]:
tokenized_df=tokenization.transform(text_df)

In [90]:
tokenized_df.show()

+--------------------+---------+--------------------+
|              Review|Sentiment|              tokens|
+--------------------+---------+--------------------+
|The Da Vinci Code...|        1|[the, da, vinci, ...|
|this was the firs...|        1|[this, was, the, ...|
|i liked the Da Vi...|        1|[i, liked, the, d...|
|i liked the Da Vi...|        1|[i, liked, the, d...|
|I liked the Da Vi...|        1|[i, liked, the, d...|
|that's not even a...|        1|[that's, not, eve...|
|I loved the Da Vi...|        1|[i, loved, the, d...|
|i thought da vinc...|        1|[i, thought, da, ...|
|The Da Vinci Code...|        1|[the, da, vinci, ...|
|I thought the Da ...|        1|[i, thought, the,...|
|The Da Vinci Code...|        1|[the, da, vinci, ...|
|The Da Vinci Code...|        1|[the, da, vinci, ...|
|then I turn on th...|        1|[then, i, turn, o...|
|The Da Vinci Code...|        1|[the, da, vinci, ...|
|i love da vinci c...|        1|[i, love, da, vin...|
|i loved da vinci ...|      

In [91]:
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')

23/09/04 21:09:29 WARN StopWordsRemover: Default locale set was [en_EC]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.


In [92]:
refined_text_df=stopword_removal.transform(tokenized_df)

In [93]:
refined_text_df.show()

+--------------------+---------+--------------------+--------------------+
|              Review|Sentiment|              tokens|      refined_tokens|
+--------------------+---------+--------------------+--------------------+
|The Da Vinci Code...|        1|[the, da, vinci, ...|[da, vinci, code,...|
|this was the firs...|        1|[this, was, the, ...|[first, clive, cu...|
|i liked the Da Vi...|        1|[i, liked, the, d...|[liked, da, vinci...|
|i liked the Da Vi...|        1|[i, liked, the, d...|[liked, da, vinci...|
|I liked the Da Vi...|        1|[i, liked, the, d...|[liked, da, vinci...|
|that's not even a...|        1|[that's, not, eve...|[even, exaggerati...|
|I loved the Da Vi...|        1|[i, loved, the, d...|[loved, da, vinci...|
|i thought da vinc...|        1|[i, thought, da, ...|[thought, da, vin...|
|The Da Vinci Code...|        1|[the, da, vinci, ...|[da, vinci, code,...|
|I thought the Da ...|        1|[i, thought, the,...|[thought, da, vin...|
|The Da Vinci Code...|   

In [94]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import rand, col

In [95]:
len_udf = udf(lambda s: len(s), IntegerType())

refined_text_df = refined_text_df.withColumn("token_count", len_udf(col('refined_tokens')))


In [96]:
refined_text_df.orderBy(rand()).show(10)

+--------------------+---------+--------------------+--------------------+-----------+
|              Review|Sentiment|              tokens|      refined_tokens|token_count|
+--------------------+---------+--------------------+--------------------+-----------+
|The Da Vinci Code...|        1|[the, da, vinci, ...|[da, vinci, code,...|          4|
|Da Vinci Code = U...|        0|[da, vinci, code,...|[da, vinci, code,...|         15|
|I hate Harry Pott...|        0|[i, hate, harry, ...|[hate, harry, pot...|          7|
|Oh, and Brokeback...|        0|[oh,, and, brokeb...|[oh,, brokeback, ...|          5|
|Because I would l...|        1|[because, i, woul...|[like, make, frie...|          6|
|I hate Harry Pott...|        0|[i, hate, harry, ...|[hate, harry, pot...|          8|
|Ok brokeback moun...|        0|[ok, brokeback, m...|[ok, brokeback, m...|          5|
|Da Vinci Code = U...|        0|[da, vinci, code,...|[da, vinci, code,...|         15|
|gosh i miss telli...|        1|[gosh, i, m

In [97]:
refined_text_df.coalesce(1
              ).withColumn("tokens", col("tokens").cast("string")
              ).withColumn("refined_tokens", col("refined_tokens").cast("string")
              ).write.format("csv"
              ).option("header", "true"
              ).save(path_out+filenamecsv_out)