# Exercise 2: Advanced Analytics NLP

In [1]:
!pip install spark-nlp==1.7.3
!pip install pandas

Collecting spark-nlp==1.7.3
[?25l  Downloading https://files.pythonhosted.org/packages/7e/c1/3ec550fbc22efdcac013a301f74d6c904ec545bef291b414be90d900d1d8/spark_nlp-1.7.3-py2.py3-none-any.whl (72.8MB)
[K    100% |████████████████████████████████| 72.8MB 518kB/s eta 0:00:01  1% |▍                               | 972kB 12.2MB/s eta 0:00:06    13% |████▏                           | 9.5MB 29.8MB/s eta 0:00:03    25% |████████▎                       | 18.8MB 31.2MB/s eta 0:00:02    33% |██████████▉                     | 24.7MB 28.1MB/s eta 0:00:02    41% |█████████████▏                  | 30.0MB 26.1MB/s eta 0:00:02    43% |█████████████▉                  | 31.4MB 27.9MB/s eta 0:00:02    50% |████████████████▎               | 36.9MB 30.0MB/s eta 0:00:02    52% |████████████████▉               | 38.3MB 31.6MB/s eta 0:00:02    54% |█████████████████▌              | 39.7MB 28.3MB/s eta 0:00:02    58% |██████████████████▋             | 42.4MB 27.8MB/s eta 0:00:02    68% |██████████████████████

In [2]:
import pandas as pd
pd.set_option('max_colwidth', 800)

# Create a spark context that includes a 3rd party jar for NLP

In [21]:
#jarPath = "spark-nlp-assembly-1.7.3.jar"
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# Todo
spark = SparkSession.builder\
        .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:1.8.2")\
        .getOrCreate()

# Read multiple files in a dir as one Dataframe

In [22]:
dataPath = "./data/reddit/*.json"
df = spark.read.json(dataPath)
print(df.count())
df.printSchema()

100
root
 |-- data: struct (nullable = true)
 |    |-- approved_at_utc: string (nullable = true)
 |    |-- approved_by: string (nullable = true)
 |    |-- archived: boolean (nullable = true)
 |    |-- author: string (nullable = true)
 |    |-- author_flair_background_color: string (nullable = true)
 |    |-- author_flair_css_class: string (nullable = true)
 |    |-- author_flair_richtext: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- author_flair_template_id: string (nullable = true)
 |    |-- author_flair_text: string (nullable = true)
 |    |-- author_flair_text_color: string (nullable = true)
 |    |-- author_flair_type: string (nullable = true)
 |    |-- author_fullname: string (nullable = true)
 |    |-- author_patreon_flair: boolean (nullable = true)
 |    |-- banned_at_utc: string (nullable = true)
 |    |-- banned_by: string (nullable = true)
 |    |-- can_gild: boolean (nullable = true)
 |    |-- can_mod_post: boolean (nullable = true)


# Deal with Struct type to query subfields 

In [23]:
title = "data.title"
author = "data.author"

df = df.select(title, author)
df.limit(5).toPandas()

Unnamed: 0,title,author
0,"Microsoft Corp said it has discovered hacking targeting democratic institutions, think tanks, and non-profit organizations in Europe.",jaykirsch
1,Deutsche Bank reportedly planned to extend the dates of $340 million in loans to Trump Organization to avoid a potential nightmare of chasing a sitting president for cash,canuck_burger
2,"Iranian ""morality police"" were forced to fire warning shots when a crowd intervened to prevent them from arresting two women for not wearing a hijab. The incident occurred in Tehran's northeastern Narmak neighbourhood on Friday night, and ended with a mob tearing the door off a police vehicle.",honolulu_oahu_mod
3,"Trump administration 'pushing Saudi nuclear deal' which could benefit company linked to Jared Kushner - Senior Trump administration officials pushed a project to share nuclear power technology with Saudi Arabia over the objections of ethics officials, according to a congressional report",madam1
4,"NASA Happily Reports the Earth is Greener, With More Trees Than 20 Years Ago–and It's Thanks to China, India",purplexxx


# Try to implement the equivalent of flatMap in dataframes

In [31]:
dfWordCount = df.select(F.explode(F.split("title", "\\s+")).alias("word"))\
                .groupBy("word").count().orderBy(F.col("count").desc())

In [32]:
dfWordCount.limit(10).toPandas()

Unnamed: 0,word,count
0,to,58
1,the,46
2,of,42
3,in,41
4,a,25
5,for,20
6,and,19
7,from,12
8,on,11
9,over,10


# Use an NLP libary to do Part-of-Speech Tagging

In [34]:
from com.johnsnowlabs.nlp.pretrained.pipeline.en import BasicPipeline as bp
dfAnnotated = bp.annotate(df, "title")
dfAnnotated.printSchema()

root
 |-- text: string (nullable = true)
 |-- author: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- normal: array (nullable = true)
 |    |-- element: struct (contains

## Deal with Map type to query subfields

In [39]:
dfPos = dfAnnotated.select("text", "pos.metadata", "pos.result")
dfPos.limit(3).toPandas()

Unnamed: 0,text,metadata,result
0,"Microsoft Corp said it has discovered hacking targeting democratic institutions, think tanks, and non-profit organizations in Europe.","[{'word': 'Microsoft'}, {'word': 'Corp'}, {'word': 'said'}, {'word': 'it'}, {'word': 'has'}, {'word': 'discovered'}, {'word': 'hacking'}, {'word': 'targeting'}, {'word': 'democratic'}, {'word': 'institutions'}, {'word': 'think'}, {'word': 'tanks'}, {'word': 'and'}, {'word': 'nonprofit'}, {'word': 'organizations'}, {'word': 'in'}, {'word': 'Europe'}]","[NNP, NNP, VBD, PRP, VBZ, VBN, VBG, VBG, JJ, NNS, VBP, NNS, CC, NN, NNS, IN, NNP]"
1,Deutsche Bank reportedly planned to extend the dates of $340 million in loans to Trump Organization to avoid a potential nightmare of chasing a sitting president for cash,"[{'word': 'Deutsche'}, {'word': 'Bank'}, {'word': 'reportedly'}, {'word': 'planned'}, {'word': 'to'}, {'word': 'extend'}, {'word': 'the'}, {'word': 'dates'}, {'word': 'of'}, {'word': 'million'}, {'word': 'in'}, {'word': 'loans'}, {'word': 'to'}, {'word': 'Trump'}, {'word': 'Organization'}, {'word': 'to'}, {'word': 'avoid'}, {'word': 'a'}, {'word': 'potential'}, {'word': 'nightmare'}, {'word': 'of'}, {'word': 'chasing'}, {'word': 'a'}, {'word': 'sitting'}, {'word': 'president'}, {'word': 'for'}, {'word': 'cash'}]","[NNP, NNP, RB, VBD, TO, VB, DT, NNS, IN, CD, IN, NNS, TO, NNP, NNP, TO, VB, DT, JJ, NN, IN, VBG, DT, VBG, NN, IN, NN]"
2,"Iranian ""morality police"" were forced to fire warning shots when a crowd intervened to prevent them from arresting two women for not wearing a hijab. The incident occurred in Tehran's northeastern Narmak neighbourhood on Friday night, and ended with a mob tearing the door off a police vehicle.","[{'word': 'Iranian'}, {'word': 'morality'}, {'word': 'police'}, {'word': 'were'}, {'word': 'forced'}, {'word': 'to'}, {'word': 'fire'}, {'word': 'warning'}, {'word': 'shots'}, {'word': 'when'}, {'word': 'a'}, {'word': 'crowd'}, {'word': 'intervened'}, {'word': 'to'}, {'word': 'prevent'}, {'word': 'them'}, {'word': 'from'}, {'word': 'arresting'}, {'word': 'two'}, {'word': 'women'}, {'word': 'for'}, {'word': 'not'}, {'word': 'wearing'}, {'word': 'a'}, {'word': 'hijab'}, {'word': 'The'}, {'word': 'incident'}, {'word': 'occurred'}, {'word': 'in'}, {'word': 'Tehran'}, {'word': 's'}, {'word': 'northeastern'}, {'word': 'Narmak'}, {'word': 'neighbourhood'}, {'word': 'on'}, {'word': 'Friday'}, {'word': 'night'}, {'word': 'and'}, {'word': 'ended'}, {'word': 'with'}, {'word': 'a'}, {'word': 'mob'...","[JJ, NN, NN, VBD, VBN, TO, VB, NN, NNS, WRB, DT, NN, VBD, TO, VB, PRP, IN, VBG, CD, NNS, IN, RB, VBG, DT, NN, DT, NN, VBD, IN, NNP, VBZ, JJ, NNP, NN, IN, NNP, NN, CC, VBD, IN, DT, NN, VBG, DT, NN, RP, DT, NN, NN]"


In [41]:
dfPosTag = dfAnnotated.select(F.explode("pos").alias("pos"))

In [42]:
dfPosTag.printSchema()

root
 |-- pos: struct (nullable = true)
 |    |-- annotatorType: string (nullable = true)
 |    |-- begin: integer (nullable = false)
 |    |-- end: integer (nullable = false)
 |    |-- result: string (nullable = true)
 |    |-- metadata: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)



In [43]:
dfPosTag.limit(10).toPandas()

Unnamed: 0,pos
0,"(pos, 0, 8, NNP, {'word': 'Microsoft'})"
1,"(pos, 10, 13, NNP, {'word': 'Corp'})"
2,"(pos, 15, 18, VBD, {'word': 'said'})"
3,"(pos, 20, 21, PRP, {'word': 'it'})"
4,"(pos, 23, 25, VBZ, {'word': 'has'})"
5,"(pos, 27, 36, VBN, {'word': 'discovered'})"
6,"(pos, 38, 44, VBG, {'word': 'hacking'})"
7,"(pos, 46, 54, VBG, {'word': 'targeting'})"
8,"(pos, 56, 65, JJ, {'word': 'democratic'})"
9,"(pos, 67, 78, NNS, {'word': 'institutions'})"


## Keep only proper nouns NNP or NNPS

In [47]:
nnpFilter = "pos.result = 'NNP' or pos.result = 'NNPS' "
dfNNP = dfPosTag.filter((F.col("pos.result")=="NNP")|(F.col("pos.result")=="NNPS"))

In [48]:
dfNNP.limit(10).toPandas()

Unnamed: 0,pos
0,"(pos, 0, 8, NNP, {'word': 'Microsoft'})"
1,"(pos, 10, 13, NNP, {'word': 'Corp'})"
2,"(pos, 126, 131, NNP, {'word': 'Europe'})"
3,"(pos, 0, 7, NNP, {'word': 'Deutsche'})"
4,"(pos, 9, 12, NNP, {'word': 'Bank'})"
5,"(pos, 81, 85, NNP, {'word': 'Trump'})"
6,"(pos, 87, 98, NNP, {'word': 'Organization'})"
7,"(pos, 175, 180, NNP, {'word': 'Tehran'})"
8,"(pos, 197, 202, NNP, {'word': 'Narmak'})"
9,"(pos, 221, 226, NNP, {'word': 'Friday'})"


## Extract columns form a map in a col

In [79]:
dfWordTag = dfNNP.select(F.col("pos.metadata.word"), F.col("pos.result"))

In [80]:
dfWordTag.schema

StructType(List(StructField(word,StringType,true),StructField(result,StringType,true)))

In [81]:
dfWordTag.show()

+------------+------+
|        word|result|
+------------+------+
|   Microsoft|   NNP|
|        Corp|   NNP|
|      Europe|   NNP|
|    Deutsche|   NNP|
|        Bank|   NNP|
|       Trump|   NNP|
|Organization|   NNP|
|      Tehran|   NNP|
|      Narmak|   NNP|
|      Friday|   NNP|
|       Trump|   NNP|
|       Saudi|   NNP|
|       Jared|   NNP|
|     Kushner|   NNP|
|      Senior|   NNP|
|       Trump|   NNP|
|       Saudi|   NNP|
|      Arabia|   NNP|
|        NASA|   NNP|
|     Happily|   NNP|
+------------+------+
only showing top 20 rows



In [83]:
dfWordTag.groupBy("word").count().orderBy(F.col("count").desc()).show()

+--------+-----+
|    word|count|
+--------+-----+
|      US|   14|
|   Trump|    9|
|   Saudi|    8|
|   Putin|    7|
|  Russia|    6|
|  Europe|    5|
|  Arabia|    5|
|Catholic|    4|
|      UK|    4|
|Vladimir|    4|
|   China|    3|
| Germany|    3|
|    Pope|    3|
|   Egypt|    3|
|   South|    3|
|   House|    3|
|  Church|    3|
|National|    2|
|  Africa|    2|
|   India|    2|
+--------+-----+
only showing top 20 rows

