In [1]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [3]:
from pyspark.sql import SparkSession

#forma 1 de crear la sesión y el contexto Spark:
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

#forma 2 de crear la sesión y el contexto Spark:
#sc = SparkContext.getOrCreate()
#spark=SparkSession.builder.appName('nlp').getOrCreate()

In [None]:
#myrdd = sc.wholeTextFiles('../datasets/papers_sample_pdf/*.txt')
#df = myrdd.toDF(schema=['filename','content'])
#df.show(5)

In [4]:
df=spark.createDataFrame([(1,'I really liked this movie'),
                         (2,'I would recommend this movie to my friends'),
                         (3,'movie was alright but acting was horrible'),
                         (4,'I am never watching that movie ever again')],
                        ['user_id','content'])

In [5]:
df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- content: string (nullable = true)



In [6]:
# Tokenization
from pyspark.ml.feature import Tokenizer
tokenization=Tokenizer(inputCol='content',outputCol='tokens')
tokenized_df=tokenization.transform(df)
tokenized_df.printSchema()
tokenized_df.show(5)


root
 |-- user_id: long (nullable = true)
 |-- content: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)

+-------+--------------------+--------------------+
|user_id|             content|              tokens|
+-------+--------------------+--------------------+
|      1|I really liked th...|[i, really, liked...|
|      2|I would recommend...|[i, would, recomm...|
|      3|movie was alright...|[movie, was, alri...|
|      4|I am never watchi...|[i, am, never, wa...|
+-------+--------------------+--------------------+



In [7]:
# stopwords removal 
from pyspark.ml.feature import StopWordsRemover
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_df=stopword_removal.transform(tokenized_df)
refined_df.select(['tokens','refined_tokens']).show(10,False)

+---------------------------------------------------+----------------------------------+
|tokens                                             |refined_tokens                    |
+---------------------------------------------------+----------------------------------+
|[i, really, liked, this, movie]                    |[really, liked, movie]            |
|[i, would, recommend, this, movie, to, my, friends]|[recommend, movie, friends]       |
|[movie, was, alright, but, acting, was, horrible]  |[movie, alright, acting, horrible]|
|[i, am, never, watching, that, movie, ever, again] |[never, watching, movie, ever]    |
+---------------------------------------------------+----------------------------------+



In [8]:
refined_df.columns

['user_id', 'content', 'tokens', 'refined_tokens']

In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *

In [10]:
len_udf = udf(lambda s: len(s), IntegerType())

refined_count_df = refined_df.withColumn("token_count", len_udf(col('refined_tokens')))


In [11]:
refined_count_df.orderBy(rand()).show(10)

+-------+--------------------+--------------------+--------------------+-----------+
|user_id|             content|              tokens|      refined_tokens|token_count|
+-------+--------------------+--------------------+--------------------+-----------+
|      2|I would recommend...|[i, would, recomm...|[recommend, movie...|          3|
|      3|movie was alright...|[movie, was, alri...|[movie, alright, ...|          4|
|      4|I am never watchi...|[i, am, never, wa...|[never, watching,...|          4|
|      1|I really liked th...|[i, really, liked...|[really, liked, m...|          3|
+-------+--------------------+--------------------+--------------------+-----------+



In [12]:
# Count Vectorizer
from pyspark.ml.feature import CountVectorizer
count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')
cv_df=count_vec.fit(refined_df).transform(refined_df)
cv_df.select(['refined_tokens','features']).show(4,False)
bow = count_vec.fit(refined_df).vocabulary
print(bow)


+----------------------------------+---------------------------------+
|refined_tokens                    |features                         |
+----------------------------------+---------------------------------+
|[really, liked, movie]            |(11,[0,2,3],[1.0,1.0,1.0])       |
|[recommend, movie, friends]       |(11,[0,6,7],[1.0,1.0,1.0])       |
|[movie, alright, acting, horrible]|(11,[0,1,5,10],[1.0,1.0,1.0,1.0])|
|[never, watching, movie, ever]    |(11,[0,4,8,9],[1.0,1.0,1.0,1.0]) |
+----------------------------------+---------------------------------+

['movie', 'recommend', 'ever', 'never', 'really', 'acting', 'horrible', 'liked', 'watching', 'alright', 'friends']


In [13]:
# TF with HashingTF
from pyspark.ml.feature import HashingTF
# podria utilizar numFeatures como el tamaño del Bag of Words:
l = len(bow)
hashing_vec=HashingTF(inputCol='refined_tokens',outputCol='tf_features',numFeatures=l)
#hashing_vec=HashingTF(inputCol='refined_tokens',outputCol='tf_features',numFeatures=11)
# compare la salida e interprete con y sin numFeatures:
#hashing_vec=HashingTF(inputCol='refined_tokens',outputCol='tf_features')

hashing_df=hashing_vec.transform(refined_df)
hashing_df.show(4)



+-------+--------------------+--------------------+--------------------+--------------------+
|user_id|             content|              tokens|      refined_tokens|         tf_features|
+-------+--------------------+--------------------+--------------------+--------------------+
|      1|I really liked th...|[i, really, liked...|[really, liked, m...|(11,[9,10],[2.0,1...|
|      2|I would recommend...|[i, would, recomm...|[recommend, movie...|(11,[1,6,9],[1.0,...|
|      3|movie was alright...|[movie, was, alri...|[movie, alright, ...|(11,[1,6,9,10],[1...|
|      4|I am never watchi...|[i, am, never, wa...|[never, watching,...|(11,[0,7,8,9],[1....|
+-------+--------------------+--------------------+--------------------+--------------------+



In [14]:
from pyspark.ml.feature import IDF
tf_idf_vec=IDF(inputCol='tf_features',outputCol='tf_idf_features')
tf_idf_df=tf_idf_vec.fit(hashing_df).transform(hashing_df)
tf_idf_df.show(4,False)

+-------+------------------------------------------+---------------------------------------------------+----------------------------------+---------------------------------+------------------------------------------------------------------------------+
|user_id|content                                   |tokens                                             |refined_tokens                    |tf_features                      |tf_idf_features                                                               |
+-------+------------------------------------------+---------------------------------------------------+----------------------------------+---------------------------------+------------------------------------------------------------------------------+
|1      |I really liked this movie                 |[i, really, liked, this, movie]                    |[really, liked, movie]            |(11,[9,10],[2.0,1.0])            |(11,[9,10],[0.0,0.5108256237659907])                                