In [0]:
# Load in one of the tables
df = spark.sql("select * from default.reviews_train")
#df = df.sample(0.01, seed = 47)
df = df.cache()
print((df.count(), len(df.columns)))

(3138710, 11)


In [0]:
# Drop duplicates
print("Before duplication removal: ", df.count())
df_distinct = df.dropDuplicates(['reviewerID', 'asin'])
print("After duplication removal: ", df.count())

Before duplication removal:  3138710
After duplication removal:  3138710


In [0]:
# Convert Unix timestamp to readable date

from pyspark.sql.functions import from_unixtime, to_date
from pyspark.sql.types import *

df_with_date = df_distinct.withColumn("reviewTime", to_date(from_unixtime(df_distinct.unixReviewTime))) \
                                                .drop("unixReviewTime")

In [0]:
from pyspark.sql.functions import col

# Combine review text and summary
from pyspark.sql.functions import concat, lit
new_df = df_with_date.withColumn("review",concat(col("reviewText"),col("summary"))).drop('reviewText').drop('summary')

In [0]:
from pyspark.sql.functions import dayofweek, month
new_df = new_df.withColumn('dayofweek', dayofweek(col('reviewTime'))).withColumn('month', month(col('reviewTime'))).drop('reviewTime')


In [0]:
from pyspark.sql.functions import col,length,trim
new_df = new_df.withColumn("review_len", length(col("review")))

In [0]:
new_df = new_df.na.drop(subset=["review", "label"])
new_df = new_df.na.fill(value='noinfo',subset=["asin", "reviewerID"])

In [0]:
# Extract Sentiment scores
#from transformers import pipeline

#data = new_df.toPandas()
#text = list(data['review'])
#data["Sentiment_Score"] = "None"

#classifier = pipeline('sentiment-analysis')

#j = 0

#for i in text:
#    data.loc[j, "Sentiment_Score"] = classifier(i)
 #   j = j + 1

#data.head()


In [0]:
# Summarize the text

#summarizer = pipeline('summarization')

#j = 0

#for i in text:
#    data.loc[j, "review"] = summarizer(i)
 #   j = j + 1

#data.head()
#new_df=spark.createDataFrame(data) 

In [0]:
new_df.printSchema()

