### Text Preprocessing Pipeline
Basic data exploration operations with PySpark and creation of pipeline which involves preprocessing steps for NLP applications by using SparkNLP. 

Setting up sparknlp and importing necessary libraries. 

In [1]:
!wget https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/colab_setup.sh -O - | bash

--2021-10-16 20:44:46--  https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/colab_setup.sh
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1608 (1.6K) [text/plain]
Saving to: ‘STDOUT’

-                     0%[                    ]       0  --.-KB/s               setup Colab for PySpark 3.0.2 and Spark NLP 3.1.0

2021-10-16 20:44:46 (1.72 MB/s) - written to stdout [1608/1608]

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/mac

In [2]:
import sparknlp 
spark= sparknlp.start()

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.functions import count, when, isnan, col

Reading data and a little exploration through PySpark functions. 

In [5]:
df= spark.read\
    .option("header", True)\
    .csv("/content/vaccination_tweets.csv")

In [6]:
#displaying first 5 rows
df.show(5, truncate=30)

+-------------------+--------------------+-------------------------+------------------------------+-------------------+--------------+------------+---------------+-------------+-------------------+------------------------------+------------------------------+-------------------+--------+---------+----------+
|                 id|           user_name|            user_location|              user_description|       user_created|user_followers|user_friends|user_favourites|user_verified|               date|                          text|                      hashtags|             source|retweets|favorites|is_retweet|
+-------------------+--------------------+-------------------------+------------------------------+-------------------+--------------+------------+---------------+-------------+-------------------+------------------------------+------------------------------+-------------------+--------+---------+----------+
|1340539111971516416|          Rachel Roh|La Crescenta-Montrose, CA|Ag

In [7]:
#checking column names
df.columns

['id',
 'user_name',
 'user_location',
 'user_description',
 'user_created',
 'user_followers',
 'user_friends',
 'user_favourites',
 'user_verified',
 'date',
 'text',
 'hashtags',
 'source',
 'retweets',
 'favorites',
 'is_retweet']

In [8]:
#checking column's data types
df.dtypes

[('id', 'string'),
 ('user_name', 'string'),
 ('user_location', 'string'),
 ('user_description', 'string'),
 ('user_created', 'string'),
 ('user_followers', 'string'),
 ('user_friends', 'string'),
 ('user_favourites', 'string'),
 ('user_verified', 'string'),
 ('date', 'string'),
 ('text', 'string'),
 ('hashtags', 'string'),
 ('source', 'string'),
 ('retweets', 'string'),
 ('favorites', 'string'),
 ('is_retweet', 'string')]

In [9]:
#compute summary statistics (not essential for text data though)
df.describe().show()

+-------+--------------------+--------------------+------------------+--------------------+------------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+------------------+-----------------+
|summary|                  id|           user_name|     user_location|    user_description|      user_created|      user_followers|       user_friends|     user_favourites|       user_verified|              date|                text|            hashtags|              source|            retweets|         favorites|       is_retweet|
+-------+--------------------+--------------------+------------------+--------------------+------------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+------------------+-----------------

