<a href="https://colab.research.google.com/github/MarinaEstefania/data-engineering-bootcamp/blob/main/Notebooks/movieDF_bootcamp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
%%capture
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install Tokenizer

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop3.2"

import findspark
findspark.init()
findspark.find()

from pyspark.sql import DataFrame, SparkSession 
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover

spark = SparkSession \
       .builder \
       .appName("Our First Spark example") \
       .getOrCreate()

spark

In [17]:
path = "movie_review.csv"
df = (spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("sample_data/movie_review.csv"))

In [18]:
df.show(5)

+-----+--------------------+---------+
|  cid|          review_str|id_review|
+-----+--------------------+---------+
|13756|Once again Mr. Co...|        1|
|15738|This is an exampl...|        2|
|15727|First of all I ha...|        3|
|17954|Not even the Beat...|        4|
|16579|Brass pictures (m...|        5|
+-----+--------------------+---------+
only showing top 5 rows



In [27]:
#For the movie_review.csv file:
#a. Work with the cid and review_str columns to get a list of words used by users.
#Note: You can implement the pyspark.ml.feature.Tokenizer class to create a list of words named review_token.
#sentenceDataFrame = dfMovie
tokenizedDF = Tokenizer(inputCol="review_str", outputCol="review_token") 
wordsListDF = tokenizedDF.transform(df)
wordsListDF.head()

Row(cid=13756, review_str="Once again Mr. Costner has dragged out a movie for far longer than necessary. Aside from the terrific sea rescue sequences, of which there are very few I just did not care about any of the characters. Most of us have ghosts in the closet, and Costner's character are realized early on, and then forgotten until much later, by which time I did not care. The character we should really care about is a very cocky, overconfident Ashton Kutcher. The problem is he comes off as kid who thinks he's better than anyone else around him and shows no signs of a cluttered closet. His only obstacle appears to be winning over Costner. Finally when we are well past the half way point of this stinker, Costner tells us all about Kutcher's ghosts. We are told why Kutcher is driven to be the best with no prior inkling or foreshadowing. No magic here, it was all I could do to keep from turning it off an hour in.", id_review=1, review_token=['once', 'again', 'mr.', 'costner', 'has', '

In [28]:
#Remove stop words if needed with pyspark.ml.feature.StopWordsRemover.
listWOStop = StopWordsRemover(inputCol='review_token', outputCol='wo_stop_words')
listWOStopRemoved = listWOStop.transform(wordsListDF)
listWOStopRemoved.show(5)

+-----+--------------------+---------+--------------------+--------------------+
|  cid|          review_str|id_review|        review_token|       wo_stop_words|
+-----+--------------------+---------+--------------------+--------------------+
|13756|Once again Mr. Co...|        1|[once, again, mr....|[mr., costner, dr...|
|15738|This is an exampl...|        2|[this, is, an, ex...|[example, majorit...|
|15727|First of all I ha...|        3|[first, of, all, ...|[first, hate, mor...|
|17954|Not even the Beat...|        4|[not, even, the, ...|[even, beatles, w...|
|16579|Brass pictures (m...|        5|[brass, pictures,...|[brass, pictures,...|
+-----+--------------------+---------+--------------------+--------------------+
only showing top 5 rows



In [21]:
#Look for data that contain the word  “good”, consider the review as positive, and name it as positive_review.
booleanDF = listWOStopRemoved.withColumn('positive_review', array_contains(col("wo_stop_words"),"good"))
booleanDF.show(5)

+-----+--------------------+---------+--------------------+--------------------+---------------+
|  cid|          review_str|id_review|        review_token|       wo_stop_words|positive_review|
+-----+--------------------+---------+--------------------+--------------------+---------------+
|13756|Once again Mr. Co...|        1|[once, again, mr....|[mr., costner, dr...|          false|
|15738|This is an exampl...|        2|[this, is, an, ex...|[example, majorit...|          false|
|15727|First of all I ha...|        3|[first, of, all, ...|[first, hate, mor...|          false|
|17954|Not even the Beat...|        4|[not, even, the, ...|[even, beatles, w...|          false|
|16579|Brass pictures (m...|        5|[brass, pictures,...|[brass, pictures,...|           true|
+-----+--------------------+---------+--------------------+--------------------+---------------+
only showing top 5 rows



In [22]:
#Add a timestamp when the job is running as the insert_date column
dfTimestamp= booleanDF.withColumn('insert_date', lit(current_timestamp()))
dfTimestamp.show(5)

+-----+--------------------+---------+--------------------+--------------------+---------------+--------------------+
|  cid|          review_str|id_review|        review_token|       wo_stop_words|positive_review|         insert_date|
+-----+--------------------+---------+--------------------+--------------------+---------------+--------------------+
|13756|Once again Mr. Co...|        1|[once, again, mr....|[mr., costner, dr...|          false|2022-07-24 23:16:...|
|15738|This is an exampl...|        2|[this, is, an, ex...|[example, majorit...|          false|2022-07-24 23:16:...|
|15727|First of all I ha...|        3|[first, of, all, ...|[first, hate, mor...|          false|2022-07-24 23:16:...|
|17954|Not even the Beat...|        4|[not, even, the, ...|[even, beatles, w...|          false|2022-07-24 23:16:...|
|16579|Brass pictures (m...|        5|[brass, pictures,...|[brass, pictures,...|           true|2022-07-24 23:16:...|
+-----+--------------------+---------+------------------

In [23]:
#Use the following logic to convert a boolean column to an integer: 
#reviews.positive_review = CASE
 #  		WHEN positive_review IS True THEN 1
 #  		ELSE 0
 # 		  END
isPositiveDF = dfTimestamp.withColumn('is_possitive', when(dfTimestamp.positive_review=='true', 1).when(dfTimestamp.positive_review=='false',0))
isPositiveDF.show(5)

+-----+--------------------+---------+--------------------+--------------------+---------------+--------------------+------------+
|  cid|          review_str|id_review|        review_token|       wo_stop_words|positive_review|         insert_date|is_possitive|
+-----+--------------------+---------+--------------------+--------------------+---------------+--------------------+------------+
|13756|Once again Mr. Co...|        1|[once, again, mr....|[mr., costner, dr...|          false|2022-07-24 23:16:...|           0|
|15738|This is an exampl...|        2|[this, is, an, ex...|[example, majorit...|          false|2022-07-24 23:16:...|           0|
|15727|First of all I ha...|        3|[first, of, all, ...|[first, hate, mor...|          false|2022-07-24 23:16:...|           0|
|17954|Not even the Beat...|        4|[not, even, the, ...|[even, beatles, w...|          false|2022-07-24 23:16:...|           0|
|16579|Brass pictures (m...|        5|[brass, pictures,...|[brass, pictures,...|   

In [26]:
#Store your results into a new file in the STAGE area (user_id(cid), positive_review(is_possitive) and review_id).
dropColumnsDF = isPositiveDF.drop('review_str').drop('review_token').drop('wo_stop_words').drop('insert_date').drop('positive_review')
classified_movie_review = dropColumnsDF.withColumnRenamed('cid', 'customer_id').withColumnRenamed('is_positive','positive_review').withColumnRenamed('id_review', 'review_id')
classified_movie_review.show(5)

+-----------+---------+------------+
|customer_id|review_id|is_possitive|
+-----------+---------+------------+
|      13756|        1|           0|
|      15738|        2|           0|
|      15727|        3|           0|
|      17954|        4|           0|
|      16579|        5|           1|
|      14841|        6|           0|
|      18085|        7|           0|
|      16365|        8|           1|
|      17912|        9|           0|
|      15100|       10|           0|
|      16781|       11|           0|
|      16656|       12|           1|
|      14390|       13|           1|
|      17975|       14|           0|
|      17616|       15|           0|
|      14589|       16|           0|
|      17629|       17|           0|
|      13089|       18|           0|
|      16752|       19|           1|
|      13579|       20|           0|
+-----------+---------+------------+
only showing top 20 rows



In [None]:
#save dataframe as CSV file
classified_movie_review.write.option("header","true").parquet("s3://manual-bucket-megc/stage-data/classified_movie_review.parquet")