root
 |-- reviewID: integer (nullable = true)
 |-- overall: double (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- reviewerID: string (nullable = false)
 |-- asin: string (nullable = false)
 |-- reviewerName: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- review: string (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- review_len: integer (nullable = true)



In [0]:
## NLP pipeline
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, StringIndexer, SQLTransformer, IndexToString, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier



asin_indexer = StringIndexer(inputCol='asin', outputCol='asinIndex', handleInvalid='keep')
asin_encoder = OneHotEncoder(inputCol='asinIndex', outputCol='asinVec')
id_indexer = StringIndexer(inputCol='reviewerID', outputCol='idIndex', handleInvalid='keep')
id_encoder = OneHotEncoder(inputCol='idIndex', outputCol='idVec')


# convert text column to spark nlp document
document_assembler = DocumentAssembler() \
    .setInputCol("review") \
    .setOutputCol("document")

# get sentences from the documents
sentence = SentenceDetector() \
    .setInputCols("document") \
    .setOutputCol("sentence") \
    .setCustomBounds(["\n\n"])

# convert document to array of tokens
tokenizer = Tokenizer() \
  .setInputCols(["sentence"]) \
  .setOutputCol("token")

# clean tokens 
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized") \
    .setLowercase(True)

# remove stopwords
stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False) 

# lemmatization
lemmatizer = LemmatizerModel.pretrained().setInputCols(["cleanTokens"]).setOutputCol("lemma")

# Tag tokens with POS tags
pos_tagger = PerceptronModel.pretrained('pos_anc') \
     .setInputCols(['document', 'token']) \
     .setOutputCol('pos')

# Extract meaningful n-grams
chunker = Chunker() \
     .setInputCols(['document', 'pos']) \
     .setOutputCol('chunks') \
     .setRegexParsers(["<NNP>+", "<NNS>+", "<JJ>+<NN>"])

# Convert custom document structure to array of tokens.
finisher = Finisher() \
    .setInputCols(["lemma", "chunks"]) \
    .setOutputCols(["token_features", "chunk_features"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

# Create embeddings
tokenEmbedder =  Word2VecModel.pretrained() \
    .setInputCols(["token"]) \
    .setOutputCol("embedding")

# Merge embeddings to a single array 
embeddingsSentence = SentenceEmbeddings() \
    .setInputCols(["document", "embedding"]) \
    .setOutputCol("sentence_embeddings") \
    .setPoolingStrategy("AVERAGE")


# Generate Term Frequency
tf = CountVectorizer(inputCol="token_features", outputCol="rawFeatures", vocabSize=10000, minTF=1, minDF=50, maxDF=0.40)
tf_chunk = CountVectorizer(inputCol="chunk_features", outputCol="rawFeaturesChunks", vocabSize=10000, minTF=1, minDF=50, maxDF=0.40)
#tf_ner = CountVectorizer(inputCol="ner_features", outputCol="rawFeaturesNER", vocabSize=10000, minTF=1, minDF=50, maxDF=0.40)

# Generate Inverse Document Frequency weighting
idf = IDF(inputCol="rawFeatures", outputCol="idfFeatures", minDocFreq=5)
idf_chunk = IDF(inputCol="rawFeaturesChunks", outputCol="idfFeaturesChunks", minDocFreq=5)
#idf_ner = IDF(inputCol="rawFeaturesNER", outputCol="idfFeaturesNER", minDocFreq=5)

# Combine all features into one final "features" column
assembler = VectorAssembler(inputCols=["verified", "overall", "dayofweek", "month", "review_len", "asinVec", "idVec", "idfFeatures", "idfFeaturesChunks"], outputCol="features", handleInvalid='keep')


nlp_pipeline = Pipeline(
    stages=[asin_indexer,
            asin_encoder,
            id_indexer,
            id_encoder,
            document_assembler, 
            sentence,
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            lemmatizer,
            pos_tagger,
            chunker,
            finisher,
            tokenEmbedder,
            embeddingsSentence,
            tf,
            tf_chunk,
            idf,
            idf_chunk,
            assembler])


lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ][OK!]
pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[ | ][OK!]
word2vec_gigaword_300 download started this may take some time.
Approximate size to download 312.3 MB
[ | ][OK!]


In [0]:
# set seed for reproducibility
(trainingData, testingData) = new_df.randomSplit([0.8, 0.2], seed = 47)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testingData.count()))

Training Dataset Count: 2323566
Test Dataset Count: 580244


In [0]:
pipeline_model = nlp_pipeline.fit(trainingData)

In [0]:
trainingDataTransformed = pipeline_model.transform(trainingData)

In [0]:
# convert embeddings into Vectors to add to Assembler

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import explode

toVectorUDF = udf(lambda vs: Vectors.dense(vs), VectorUDT())
temp = trainingDataTransformed.select(["features", "label", explode("sentence_embeddings.embeddings").alias("sentence_embedding")]).withColumn("final_embeddings", toVectorUDF("sentence_embedding"))

In [0]:
assembler = VectorAssembler(
    inputCols=["features", "final_embeddings"],
    outputCol="new_features")

temp = assembler.transform(temp)

temp = temp.select(["new_features", "label"])
temp.printSchema()


root
 |-- new_features: vector (nullable = true)
 |-- label: integer (nullable = true)



In [0]:
from pyspark.ml.classification import LogisticRegression

