In [0]:
%sh
pip install textblob

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-834aeedc-770e-4601-b732-922fe6675585/bin/python -m pip install --upgrade pip' command.


In [0]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[3]") \
                            .appName('MyApplicationName') \
                            .getOrCreate()

spark

In [0]:
import urllib.request
import boto3


In [0]:
import os
access_key = ''
secret_key = ''
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key
enconded_secret_key = secret_key.replace("/", "%2F")
aws_region = "us-east-2"

sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

bucket_name = 'my-bigdata-project-sw/raw/'
#file_name = 'cleaned_amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv/'
file_path = 's3://' + bucket_name + 'cleaned_spark_data_frame.parquet/*'

schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("product_parent", IntegerType(), True),
    StructField("product_title", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("star_rating", StringType(), True),
    StructField("helpful_votes", IntegerType(), True),
    StructField("total_votes", IntegerType(), True),
    StructField("verified_purchase", StringType(), True),
    StructField("review_headline", StringType(), True),
    StructField("review_body", StringType(), True),
    StructField("review_date", DateType(), True),
    StructField("clean_review_headline", StringType(), True),
    StructField("clean_review_body", StringType(), True)
])

#sdf = spark.read.parquet(file_path)
sdf = spark.read.schema(schema).parquet(file_path)

In [0]:
sdf.show()

+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+--------------------+--------------------+-----------+---------------------+--------------------+
|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|verified_purchase|     review_headline|         review_body|review_date|clean_review_headline|   clean_review_body|
+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----------------+--------------------+--------------------+-----------+---------------------+--------------------+
|0061091669|     998895209|Marilyn Monroe: T...|           Books|          5|            4|          5|                N|You MUST have thi...|Marilyn's fan or ...| 1996-12-09| You MUST have thi...|Marilyn's fan or ...|
|1559723505|      61059309|The Highly Sensit...|           Books|          5|           12|         14|                N|Hig

In [0]:
from textblob import TextBlob
# Create a function to perform sentiment analysis on some text
def sentiment_analysis(some_text):
    sentiment = TextBlob(some_text).sentiment.polarity
    return sentiment

sentiment_analysis_udf = udf(sentiment_analysis, DoubleType())

In [0]:
import pandas as pd
import numpy as np
import matplotlib as mpl
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
from textblob import TextBlob

# Tokenize and process clean_review_body
review_tokenizer = RegexTokenizer(inputCol="clean_review_body", outputCol="clean_review_words", pattern='\\w+', gaps=False)
sdf = review_tokenizer.transform(sdf)

review_stop_word_remover = StopWordsRemover(inputCol="clean_review_words", outputCol="clean_review_filtered_words")
sdf = review_stop_word_remover.transform(sdf)

review_hashing_tf = HashingTF(numFeatures=4096, inputCol="clean_review_filtered_words", outputCol="clean_review_hash")
sdf = review_hashing_tf.transform(sdf)

review_idf = IDF(inputCol="clean_review_hash", outputCol="clean_review_features", minDocFreq=1)
sdf = review_idf.fit(sdf).transform(sdf)

# Tokenize and process clean_review_headline
headline_tokenizer = RegexTokenizer(inputCol="clean_review_headline", outputCol="clean_headline_words", pattern='\\w+', gaps=False)
sdf = headline_tokenizer.transform(sdf)

headline_stop_word_remover = StopWordsRemover(inputCol="clean_headline_words", outputCol="clean_headline_filtered_words")
sdf = headline_stop_word_remover.transform(sdf)

headline_hashing_tf = HashingTF(numFeatures=4096, inputCol="clean_headline_filtered_words", outputCol="clean_headline_hash")
sdf = headline_hashing_tf.transform(sdf)

headline_idf = IDF(inputCol="clean_headline_hash", outputCol="clean_headline_features", minDocFreq=1)
sdf = headline_idf.fit(sdf).transform(sdf)

# Apply sentiment analysis UDF
sdf = sdf.withColumn("sentiment_score_review", sentiment_analysis_udf(sdf['clean_review_body']))

# Indexing and Encoding
indexers = StringIndexer(inputCols=["product_id", "product_category", "verified_purchase"], outputCols=["product_id_i", "product_category_i", "verified_purchase_i"])
encoder = OneHotEncoder(inputCols=["product_id_i", "product_category_i", "star_rating", "verified_purchase_i"], outputCols=["product_id_v", "product_category_v", "star_rating_v", "verified_purchase_v"], dropLast=False)
assembler = VectorAssembler(inputCols=['star_rating_v', 'product_category_v', 'verified_purchase_v', "clean_review_features", "clean_review_features", "clean_headline_features", "sentiment_score_review"], outputCol="features")

# Create a pipeline
pipeline = Pipeline(stages=[indexers, encoder, assembler])

# Fit and transform the data
final_sdf = pipeline.fit(sdf).transform(sdf)


In [0]:
#WRITE TO THE "TRUSTED" FOLDER IN AWS S3
output_bucket_name = 'my-bigdata-project-sw/trusted/'
output_file_name = "featured_engineered_sdf"

output_file_path= 's3://' + output_bucket_name + output_file_name
final_sdf.write.parquet(output_file_path)