Import dependencies

In [16]:
import json
import pandas as pd
import numpy as np
# Import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql import functions as F

# Import SparkNLP
! pip install sparknlp
! pip install pyspark
import sparknlp
from sparknlp.annotator import *
from sparknlp.base import *

# Start Spark session
spark = sparknlp.start()



## 2. Inputs

Enter inputs as strings in this list. Later cells of the notebook will extract keyphrases from whatever inputs are entered here.

## 3. Pipeline creation

Create the NLP pipeline.

In [19]:
# Transforms the raw text into a document readable by the later stages of the
# pipeline
document_assembler = DocumentAssembler() \
    .setInputCol('text') \
    .setOutputCol('document')

# Separates the document into sentences
sentence_detector = SentenceDetector() \
    .setInputCols(['document']) \
    .setOutputCol('sentences')# \
    #.setDetectLists(True)

# Separates sentences into individial tokens (words)
tokenizer = Tokenizer() \
    .setInputCols(['sentences']) \
    .setOutputCol('tokens') \
    .setContextChars(['(', ')', '?', '!', '.', ','])

# The keyphrase extraction model. Change MinNGrams and MaxNGrams to set the
# minimum and maximum length of possible keyphrases, and change NKeywords to
# set the amount of potential keyphrases identified per document.
keywords = YakeModel() \
    .setInputCols('tokens') \
    .setOutputCol('keywords') \
    .setMinNGrams(2) \
    .setMaxNGrams(5) \
    .setNKeywords(100) \
    .setStopWords(StopWordsCleaner().getStopWords())

# Assemble all of these stages into a pipeline, then fit the pipeline on an
# empty data frame so it can be used to transform new inputs.
pipeline = Pipeline(stages=[
    document_assembler, 
    sentence_detector,
    tokenizer,
    keywords
])
from cassandra.cluster import Cluster
cluster =Cluster(['127.0.0.1']) #If you have a locally installed DSE cluster
session = cluster.connect()
session.set_keyspace('cas')
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()
tablepos = spark.read.format("org.apache.spark.sql.cassandra").options(table="all", keyspace="cas").load()

pipeline_model = pipeline.fit(tablepos)

# LightPipeline is faster than Pipeline for small datasets
light_pipeline = LightPipeline(pipeline_model)

## 4. Output creation

Utility functions to create more useful sets of keyphrases from the raw data frame produced by the model.

In [20]:
def adjusted_score(row, pow=2.5):
    """This function adjusts the scores of potential key phrases to give better
    scores to phrases with more words (which will naturally have worse scores
    due to the nature of the model). You can change the exponent to reward
    longer phrases more or less. Higher exponents reward longer phrases."""
    return ((row.result.count(' ') + 1) ** pow /
            (float(row.metadata['score']) + 0.1))

def get_top_ranges(phrases, input_text):
    """Combine phrases that overlap."""
    starts = sorted([row['begin'] for row in phrases])
    ends = sorted([row['end'] for row in phrases])

    ranges = [[starts[0], None]]
    for i in range(len(starts) - 1):
        if ends[i] < starts[i + 1]:
            ranges[-1][1] = ends[i]
            ranges.append([starts[i + 1], None])
    ranges[-1][1] = ends[-1]
    return [{
        'begin': range[0],
        'end': range[1],
        'phrase': input_text[range[0]:range[1] + 1]
     } for range in ranges]

def remove_duplicates(phrases):
    """Remove phrases that appear multiple times."""
    i = 0
    while i < len(phrases):
        j = i + 1
        while j < len(phrases):
            if phrases[i]['phrase'] == phrases[j]['phrase']:
                phrases.remove(phrases[j])
            j += 1
        i += 1

    return phrases

def get_output_lists(df_row):
    """Returns a tuple with two lists of five phrases each. The first combines
    key phrases that overlap to create longer kep phrases, which is best for
    highlighting key phrases in text, and the seocnd is simply the keyphrases
    with the highest scores, which is best for summarizing a document."""
    keyphrases = []
    for row in df_row.keywords:
        keyphrases.append({
            'begin': row.begin,
            'end': row.end,
            'phrase': row.result,
            'score': adjusted_score(row)
        })
    keyphrases = sorted(keyphrases, key=lambda x: x['score'], reverse=True)

    return (
        get_top_ranges(keyphrases[:20], df_row.text)[:5],
        remove_duplicates(keyphrases[:10])[:5]
    )

Transform the example inputs to create a data frame storing the identified keyphrases.

In [21]:
result = light_pipeline.transform(tablepos).toPandas()

21/07/20 17:01:59 ERROR Executor: Exception in task 5.0 in stage 2.0 (TID 21)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.6 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$1

Py4JJavaError: An error occurred while calling o1245.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 1 times, most recent failure: Lost task 3.0 in stage 2.0 (TID 19) (192.168.88.236 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.6 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3519)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 477, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 3.6 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


For each example, create two JSON files containing selections of the best keyphrases for the document. See the docstring of `get_output_lists` two cells above to learn more about the two JSON files produced. These JSON files are used directly in the public demo app for this model.

In [None]:
OUTPUT_FILE_PATH="output"
! mkdir -p $OUTPUT_FILE_PATH

for i in range(len(result)):
    top_ranges, top_summaries = get_output_lists(result.iloc[i])
    with open(f'{OUTPUT_FILE_PATH}/Example{i + 1}.json', 'w') as ranges_file:
        json.dump(top_ranges, ranges_file)
    with open(f'{OUTPUT_FILE_PATH}/Example{i + 1}_summaries.json', 'w') \
            as summaries_file:
        json.dump(top_summaries, summaries_file)

## 5. Visualize outputs

The raw pandas data frame containing the outputs

In [None]:
result

The list of the top keyphrases (with overlapping keyphrases merged) for the last example

In [None]:
top_ranges

The list of the best summary kephrases (with duplicates removed) for the last example

In [None]:
print("samal")
top_summaries