In [14]:
import sparknlp
from pyspark.ml import PipelineModel, Pipeline
import sparknlp.annotator as sa
import sparknlp.base as sb
import sparknlp
from sparknlp import Finisher

# Load Spark

In [1]:
from pyspark.sql import SparkSession
# spark = sparknlp.start()

spark = (
    SparkSession.builder \
      .config("spark.executor.instances", "50") \
      .config("spark.driver.memory", "15g") \
      .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.7.5")
      .getOrCreate()
)

In [3]:
spark

# Load Our Own Data

In [10]:
import sqlite3
import pandas as pd
from tqdm.auto import tqdm
import pyspark.sql.functions as F
# import unidecode

# conn = sqlite3.connect('../data/diffengine-diffs/db/newssniffer-nytimes.db')
conn = sqlite3.connect('newssniffer-nytimes.db')

df = pd.read_sql('''
     SELECT * from entryversion 
     WHERE entry_id IN (SELECT distinct entry_id FROM entryversion LIMIT 200)
 ''', con=conn)

# df = pd.read_sql('''
#     SELECT entry_id, summary, version from entryversion 
# ''', con=conn)

df = df.assign(summary=lambda df: df['summary'].str.replace('</p><p>', ' '))

In [11]:
sdf = spark.createDataFrame(df)

# Try Sentence Tokenizing on Our Own Data

In [None]:
documenter = sb.DocumentAssembler()\
    .setInputCol("summary")\
    .setOutputCol("document")

sentencer = (sa.SentenceDetector()
                .setInputCols(["document"])
                .setOutputCol("sentences")            
            )

finisher = (
    Finisher()
    .setInputCols(["sentences"]) 
)

sd_pipeline = PipelineModel(stages=[documenter, sentencer, finisher])

In [None]:
annotations_df = sd_pipeline.transform(sdf)

In [None]:
sent_list_df = (annotations_df
                .select("entry_id", "version", F.posexplode("finished_sentences"))
                .withColumnRenamed('col', 'sentence')
                .withColumnRenamed('pos', 'sent_idx')
               )
# tdf = sent_list_df.toPandas()

In [54]:
exploded_sent_df = (sent_list_df
 .alias("sent_list_df")
 .join(
     sent_list_df.alias("sent_list_df_2"),
     [F.col("sent_list_df.entry_id") == F.col("sent_list_df_2.entry_id"), 
      F.col("sent_list_df.version") == F.col("sent_list_df_2.version"), 
     ], 
     "inner"
 )
 .select(
     F.col("sent_list_df.entry_id"),
     F.col("sent_list_df.version"),
     F.col("sent_list_df.sent_idx").alias("sent_idx_x"),
     F.col("sent_list_df_2.sent_idx").alias("sent_idx_y"),
     F.col("sent_list_df.sentence").alias("sentence_x"),
     F.col("sent_list_df_2.sentence").alias("sentence_y"),
#    .show(truncate=False)
    )
)

In [55]:
exploded_sent_df.show()


## todo: 
## 0. do this same procedure for diffed sequential versions

## 1a. use tokenize and Albert or BERT or Word2Vec to generate vectors of embeddings for each sentence.
## 1b. lemmatize each sentence

## 2. take Sim_asym along each row, two times using:
## a. phi(x, y) = vec(x) \cdot vec(y)
## b. phi(x ,y) = lemmatization

## 3. for each sentence, select the argmax in both directions.
## 4. choose some reasonable threshold.

## 5. For scores above this threshold, co

+--------+-------+----------+----------+-------------------+--------------------+
|entry_id|version|sent_idx_x|sent_idx_y|         sentence_x|          sentence_y|
+--------+-------+----------+----------+-------------------+--------------------+
|  548743|      1|         0|         0|FORT COLLINS, Colo.| FORT COLLINS, Colo.|
|  548743|      1|         0|         1|FORT COLLINS, Colo.|— Annie Hartnett ...|
|  548743|      1|         0|         2|FORT COLLINS, Colo.|Now 21 and a lead...|
|  548743|      1|         0|         3|FORT COLLINS, Colo.|“I would still sa...|
|  548743|      1|         0|         4|FORT COLLINS, Colo.|“When you’re voti...|
|  548743|      1|         0|         5|FORT COLLINS, Colo.|” So on Saturday ...|
|  548743|      1|         0|         6|FORT COLLINS, Colo.|Each party used t...|
|  548743|      1|         0|         7|FORT COLLINS, Colo.|But Mr. Obama, tr...|
|  548743|      1|         0|         8|FORT COLLINS, Colo.|“I’m counting on ...|
|  548743|      

