# 015043 Project-2

In [None]:
%%time

import numpy as np  
import pandas as pd  
from tqdm.auto import tqdm
tqdm.pandas()

import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark==2.4.7 spark-nlp==2.7.5

#!ls '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['PATH'] = os.environ['JAVA_HOME'] + "/bin:" + os.environ['PATH']

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import sparknlp
from sparknlp.annotator import *
from sparknlp.base import *
spark = sparknlp.start()

from sklearn.metrics import classification_report

print("Apache Spark version", spark.version)
print("Spark NLP version", sparknlp.version())

debconf: delaying package configuration, since apt-utils is not installed
openjdk version "1.8.0_282"
OpenJDK Runtime Environment (build 1.8.0_282-8u282-b08-0ubuntu1~18.04-b08)
OpenJDK 64-Bit Server VM (build 25.282-b08, mixed mode)
Collecting pyspark==2.4.7
  Downloading pyspark-2.4.7.tar.gz (217.9 MB)
[K     |████████████████████████████████| 217.9 MB 52 kB/s 
[?25hCollecting spark-nlp==2.7.5
  Downloading spark_nlp-2.7.5-py2.py3-none-any.whl (139 kB)
[K     |████████████████████████████████| 139 kB 57.7 MB/s 
[?25hCollecting py4j==0.10.7
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[K     |████████████████████████████████| 197 kB 43.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \

# Data

In [1]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [None]:
train = pd.read_csv('/gdrive/MyDrive/Colab_Notebooks/FA/Project_24thDEC_2021/atis_intents_train.csv')
train.columns = ['intent', 'snippet']

print(train.shape)
train.head()

(4833, 2)


Unnamed: 0,intent,snippet
0,atis_flight,what flights are available from pittsburgh to...
1,atis_flight_time,what is the arrival time in san francisco for...
2,atis_airfare,cheapest airfare from tacoma to orlando
3,atis_airfare,round trip fares from pittsburgh to philadelp...
4,atis_flight,i need a flight tomorrow from columbus to min...


In [None]:
train.intent.value_counts(), train.intent.value_counts(normalize=True)

(atis_flight            3665
 atis_airfare            423
 atis_ground_service     255
 atis_airline            157
 atis_abbreviation       147
 atis_aircraft            81
 atis_flight_time         54
 atis_quantity            51
 Name: intent, dtype: int64,
 atis_flight            0.758328
 atis_airfare           0.087523
 atis_ground_service    0.052762
 atis_airline           0.032485
 atis_abbreviation      0.030416
 atis_aircraft          0.016760
 atis_flight_time       0.011173
 atis_quantity          0.010552
 Name: intent, dtype: float64)

In [None]:
test = pd.read_csv('/gdrive/MyDrive/Colab_Notebooks/FA/Project_24thDEC_2021/atis_intents_test.csv')
test.columns = ['intent', 'snippet']

print(test.shape)
test.head()

(799, 2)


Unnamed: 0,intent,snippet
0,atis_airfare,on april first i need a ticket from tacoma to...
1,atis_flight,on april first i need a flight going from pho...
2,atis_flight,i would like a flight traveling one way from ...
3,atis_flight,i would like a flight from orlando to salt la...
4,atis_flight,i need a flight from toronto to newark one wa...


In [None]:
test.intent.value_counts(), test.intent.value_counts(normalize=True)

(atis_flight            631
 atis_airfare            48
 atis_airline            38
 atis_ground_service     36
 atis_abbreviation       33
 atis_aircraft            9
 atis_quantity            3
 atis_flight_time         1
 Name: intent, dtype: int64,
 atis_flight            0.789737
 atis_airfare           0.060075
 atis_airline           0.047559
 atis_ground_service    0.045056
 atis_abbreviation      0.041302
 atis_aircraft          0.011264
 atis_quantity          0.003755
 atis_flight_time       0.001252
 Name: intent, dtype: float64)

# Intent classification with a Spark NLP 

In [None]:
document_assembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

use = UniversalSentenceEncoder.pretrained('tfhub_use', lang="en") \
    .setInputCols(["document"])\
    .setOutputCol("sentence_embeddings")

document_classifier = ClassifierDLModel.pretrained('classifierdl_use_atis', 'en') \
  .setInputCols(["document", "sentence_embeddings"]) \
  .setOutputCol("class")

nlpPipeline = Pipeline(stages=[document_assembler, use, document_classifier])
light_pipeline = LightPipeline(nlpPipeline.fit(spark.createDataFrame([['']]).toDF("text")))

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]
classifierdl_use_atis download started this may take some time.
Approximate size to download 21.1 MB
[OK!]


In [None]:
example = ['I want to fly from Albany NY to Tampa Florida.', 'what would be the cost of the flight ']
result = light_pipeline.annotate(example)
result

[{'document': ['I want to fly from Albany NY to Tampa Florida.'],
  'sentence_embeddings': ['I want to fly from Albany NY to Tampa Florida.'],
  'class': ['atis_flight']},
 {'document': ['what would be the cost of the flight '],
  'sentence_embeddings': ['what would be the cost of the flight '],
  'class': ['atis_airfare']}]

### Performance on test

In [None]:
example = test.snippet.tolist()
result = light_pipeline.annotate(example)
Preds = []
for j in range(len(result)):
    Preds.append(result[j]['class'][0])
    
Truth = test.intent.tolist()

print(classification_report(Truth, Preds))

                     precision    recall  f1-score   support

  atis_abbreviation       1.00      1.00      1.00        33
      atis_aircraft       0.00      0.00      0.00         9
       atis_airfare       0.61      1.00      0.76        48
       atis_airline       0.49      1.00      0.66        38
        atis_flight       0.99      0.90      0.95       631
   atis_flight_time       0.00      0.00      0.00         1
atis_ground_service       0.97      1.00      0.99        36
      atis_quantity       0.00      0.00      0.00         3

           accuracy                           0.91       799
          macro avg       0.51      0.61      0.54       799
       weighted avg       0.93      0.91      0.91       799



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
test = test[(test['intent'] != 'atis_aircraft') &
                   (test['intent'] != 'atis_flight_time') &
                   (test['intent'] != 'atis_quantity')  ] 
test.shape

(786, 2)

In [None]:
example = test.snippet.tolist()
result = light_pipeline.annotate(example)
Preds = []
for j in range(len(result)):
    Preds.append(result[j]['class'][0])
    
Truth = test.intent.tolist()

print(classification_report(Truth, Preds))

                     precision    recall  f1-score   support

  atis_abbreviation       1.00      1.00      1.00        33
       atis_airfare       0.62      1.00      0.76        48
       atis_airline       0.56      1.00      0.72        38
        atis_flight       1.00      0.90      0.95       631
atis_ground_service       0.97      1.00      0.99        36

           accuracy                           0.92       786
          macro avg       0.83      0.98      0.88       786
       weighted avg       0.95      0.92      0.93       786



In [None]:
trainDataset = spark.createDataFrame(train)
trainDataset.show(truncate=50)

+-------------------+--------------------------------------------------+
|             intent|                                           snippet|
+-------------------+--------------------------------------------------+
|        atis_flight| what flights are available from pittsburgh to ...|
|   atis_flight_time| what is the arrival time in san francisco for ...|
|       atis_airfare|           cheapest airfare from tacoma to orlando|
|       atis_airfare| round trip fares from pittsburgh to philadelph...|
|        atis_flight| i need a flight tomorrow from columbus to minn...|
|      atis_aircraft| what kind of aircraft is used on a flight from...|
|        atis_flight| show me the flights from pittsburgh to los ang...|
|        atis_flight|             all flights from boston to washington|
|atis_ground_service| what kind of ground transportation is availabl...|
|        atis_flight|  show me the flights from dallas to san francisco|
|        atis_flight| show me the flights from san 

In [None]:
from pyspark.sql.functions import col

print(trainDataset.count())

trainDataset.groupBy("intent") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

4833
+-------------------+-----+
|             intent|count|
+-------------------+-----+
|        atis_flight| 3665|
|       atis_airfare|  423|
|atis_ground_service|  255|
|       atis_airline|  157|
|  atis_abbreviation|  147|
|      atis_aircraft|   81|
|   atis_flight_time|   54|
|      atis_quantity|   51|
+-------------------+-----+



In [None]:
testDataset = spark.createDataFrame(test)
testDataset.show(truncate=50)

+------------+--------------------------------------------------+
|      intent|                                           snippet|
+------------+--------------------------------------------------+
|atis_airfare| on april first i need a ticket from tacoma to ...|
| atis_flight| on april first i need a flight going from phoe...|
| atis_flight| i would like a flight traveling one way from p...|
| atis_flight| i would like a flight from orlando to salt lak...|
| atis_flight| i need a flight from toronto to newark one way...|
| atis_flight| monday morning i would like to fly from columb...|
| atis_flight| on wednesday april sixth i would like to fly f...|
| atis_flight| after 12 pm on wednesday april sixth i would l...|
| atis_flight| are there any flights from long beach to colum...|
| atis_flight|       find a flight from memphis to tacoma dinner|
| atis_flight| on next wednesday flight from kansas city to c...|
| atis_flight| flight on american from miami to chicago arriv...|
| atis_fli

In [None]:
from pyspark.sql.functions import col

print(testDataset.count())

testDataset.groupBy("intent") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

786
+-------------------+-----+
|             intent|count|
+-------------------+-----+
|        atis_flight|  631|
|       atis_airfare|   48|
|       atis_airline|   38|
|atis_ground_service|   36|
|  atis_abbreviation|   33|
+-------------------+-----+



In [None]:
document_assembler = DocumentAssembler() \
    .setInputCol("snippet") \
    .setOutputCol("document")
    
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")
    
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

lemma = LemmatizerModel.pretrained('lemma_antbnc') \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("lemma")

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [None]:
glove_embeddings = WordEmbeddingsModel().pretrained() \
 .setInputCols(["document",'lemma'])\
 .setOutputCol("embeddings")\
 .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "embeddings"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")

classsifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("intent")\
  .setMaxEpochs(3)\
  .setEnableOutputLogs(True)
  #.setOutputLogsPath('logs')

clf_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            lemma, 
            glove_embeddings,
            embeddingsSentence,
            classsifierdl])

glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


### default classifierDL params:

maxEpochs -> 10,
lr -> 5e-3f,
dropout -> 0.5f,
batchSize -> 64,
enableOutputLogs -> false,
verbose -> Verbose.Silent.id,
validationSplit -> 0.0f,
outputLogsPath -> ""


### Train

In [None]:
%%time

# Train gpu = 14 secs... no gpu 16 secs
#spark = sparknlp.start(gpu = True)

clf_pipelineModel = clf_pipeline.fit(trainDataset)

CPU times: user 115 ms, sys: 29.6 ms, total: 144 ms
Wall time: 17.8 s


### Test

In [None]:
test = pd.read_csv('/gdrive/MyDrive/Colab_Notebooks/FA/Project_24thDEC_2021/atis_intents_test.csv')
test.columns = ['intent', 'snippet']

print(test.shape)
test.head()

(799, 2)


Unnamed: 0,intent,snippet
0,atis_airfare,on april first i need a ticket from tacoma to...
1,atis_flight,on april first i need a flight going from pho...
2,atis_flight,i would like a flight traveling one way from ...
3,atis_flight,i would like a flight from orlando to salt la...
4,atis_flight,i need a flight from toronto to newark one wa...


In [None]:
testDataset = spark.createDataFrame(test)
testDataset.show(10)

+------------+--------------------+
|      intent|             snippet|
+------------+--------------------+
|atis_airfare| on april first i...|
| atis_flight| on april first i...|
| atis_flight| i would like a f...|
| atis_flight| i would like a f...|
| atis_flight| i need a flight ...|
| atis_flight| monday morning i...|
| atis_flight| on wednesday apr...|
| atis_flight| after 12 pm on w...|
| atis_flight| are there any fl...|
| atis_flight| find a flight fr...|
+------------+--------------------+
only showing top 10 rows



In [None]:
# get the predictions on test Set

preds = clf_pipelineModel.transform(testDataset)
preds_df = preds.select("intent","class.result").toPandas()
for j in range(preds_df.shape[0]):
    preds_df.result[j] = preds_df.result[j][0]
preds_df.sample(10)

Unnamed: 0,intent,result
389,atis_flight,atis_flight
190,atis_airline,atis_flight
325,atis_abbreviation,atis_flight
137,atis_flight,atis_flight
14,atis_flight,atis_flight
624,atis_airfare,atis_flight
766,atis_flight,atis_flight
490,atis_airline,atis_flight
638,atis_flight,atis_flight
590,atis_flight,atis_flight


In [None]:

# Definitely WORSE than the pretrained model above...this one always predicts the majority class 

print(classification_report(preds_df.intent, preds_df.result))

                     precision    recall  f1-score   support

  atis_abbreviation       0.00      0.00      0.00        33
      atis_aircraft       0.00      0.00      0.00         9
       atis_airfare       0.00      0.00      0.00        48
       atis_airline       0.00      0.00      0.00        38
        atis_flight       0.79      1.00      0.88       631
   atis_flight_time       0.00      0.00      0.00         1
atis_ground_service       0.00      0.00      0.00        36
      atis_quantity       0.00      0.00      0.00         3

           accuracy                           0.79       799
          macro avg       0.10      0.12      0.11       799
       weighted avg       0.62      0.79      0.70       799



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
%%time

document_assembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")

embeddings = WordEmbeddingsModel.pretrained("glove_840B_300", "xx")\
          .setInputCols("document", "token") \
          .setOutputCol("embeddings")

ner = NerDLModel.pretrained("nerdl_atis_840b_300d", "en") \
        .setInputCols(["document", "token", "embeddings"]) \
        .setOutputCol("ner")

ner_converter = NerConverter()\
    .setInputCols(['document', 'token', 'ner']) \
    .setOutputCol('ner_chunk')

pipeline = Pipeline(stages=[document_assembler, tokenizer, embeddings, ner, ner_converter])


glove_840B_300 download started this may take some time.
Approximate size to download 2.3 GB
[OK!]
nerdl_atis_840b_300d download started this may take some time.
Approximate size to download 14.5 MB
[OK!]
CPU times: user 169 ms, sys: 67.2 ms, total: 236 ms
Wall time: 2min 40s


In [None]:
example = spark.createDataFrame(pd.DataFrame({'text': [
    "How much would cost a trip from Albany to Miami for tomorrow"
    ]}))
result = pipeline.fit(example).transform(example)

result

DataFrame[text: string, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, token: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, embeddings: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, ner: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, ner_chunk: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>]

In [None]:
# Visualize outputs as data frame ... NOTE pyspark df not pandas df...

exploded = F.explode(F.arrays_zip('ner_chunk.result', 'ner_chunk.metadata'))
select_expression_0 = F.expr("cols['0']").alias("chunk")
select_expression_1 = F.expr("cols['1']['entity']").alias("ner_label")
result.select(exploded.alias("cols")) \
    .select(select_expression_0, select_expression_1).show(truncate=False)

+--------+--------------------------+
|chunk   |ner_label                 |
+--------+--------------------------+
|Albany  |fromloc.airport_code      |
|Miami   |toloc.city_name           |
|tomorrow|depart_date.today_relative|
+--------+--------------------------+



In [None]:
resultDF = result.toPandas()
resultDF

Unnamed: 0,text,document,token,embeddings,ner,ner_chunk
0,How much would cost a trip from Albany to Miam...,"[(document, 0, 59, How much would cost a trip ...","[(token, 0, 2, How, {'sentence': '0'}, []), (t...","[(word_embeddings, 0, 2, How, {'sentence': '0'...","[(named_entity, 0, 2, O, {'word': 'How', 'conf...","[(chunk, 32, 37, Albany, {'sentence': '0', 'ch..."


In [None]:
IOIdf = pd.DataFrame(columns = ['Result', 'Entity', 'StrBegin', 'StrEnd'])


for i in range(len(resultDF['ner_chunk'][0])):
    #ResultEntity = [result['ner_chunk'][0][i]['result'] , result['ner_chunk'][0][i]['metadata']['entity']]
    ResultEntity = pd.DataFrame()
    ResultEntity['Result'] = [resultDF['ner_chunk'][0][i]['result']]
    ResultEntity['Entity'] = [resultDF['ner_chunk'][0][i]['metadata']['entity']]
    ResultEntity['StrBegin'] = [resultDF['ner_chunk'][0][i]['begin']]
    ResultEntity['StrEnd'] = [resultDF['ner_chunk'][0][i]['end']]
    #print(ResultEntity)
    
    IOIdf = IOIdf.append(ResultEntity)
    
IOIdf 


Unnamed: 0,Result,Entity,StrBegin,StrEnd
0,Albany,fromloc.airport_code,32,37
0,Miami,toloc.city_name,42,46
0,tomorrow,depart_date.today_relative,52,59


In [None]:
result

DataFrame[text: string, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, token: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, embeddings: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, ner: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, ner_chunk: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>]