# Exercise 2: Advanced Analytics NLP

In [1]:
!pip install spark-nlp==1.7.3

Collecting spark-nlp==1.7.3
[?25l  Downloading https://files.pythonhosted.org/packages/7e/c1/3ec550fbc22efdcac013a301f74d6c904ec545bef291b414be90d900d1d8/spark_nlp-1.7.3-py2.py3-none-any.whl (72.8MB)
[K    100% |████████████████████████████████| 72.8MB 149kB/s ta 0:00:011  0% |▎                               | 563kB 8.7MB/s eta 0:00:09    2% |▉                               | 1.9MB 7.6MB/s eta 0:00:10    3% |█▏                              | 2.7MB 10.7MB/s eta 0:00:07    5% |█▉                              | 4.1MB 18.8MB/s eta 0:00:04    6% |██▎                             | 5.1MB 19.8MB/s eta 0:00:04    9% |███▏                            | 7.2MB 7.8MB/s eta 0:00:09    10% |███▍                            | 7.7MB 21.3MB/s eta 0:00:04    11% |███▋                            | 8.2MB 8.9MB/s eta 0:00:08    13% |████▏                           | 9.5MB 9.5MB/s eta 0:00:07    19% |██████▎                         | 14.2MB 22.6MB/s eta 0:00:03    24% |████████                        | 18.0M

In [1]:
import pandas as pd
pd.set_option('max_colwidth', 800)

# Create a spark context that includes a 3rd party jar for NLP

In [5]:
#jarPath = "spark-nlp-assembly-1.7.3.jar"

from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:1.8.2")\
    .getOrCreate()
spark

Exception: Java gateway process exited before sending its port number

# Read multiple files in a dir as one Dataframe

In [None]:
dataPath = "./data/reddit/*.json"
df = spark.read.json(dataPath)
print(df.count())
df.printSchema()

# Deal with Struct type to query subfields 

In [None]:
title = "data.title"
author = "data.author"

dfAuthorTitle = df.select(title, author)
dfAuthorTitle.limit(5).toPandas()

# Try to implement the equivalent of flatMap in dataframes

In [None]:
import pyspark.sql.functions as F

dfWordCount = df.select(F.explode(F.split(title, '\\s+')).alias("word")).groupBy("word").count().orderBy(F.desc("count"))
dfWordCount.limit(10).toPandas()

# Use an NLP libary to do Part-of-Speech Tagging

In [None]:
from com.johnsnowlabs.nlp.pretrained.pipeline.en import BasicPipeline as bp
dfAnnotated = bp.annotate(dfAnnotated, "title")
dfAnnotated.printSchema()

## Deal with Map type to query subfields

In [None]:
dfPos = dfAnnotated.select("text", "pos.metadata", "pos.result")
dfPos.limit(5).toPandas()

In [None]:
dfPos= dfAnnotated.select(F.explode("pos").alias("pos"))
dfPos.printSchema()
dfPos.toPandas()

## Keep only proper nouns NNP or NNPS

In [None]:
nnpFilter = "pos.result = 'NNP' or pos.result = 'NNPS' "
dfNNP = dfPos.where(nnpFilter)
dfNNP.limit(10).toPandas()

## Extract columns form a map in a col

In [None]:
dfWordTag = dfNNP.selectExpr("pos.metadata['word'] as word", "pos.result as tag")
dfWordTag.limit(10).toPandas()

In [None]:
from pyspark.sql.functions import desc
dfWordTag.groupBy('word').count().orderBy(desc('count')).show()