In [1]:
import json
from pyspark import sql
from pyspark.sql import SparkSession, functions as sfunc
import pandas as pd
pd.set_option('display.max_columns', 50)

import sparknlp
from sparknlp.pretrained import PretrainedPipeline, PipelineModel
from pyspark import SparkConf

In [3]:
sc = SparkSession.builder \
    .appName("Spark NLP") \
    .master("local[*]") \
    .config("spark.driver.memory", "3G") \
    .config("spark.executor.memory", "3G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "1000M") \
    .config("spark.driver.maxResultSize", "1g") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.2.3,org.tensorflow:tensorflow:1.15.0") \
    .config("spark.sql.debug.maxToStringFields", 1000)\
    .enableHiveSupport()\
    .getOrCreate()

24/02/26 14:58:07 WARN Utils: Your hostname, Ajay resolves to a loopback address: 127.0.1.1; using 172.25.184.63 instead (on interface eth0)
24/02/26 14:58:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ajay/sparknlp/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ajay/.ivy2/cache
The jars for the packages stored in: /home/ajay/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
org.tensorflow#tensorflow added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c8d37c05-3890-47d7-81cb-ad45b6bdbd01;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.2.3 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found com.fasterxml.jackson.dataformat#jackson-dataf

	0 artifacts copied, 83 already retrieved (0kB/66ms)
24/02/26 14:58:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
df = sc.read.json("file:/home/ajay/yelp_dataset/yelp_academic_dataset_review.json")
df = df.withColumns({
    'date'   : sfunc.to_timestamp('date'),
    'cool'   : sfunc.cast(sql.types.IntegerType(), 'cool'),
    'funny'  : sfunc.cast(sql.types.IntegerType(), 'funny'),
    'stars'  : sfunc.cast(sql.types.IntegerType(), 'stars'),
    'useful' : sfunc.cast(sql.types.IntegerType(), 'useful')
})
# print("Percentage of tips with no compliments:", 100 * (df.filter(df.compliment_count == 0).count() / df.count()))
# # compliment_count has 98% 0 values. Drop it
# df = df.drop('compliment_count')

                                                                                

In [5]:
df = df.withColumn('inferred_sentiment', sfunc.when(sfunc.col('stars') < 2.5, 'negative').when(sfunc.col('stars') > 3.5, 'postive').otherwise('neutral'))

In [6]:
from pyspark.ml import Pipeline

In [7]:
from sparknlp.annotator import *
from sparknlp.base import Finisher

In [31]:
doc_assembler = DocumentAssembler().setInputCol('text').setOutputCol('doc')

tokenizer = Tokenizer().setInputCols(['doc']).setOutputCol('tokens')
normalizer = Normalizer().setInputCols(['tokens']).setOutputCol('tokens_norm')
stop_words_remover = StopWordsCleaner().pretrained().setInputCols(['tokens_norm']).setOutputCol('tokens_clean')
n_gram = NGramGenerator().setInputCols(['tokens_clean']).setOutputCol('bigram').setN(2)

vivekn = ViveknSentimentModel().pretrained().setInputCols(['doc', 'tokens_clean']).setOutputCol('sentiment_class')

stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[OK!]
sentiment_vivekn download started this may take some time.
Approximate size to download 873.6 KB
[OK!]


In [32]:
pipe = Pipeline().setStages([doc_assembler, tokenizer, normalizer, stop_words_remover, vivekn])

In [33]:
pipe = pipe.fit(df)

In [42]:
temp = pipe.transform(df).select(sfunc.explode('sentiment_class').alias('sentiment_class')).select('sentiment_class.result', (sfunc.map_values('sentiment_class.metadata')[0]).alias('sentiment_confidence'))

In [43]:
temp.show()



+--------+--------------------+
|  result|sentiment_confidence|
+--------+--------------------+
|positive|              0.4052|
|negative|              0.6369|
|positive|              0.8441|
|negative|                0.75|
|positive|              0.4759|
|negative|              0.5324|
|positive|              0.7337|
|positive|              0.9230|
|negative|              0.5609|
|positive|              0.4673|
|negative|              0.6067|
|positive|              0.8235|
|positive|              0.9790|
|positive|              0.8166|
|positive|              0.8333|
|positive|              0.7741|
|negative|              0.7766|
|positive|              0.3991|
|positive|              0.3398|
|positive|              0.5983|
+--------+--------------------+
only showing top 20 rows





In [25]:
temp['sentiment_class'].iloc[0][0]

Row(annotatorType='sentiment', begin=7, end=511, result='positive', metadata={'confidence': '0.4052'}, embeddings=[])