In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col, regexp_replace, concat
import  pyspark.sql.functions as F
import spacy
import stanza

SparkContext.setSystemProperty("spark.sql.crossJoin.enabled", "true")
SparkContext.setSystemProperty("spark.sql.execution.arrow.maxRecordsPerBatch", "2000")

sc = SparkContext.getOrCreate()
spark = SQLContext(sc)

merged_se_cleaned = 'merged_cleaned_files_se_before_extraction-table' 

In [None]:
# sc.addPyFile("gs://workbench.gleanlabs.net/emr_data/user=johanna/nlp_pipeline_new_remap.zip")
sc.addPyFile("gs://workbench.gleanlabs.net/emr_data/user=johanna/nlppipeline_stanza_pandas_udfs.zip")

In [None]:
from lib.nlppipelinenltk import NLPPipelineNLTK

In [None]:
nlpPipeline = NLPPipelineNLTK()  # initialize the class

In [None]:
from pyspark.sql.functions import pandas_udf
import pyspark.sql.types as T
import os 
import pandas as pd
import numpy as np
           
def give_stanza_pos(sentences:pd.Series)-> pd.Series:
    os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
    #use map or apply on the series instead of that, it will be faster
    return pd.Series([sent[1] for sent in nlpPipeline.getKeyPhrases(sentences.tolist())])

give_stanza_pos_pandas = pandas_udf(give_stanza_pos, returnType= T.ArrayType(StringType()))


In [None]:
from lib.sparkmethods import stage1
import time
import nltk
dico = {}
times_nltk = []
for N in [10000, 50000,100000, 1000000]:
    # save file not to have files changing along the function (limit is not deterministic)
    spark.read.format('delta').load("gs://workbench.gleanlabs.net/emr_data/user=johanna/{}".format(merged_se_cleaned)).limit(N).coalesce(100).write.mode('overwrite').format('delta').save("example{}-table".format(N))
    start = time.time()
    
    # we load the file and repartition it
    df = spark.read.format('delta').load("example{}-table".format(N))
    df = df.repartition(200, 'docID')
    
    # usually this is done inside the getKeyPhrases function + remap stage
    df = df.filter((~F.col('content').isNull()) & (F.col('content')!=''))
    df2 = df.withColumn('sent_tokenize', F.udf(lambda x: nltk.sent_tokenize(x), T.ArrayType(T.StringType()))(F.col('content')))
    df3 = df2.select(df2.docID, df2.content,F.explode(F.col('sent_tokenize')).alias('new_content'))
    
    # stanza kps using batches of sentences
    content = df3.withColumn('kps', give_stanza_pos_pandas(F.col('new_content')))
    
    #write file
    content.write.format('delta').save("gs://workbench.gleanlabs.net/emr_data/user=johanna/{}".format('merged_se_extracted_kps_new_posts_{}_stanza-table'.format(N)))
    end = time.time() - start
    dico[N] = end
    times_nltk.append(end)

In [None]:
import pandas as pd
nltk = {'sentences': times_nltk}
    
df_nltk = pd.DataFrame(nltk, index = [10000, 50000,100000, 1000000])

In [None]:
df_nltk.to_csv('stanza_time_pandas_udfs.csv')

In [2]:
content = spark.read.parquet("gs://workbench.gleanlabs.net/emr_data/user=johanna/{}".format('merged_se_extracted_kps_new_posts_{}_stanza-table'.format(1000000)))

In [3]:
content.count()

4690516

In [4]:
content.show()

+--------+--------------------+--------------------+--------------------+
|   docID|             content|         new_content|                 kps|
+--------+--------------------+--------------------+--------------------+
|48072226|Your system can't...|Your system can't...|[system, npm pack...|
|48072226|Your system can't...|Try follow this i...|[installation ste...|
|48081789|This happens beca...|This happens beca...|[page change, com...|
|48081789|This happens beca...|I had the exactly...|       [field, data]|
|48081789|This happens beca...|This forces the f...|[field, value, ja...|
|48081789|This happens beca...|My case: How I th...|        [case, case]|
|48088727|How can I iterate...|How can I iterate...|[xml api response...|
|48088727|How can I iterate...|I am getting XML ...|[xml response, ap...|
|48088727|How can I iterate...|As I am working o...|[xml response, ti...|
|48088727|How can I iterate...|Below is the XML ...|[xml response fil...|
|48088727|How can I iterate...|Thanks 