In [None]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
#configuración en google colab
#instalar java y spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz
!tar xf spark-3.0.2-bin-hadoop3.2.tgz
!pip install findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop3.2"
import findspark
findspark.init()

In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession

#forma 1 de crear la sesión y el contexto Spark:
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

#forma 2 de crear la sesión y el contexto Spark:
#sc = SparkContext.getOrCreate()
#spark=SparkSession.builder.appName('nlp').getOrCreate()

In [None]:
df=spark.createDataFrame([(1,'I really liked this movie'),
                         (2,'I would recommend this movie to my friends'),
                         (3,'movie was alright but acting was horrible'),
                         (4,'I am never watching that movie ever again')],
                        ['user_id','review'])

In [None]:
df.show(5,False)

In [None]:
# Tokenization

In [None]:
from pyspark.ml.feature import Tokenizer

In [None]:
tokenization=Tokenizer(inputCol='review',outputCol='tokens')

In [None]:
tokenized_df=tokenization.transform(df)

In [None]:
tokenized_df.show(4,False)

In [None]:
# stopwords removal 

In [None]:
from pyspark.ml.feature import StopWordsRemover

In [None]:
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')

In [None]:
refined_df=stopword_removal.transform(tokenized_df)

In [None]:
refined_df.select(['user_id','tokens','refined_tokens']).show(10,False)

In [None]:
# Count Vectorizer

In [None]:
from pyspark.ml.feature import CountVectorizer

In [None]:
count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')

In [None]:
cv_df=count_vec.fit(refined_df).transform(refined_df)

In [None]:
cv_df.select(['user_id','refined_tokens','features']).show(4,False)

In [None]:
count_vec.fit(refined_df).vocabulary

In [None]:
#Tf-idf

In [None]:
from pyspark.ml.feature import HashingTF,IDF

In [None]:
hashing_vec=HashingTF(inputCol='refined_tokens',outputCol='tf_features',numFeatures=100)

In [None]:
hashing_df=hashing_vec.transform(refined_df)

In [None]:
hashing_df.select(['user_id','refined_tokens','tf_features']).show(4,False)

In [None]:
tf_idf_vec=IDF(inputCol='tf_features',outputCol='tf_idf_features')

In [None]:
tf_idf_df=tf_idf_vec.fit(hashing_df).transform(hashing_df)

In [None]:
tf_idf_df.select(['user_id','tf_idf_features']).show(4,False)

In [None]:
# Classification 

In [None]:
text_df=spark.read.csv('/content/gdrive/My\ Drive/github/ari20202/datasets/movie_reviews.csv',inferSchema=True,header=True,sep=',')

In [None]:
text_df.printSchema()

In [None]:
text_df.count()

In [None]:
from pyspark.sql.functions import rand 

In [None]:
text_df.orderBy(rand()).show(10,False)

In [None]:
text_df=text_df.filter(((text_df.Sentiment =='1') | (text_df.Sentiment =='0')))

In [None]:
text_df.count()

In [None]:
text_df.groupBy('Sentiment').count().show()

In [None]:
text_df.printSchema()

In [None]:
text_df = text_df.withColumn("Label", text_df.Sentiment.cast('float')).drop('Sentiment')

In [None]:
text_df.orderBy(rand()).show(10,False)

In [None]:
text_df.groupBy('label').count().show()

In [None]:
# Add length to the dataframe
from pyspark.sql.functions import length

In [None]:
text_df=text_df.withColumn('length',length(text_df['Review']))

In [None]:
text_df.orderBy(rand()).show(10,False)

In [None]:
text_df.groupBy('Label').agg({'Length':'mean'}).show()

In [None]:
# Data Cleaning

In [None]:
tokenization=Tokenizer(inputCol='Review',outputCol='tokens')

In [None]:
tokenized_df=tokenization.transform(text_df)

In [None]:
tokenized_df.show()

In [None]:
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')

In [None]:
refined_text_df=stopword_removal.transform(tokenized_df)

In [None]:
refined_text_df.show()

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *

In [None]:
len_udf = udf(lambda s: len(s), IntegerType())

refined_text_df = refined_text_df.withColumn("token_count", len_udf(col('refined_tokens')))


In [None]:
refined_text_df.orderBy(rand()).show(10)

In [None]:
count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')

In [None]:
cv_text_df=count_vec.fit(refined_text_df).transform(refined_text_df)

In [None]:
cv_text_df.select(['refined_tokens','token_count','features','Label']).show(10)

In [None]:
#select data for building model
model_text_df=cv_text_df.select(['features','token_count','Label'])

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
df_assembler = VectorAssembler(inputCols=['features','token_count'],outputCol='features_vec')
model_text_df = df_assembler.transform(model_text_df)

In [None]:
model_text_df.printSchema()

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
#split the data 
training_df,test_df=model_text_df.randomSplit([0.75,0.25])

In [None]:
training_df.groupBy('Label').count().show()

In [None]:
test_df.groupBy('Label').count().show()

In [None]:
log_reg=LogisticRegression(featuresCol='features_vec',labelCol='Label').fit(training_df)

In [None]:
results=log_reg.evaluate(test_df).predictions

In [None]:
results.select(['Label','probability','prediction']).show()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [None]:
#confusion matrix
true_postives = results[(results.Label == 1) & (results.prediction == 1)].count()
true_negatives = results[(results.Label == 0) & (results.prediction == 0)].count()
false_positives = results[(results.Label == 0) & (results.prediction == 1)].count()
false_negatives = results[(results.Label == 1) & (results.prediction == 0)].count()

In [None]:
recall = float(true_postives)/(true_postives + false_negatives)
print(recall)

In [None]:
precision = float(true_postives) / (true_postives + false_positives)
print(precision)

In [None]:
accuracy=float((true_postives+true_negatives) /(results.count()))
print(accuracy)