In [1]:
import io
import string
import re
import os

import hashlib
import pandas as pd


from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, udf

import spacy
import gensim
from tika import parser
import pdfplumber
from wordcloud import WordCloud
from sklearn.feature_extraction import text
import matplotlib.pyplot as plt

   
df = pd.read_csv('./retrieval/Data/ImportFileList.csv')

# Create SparkSession
spark = SparkSession.builder.appName("Uticaria_Spark").getOrCreate()

urticaria_urls_rows = df.loc[:, ['Name', 'path']].values.tolist()

# create a Pandas dataframe of Urticaria articles incl. path
urticaria_path_pd = pd.DataFrame(urticaria_urls_rows, columns=['article', 'path'])

In [2]:
urticaria_path_pd.head()

Unnamed: 0,article,path
0,Frezzolini 2006,./CAPTUM/Basophil/dsDNA/Frezzolini 2006.pdf
1,Hatada 2013,./CAPTUM/Basophil/dsDNA/Hatada 2013.pdf
2,Kolkhir 2020,./CAPTUM/Basophil/Antihistamine/Kolkhir 2020.pdf
3,Magen 2014,./CAPTUM/Basophil/Antihistamine/Magen 2014.pdf
4,Grattan 2000,./CAPTUM/Basophil/Antihistamine/Grattan 2000.pdf


In [3]:
urticaria_urls = spark.createDataFrame(urticaria_path_pd).repartition(8)

In [4]:
@udf('string')
def extract_content(path: str) ->str:
    try:
        with pdfplumber.open(path) as pdf:
            raw = pdf.pages[1]
            raw_text = raw.extract_text()
        return raw_text
        #raw = parser.from_file(path)
        #return raw['content']
    except:
        return ""


@udf('string')
def extract_meta(path: str) ->str:

    try:
        raw = parser.from_file(path)
        return raw['metadata']
    except:
        return ""

def remove_non_ascii(text):
    printable = set(string.printable)
    return ''.join(filter(lambda x: x in printable, text))

def not_header(line):
    # as we're consolidating broken lines into paragraphs, 
    # we want to make sure not to include headers
    return not line.isupper()

def extract_statements(nlp, text):
  
    """
  Extracting desease statements from raw text by removing junk, URLs, etc.
  We group consecutive lines into paragraphs and use spacy to parse sentences.
    """
  
  # remove non ASCII characters
  
    text = remove_non_ascii(text)
  
    lines = []
    prev = ""
    for line in text.split('\n'):
    # aggregate consecutive lines where text may be broken down
    # only if next line starts with a space or previous does not end with dot.
        if(line.startswith(' ') or not prev.endswith('.')):
            prev = prev + ' ' + line
        else:
            # new paragraph
            lines.append(prev)
            prev = line
        
  # don't forget left-over paragraph
    lines.append(prev)

    # clean paragraphs from extra space, unwanted characters, urls, etc.
    # best effort clean up, consider a more versatile cleaner
    sentences = []
    for line in lines:

      # removing header number
      line = re.sub(r'^\s?\d+(.*)$', r'\1', line)
      # removing trailing spaces
      line = line.strip()
      # words may be split between lines, ensure we link them back together
      line = re.sub('\s?-\s?', '-', line)
      # remove space prior to punctuation
      line = re.sub(r'\s?([,:;\.])', r'\1', line)
      # remove figures that are not relevant to grammatical structure
      line = re.sub(r'\d{5,}', r' ', line)
      # remove mentions of URLs
      line = re.sub(r'((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*', r' ', line)
      # remove multiple spaces
      line = re.sub('\s+', ' ', line)

      # split paragraphs into well defined sentences using spacy
      for part in list(nlp(line).sents):
        sentences.append(str(part).strip())

    return sentences

@pandas_udf('array<string>', PandasUDFType.SCALAR_ITER)
def extract_statements_udf(content_series_iter):
    """
    as loading a spacy model takes time, we certainly do not want to load model for each record to process
    we load model only once and apply it to each batch of content this executor is responsible for
    """

    # load spacy model
    spacy.cli.download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm", disable=['ner'])

    # cleanse and tokenize a batch of PDF content 
    for content_series in content_series_iter:
        yield content_series.map(lambda x: extract_statements(nlp, x))


def tokenize(sentence):
    gen = gensim.utils.simple_preprocess(sentence, deacc=True)
    return ' '.join(gen)

def lemmatize(nlp, text):
  
    # parse sentence using spacy
    doc = nlp(text) 

    # convert words into their simplest form (singular, present form, etc.)
    lemma = []
    for token in doc:
        if (token.lemma_ not in ['-PRON-']):
            lemma.append(token.lemma_)

    return tokenize(' '.join(lemma))

@pandas_udf('string', PandasUDFType.SCALAR_ITER)
def lemma(content_series_iter):
    """
    as loading a spacy model takes time, we certainly do not want to load model for each record to process
    we load model only once and apply it to each batch of content this executor is responsible for
    """

    # load spacy model
    spacy.cli.download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm", disable=['ner'])

    # lemmatize a batch of text content into sentences
    for content_series in content_series_iter:
        yield content_series.map(lambda x: lemmatize(nlp, x))



In [5]:

# cache PDFs
urticaria_articles = urticaria_urls.withColumn('content', extract_content(F.col('path'))) \
    .withColumn('meta', extract_meta(F.col('path'))) \
    .filter(F.length(F.col('content')) > 0) \
    .cache()
urticaria_articles.show(1)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute '_fill_function' on <module 'pyspark.cloudpickle' from '/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/cloudpickle/__init__.py'>


In [15]:
urticaria_statements = urticaria_articles.withColumn('statements', extract_statements_udf(F.col('content'))) \
    .withColumn('statement', F.explode(F.col('statements'))) \
    .filter(F.length(F.col('statement')) > 100) \
    .select('article', 'statement') \
    .cache()
urticaria_statements.show(1)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute '_fill_function' on <module 'pyspark.cloudpickle' from '/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/cloudpickle/__init__.py'>


In [8]:
urticaria_lemma = urticaria_statements.withColumn('lemma', lemma(F.col('statement'))) \
    .select('article', 'statement', 'lemma')

urticaria = urticaria_lemma.select("article", "statement", "lemma").toPandas()

urticaria.to_csv('./retrieval/Data/ProcessedData.csv')