## Starting Spark NLP

In [None]:
import findspark
findspark.init()

In [None]:
import sparknlp
from pyspark.sql import SparkSession

spark = sparknlp.start()

# alternative way to create the spark context, if you have memory problems or timeout limits.
#spark = SparkSession.builder \
#    .config("spark.executor.heartbeatInterval", "20000s") \
#    .config("spark.network.timeout", "20001s") \
#    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.4")\
#    .config("spark.driver.memory","10G")\
#    .getOrCreate()

In [None]:
import comet_ml

comet_ml.init()

In [None]:
from sparknlp.logging.comet import CometLogger
logger = CometLogger()
logger.experiment.set_name('PretrainedModel')

## Loading data

In [None]:
from pyspark.sql.functions import when, col

df=spark.read.format("csv").option("header", "true").load("source/DisneylandReviews.csv")
df = df.withColumn("sentiment", when(col("Rating") > 2, "positive").otherwise("negative"))

In [None]:
df.count()

In [None]:
df = df.limit(1000)

## Using a Pretrained pipeline

In [None]:
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

document = DocumentAssembler() \
    .setInputCol("Review_Text") \
    .setOutputCol("document")

token = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normal")

vivekn =  ViveknSentimentModel.pretrained() \
    .setInputCols(["document", "normal"]) \
    .setOutputCol("result_sentiment") \

finisher = Finisher() \
    .setInputCols(["result_sentiment"]) \
    .setOutputCols("final_sentiment")

In [None]:
pipeline = Pipeline().setStages([document, token, normalizer, vivekn, finisher])

Selecting input data

In [None]:
X = df.select('Review_Text').toDF('Review_Text')

Training the pipeline

In [None]:
pipelineModel = pipeline.fit(X)
result = pipelineModel.transform(X)

Logging the pipeline parameters

## Logging Evaluation in Comet

In [None]:
from sklearn.metrics import classification_report

df_tot = df.join(result, on=["Review_Text"])
pandas_df = df_tot.toPandas()
pandas_df['predicted_sentiment'] = [','.join(map(str, l)) for l in pandas_df['final_sentiment']]

report = classification_report(pandas_df['sentiment'], pandas_df['predicted_sentiment'], output_dict=True, labels=['positive', 'negative'])
for key, value in report.items():
    if key!='accuracy':
        logger.log_metrics(value,prefix=key)
    else:
        logger.log_metrics({"accuracy": value})

In [None]:
from sklearn.metrics import accuracy_score

accuracy = accuracy_score(pandas_df['sentiment'], pandas_df['predicted_sentiment'])
logger.log_metrics({"accuracy": accuracy})

In [None]:
logger.end()

## Building a custom model

In [None]:
logger = CometLogger()
logger.experiment.set_name('CustomModel')

In [None]:
vivekn_custom = ViveknSentimentApproach() \
    .setInputCols(["document", "normal"]) \
    .setSentimentCol("sentiment") \
    .setOutputCol("result_sentiment") 
    
pipeline = Pipeline().setStages([document, token, normalizer, vivekn_custom, finisher])

## Training/Test split

In [None]:
(training_set, test_set) = df.randomSplit([0.8, 0.2])

In [None]:
X_train = training_set.select('Review_Text', 'sentiment').toDF('Review_Text', 'sentiment')
X_test = test_set.select('Review_Text', 'sentiment').toDF('Review_Text', 'sentiment')


In [None]:
pipelineModel = pipeline.fit(X_train)

In [None]:
result = pipelineModel.transform(X_test)

## Logging Evaluation in Comet

In [None]:
pandas_df = result.select('sentiment', 'final_sentiment').toPandas()
pandas_df['predicted_sentiment'] = [','.join(map(str, l)) for l in pandas_df['final_sentiment']]

report = classification_report(pandas_df['sentiment'], pandas_df['predicted_sentiment'], output_dict=True, labels=['positive', 'negative'])
for key, value in report.items():
    if key!='accuracy':
        logger.log_metrics(value,prefix=key)
    else:
        logger.log_metrics({"accuracy": value})

In [None]:
accuracy = accuracy_score(pandas_df['sentiment'], pandas_df['predicted_sentiment'])
logger.log_metrics({"accuracy": accuracy})

In [None]:
logger.end()