In [2]:
pip --quiet install spark-nlp==3.4.0 pyspark==3.2.0

Note: you may need to restart the kernel to use updated packages.


In [1]:
import os
import sparknlp
import pyspark


os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.json4s:json4s-native_3:4.0.3,com.amazonaws:aws-java-sdk:1.12.136,org.apache.hadoop:hadoop-aws:3.3.1,com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.0 pyspark-shell'


from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.functions import monotonically_increasing_id,split, explode, concat_ws, udf

In [2]:
print("Spark NLP version", sparknlp.version())
print("PySpark version:", pyspark.version)

Spark NLP version 3.4.0
PySpark version: <module 'pyspark.version' from '/home/emr-notebook/.local/lib/python3.7/site-packages/pyspark/version.py'>


In [3]:
#spark configuration
conf = SparkConf()\
.set('spark.executor.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true') \
.set('spark.driver.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true') \
.set("spark.driver.memory", "4G") \
.set("spark.driver.maxResultSize", "0") \
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.set("spark.kryoserializer.buffer.max", "2000m") \
.set("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.0") \
.setAppName('SPARK NLP AWS').setMaster('local[*]')

In [4]:
sc=pyspark.SparkContext(conf=conf)
sc.setSystemProperty('com.amazonaws.services.s3.enableV4', 'true')

In [5]:
accessKeyId='your access Key'
secretAccessKey='your secret key'

train_URI = "s3a://erasolon-ml-dataset/GoEmotions/train.tsv"
train_output_URI = "s3a://erasolon-ml-output/emr/goemotions/train_output"

validation_URI = "s3a://erasolon-ml-dataset/GoEmotions/dev.tsv"
validation_output_URI = "s3a://erasolon-ml-output/emr/goemotions/validation_output"

test_URI = "s3a://erasolon-ml-dataset/GoEmotions/test.tsv"
test_output_URI = "s3a://erasolon-ml-output/emr/goemotions/test_output"

label_URI = "s3a://erasolon-ml-dataset/GoEmotions/labels.txt"

In [6]:
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set('fs.s3a.access.key', accessKeyId)
hadoopConf.set('fs.s3a.secret.key', secretAccessKey)
hadoopConf.set('fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')

spark=SparkSession(sc)

In [7]:
raw_label_df = spark.read.csv(label_URI,header=False,inferSchema=True)
raw_train_df=spark.read.csv(train_URI,header=False,inferSchema=True,sep="\t")
raw_validation_df=spark.read.csv(validation_URI,header=False,inferSchema=True,sep="\t")
raw_test_df=spark.read.csv(test_URI,header=False,inferSchema=True,sep="\t")

In [8]:
renamed_label_df = raw_label_df.withColumnRenamed('_c0', 'label').withColumn("id_label", monotonically_increasing_id())

In [9]:
document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

sentenceDetector = SentenceDetector() \
   .setInputCols(["document"]) \
   .setOutputCol("sentences")
    
tokenizer = Tokenizer() \
    .setInputCols(["sentences"]) \
    .setOutputCol("token")
      
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
    .setInputCols("normalized")\
    .setOutputCol("cleanTokens")\
    .setCaseSensitive(False)

lemma = LemmatizerModel.pretrained('lemma_antbnc') \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("lemma")

tokenAssembler = TokenAssembler() \
   .setInputCols(["sentences", "lemma"]) \
   .setOutputCol("cleanText")

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [10]:
clf_pipeline = Pipeline(
    stages=[document_assembler, 
            sentenceDetector, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            lemma,
           tokenAssembler])

In [11]:
def clean_df(df, pipeline):
    splitted_raw_df = df.withColumn('_c1',explode(split('_c1',',')))
    renamed_df = splitted_raw_df.select('_c0','_c1').withColumnRenamed('_c0', 'text').withColumnRenamed('_c1', 'id_label')
    renamed_df = renamed_df.join(renamed_label_df,"id_label","inner").drop("id_label")
    renamed_df = renamed_df.withColumn("id", monotonically_increasing_id())
    # Annotate your testing dataset
    cleaned_df = pipeline.fit(renamed_df).transform(renamed_df).select("cleanText.result")
    cleaned_df = cleaned_df.withColumn("id", monotonically_increasing_id())
    final_df = cleaned_df.join(renamed_df, "id", "inner").drop('text','id').withColumn('source',concat_ws(" ", 'result')).drop("result")
    return final_df 

In [12]:
final_train_df = clean_df(raw_train_df, clf_pipeline)
final_validation_df = clean_df(raw_validation_df, clf_pipeline)
final_test_df = clean_df(raw_test_df, clf_pipeline)

In [13]:
def blazingtext_format(source, label):
    return "__label__"+label+" "+source

blazingTextUDF = udf(lambda x, z: blazingtext_format(x,z))

In [14]:
manifest_train_df = final_train_df.select(blazingTextUDF("source","label"))
manifest_validation_df = final_validation_df.select(blazingTextUDF("source","label"))
manifest_test_df = final_test_df.select(blazingTextUDF("source","label"))

In [15]:
manifest_train_df.repartition(1).write.text(train_output_URI)
manifest_validation_df.repartition(1).write.text(validation_output_URI)
manifest_test_df.repartition(1).write.text(test_output_URI)