![JohnSnowLabs](https://nlp.johnsnowlabs.com/assets/images/logo.png)

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop//blob/master/tutorials/Certification_Trainings/Public/6.Playground_DataFrames.ipynb)

# Spark DataFrames Playground

In [1]:
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash
# !bash colab.sh
# by default they are set to the latest
# -p is for pyspark
# -s is for spark-nlp
# setup colab with a specific version:
# !bash colab.sh -p 3.1.1 -s 3.0.1

--2021-04-12 16:28:51--  http://setup.johnsnowlabs.com/colab.sh
Resolving setup.johnsnowlabs.com (setup.johnsnowlabs.com)... 51.158.130.26
Connecting to setup.johnsnowlabs.com (setup.johnsnowlabs.com)|51.158.130.26|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh [following]
--2021-04-12 16:28:51--  https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1593 (1.6K) [text/plain]
Saving to: ‘STDOUT’


2021-04-12 16:28:52 (42.5 MB/s) - written to stdout [1593/1593]

setup Colab for PySpark 3.1.1 and Spark NLP 3.0.1
[K     |████████████████████████████████|


<b>  if you want to work with Spark 2.3 </b>
```
import os

# Install java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz

!tar xf spark-2.3.0-bin-hadoop2.7.tgz
!pip install -q findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
os.environ["SPARK_HOME"] = "/content/spark-2.3.0-bin-hadoop2.7"
! java -version

import findspark
findspark.init()
from pyspark.sql import SparkSession

! pip install --ignore-installed -q spark-nlp==2.7.5

import sparknlp

spark = sparknlp.start(spark23=True)
```

In [2]:
import sparknlp

spark = sparknlp.start()

from sparknlp.base import *
from sparknlp.annotator import *

from pyspark.ml import Pipeline

print("Spark NLP version", sparknlp.version())

print("Apache Spark version:", spark.version)

spark

Spark NLP version 3.0.1
Apache Spark version: 3.1.1


In [3]:
spark = sparknlp.start()

In [4]:
document = DocumentAssembler().setInputCol('text').setOutputCol('document')

In [5]:
tokenizer = Tokenizer().setInputCols('document').setOutputCol('token')

In [6]:
pos = PerceptronModel.pretrained().setInputCols('document', 'token').setOutputCol('pos')

pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]


In [7]:
pipeline = Pipeline().setStages([document, tokenizer, pos])

In [8]:
!wget https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/jupyter/annotation/english/spark-nlp-basics/sample-sentences-en.txt

--2021-04-12 16:31:05--  https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/jupyter/annotation/english/spark-nlp-basics/sample-sentences-en.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 284 [text/plain]
Saving to: ‘sample-sentences-en.txt’


2021-04-12 16:31:05 (19.2 MB/s) - ‘sample-sentences-en.txt’ saved [284/284]



In [9]:
data = spark.read.text('./sample-sentences-en.txt').toDF('text')

In [10]:
data.show(5)

+--------------------+
|                text|
+--------------------+
|Peter is a very g...|
|My life in Russia...|
|John and Peter ar...|
|Lucas Nogal Dunbe...|
|Europe is very cu...|
+--------------------+



In [11]:
model = pipeline.fit(data)

In [12]:
result = model.transform(data)

In [13]:
result.show(5)

+--------------------+--------------------+--------------------+--------------------+
|                text|            document|               token|                 pos|
+--------------------+--------------------+--------------------+--------------------+
|Peter is a very g...|[{document, 0, 27...|[{token, 0, 4, Pe...|[{pos, 0, 4, NNP,...|
|My life in Russia...|[{document, 0, 37...|[{token, 0, 1, My...|[{pos, 0, 1, PRP$...|
|John and Peter ar...|[{document, 0, 76...|[{token, 0, 3, Jo...|[{pos, 0, 3, NNP,...|
|Lucas Nogal Dunbe...|[{document, 0, 67...|[{token, 0, 4, Lu...|[{pos, 0, 4, NNP,...|
|Europe is very cu...|[{document, 0, 68...|[{token, 0, 5, Eu...|[{pos, 0, 5, NNP,...|
+--------------------+--------------------+--------------------+--------------------+



In [14]:
stored = result\
  .select('text', 'pos.begin', 'pos.end', 'pos.result', 'pos.metadata')\
  .toDF('text', 'pos_begin', 'pos_end', 'pos_result', 'pos_meta')\
  .cache()

In [15]:
stored.printSchema()

root
 |-- text: string (nullable = true)
 |-- pos_begin: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- pos_end: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- pos_result: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pos_meta: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)



In [16]:
stored.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|           pos_begin|             pos_end|          pos_result|            pos_meta|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Peter is a very g...|[0, 6, 9, 11, 16,...|[4, 7, 9, 14, 19,...|[NNP, VBZ, DT, RB...|[{word -> Peter},...|
|My life in Russia...|[0, 3, 8, 11, 18,...|[1, 6, 9, 16, 19,...|[PRP$, NN, IN, NN...|[{word -> My}, {w...|
|John and Peter ar...|[0, 5, 9, 15, 19,...|[3, 7, 13, 17, 26...|[NNP, CC, NNP, VB...|[{word -> John}, ...|
|Lucas Nogal Dunbe...|[0, 6, 12, 23, 26...|[4, 10, 21, 24, 2...|[NNP, NNP, NNP, V...|[{word -> Lucas},...|
|Europe is very cu...|[0, 7, 10, 15, 23...|[5, 8, 13, 21, 26...|[NNP, VBZ, RB, RB...|[{word -> Europe}...|
+--------------------+--------------------+--------------------+--------------------+--------------------+



---------
## Spark SQL Functions

In [17]:
from pyspark.sql.functions import *

In [18]:
stored.filter(array_contains('pos_result', 'VBD')).show(5)

+----+---------+-------+----------+--------+
|text|pos_begin|pos_end|pos_result|pos_meta|
+----+---------+-------+----------+--------+
+----+---------+-------+----------+--------+



In [19]:
stored.withColumn('token_count', size('pos_result')).select('pos_result', 'token_count').show(5)

+--------------------+-----------+
|          pos_result|token_count|
+--------------------+-----------+
|[NNP, VBZ, DT, RB...|          7|
|[PRP$, NN, IN, NN...|          8|
|[NNP, CC, NNP, VB...|         15|
|[NNP, NNP, NNP, V...|         15|
|[NNP, VBZ, RB, RB...|         15|
+--------------------+-----------+



In [20]:
stored.select('text', array_max('pos_end')).show(5)

+--------------------+------------------+
|                text|array_max(pos_end)|
+--------------------+------------------+
|Peter is a very g...|                27|
|My life in Russia...|                37|
|John and Peter ar...|                76|
|Lucas Nogal Dunbe...|                67|
|Europe is very cu...|                68|
+--------------------+------------------+



In [21]:
stored.withColumn('unique_pos', array_distinct('pos_result')).select('pos_result', 'unique_pos').show(5)

+--------------------+--------------------+
|          pos_result|          unique_pos|
+--------------------+--------------------+
|[NNP, VBZ, DT, RB...|[NNP, VBZ, DT, RB...|
|[PRP$, NN, IN, NN...|[PRP$, NN, IN, NN...|
|[NNP, CC, NNP, VB...|[NNP, CC, VBP, NN...|
|[NNP, NNP, NNP, V...|[NNP, VBZ, DT, RB...|
|[NNP, VBZ, RB, RB...|[NNP, VBZ, RB, JJ...|
+--------------------+--------------------+



In [22]:
stored.groupBy(array_distinct('pos_result')).count().show(10)

+--------------------------+-----+
|array_distinct(pos_result)|count|
+--------------------------+-----+
|      [NNP, CC, VBP, NN...|    1|
|      [NNP, VBZ, DT, RB...|    1|
|      [NNP, VBZ, RB, JJ...|    1|
|      [PRP$, NN, IN, NN...|    1|
|      [NNP, VBZ, DT, RB...|    1|
+--------------------------+-----+



----------------
### SQL Functions with `col`

In [23]:
from pyspark.sql.functions import col

In [24]:
stored.select(col('pos_meta').getItem(0).getItem('word')).show(5)

+-----------------+
|pos_meta[0][word]|
+-----------------+
|            Peter|
|               My|
|             John|
|            Lucas|
|           Europe|
+-----------------+



-------------
### Spark NLP Annotation UDFs

In [25]:
result.select('pos').show(1, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|pos                                                                                                                                                                                                                                                                    |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{pos, 0, 4, NNP, {word -> Peter}, []}, {pos, 6, 7, VBZ, {word -> is}, []}, {pos, 9, 9, DT, {word -> a}, []}, {pos, 11, 14, RB, {word -> very}, []}, {pos, 16, 19, JJ, {word -> good}, []}, {pos, 21, 26,

In [26]:
def nn_tokens(annotations):
    nn_annotations = list(
        filter(lambda annotation: annotation.result == 'NN', annotations)
    )
    return list(
        map(lambda nn_annotation: nn_annotation.metadata['word'], nn_annotations)
    )

In [27]:
from sparknlp.functions import *

In [28]:
from pyspark.sql.types import ArrayType, StringType

In [29]:
result.select(map_annotations(nn_tokens, ArrayType(StringType()))('pos').alias('nn_tokens')).show(truncate=False)

PythonException: ignored