In [1]:
import findspark
import pandas as pd
import os
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, DoubleType, StringType
import sparknlp
from pyspark.sql import SparkSession
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
from pyspark.sql.types import *
from sklearn.metrics import classification_report
import time
from pyspark.sql.functions import *
from sklearn.model_selection import train_test_split

findspark.init()

In [2]:
# os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.5 pyspark-shell"
# os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages com.johnsnowlabs.nlp:spark-nlp-gpu_2.12:4.2.4 pyspark-shell"
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","6G")\
    .config("spark.driver.maxResultSize", "0") \
    .config('spark.port.maxRetries', 100) \
    .config("spark.kryoserializer.buffer.max", "1500M") \
    .config("spark.jars", "D:\ProgramData\cache_pretrained\pretrained\spark-nlp-assembly-4.2.4.jar") \
    .getOrCreate()
# .config("spark.driver.memory","4G")\
# .config("spark.jsl.settings.pretrained.cache_folder", "D:\ProgramData\cache_pretrained\pretrained") \
# .config("spark.jsl.settings.storage.cluster_tmp_dir", "D:\ProgramData\cache_pretrained\storage") \
# .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.5")\

In [3]:
# os.environ['PYSPARK_SUBMIT_ARGS'] = 'D:\ProgramData\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\bin\spark-shell'

In [7]:
from sklearn.utils import resample

chats_sample20000 = pd.read_csv('chats_sample_20000.csv')
# Separate majority and minority classes
df_majority = chats_sample20000[chats_sample20000['label'] == 0]
df_minority = chats_sample20000[chats_sample20000['label'] == 1]

# Downsample majority class
df_majority_downsampled = resample(df_majority, replace=False, n_samples=10000)
# Upsample minority class
df_minority_upsampled = resample(df_minority, replace=True, n_samples=10000)
df_up_down_sampled = pd.concat([df_majority_downsampled, df_minority_upsampled])
chats_sample20000_train, chats_sample20000_test = train_test_split(df_up_down_sampled, test_size=0.2, random_state=42)
chats_sample20000_train.to_csv("chats_sample_20000_train.csv", index=False)
chats_sample20000_test.to_csv("chats_sample_20000_test.csv", index=False)

In [3]:
# chats_sample50000 = pd.read_csv('chats_sample_50000.csv')
# chats_sample50000_train = pd.read_csv('chats_sample_50000_train.csv')
# chats_sample50000_test = pd.read_csv('chats_sample_50000_test.csv')
chats_sample20000_train = pd.read_csv('chats_sample_20000_train.csv')
chats_sample20000_test = pd.read_csv('chats_sample_20000_test.csv')

In [51]:
chats_sample20000 = pd.read_csv('chats_sample_20000.csv')
my_schema = StructType([StructField("body", StringType(), True)\
                       ,StructField("label", StringType(), True)])
spark_chats_sample20000 = spark.createDataFrame(chats_sample20000, schema=my_schema)
spark_chats_sample20000.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|11445|
|    1| 8555|
+-----+-----+



In [4]:
my_schema = StructType([StructField("body", StringType(), True)\
                       ,StructField("label", StringType(), True)])
spark_chats_sample20000_train = spark.createDataFrame(chats_sample20000_train, schema=my_schema)
spark_chats_sample20000_test = spark.createDataFrame(chats_sample20000_test, schema=my_schema)

In [47]:
spark_chats_sample20000_train.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0| 7981|
|    1| 8019|
+-----+-----+



In [48]:
spark_chats_sample20000_test.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0| 2019|
|    1| 1981|
+-----+-----+



In [29]:
# documentAssembler = DocumentAssembler() \
#     .setInputCol("body") \
#     .setOutputCol("document")
# tokenizer = Tokenizer().setInputCols(['document']).setOutputCol("token")
# pipeline = Pipeline(stages=[documentAssembler, tokenizer])

In [26]:
# mydata = [("I love you", '1')]
# schema = StructType([StructField("body", StringType(), True)\
#                      , StructField("label", StringType(), True)])
# df = spark.createDataFrame(data=mydata,schema=schema)