In [56]:
import difflib

In [32]:
tdf

Unnamed: 0,entry_id,version,pos,col
0,547988,0,0,"In Silicon Valley, Apple just won big against ..."
1,547988,0,1,"Across the country, in a federal court in Flor..."
2,547988,0,2,"Mr. Stadnyk, who holds a patent on a motorcycl..."
3,547988,0,3,"Represented by a prominent Washington lawyer, ..."
4,547988,0,4,Mr. Stadnyk and his lawyer — along with some a...
5,547988,0,5,"The present system, one of the nation’s oldest..."
6,547988,0,6,The impending law would overturn that by award...
7,547988,0,7,"Mr. Stadnyk, 48, a garage inventor who stumble..."
8,547988,0,8,He devised a system of brackets and gears to a...
9,547988,0,9,"With his system, he says, the rider feels a fl..."


In [None]:
chunksize = 10000
unique_entryids = df['entry_id'].unique()
num_chunks = int(unique_entryids.shape[0] / chunksize)

output_dfs = []
for chunk_id in tqdm(range(num_chunks)):
    batch_ids = unique_entryids[chunk_id * chunksize: (chunk_id + 1) * chunksize]
    small_df = df.loc[lambda df: df['entry_id'].isin(batch_ids)]
    #
    sdf = spark.createDataFrame(small_df)
    #
    annotations_df = sd_pipeline.transform(sdf)
    t_df = annotations_df.toPandas()
    output_dfs.append(t_df)

# Get Albert Embeddings

In [5]:
document_assembler = (
      sb.DocumentAssembler()
        .setInputCol("summary")
        .setOutputCol("document")
)

tokenizer = (
    sa.Tokenizer()
        .setInputCols(["document"])
        .setOutputCol("token")
)
 
word_embeddings = (
    sa.AlbertEmbeddings
        .load('s3://aspangher/spark-nlp/albert_xxlarge_uncased_en')
        .setInputCols(["document", "token"])
        .setOutputCol("embeddings")
)

embeddings_finisher = (
    sb.EmbeddingsFinisher()
            .setInputCols("embeddings")
            .setOutputCols("embeddings_vectors")
            .setOutputAsVector(True)
)

In [37]:
tokenizer = (
    sa.Tokenizer()
        .setInputCols(["document"])
        .setOutputCol("token")
)

In [38]:
bert_pipeline = Pipeline(stages=
  [
    document_assembler,
    tokenizer,
    word_embeddings,
    embeddings_finisher
  ]
)

In [40]:
df_bert = bert_pipeline.fit(sdf).transform(sdf)
# df_bert = bert_pipeline_model.transform(sdf)

In [42]:
df_bert#.select('entry_id', 'version', 'embedding_vectors')

DataFrame[index: bigint, version: bigint, title: string, created: string, url: string, source: string, entry_id: bigint, archive_url: string, num_versions: bigint, summary: string, joint_key: string, id: string, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, token: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, embeddings: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, embeddings_vectors: array<vector>]

In [None]:
t2_df = (df_bert
         .select('entry_id', 'version', 'embeddings_vectors')
         .toPandas()
        )

# With Sentences

In [99]:
documenter = (
    sb.DocumentAssembler()
        .setInputCol("summary")
        .setOutputCol("document")
)

sentencer = (
    sa.SentenceDetector()
        .setInputCols(["document"])
        .setOutputCol("sentences")            
)

finisher = (
    Finisher()
    .setInputCols(["sentences"]) 
)

# sd_pipeline_2 = PipelineModel(stages=[documenter, sentencer, finisher])

In [100]:
# annotations_df = sd_pipeline_2.transform(sdf)

In [103]:
sent_list_df = (annotations_df
                .select("entry_id", "version", F.posexplode("finished_sentences"))
                .withColumnRenamed('col', 'sentence')
                .withColumnRenamed('pos', 'sent_idx')
               )

In [108]:
sentence_documenter = (
    sb.DocumentAssembler()
        .setInputCol("sentence")
        .setOutputCol("document")
)

In [33]:
documenter = (
    sb.DocumentAssembler()
        .setInputCol("summary")
        .setOutputCol("document")
)

sentencer = (
    sa.SentenceDetector()
        .setInputCols(["document"])
        .setOutputCol("sentences")            
)

