In [1]:
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1654110644840_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
data = spark.read.option("header", True).option("multiline", True).option("escape","\"").csv('s3://lsc-projct/*.csv')
data = data.na.drop(subset=['username', 'content'])
df = data[['content']]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Named Entity Recognition (NER) with Bert Embeddings

In [3]:
documentAssembler = DocumentAssembler() \
    .setInputCol('content') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

embeddings = BertEmbeddings.pretrained(name='bert_base_cased', lang='en') \
    .setInputCols(['document', 'token']) \
    .setOutputCol('embeddings')

ner_model = NerDLModel.pretrained("ner_dl_bert", 'en') \
    .setInputCols(['document', 'token', 'embeddings']) \
    .setOutputCol('ner')

ner_converter = NerConverter() \
    .setInputCols(['document', 'token', 'ner']) \
    .setOutputCol('ner_chunk')

nlp_pipeline = Pipeline(stages=[
    documentAssembler, 
    tokenizer,
    embeddings,
    ner_model,
    ner_converter
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[OK!]
ner_dl_bert download started this may take some time.
Approximate size to download 15.4 MB
[OK!]

In [9]:
empty_df = spark.createDataFrame([['']]).toDF('content')
pipeline_model = nlp_pipeline.fit(empty_df)
result = pipeline_model.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extract entity tag for each chunk.
Count the number of occurance of each chunk and sort in descending order.

In [10]:
res = result.select(F.explode(F.arrays_zip('ner_chunk.result', 'ner_chunk.metadata')).alias('cols')) \
  .select(F.expr("cols['0']").alias('chunk'), \
          F.expr("cols['1'].entity").alias('entity'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
res.show(20, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------+------+
|chunk                     |entity|
+--------------------------+------+
|@NJGov                    |MISC  |
|@IAMQUEENLATIFAH          |MISC  |
|King County               |LOC   |
|Catholic                  |MISC  |
|Board                     |ORG   |
|Anniversary               |MISC  |
|Aussies                   |ORG   |
|Community Commons         |ORG   |
|Parks & Rec Leslie Knope  |ORG   |
|Elizabeth Esposito        |PER   |
|AICP                      |ORG   |
|Latifah…my                |PER   |
|Newark 
U.N.I.T.Y         |LOC   |
|Latifah                   |MISC  |
|Brick City                |LOC   |
|Congress Holds Hearing    |ORG   |
|Federally Assisted Housing|ORG   |
|@JohnTory                 |PER   |
|Bear Mountain             |ORG   |
|City Manager              |ORG   |
+--------------------------+------+
only showing top 20 rows

### Top names

In [11]:
person = res.filter(F.col('entity')=='PER')
counts_per = person.groupBy('chunk').count().sort('count', ascending=False)
counts_per.select('chunk', 'count').show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-----+
|chunk              |count|
+-------------------+-----+
|Biden              |18   |
|Dave Chappelle     |13   |
|Doug Ford          |11   |
|Matt Kurzmann      |9    |
|Chappelle          |7    |
|Andrew Mikula      |7    |
|Eric Salongo Kalisa|7    |
|MacKenzie Scott    |5    |
|Jeremiah Jarmin    |5    |
|Anthony Buonicore  |5    |
|Seth Barnett       |5    |
|Alex Melikan       |4    |
|Chrystal Kornegay  |4    |
|Ford               |4    |
|Neil Rosenthal     |4    |
|Cruz III           |4    |
|Katie Goar         |4    |
|Emilio Dorcely     |4    |
|Marcia Fudge       |4    |
|John B             |4    |
+-------------------+-----+
only showing top 20 rows

### Top locations

In [12]:
locations = res.filter(F.col('entity')=='LOC')
counts_loc = locations.groupBy('chunk').count().sort('count', ascending=False)
counts_loc.select('chunk', 'count').show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-----+
|chunk        |count|
+-------------+-----+
|America      |88   |
|U.S          |73   |
|California   |61   |
|Bamboo       |60   |
|Crypto       |60   |
|Ontario      |56   |
|US           |52   |
|Canada       |49   |
|Florida      |42   |
|Toronto      |29   |
|NYC          |29   |
|CA           |24   |
|Vancouver    |23   |
|LA           |22   |
|Colorado     |20   |
|NY           |20   |
|MA           |20   |
|BC           |18   |
|San Francisco|17   |
|Charlotte    |17   |
+-------------+-----+
only showing top 20 rows