In [1]:
!pip install sparknlp pymongo

Defaulting to user installation because normal site-packages is not writeable


In [23]:
import os
from pprint import pprint
import pymongo
from pymongo import MongoClient    
import pyspark
from pyspark.ml import PipelineModel
import sparknlp
from sparknlp.annotator import *
from sparknlp.base import *
from zipfile import ZipFile

# Writing to DB

In [18]:
conf = pyspark.SparkConf()
conf.set('spark.jars.packages', 
         "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,com.databricks:spark-xml_2.12:0.18.0,com.johnsnowlabs.nlp:spark-nlp_2.12:5.3.3")
conf.set('spark.driver.memory','8g')
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)
spark

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at /state/partition1/job-45767675/ipykernel_2693020/2667504274.py:6 

In [8]:
user = "ob2205"
PATH = '/scratch/ob2205/shared/ip/ip_address.txt'
ip_df = spark.read.text(PATH)
uri = ip_df.head(1)[0].value
uri

'10.32.32.4:27040'

In [9]:
client = MongoClient(uri)

In [10]:
client.server_info()

{'version': '7.0.4',
 'gitVersion': '38f3e37057a43d2e9f41a39142681a76062d582e',
 'modules': [],
 'allocator': 'tcmalloc',
 'javascriptEngine': 'mozjs',
 'sysInfo': 'deprecated',
 'versionArray': [7, 0, 4, 0],
 'openssl': {'running': 'OpenSSL 3.0.2 15 Mar 2022',
  'compiled': 'OpenSSL 3.0.2 15 Mar 2022'},
 'buildEnvironment': {'distmod': 'ubuntu2204',
  'distarch': 'x86_64',
  'cc': '/opt/mongodbtoolchain/v4/bin/gcc: gcc (GCC) 11.3.0',
  'ccflags': '-Werror -include mongo/platform/basic.h -ffp-contract=off -fasynchronous-unwind-tables -g2 -Wall -Wsign-compare -Wno-unknown-pragmas -Winvalid-pch -gdwarf-5 -fno-omit-frame-pointer -fno-strict-aliasing -O2 -march=sandybridge -mtune=generic -mprefer-vector-width=128 -Wno-unused-local-typedefs -Wno-unused-function -Wno-deprecated-declarations -Wno-unused-const-variable -Wno-unused-but-set-variable -Wno-missing-braces -fstack-protector-strong -gdwarf64 -Wa,--nocompress-debug-sections -fno-builtin-memcmp -Wimplicit-fallthrough=5',
  'cxx': '/opt

In [11]:
db=client.test

In [57]:
db

Database(MongoClient(host=['10.32.32.6:27040'], document_class=dict, tz_aware=False, connect=True), 'test')

In [11]:
# Extract articles from zip

# source_path = "/scratch/work/public/proquest/proquest_hnp/BostonGlobe/BG_20151210212722_00001.zip"
# with ZipFile(source_path, "r") as zip:
#     zip.extractall('zip_tmp')

In [24]:
df = spark.read\
    .option('rootTag', 'Record')\
    .option('rowTag', 'Record')\
    .option('recursiveFileLookup', 'true')\
    .format("xml").load("zip_tmp")

                                                                                

In [14]:
# Start SparkNLP
spark = sparknlp.start() # for GPU training >> sparknlp.start(gpu = True) # for Spark 2.3 =>> sparknlp.start(spark23 = True)



In [28]:
print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

Spark NLP version 5.3.3
Apache Spark version: 3.1.2


In [13]:
class YakePipeline(Pipeline):
    """
    A pipeline for extracting keywords using YAKE.

    Example:
    pipeline = YakePipeline()
    processed_df = pipeline.fit(df).transfrom(df)
    """
    def __init__(self):
        super(YakePipeline, self).__init__()
        self.stopwords = StopWordsCleaner().getStopWords()
        self.document = DocumentAssembler() \
                .setInputCol("FullText") \
                .setOutputCol("document")
        self.sentenceDetector = SentenceDetector() \
                .setInputCols("document") \
                .setOutputCol("sentence")
        self.token = Tokenizer() \
                .setInputCols("sentence") \
                .setOutputCol("token") \
                .setContextChars(["(", ")", "?", "!", ".", ","])
        self.keywords = YakeKeywordExtraction() \
                .setInputCols("token") \
                .setOutputCol("keywords") \
                .setMinNGrams(1) \
                .setMaxNGrams(3)\
                .setNKeywords(20)\
                .setStopWords(self.stopwords)
        self.setStages([self.document, self.sentenceDetector, self.token, self.keywords])


In [29]:
# Extract keywords and append to df
yake_pipeline = YakePipeline()

result = yake_pipeline.fit(df).transform(df)\
    .drop("document")\
    .drop("token")\
    .drop("sentence")

In [30]:
# df now includes keywords, along with other outputs from pipeline
result.printSchema()

root
 |-- Abstract: string (nullable = true)
 |-- ActionCode: string (nullable = true)
 |-- AlphaPubDate: string (nullable = true)
 |-- Contributor: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ContribRole: string (nullable = true)
 |    |    |-- FirstName: string (nullable = true)
 |    |    |-- LastName: string (nullable = true)
 |    |    |-- MiddleName: string (nullable = true)
 |    |    |-- NameSuffix: string (nullable = true)
 |    |    |-- OrganizationName: string (nullable = true)
 |    |    |-- OriginalForm: string (nullable = true)
 |    |    |-- PersonName: string (nullable = true)
 |    |    |-- PersonTitle: string (nullable = true)
 |-- DateTimeStamp: long (nullable = true)
 |-- FullText: string (nullable = true)
 |-- LanguageCode: string (nullable = true)
 |-- NumericPubDate: long (nullable = true)
 |-- ObjectType: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Pagination: string (nullable = true)
 |

In [32]:
# Write to mongo
start = time.time()
result.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("overwrite")\
    .option('uri',f'mongodb://{uri}/test.newspapers')\
    .save()
stop = time.time()
print(stop - start)



235.01104307174683


                                                                                

# Querying


In [78]:
def normalize_query(query):
    """Takes an user query and returns an array of keywords"""
    # TO DO
    # clean stop words
    # make lowercase
    # strip whitespace
    
def keyword_search(keyword, db, k):
    """Takes a keyword and returns top-k articles with that keyword."""
    
    # aggregate pipeline to find top-k articles based on keyword input
    data = db.aggregate([
        {"$project":{"keywords.result": 1, "keywords.metadata.score": 1}},
        {"$unwind": {"path": "$keywords", "preserveNullAndEmptyArrays": False}},
        {"$match": {"keywords.result":  {"$regex": f"{keyword}"}}},
        {"$group": { "_id": "$_id", "keywords": {"$addToSet": "$keywords"}}},
        {"$sort": {"keywords.metadata.score": -1}},
        {"$limit": k}
    ])
    
    # Get article IDs and hit db again for full articles
    #     ids = []
    #     for doc in data:
    #         _id = doc["_id"]
    #         ids.append(_id)
    #     output = db.find({"_id": {"$in": ids}})
        
    return data

# TO DO
# Return additional fields (query db using _ids)
# write `normalize_query`
# incorporate multiple keywords

    

In [79]:
output = keyword_search("china", db.newspapers, 5)

for doc in output:
    pprint(doc)


{'_id': ObjectId('662e660a6396ff66f83ef7b2'),
 'keywords': [{'metadata': {'score': '0.523434987506563'}, 'result': 'china'},
              {'metadata': {'score': '1.5689100679790935'},
               'result': 'fight in china'}]}
{'_id': ObjectId('662e65c46396ff66f83ecfab'),
 'keywords': [{'metadata': {'score': '0.7031205485333598'}, 'result': 'china'}]}
{'_id': ObjectId('662e65b56396ff66f83ec6b8'),
 'keywords': [{'metadata': {'score': '0.6592879295318568'}, 'result': 'china'}]}
{'_id': ObjectId('662e66346396ff66f83f1071'),
 'keywords': [{'metadata': {'score': '0.6409002858326851'}, 'result': 'china'}]}
{'_id': ObjectId('662e65946396ff66f83eb531'),
 'keywords': [{'metadata': {'score': '0.6276050598018206'}, 'result': 'china'}]}


# Visualizations

In [71]:
%%html
<style>
.box_style{
    width:100%;
    border : None;
    height: auto;
    background-color:#EEE;
    color=white;
}
.side_bar{
    width:100%;
    border: None;
    height: auto;
    background-color:#66b2b2;
    color=white;
}

.widget-label {
    color: white !important;
}

.widget_text {
    border-radius: 8px;
}
.button_style {
    margin-top: 15px;
}
</style>

In [75]:
import ipywidgets as ipw
from IPython.display import HTML, display, clear_output, Javascript
from bson.objectid import ObjectId
import re

def view_doc_clicked(b):
    _id = b.tooltip
    if len(_id):
        doc = db.newspapers.find_one({"_id": ObjectId(_id)})
        if doc:
            show_document(doc['RecordTitle'], doc['FullText'])
    

def show_document(header, text):
    text = re.sub("[^0-9a-zA-Z]", " ", text)
    text = re.sub(" +", " ", text).strip()
    
    display(Javascript("""
        require(
            ["base/js/dialog"], 
            function(dialog) {
                dialog.modal({
                    title: '%s',
                    body: '%s',
                    buttons: {
                        'Done': {}
                    }
                });
            }
        );
        """ % (header, text)))


notify_output = ipw.Output()
display(notify_output)

@notify_output.capture()
def popup(text):
    clear_output()
    display(HTML("<script>alert('{}');</script>".format(text)))

def create_result_option(record_id, record_title, ctx_text, publisher, pub_date, object_id):
    items_layout = ipw.Layout(width='90%')
    
    children = []
    row_layout = ipw.Layout(display='flex', flex_flow='row', align_items='stretch', border_bottom='solid 2px lightgrey', padding='5px')
    sn_box = ipw.HBox([ipw.HTML(f"{record_id}.")], layout=ipw.Layout(width='3%', margin='5px 5px 5px 5px'))
    rn_box = ipw.HBox(children=[], layout=ipw.Layout(width='10%', margin='5px 5px 5px 5px'))
    
    if len(record_title):
        btnView = ipw.Button(description="View", tooltip=str(object_id))
        btnView.on_click(view_doc_clicked)
        rn_box.children = [btnView]
        
        
        children.append(ipw.HTML(f"<b><font color='#1b75d0'; size=3px>{record_title}", layout=items_layout))
    if len(ctx_text):
        children.append(ipw.HTML(ctx_text, layout=items_layout))
    if len(publisher):
        children.append(ipw.HTML(f"<font color='grey'><b>Publisher:</b> {publisher}", layout=items_layout))
    if len(pub_date):
        children.append(ipw.HTML(f"<font color='grey'><b>Date:</b> {pub_date}", layout=items_layout))
    
    row_content = ipw.VBox(children = children, layout=ipw.Layout(width='90%'))
    row = ipw.HBox(children=[sn_box, row_content, rn_box], layout=row_layout)
 
    return row

Output()

In [76]:
def search_keyword(keyword):
    data = db.newspapers.aggregate([             
        {
            '$facet': {
                'docs': [
                    { '$unwind': '$keywords' },
                    { '$match': {'keywords.result': keyword} }
                ],
                'count': [{'$count': "count"} ]
            }
        },
        {
            '$addFields': { 'count': { '$arrayElemAt': ["$count.count", 0] } }
        }
    ])
    
    
    # ToDo: Limit to say 5 results
    data = data.next()
    i = 1
    children = []
    if len(data['docs']) < 1:
        children.append(create_result_option("#", '', "No Newspaper found with this keyword", "", '', ''))
        return children

    for d in data['docs']:
        # ctx_txt: take keyword with highest score and contexualize it
        start = d["keywords"]['begin'] or 0
        end = d["keywords"]['end'] or 0
        ctx_text=word_highlighter(d["FullText"], start, end, 90)

        children.append(create_result_option(i, d["RecordTitle"], ctx_text, d["Publisher"], d["AlphaPubDate"], d['_id']))
        i += 1
    return children   

def word_highlighter(fulltext, start, end, ctx_length=100):
    tot_length = len(fulltext)
    chunk_start = 0 if start-ctx_length < 0 else start-ctx_length
    chunk_end = tot_length if end+ctx_length > tot_length else end+ctx_length
    return fulltext[chunk_start:start] + \
        f"<span style='background-color: yellow;'>{fulltext[start:end + 1]}</span>" + \
        fulltext[end + 1:chunk_end]

# search_keyword("grass")

In [77]:
hor_layout = ipw.Layout(align_content='stretch', margin='0.1% 1% 0.1% 2% ', width='100%')

app_title = ipw.HTML('<h1> YouTube for Newspapers </h1>', layout=hor_layout)
app_footer = ipw.HTML('<p> Apache Spark, HDFS, MongoDB, NLP</p>', layout=hor_layout)

headerBox = ipw.HBox([app_title]).add_class('box_style')
footerBox = ipw.HBox([app_footer]).add_class('box_style')

txt_keyword = ipw.Text(placeholder='Enter a Keyword', description='Keyword:', disabled=False, layout=ipw.Layout(width='auto', 
    margin='15px 10px 5px 2px')).add_class('widget_text')
btnSearch = ipw.Button(description="Search!", icon='search').add_class('button_style')
box_layout = ipw.Layout(display='flex', flex_flow='column', align_items='center', width='100%')
btnContainer = ipw.HBox(children=[btnSearch], layout=box_layout)


def on_button_clicked(b):
    if len(txt_keyword.value.strip()) < 1:
        popup("Enter search Key.")
    else:
        main_panel.children = search_keyword(txt_keyword.value.lower())

btnSearch.on_click(on_button_clicked)

side_bar = ipw.VBox([txt_keyword, btnContainer]).add_class('side_bar')
main_panel = ipw.VBox([])

ipw.AppLayout(header=headerBox, left_sidebar=side_bar, center=main_panel, right_sidebar=None, footer=footerBox,
    pane_widths=[2, 7, 0],
    pane_heights=[1, 9, '40px'])

AppLayout(children=(HBox(children=(HTML(value='<h1> YouTube for Newspapers </h1>', layout=Layout(align_content…