# More classification docs: https://spark.apache.org/docs/latest/ml-classification-regression.html

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0, featuresCol = 'new_features')
lrModel = lr.fit(temp)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-2949808757013230>[0m in [0;36m<cell line: 6>[0;34m()[0m
[1;32m      4[0m [0;34m[0m[0m
[1;32m      5[0m [0mlr[0m [0;34m=[0m [0mLogisticRegression[0m[0;34m([0m[0mmaxIter[0m[0;34m=[0m[0;36m20[0m[0;34m,[0m [0mregParam[0m[0;34m=[0m[0;36m0.3[0m[0;34m,[0m [0melasticNetParam[0m[0;34m=[0m[0;36m0[0m[0;34m,[0m [0mfeaturesCol[0m [0;34m=[0m [0;34m'new_features'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 6[0;31m [0mlrModel[0m [0;34m=[0m [0mlr[0m[0;34m.[0m[0mfit[0m[0;34m([0m[0mtemp[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py[0m in [0;36mpatched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m             [0mcall_succeeded[0m [0;34m=[0m [0;32

In [0]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

print("Training Accuracy:  " + str(trainingSummary.accuracy))
print("Training Precision: " + str(trainingSummary.precisionByLabel))
print("Training Recall:    " + str(trainingSummary.recallByLabel))
print("Training FMeasure:  " + str(trainingSummary.fMeasureByLabel()))
print("Training AUC:       " + str(trainingSummary.areaUnderROC))



In [0]:
testingDataTransform = pipeline_model.transform(testingData)
#testingDataTransform = testingDataTransform.select('features', 'embedding_features', 'label')
testingDataTransform.show(5)



In [0]:
toVectorUDF = udf(lambda vs: Vectors.dense(vs), VectorUDT())
test_temp = testingDataTransform.select(["features", "label", explode("sentence_embeddings.embeddings").alias("sentence_embedding")]).withColumn("final_embeddings", toVectorUDF("sentence_embedding"))

assembler = VectorAssembler(
    inputCols=["features", "final_embeddings"],
    outputCol="new_features")

test_temp = assembler.transform(test_temp)

test_temp = test_temp.select(["new_features", "label"])
test_temp.printSchema()



In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = lrModel.transform(test_temp)
predictions.show(5)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('Test Area Under ROC', evaluator.evaluate(predictions))



In [0]:
# Load in the tables
test_df = spark.sql("select * from default.reviews_test")
test_df.show(5)
print((test_df.count(), len(test_df.columns)))



In [0]:
# Convert Unix timestamp to readable date
test_df_with_date = test_df.withColumn("reviewTime", to_date(from_unixtime(test_df.unixReviewTime))) \
                                                .drop("unixReviewTime")

# Combine review text and summary
new_test_df = test_df_with_date.withColumn("review",concat(col("reviewText"),col("summary"))).drop('reviewText').drop('summary')

new_test_df = new_test_df.withColumn('dayofweek', dayofweek(col('reviewTime'))).withColumn('month', month(col('reviewTime'))).drop('reviewTime')
display(new_test_df)

new_test_df = new_test_df.withColumn("review_len", length(col("review")))




In [0]:
test_df_Transformed = pipeline_model.transform(new_test_df)
test_df_Transformed.show()



In [0]:
toVectorUDF = udf(lambda vs: Vectors.dense(vs), VectorUDT())
final_test_df = test_df_Transformed.select(["reviewID", "features", explode("sentence_embeddings.embeddings").alias("sentence_embedding")]).withColumn("final_embeddings", toVectorUDF("sentence_embedding"))

assembler = VectorAssembler(
    inputCols=["features", "final_embeddings"],
    outputCol="new_features")

final_test_df = assembler.transform(final_test_df)



In [0]:
predictions = lrModel.transform(final_test_df)



In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

probelement=udf(lambda v:float(v[1]),FloatType())
submission_data = predictions.select('reviewID', probelement('probability')).withColumnRenamed('<lambda>(probability)', 'label')



In [0]:
display(submission_data.select('reviewID', 'label'))



In [0]:
print((submission_data.count(), len(submission_data.columns)))



In [0]:
# Let's look at some quick summary statistics
#df.describe().show()



In [0]:
# The count of each overall rating

#from pyspark.sql.functions import col
#df.groupBy("overall").count().orderBy(col("overall").asc()).show()



In [0]:
# The most common product IDs
#df.groupBy("asin").count().orderBy(col("count").desc()).show(10)



In [0]:
#from pyspark.sql.functions import countDistinct
#df2=df.select(countDistinct("reviewID"))
#df2.show()



In [0]:
#df2=df.select(countDistinct("reviewerID"))
#df2.show()



In [0]:
#df.groupBy("label").count().show()

