## spark project.  Read movie_review.csv from Raw Layer.

## look for data that contain the word  “good”, considering the review as positive, and name it as positive_review.

### Paul Ricardo Félix Trujillo

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
import pyspark.sql.functions as f

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
spark

In [7]:
dfs = spark.read.option("header","true").csv("movie_review.csv")

In [8]:
dfs.show(5)

+-----+--------------------+
|  cid|          review_str|
+-----+--------------------+
|13756|Once again Mr. Co...|
|15738|This is an exampl...|
|15727|First of all I ha...|
|17954|Not even the Beat...|
|16579|Brass pictures (m...|
+-----+--------------------+
only showing top 5 rows



In [9]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
import pyspark.sql.functions as f

### Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). A simple Tokenizer class provides this functionality.

In [11]:
tokenizer = Tokenizer(inputCol="review_str", outputCol="review_token")
tokenized = tokenizer.transform(dfs).select('cid','review_token')

print('Tokenized data extract:')
tokenized.show(10)

Tokenized data extract:
+-----+--------------------+
|  cid|        review_token|
+-----+--------------------+
|13756|[once, again, mr....|
|15738|[this, is, an, ex...|
|15727|[first, of, all, ...|
|17954|[not, even, the, ...|
|16579|[brass, pictures,...|
|14841|[a, funny, thing,...|
|18085|[this, german, ho...|
|16365|[being, a, long-t...|
|17912|[tokyo, eyes, tel...|
|15100|[wealthy, horse, ...|
+-----+--------------------+
only showing top 10 rows



### Stop words are words which should be excluded from the input, typically because the words appear frequently and don’t carry as much meaning.

In [12]:
remover = StopWordsRemover(inputCol='review_token', outputCol='token_clean')
data_clean = remover.transform(tokenized).select('cid', 'token_clean')

print('Data Cleaning extract:')
data_clean.show(10)

Data Cleaning extract:
+-----+--------------------+
|  cid|         token_clean|
+-----+--------------------+
|13756|[mr., costner, dr...|
|15738|[example, majorit...|
|15727|[first, hate, mor...|
|17954|[even, beatles, w...|
|16579|[brass, pictures,...|
|14841|[funny, thing, ha...|
|18085|[german, horror, ...|
|16365|[long-time, fan, ...|
|17912|[tokyo, eyes, tel...|
|15100|[wealthy, horse, ...|
+-----+--------------------+
only showing top 10 rows



### Spark array_contains() is an SQL Array function that is used to check if an element value is present in an array type(ArrayType) column on DataFrame. You can use array_contains() function either to derive a new boolean column or filter the DataFrame.

In [17]:
df2=data_clean.withColumn("positive_review",f.array_contains(f.col("token_clean"),"good"))
df2.show(20)

+-----+--------------------+---------------+
|  cid|         token_clean|positive_review|
+-----+--------------------+---------------+
|13756|[mr., costner, dr...|          false|
|15738|[example, majorit...|          false|
|15727|[first, hate, mor...|          false|
|17954|[even, beatles, w...|          false|
|16579|[brass, pictures,...|           true|
|14841|[funny, thing, ha...|          false|
|18085|[german, horror, ...|          false|
|16365|[long-time, fan, ...|           true|
|17912|[tokyo, eyes, tel...|          false|
|15100|[wealthy, horse, ...|          false|
|16781|[cage, plays, dru...|          false|
|16656|[first, all,, lik...|           true|
|14390|[tell, -, serious...|           true|
|17975|[big, disappointm...|          false|
|17616|[film, absolutely...|          false|
|14589|[decidedly, avera...|          false|
|17629|[bottom, end, apo...|          false|
|13089|[earth, destroyed...|          false|
|16752|[many, people, st...|           true|
|13579|[ne

In [None]:
sc.stop()