tokenizer = (
    sa.Tokenizer()
        .setInputCols(["sentences"])
        .setOutputCol("token")
)

word_embeddings = (
    sa.AlbertEmbeddings
        .load('s3://aspangher/spark-nlp/albert_large_uncased_en')
        .setInputCols(["sentences", "token"])
        .setOutputCol("embeddings")
        .setBatchSize(1)
)

In [34]:
embeddings_finisher = (
    sb.EmbeddingsFinisher()
            .setInputCols("embeddings")
            .setOutputCols("embeddings_vectors")
            .setOutputAsVector(True)
)

In [35]:
bert_pipeline_from_sentences = Pipeline(stages=
  [
      documenter,
#       sentence_documenter,
      sentencer,
      tokenizer,
      word_embeddings,
      embeddings_finisher
  ]
)

In [36]:
# df_bert = bert_pipeline_from_sentences.fit(sent_list_df).transform(sent_list_df)
df_bert = bert_pipeline_from_sentences.fit(sdf).transform(sdf)

In [38]:
df_bert.select('entry_id', 'version', 'embeddings_vectors').show()

Py4JJavaError: An error occurred while calling o1265.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 15, 10.178.184.222, executor 20): ExecutorLostFailure (executor 20 exited caused by one of the running tasks) Reason: 
The executor with id 20 exited with exit code 137.
The API gave the following brief reason: null
The API gave the following message: null
The API gave the following container statuses:

ContainerStatus(containerID=docker://5f0fe13d42effb3eff049cf9b9330db702b2354dcfe769c94ba54dc03cc49af5, image=artifactory.inf.bloomberg.com/ds/frameworks/spark-2.4-python-3.7:1.129, imageID=docker-pullable://artifactory.inf.bloomberg.com/ds/frameworks/spark-2.4-python-3.7@sha256:5a3539a36f8e601dfd78a4151c62e3d3922dfc92610bf33d7c6dc89fe5b1dbb2, lastState=ContainerState(running=null, terminated=null, waiting=null, additionalProperties={}), name=executor, ready=false, restartCount=0, state=ContainerState(running=null, terminated=ContainerStateTerminated(containerID=docker://5f0fe13d42effb3eff049cf9b9330db702b2354dcfe769c94ba54dc03cc49af5, exitCode=137, finishedAt=2021-04-13T23:42:24Z, message=null, reason=OOMKilled, signal=null, startedAt=2021-04-13T23:08:00Z, additionalProperties={}), waiting=null, additionalProperties={}), additionalProperties={started=false})
      
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [23]:
t2_df = (df_bert
         .select('entry_id', 'version', 'sent_idx', 'sentence', 'embeddings_vectors')
         .toPandas()
        )

AnalysisException: "cannot resolve '`sent_idx`' given input columns: [source, archive_url, document, created, entry_id, version, num_versions, title, joint_key, token, id, sentences, summary, embeddings, index, url];;\n'Project [entry_id#6L, version#1L, 'sent_idx, 'sentence, 'embeddings_vectors]\n+- Project [index#0L, version#1L, title#2, created#3, url#4, source#5, entry_id#6L, archive_url#7, num_versions#8L, summary#9, joint_key#10, id#11, document#135, sentences#150, token#166, embeddings#183 AS embeddings#201]\n   +- Project [index#0L, version#1L, title#2, created#3, url#4, source#5, entry_id#6L, archive_url#7, num_versions#8L, summary#9, joint_key#10, id#11, document#135, sentences#150, token#166, UDF(array(sentences#150, token#166)) AS embeddings#183]\n      +- Project [index#0L, version#1L, title#2, created#3, url#4, source#5, entry_id#6L, archive_url#7, num_versions#8L, summary#9, joint_key#10, id#11, document#135, sentences#150, UDF(array(sentences#150)) AS token#166]\n         +- Project [index#0L, version#1L, title#2, created#3, url#4, source#5, entry_id#6L, archive_url#7, num_versions#8L, summary#9, joint_key#10, id#11, document#135, UDF(array(document#135)) AS sentences#150]\n            +- Project [index#0L, version#1L, title#2, created#3, url#4, source#5, entry_id#6L, archive_url#7, num_versions#8L, summary#9, joint_key#10, id#11, UDF(summary#9) AS document#135]\n               +- LogicalRDD [index#0L, version#1L, title#2, created#3, url#4, source#5, entry_id#6L, archive_url#7, num_versions#8L, summary#9, joint_key#10, id#11], false\n"