In [None]:
from Utils.connect import *
spark = Start().config()

In [None]:
spark.stop()

# Feature engineering

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

df = spark.read.json("/datasets/amazon/train.json")
df.cache()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType
from pyspark.ml.feature import *

stages = []

categoricalColumns = ["reviewerName", "summary", "reviewText"]

tokenizer = Tokenizer(inputCol= "reviewText", outputCol="tokens")
hashingTF = HashingTF(inputCol = 'tokens', outputCol = 'rawFeatures')
idf = IDF(inputCol = 'rawFeatures', outputCol = 'TfIdfFeatures', minDocFreq = 5)
word2Vec = Word2Vec(inputCol = 'tokens', outputCol = 'Word2VecFeatures')
countVec = CountVectorizer(inputCol = 'tokens', 
                           outputCol = 'CountVectFeatures')


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType
from pyspark.ml.feature import *

categoricalColumns = ["reviewerName", "summary", "reviewText"]
for col in categoricalColumns:
    tokenizer = Tokenizer(inputCol= col, outputCol=f"{col}tokens")
    df = tokenizer.transform(df)
    hasher = HashingTF(numFeatures=100, binary=True, inputCol=col, outputCol=f"{col}_vector")
    df = hasher.transform(df)
    df = df.drop(col, f"{col}tokens")

In [None]:
df.columns

In [None]:
df.columns

In [None]:
vec = VectorAssembler(inputCols=['asin',
                                 'unixReviewTime',
                                 'verified',
                                 'vote',
                                 'reviewerNametokens',
                                 'summarytokens',
                                 'reviewTexttokens'],
                      outputCol='FEATURES')

train, test = df.randomSplit([0.8, 0.2], seed=12345)
vector_feature_train = vec.transform(train)
vector_feature_test = vec.transform(test)
train = vector_feature_train.select('overall', 'FEATURES')
test = vector_feature_test.select('overall', 'FEATURES')

In [None]:
train

In [None]:
gbt = GBTRegressor(featuresCol='FEATURES', labelCol='overall')

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol='overall',
                                metricName = "rmse")

In [None]:
gbt_model = gbt.fit(train)
pred = gbt_model.transform(test)
rmse = evaluator.evaluate(pred)
print(rmse)

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10, 20, 30])\
    .addGrid(gbt.maxDepth, [3, 4, 5])\
    .build()

crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [None]:
cvModel = crossval.fit(train)
cvModel.avgMetrics

In [None]:
from model import pipeline
from pyspark.ml.evaluation import RegressionEvaluator

train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)
predictions = model.transform(test_data)

In [None]:
evaluator = RegressionEvaluator(labelCol="overall", predictionCol="pred", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: ", rmse)

In [None]:
spark.catalog.clearCache()