In [27]:
# res = pipeline.fit(df).transform(df)

In [28]:
# res.select("token").show(truncate=False)

In [3]:
document_assembler = DocumentAssembler() \
            .setInputCol("description") \
            .setOutputCol("document")

tokenizer = Tokenizer() \
            .setInputCols(["document"]) \
            .setOutputCol("token")

normalizer = Normalizer() \
            .setInputCols(["token"]) \
            .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
            .setInputCols("normalized")\
            .setOutputCol("cleanTokens")\
            .setCaseSensitive(False)

### Sentence BERT Embeddings

In [5]:
from sparknlp.annotator import BertSentenceEmbeddings
from sparknlp.pretrained import PretrainedPipeline

documentAssembler = DocumentAssembler() \
    .setInputCol("body") \
    .setOutputCol("document")
sentence = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")
sentence_embedding  = BertSentenceEmbeddings \
    .load("file:///D:\ProgramData\cache_pretrained\pretrained\sent_bert_multi_cased_xx_2.6.0_2.4_1598347692999") \
    .setInputCols(["sentence"]) \
    .setOutputCol("sentence_bert_embeddings")
clfdl = ClassifierDLApproach()\
    .setInputCols(["sentence_bert_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("label")\
    .setMaxEpochs(1)\
    .setBatchSize(1000)\
    .setOutputLogsPath('./log/') \
    .setEnableOutputLogs(True)
    # .setLr(1e-3)\
    # .setValidationSplit(0.2) \

pipeline = Pipeline(
    stages= [
    documentAssembler,
    sentence,
    sentence_embedding,
    clfdl
])
pipeline.write().overwrite().save("./tmp/bert-sentence-dnn-pipeline")

In [5]:
%%time
bert_sentence_dnn_pipeline = Pipeline.load("./tmp/bert-sentence-dnn-pipeline")
bert_sentence_dnn_model = bert_sentence_dnn_pipeline.fit(spark_chats_sample20000_train)
bert_sentence_dnn_model.write().overwrite().save("./tmp/bert-sentence-dnn-model1")

CPU times: total: 46.9 ms
Wall time: 10min 46s


In [8]:
from sklearn.metrics import accuracy_score

pred = bert_sentence_dnn_model.transform(spark_chats_sample20000_test)
pred_df = pred.select("label", "body", "class.result").toPandas()
pred_df['result'] = pred_df['result'].apply(lambda x: x[0])

print(classification_report(pred_df.label, pred_df.result))
print(accuracy_score(pred_df.label, pred_df.result))

              precision    recall  f1-score   support

           0       0.00      0.00      0.00      2019
           1       0.50      1.00      0.66      1981

    accuracy                           0.50      4000
   macro avg       0.25      0.50      0.33      4000
weighted avg       0.25      0.50      0.33      4000

0.49525


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


### BERT Embeddings

In [5]:
from sparknlp.annotator import BertSentenceEmbeddings
from sparknlp.pretrained import PretrainedPipeline

document_assembler = DocumentAssembler() \
    .setInputCol("body") \
    .setOutputCol("document")
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")
bert_embeddings = BertEmbeddings().pretrained(name='small_bert_L4_512', lang='en') \
    .setInputCols(["document",'token'])\
    .setOutputCol("embeddings")
sentence_embedding = SentenceEmbeddings() \
    .setInputCols(["document", "embeddings"]) \
    .setOutputCol("sentence_embeddings") \
    .setPoolingStrategy("AVERAGE")
clfdl = ClassifierDLApproach()\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("label")\
    .setMaxEpochs(1)\
    .setLr(0.001)\
    .setBatchSize(1000)\
    .setOutputLogsPath('./log/') \
    .setEnableOutputLogs(True)
    # .setLr(1e-3)\
    # .setValidationSplit(0.2) \

pipeline = Pipeline(
    stages= [
    document_assembler,
    tokenizer,
    stopwords_cleaner,
    bert_embeddings,
    sentence_embedding,
    clfdl
])
pipeline.write().overwrite().save("./tmp/bert-pipeline")

small_bert_L4_512 download started this may take some time.
Approximate size to download 104 MB
[OK!]


In [5]:
%%time
bert_pipeline = Pipeline.load("./tmp/bert-pipeline")
bert_dnn_model = bert_pipeline.fit(spark_chats_sample20000_train)
bert_dnn_model.write().overwrite().save("./tmp/bert-dnn-model1")
# bert_dnn_model = PipelineModel.read().load("./tmp/bert-dnn-model1")

CPU times: total: 62.5 ms
Wall time: 1min 29s


In [6]:
from sklearn.metrics import accuracy_score

pred = bert_dnn_model.transform(spark_chats_sample20000_test)
pred_df = pred.select("label", "body", "class.result").toPandas()
pred_df['result'] = pred_df['result'].apply(lambda x: x[0])

print(classification_report(pred_df.label, pred_df.result))
print(accuracy_score(pred_df.label, pred_df.result))

              precision    recall  f1-score   support

           0       0.52      0.78      0.62      2019
           1       0.53      0.25      0.34      1981

    accuracy                           0.52      4000
   macro avg       0.52      0.52      0.48      4000
weighted avg       0.52      0.52      0.48      4000

0.5205


UsageError: Line magic function `%%time` not found.


In [39]:
text_df = [("You got the key to my heart Kiara", "1")]
schema = StructType([StructField("body", StringType(), True)\
                     , StructField("label", StringType(), True)])
mydf = spark.createDataFrame(data=text_df,schema=schema)

In [41]:
documentAssembler = DocumentAssembler() \
    .setInputCol("body") \
    .setOutputCol("document")
sentence = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")
sentence_embedding  = BertSentenceEmbeddings \
    .load("file:///D:\ProgramData\cache_pretrained\pretrained\sent_bert_multi_cased_xx_2.6.0_2.4_1598347692999") \
    .setInputCols(["sentence"]) \
    .setOutputCol("sentence_bert_embeddings")

pipeline = Pipeline(
    stages= [
    documentAssembler,
    sentence,
    sentence_embedding
])

In [42]:
res = pipeline.fit(mydf).transform(mydf)

In [44]:
res.select("sentence_bert_embeddings").show(truncate=300)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                                                                                    sentence_bert_embeddings|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{sentence_embeddings, 0, 32, You got the key to my heart Kiara, {sentence -> 0, token -> 

In [5]:
from sparknlp.annotator import BertSentenceEmbeddings
from sparknlp.pretrained import PretrainedPipeline

documentAssembler = DocumentAssembler() \
    .setInputCol("body") \
    .setOutputCol("document")
sentence = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")
sentence_embedding  = BertSentenceEmbeddings \
    .load("file:///D:\ProgramData\cache_pretrained\pretrained\sent_bert_use_cmlm_multi_base_xx_3.1.3_2.4_1626783880233") \
    .setInputCols(["sentence"]) \
    .setOutputCol("sentence_bert_embeddings")
clfdl = ClassifierDLApproach()\
    .setInputCols(["sentence_bert_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("label")\
    .setMaxEpochs(1)\
    .setLr(0.001)\
    .setBatchSize(1000)\
    .setOutputLogsPath('./log/') \
    .setEnableOutputLogs(True)
    # .setLr(1e-3)\
    # .setValidationSplit(0.2) \

pipeline = Pipeline(
    stages= [
    documentAssembler,
    sentence,
    sentence_embedding,
    clfdl
])
pipeline.write().overwrite().save("./tmp/bert-use-pipeline")

Py4JJavaError: An error occurred while calling o94.load.
: java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$1(TorrentBroadcast.scala:316)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$1$adapted(TorrentBroadcast.scala:316)
	at org.apache.spark.broadcast.TorrentBroadcast$$$Lambda$1583/988352278.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$4(TorrentBroadcast.scala:321)
	at org.apache.spark.broadcast.TorrentBroadcast$$$Lambda$1586/2132246746.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:323)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:140)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:95)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:75)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1529)
	at com.johnsnowlabs.nlp.embeddings.BertSentenceEmbeddings.setModelIfNotSet(BertSentenceEmbeddings.scala:315)
	at com.johnsnowlabs.nlp.embeddings.ReadBertSentenceDLModel.readTensorflow(BertSentenceEmbeddings.scala:437)
	at com.johnsnowlabs.nlp.embeddings.ReadBertSentenceDLModel.readTensorflow$(BertSentenceEmbeddings.scala:431)
	at com.johnsnowlabs.nlp.embeddings.BertSentenceEmbeddings$.readTensorflow(BertSentenceEmbeddings.scala:482)
	at com.johnsnowlabs.nlp.embeddings.ReadBertSentenceDLModel.$anonfun$$init$$1(BertSentenceEmbeddings.scala:440)
	at com.johnsnowlabs.nlp.embeddings.ReadBertSentenceDLModel.$anonfun$$init$$1$adapted(BertSentenceEmbeddings.scala:440)
	at com.johnsnowlabs.nlp.embeddings.ReadBertSentenceDLModel$$Lambda$1534/1844479751.apply(Unknown Source)
	at com.johnsnowlabs.nlp.ParamsAndFeaturesReadable.$anonfun$onRead$1(ParamsAndFeaturesReadable.scala:50)
	at com.johnsnowlabs.nlp.ParamsAndFeaturesReadable.$anonfun$onRead$1$adapted(ParamsAndFeaturesReadable.scala:49)


In [None]:
bert_sentence_use_dnn_model = pipeline.fit(spark_chats_sample20000_train)

In [None]:
bert_sentence_use_dnn_model.write().overwrite().save("./tmp/bert-sentence-use-dnn-model1")

In [6]:
# bert_sentence_dnn_pipeline = Pipeline.read().load("./tmp/bert-dnn-pipeline")
pipeline.write().overwrite().save("./tmp/bert-dnn-pipeline")

In [8]:
bert_sentence_dnn_model.write().overwrite().save("./tmp/bert-sentence-dnn-model1")

In [9]:
bert_sentence_dnn_model = PipelineModel.read().load("./tmp/bert-dnn-model1")

In [9]:
bert_sentence_dnn_model.stages

[DocumentAssembler_af0a5e6f4a2d,
 SentenceDetector_53cd1cd9bbfa,
 BERT_SENTENCE_EMBEDDINGS_8fe73bf451ef,
 ClassifierDLModel_48356a660915]

In [10]:
from sklearn.metrics import accuracy_score

pred = bert_sentence_dnn_model.transform(spark_chats_sample20000_test)
pred_df = pred.select("label", "body", "class.result").toPandas()
pred_df['result'] = pred_df['result'].apply(lambda x: x[0])

In [11]:
print(classification_report(pred_df.label, pred_df.result))
print(accuracy_score(pred_df.label, pred_df.result))

              precision    recall  f1-score   support

           0       0.57      1.00      0.73      2276
           1       0.00      0.00      0.00      1724

    accuracy                           0.57      4000
   macro avg       0.28      0.50      0.36      4000
weighted avg       0.32      0.57      0.41      4000

0.569


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [9]:
# chats_sample50000_train, chats_sample50000_test = train_test_split(chats_sample50000, test_size=0.2, random_state=42)
# chats_sample50000_train.to_csv("chats_sample_50000_train.csv", index=False)
# chats_sample50000_test.to_csv("chats_sample_50000_test.csv", index=False)

In [5]:
# training, test = spark_chats_sample50000.randomSplit([0.8,0.2], seed=42)

In [6]:
# test.write.format("csv").mode('overwrite').save("chats_sample50000_training")

In [4]:
# bert_sentence_dnn_pipeline = Pipeline.read().load("./tmp/bert-dnn-pipeline")

In [19]:
pred.write.json("chats_sample50000_output.json")

In [None]:
# log_file_name = os.listdir("/log")[0]
#
# with open("/root/annotator_logs/"+log_file_name, "r") as log_file :
#     print(log_file.read())