![JohnSnowLabs](https://nlp.johnsnowlabs.com/assets/images/logo.png)

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/jupyter/annotation/english/text-matcher-pipeline/extractor.ipynb)

## 0. Colab Setup

In [None]:
import os

# Install java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed spark-nlp

openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
[K     |████████████████████████████████| 215.7MB 55kB/s 
[K     |████████████████████████████████| 204kB 49.3MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 122kB 2.8MB/s 
[?25h

## Simple Text Matching

In the following example, we walk-through our straight forward Text Matcher Annotator.

This annotator will take a list of sentences from a text file and look them up in the given target dataset.

This annotator is an Annotator Model and hence does not require training. 

#### 1. Call necessary imports and set the resource path to read local data files

In [None]:
import os
import sys

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
import time

import sparknlp
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

! wget -N https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/jupyter/annotation/english/text-matcher-pipeline/entities.txt 

--2020-05-12 02:56:03--  https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/jupyter/annotation/english/text-matcher-pipeline/entities.txt
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘entities.txt’

entities.txt            [ <=>                ]  69.24K  --.-KB/s    in 0.03s   

Last-modified header missing -- time-stamps turned off.
2020-05-12 02:56:04 (2.49 MB/s) - ‘entities.txt’ saved [70904]



In [None]:
spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)


Spark NLP version:  2.5.0
Apache Spark version:  2.4.4


#### 3. Create appropriate annotators. We are using Sentence Detection and Tokenizing the sentence. The Finisher will clean the annotations and exclude the metadata.

In [None]:
documentAssembler = DocumentAssembler()\
  .setInputCol("text")\
  .setOutputCol("document")

sentenceDetector = SentenceDetector()\
  .setInputCols(["document"])\
  .setOutputCol("sentence")

tokenizer = Tokenizer()\
  .setInputCols(["document"])\
  .setOutputCol("token")

extractor = TextMatcher()\
  .setEntities("entities.txt")\
  .setInputCols(["token", "sentence"])\
  .setOutputCol("entites")

finisher = Finisher() \
    .setInputCols(["entites"]) \
    .setIncludeMetadata(False) \
    .setCleanAnnotations(True)

pipeline = Pipeline(
    stages = [
    documentAssembler,
    sentenceDetector,
    tokenizer,
    extractor,
    finisher
  ])


#### 4. Load the input data to be annotated

In [None]:
! rm /tmp/sentiment.parquet.zip
! wget -N https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/sentiment.parquet.zip -P /tmp 
! unzip /tmp/sentiment.parquet.zip -d /tmp/ -y

--2020-05-12 02:56:22--  https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/sentiment.parquet.zip
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.145.53
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.145.53|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 76127532 (73M) [application/zip]
Saving to: ‘/tmp/sentiment.parquet.zip’


2020-05-12 02:56:23 (94.2 MB/s) - ‘/tmp/sentiment.parquet.zip’ saved [76127532/76127532]

Archive:  /tmp/sentiment.parquet.zip
caution: filename not matched:  -y


In [None]:
data = spark. \
        read. \
        parquet("/tmp/sentiment.parquet"). \
        limit(1000).cache()
data.show(20)

+------+---------+--------------------+
|itemid|sentiment|                text|
+------+---------+--------------------+
|     1|        0|                 ...|
|     2|        0|                 ...|
|     3|        1|              omg...|
|     4|        0|          .. Omga...|
|     5|        0|         i think ...|
|     6|        0|         or i jus...|
|     7|        1|       Juuuuuuuuu...|
|     8|        0|       Sunny Agai...|
|     9|        1|      handed in m...|
|    10|        1|      hmmmm.... i...|
|    11|        0|      I must thin...|
|    12|        1|      thanks to a...|
|    13|        0|      this weeken...|
|    14|        0|     jb isnt show...|
|    15|        0|     ok thats it ...|
|    16|        0|    &lt;-------- ...|
|    17|        0|    awhhe man.......|
|    18|        1|    Feeling stran...|
|    19|        0|    HUGE roll of ...|
|    20|        0|    I just cut my...|
+------+---------+--------------------+
only showing top 20 rows



#### 5. Running the fit for sentence detection and tokenization.

In [None]:
print("Start fitting")
model = pipeline.fit(data)
print("Fitting is ended")

Start fitting
Fitting is ended


#### 6. Runing the transform on data to do text matching. It will append a new coloumns with matched entities.

In [None]:
extracted = model.transform(data)
extracted.show()

# filter rows with extracted text
extracted\
.filter("size(finished_entites) != 0") \
.show()

+------+---------+--------------------+----------------+
|itemid|sentiment|                text|finished_entites|
+------+---------+--------------------+----------------+
|     1|        0|                 ...|              []|
|     2|        0|                 ...|              []|
|     3|        1|              omg...|              []|
|     4|        0|          .. Omga...|              []|
|     5|        0|         i think ...|              []|
|     6|        0|         or i jus...|              []|
|     7|        1|       Juuuuuuuuu...|              []|
|     8|        0|       Sunny Agai...|              []|
|     9|        1|      handed in m...|              []|
|    10|        1|      hmmmm.... i...|              []|
|    11|        0|      I must thin...|              []|
|    12|        1|      thanks to a...|              []|
|    13|        0|      this weeken...|              []|
|    14|        0|     jb isnt show...|              []|
|    15|        0|     ok thats

#### 7. The model could be saved locally and reloaded to run again

In [None]:

model.write().overwrite().save("./extractor_model")

In [None]:
from pyspark.ml import  Pipeline

sameModel = PipelineModel.read().load("./extractor_model")

sameModel.transform(data) \
.filter("size(finished_entites) != 0") \
.show()