In [0]:
dbutils.widgets.text("tagcloud_parquet", "/FileStore/tables/cfuentes/tag_cloud_test.parquet")
dbutils.widgets.text("english_documents", "/FileStore/tables/cfuentes/english_articles.parquet")
dbutils.widgets.text("es_nodes", "51.104.144.96")
dbutils.widgets.text("es_port", "9200")
dbutils.widgets.text("es_user", "elastic")
dbutils.widgets.text("es_pass", "h5UfRM71MMD2y3AZrB1243V2")
dbutils.widgets.text("es_index", "index/cfuentes_covid_docs")

In [0]:
parquet_path = dbutils.widgets.get("english_documents")
parquet_tagcloud = dbutils.widgets.get("tagcloud_parquet")
esURL= dbutils.widgets.get("es_nodes")
esPort= dbutils.widgets.get("es_port")
esUser= dbutils.widgets.get("es_user")
esPass= dbutils.widgets.get("es_pass")
esIndex= dbutils.widgets.get("es_index")

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
from nltk.corpus import stopwords 
from nltk.tokenize import word_tokenize


def tokenize_stopword(text):
  
  tokens = word_tokenize(text)
  remove_punctuation = [word for word in tokens if word.isalpha()]
  stop_words = set(stopwords.words("english"))
  remove_stopwords = [w for w in remove_punctuation if not w in stop_words]
  lowered = [w.lower() for w in remove_stopwords]
  return lowered

tokenize_stopwords_udf = udf(tokenize_stopword, ArrayType(StringType()))
  

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover

parquetDF = spark.read.parquet(parquet_path)
originalDF = parquetDF

# Clean text
parquetDF = parquetDF.select("paper_id", (lower(regexp_replace("abstract_text", "[^a-zA-Z\\s]", "")).alias("clean_abstract_text"))).filter("abstract_text != ''")

# Tokenize text
tokenizer = Tokenizer(inputCol='clean_abstract_text', outputCol='clean_abstract_text_token')
parquetDF = tokenizer.transform(parquetDF).select('paper_id', 'clean_abstract_text_token')

# Remove stop words
remover = StopWordsRemover(inputCol='clean_abstract_text_token', outputCol='tokenized_abstract_text')
parquetDF = remover.transform(parquetDF).select('paper_id', 'tokenized_abstract_text')

# No tokens under length 3
def filter_tokens_length(token_list):
  return [token for token in token_list if len(token) >= 3]
  

filter_tokens_length_udf = udf(filter_tokens_length, ArrayType(StringType()))

parquetDF = parquetDF.select(col("paper_id").alias("id"), filter_tokens_length_udf("tokenized_abstract_text").alias("abstract"))


In [0]:
parquetDF.write.format("org.elasticsearch.spark.sql") \
  .option("es.net.http.auth.user",esUser) \
  .option("es.net.http.auth.pass",esPass) \
  .option("es.nodes", esURL) \
  .option("es.port", esPort) \
  .option("es.nodes.wan.only", "true") \
  .option("es.net.ssl", "false") \
  .mode("append").save(esIndex)

In [0]:
from pyspark.sql.functions import explode

parquetDF.select("id", explode("abstract").alias("keyword")).groupby("keyword").count().orderBy("count", ascending = False) \
  .write.mode("append").parquet(parquet_tagcloud)

In [0]:
from pyspark.sql.functions import when, col

left_join = originalDF.join(parquetDF, originalDF.paper_id == parquetDF.id, how='left') 

left_join = left_join.withColumn("es_has_errors", when(col("id").isNotNull(), False).otherwise(True))

col_list = originalDF.columns + ["es_has_errors"]

left_join.select(*col_list).write.mode('overwrite').parquet(parquet_path)

In [0]:
dbutils.notebook.exit("Ok")