In [10]:
#print the schema of df
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_created: string (nullable = true)
 |-- user_followers: string (nullable = true)
 |-- user_friends: string (nullable = true)
 |-- user_favourites: string (nullable = true)
 |-- user_verified: string (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- source: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- favorites: string (nullable = true)
 |-- is_retweet: string (nullable = true)



In [11]:
#checking for null values for 'text' column
df.select(count(when(isnan("text") | col("text").isNull(), "text")).alias("text")).show()

+----+
|text|
+----+
|1806|
+----+



There are null values in text, we will drop these.

In [12]:
df= df.na.drop()
df.select(count(when(isnan("text") | col("text").isNull(), "text")).alias("text")).show()

+----+
|text|
+----+
|   0|
+----+



Great! Starting generate the text pre-processing steps with SparkNLP annotators and models. 

In [13]:
#as we will use only text column
df= df.select("text")

In [14]:
from sparknlp.annotator import *
from sparknlp.base import *

In [15]:
documentAssembler= DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

tokenizer= Tokenizer()\
    .setInputCols(["document"])\
    .setOutputCol("token")

normalizer= Normalizer()\
    .setInputCols(["token"])\
    .setOutputCol( "normalized")\
    .setLowercase(True)

stopwords_cleaner= StopWordsCleaner()\
    .setInputCols("normalized")\
    .setOutputCol("cleaned")\
    .setCaseSensitive(False)

lemmatizer_model = LemmatizerModel.pretrained('lemma_antbnc', 'en') \
    .setInputCols(["cleaned"]) \
    .setOutputCol("lemma")

bert_embedding= BertEmbeddings.pretrained("bert_base_cased", "en")\
    .setInputCols(["document", "lemma"])\
    .setOutputCol("bert_embedding")\
    .setCaseSensitive(True)

sentence_embedding= SentenceEmbeddings()\
    .setInputCols(["document", "bert_embedding"])\
    .setOutputCol("sentence_embedding")\
    .setPoolingStrategy("AVERAGE")

embedding_finisher= EmbeddingsFinisher()\
    .setInputCols(["sentence_embedding"])\
    .setOutputCols(["finished_sentence_embedding"])\
    .setOutputAsVector(True)\
    .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[OK!]


Now, putting all these pre-processing steps into the pipeline.

In [16]:
#importing PySpark's Pipeline
from pyspark.ml import Pipeline

In [17]:
nlp_pipeline= Pipeline(stages= [
                                documentAssembler,
                                tokenizer,
                                normalizer,
                                stopwords_cleaner,
                                lemmatizer_model,
                                bert_embedding,
                                sentence_embedding,
                                embedding_finisher
])

Now we have pipeline called 'nlp_pipeline' as ready to fit and transform processes. <br/>
We fit our pipeline first with empty dataFrame in order to enable it to transform with any dataFrame. 

In [18]:
%%time
empty_df= spark.createDataFrame([[" "]]).toDF("text")   #empty_df has been created with 1 'text' column
nlp_model= nlp_pipeline.fit(df)  

CPU times: user 67 ms, sys: 11 ms, total: 78 ms
Wall time: 872 ms


In [19]:
#transforming nlp_model with our dataset
result= nlp_model.transform(df)

Inspecting the results of pre-processing transactions. 

In [20]:
result.show(5, truncate=30)

+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|                          text|                      document|                         token|                    normalized|                       cleaned|                         lemma|                bert_embedding|            sentence_embedding|   finished_sentence_embedding|
+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|Same folks said daikon past...|[[document, 0, 96, Same fol...|[[token, 0, 3, Same, [sente...|[[token, 0, 3, same, [sente...|[[token, 5, 9, folks, [sent...|[

In [26]:
result.printSchema()

root
 |-- text: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 

Word embedding results. 

In [None]:
result.select(F.explode(F.arrays_zip("lemma.result", "bert_embedding.embeddings")).alias("col"))\
      .select(F.expr("col['0']").alias("lemma"),
            F.expr("col['1']").alias("bert")).show(10)

+-----------------+--------------------+
|            lemma|                bert|
+-----------------+--------------------+
|             folk|[-0.058960535, -0...|
|              say|[-0.4545134, -0.3...|
|           daikon|[0.3539787, 0.225...|
|            paste|[0.33013698, 0.00...|
|            treat|[0.1006234, -0.52...|
|         cytokine|[1.1752083, 0.427...|
|            storm|[0.23642492, -0.3...|
|   pfizerbiontech|[0.47123566, -0.9...|
|httpstcoxehhimgkf|[0.7643941, 0.228...|
|      coronavirus|[0.8943809, -0.28...|
+-----------------+--------------------+
only showing top 10 rows



Here are sentence embedding results.


In [50]:
result.select("text", "sentence_embedding.embeddings", "finished_sentence_embedding").show(5, truncate=50)

+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|                                              text|                                        embeddings|                       finished_sentence_embedding|
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|Same folks said daikon paste could treat a cyto...|[[0.3242811, -0.16343313, 0.09832452, 0.4089455...|[[0.32428109645843506,-0.16343313455581665,0.09...|
|#coronavirus #SputnikV #AstraZeneca #PfizerBioN...|[[0.4382467, -0.21708411, 0.13505672, 0.2224946...|[[0.4382466971874237,-0.2170841097831726,0.1350...|
|it is a bit sad to claim the fame for success o...|[[0.0014825637, -0.039872296, -0.06721318, 0.17...|[[0.0014825636753812432,-0.039872296154499054,-...|
|Coronavirus: Iran reports 8,201 new cases, 221 ...|[[0.44694707, -0.2

Inspecting by Pandas dataFrame

In [33]:
import pandas as pd

In [39]:
result.select("token.result").show(5)

+--------------------+
|              result|
+--------------------+
|[Same, folks, sai...|
|[#coronavirus, #S...|
|[it, is, a, bit, ...|
|[Coronavirus, :, ...|
|[The, trump, admi...|
+--------------------+
only showing top 5 rows



In [55]:
df= result.select(F.explode(F.arrays_zip("cleaned.result", "lemma.result", "bert_embedding.embeddings")).alias("col"))\
    .select(F.expr("col['0']").alias("cleaned"),
            F.expr("col['1']").alias("lemma"),
            F.expr("col['2']").alias("bert_embedding")).toPandas()

In [56]:
df.head(10)

Unnamed: 0,cleaned,lemma,bert_embedding
0,folks,folk,"[-0.05896046757698059, -0.2533942461013794, 0...."
1,said,say,"[-0.454512357711792, -0.3617393374443054, 0.08..."
2,daikon,daikon,"[0.3539792597293854, 0.22553156316280365, 0.10..."
3,paste,paste,"[0.3301369845867157, 0.0037823885213583708, 0...."
4,treat,treat,"[0.10062337666749954, -0.5281332731246948, -0...."
5,cytokine,cytokine,"[1.1752082109451294, 0.4270249307155609, -0.16..."
6,storm,storm,"[0.23642480373382568, -0.3116196393966675, -0...."
7,pfizerbiontech,pfizerbiontech,"[0.4712355434894562, -0.9010103344917297, 0.32..."
8,httpstcoxehhimgkf,httpstcoxehhimgkf,"[0.7643942832946777, 0.22865983843803406, 0.58..."
9,coronavirus,coronavirus,"[0.8943809270858765, -0.28010478615760803, -0...."
