## Choose AWS or Local
This code does some initial setup depending upon if you're running this code in AWS EMR or locally via Docker Containers.

In [None]:
# Do not edit these 2 lines
LOCAL_RUN_MODE = "LOCAL"
AWS_EMR_RUN_MODE = "AWS_EMR"

# Uncomment the line you want to use, matching to where you're running the code
run_mode = LOCAL_RUN_MODE
# run_mode = AWS_EMR_RUN_MODE

# Define AWS bucket (adjust if S3 bucket name is different than the readme instructions)
bucket = "s3://cse6242-team094-spring2021"

# For AWS, specify which file you'd like to run (sample or full dataset, sample recommended for speed)
csv_file = 'metadata_sample_5000.csv' # Sample dataset
#csv_file = 'metadata.csv' # Full dataset

In [None]:
if (run_mode == LOCAL_RUN_MODE):
    # Initialize the SparkSession
    from pyspark.sql import SparkSession
    spark = (SparkSession.builder.master("local")
                                 .config("spark.jars", "/usr/share/java/mysql-connector-java-8.0.23.jar")
                                 .getOrCreate())
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

## Load in required packages.

In [None]:
from pyspark.sql.functions import regexp_replace, lower, col, udf, posexplode, collect_list, concat_ws
from pyspark.sql.types import StringType, ArrayType, StructType, StructField
from pyspark.sql import *
from nltk.corpus import wordnet, stopwords
import nltk
import contractions
import re

In [None]:
# Retrieve additional artifacts needed for NLTK
if (run_mode == LOCAL_RUN_MODE):
    nltk.download("averaged_perceptron_tagger")
    nltk.download("stopwords")
    nltk.download("wordnet")
    nltk.download("punkt")
    nltk.download("omw")

## Function to load in data, write results to parquet, and import stopword data as PySpark dataframe

In [None]:
def load_csv_data(filename, path):
    # Load Raw Data 
    raw_data = (spark.read
        .option("quote", "\"")
        .option("escape", "\"")
        .csv(path + filename, header=True, inferSchema=True))
    print("Number of papers: ",raw_data.count()) # Prints # of papers (# of records, as each record is one paper)
    return raw_data

def write_to_parquet(path, subfolder):
    abstracts.write.parquet(path+subfolder)

# Define list of stopwords from nltk library, make additions appropriate to this project, then convert into spark dataframe
stopwords_var = stopwords.words('english')
stopwords_var.extend(['background','objective','introduction','abstract','conclusion','figure','table','chart'])
stopwords_df = spark.createDataFrame(stopwords_var, StringType())

## Load in data, print number of total papers, and preview first 5 rows

In [None]:
if (run_mode == LOCAL_RUN_MODE):
    raw_data = load_csv_data(filename='metadata_sample_5000.csv', path="/home/jovyan/sample_data/")
else:
    raw_data = load_csv_data(filename=csv_file, path=bucket + "/read/")

## Drop duplicate rows based on repeated cord_uid or repeated abstract text, and drop rows with null values.
Titles, authors, and other features are allowed to be repeated (it's possible that two papers are titled identically by coincidence, but it is less likely that two papers have the same exact abstract text and impossible for them to have the same cord_uid). The number of resulting papers after eliminating duplicates and null rows is printed.

In [None]:
raw_data = raw_data.dropDuplicates(['cord_uid']) \
    .dropDuplicates(['abstract']) \
    .na.drop(subset=['cord_uid','title','authors','abstract'])
print("Number of papers: ",raw_data.count())

## Create three new dataframes for each of the relevant datasets (abstract text, titles, and metadata).
- Abstracts: Includes cord_uid, raw abstract text, pre-processed abstract text.
- Titles: Includes cord_uid, title text.
- Metadata: Includes cord_uid, author names, publication date, publishing journal.

In [None]:
abstracts_raw = raw_data.select(['cord_uid','abstract'])
titles = raw_data.select(['cord_uid','title'])
metadata = raw_data.select(['cord_uid','authors','publish_time','journal', 'url'])

## Begin to pre-process abstract text data. Following steps are taken in pre-processing:
1. Convert all letters to lowercase.
2. Expand contractions with a user-defined-function.
3. Replace all non-alpha characters with spaces using regular expressions.
    (spaces were used instead of deletion because many forms of punctuation can be used to separate words, like hyphens, slashes, etc. Additionally, lemmatization isn't strong enough to replace the possessive format of unfamiliar words, so using deletion could result in two terms instead of one for many instances. For example, mRNA's would be replaced with mRNAs if the apostrophe is simply deleted. If using a space instead of delection, the mRNA's will eventually be transformed to mRNA as the base term, which will more accurately track the topic. Other forms of punctuation aren't impacted with spaces instead of deletion because the tokenization will eliminate white spaces later.
4. Remove all single character words using regular expressions.
5. Tokenize all long strings of texts into individual words called tokens (with a UDF: user defined function).
6. Flatten the data to eliminate the column of lists of tuples (using PySpark's explode function).
7. Lemmatize all tokens to the respective base terms (with a UDF: user defined function).
8. Compare lemmatized tokens to list of stop words and filter accordingly (using left-anti join).
9. Join pre-processed tokens into comma seperated strings for each respective paper to store data in SQL database.
10. Join raw abstract text rdd with abstract tokens rdd.
11. Store joined rdd as parquet outputs.

An effort was made to merge the steps above where appropriate, to avoid having too many independent operations that would be otherwise computationally intensive. To begin the pre-processing, all of the user-defined functions will be defined. Afterwards, the various user-defined functions and pre-processing operations will be completed on the PySpark dataframe.

In [None]:
# UDF-1: User defined function to expand contractions. Takes in a string and returns the expanded format. E.g. can't --> can not
contractions_udf = udf(lambda z: contractions.fix(z), StringType())


# UDF-2: Function to tokenize words and assign part of speech tags to tokens using nltk library.
def nltk_pos_tag(z):
    x = nltk.word_tokenize(z)
    y = nltk.pos_tag(x)
    return y
# Create PySpark UDF from previously written function
pos_tag_udf = udf(lambda z: nltk_pos_tag(z), ArrayType(ArrayType(StringType())))


# UDF-3: User defined function to convert nltk pos tags to wordnet pos tags. Then takes the word and it's pos tag to lemmatize it.

def wn_lemma(tokens_postag):
    from nltk.corpus import wordnet, stopwords
    # Define list of stopwords from nltk library, make additions appropriate to this project, then convert into spark dataframe
    stopwords_var = stopwords.words('english')
    stopwords_var.extend(['background','objective','introduction','abstract','conclusion','figure','table','chart'])

    # Adjust output tags from nltk.pos_tag to appropriate format for WordNet Lemmatizer input
    wn_lemmatizer = nltk.WordNetLemmatizer()
    if tokens_postag[1].startswith('J'):
        wn_tag = wordnet.ADJ
    elif tokens_postag[1].startswith('V'):
        wn_tag = wordnet.VERB
    elif tokens_postag[1].startswith('N'):
        wn_tag = wordnet.NOUN
    elif tokens_postag[1].startswith('R'):
        wn_tag = wordnet.ADV
    else:
        wn_tag = None
    
    if wn_tag is not None:
        lemma = wn_lemmatizer.lemmatize(tokens_postag[0], pos=wn_tag)
    else:
        lemma = tokens_postag[0]
    return lemma

lemmatize_udf = udf(lambda z: wn_lemma(z), StringType())

In [None]:
# Steps 1-3: The operation below converts the abstract text to lowercase, expands contractions, and then replaces all non-alpha characters with spaces
abstracts = abstracts_raw.select('cord_uid', regexp_replace(contractions_udf(lower(col("abstract"))), "[^a-zA-Z\s]", " ").alias("abstract_proc"))

In [None]:
# Step 4: Eliminate all single character words
abstracts = abstracts.withColumn('abstract_proc', regexp_replace(col("abstract_proc"), "\s(\w\s)+", " ")) 

In [None]:
# Step 5: Create tokens and part of speech tags
abstracts = abstracts.select("cord_uid", pos_tag_udf(col("abstract_proc")).alias("abstract_tokens"))

In [None]:
# Step 6: Flatten the list of tuples - [(token1, POS_tag1), (token2, POS_tag2), ...] (posexplode does this while creating indices to preserve order)
abstracts = abstracts.select("cord_uid", posexplode(col("abstract_tokens")).alias("pos","abstract_tokens"))

In [None]:
# Step 7: Lemmatize all the tokens to their base terms
abstracts = abstracts.withColumn("abstract_tokens", lemmatize_udf(col("abstract_tokens")))

In [None]:
# Step 8: Eliminate rows with tokens that are stopwords (using a left anti-join)
abstracts = abstracts.join(stopwords_df,abstracts.abstract_tokens ==  stopwords_df.value,"leftanti")

In [None]:
# Step 9: Collapse results back into a string of space separated values for tokens. Use the previously created indexes to preserve order of tokens
abstracts = abstracts.orderBy("cord_uid","pos").groupBy("cord_uid").agg(concat_ws(" ", collect_list("abstract_tokens")).alias("abstract_tokens"))

In [None]:
# Step 10: Join abstracts_raw rdd with abstracts rdd
abstracts_final = abstracts_raw.join(abstracts, abstracts_raw.cord_uid == abstracts.cord_uid, "left") \
    .drop(abstracts.cord_uid)

In [None]:
# This saving code will be executed when running in the AWS EMR cluster
if (run_mode == AWS_EMR_RUN_MODE):
    # Step 11: Write output to parquet files
    write_bucket = bucket + "/write/"
    abstracts_final.write.parquet(write_bucket+"abstract_parquet")
    titles.write.parquet(write_bucket+"title_parquet")
    metadata.write.parquet(write_bucket+"metadata_parquet")

In [None]:
# This saving code will be executed when running a local, Docker Container demo
if (run_mode == LOCAL_RUN_MODE):      
    (abstracts_final.write
                    .format('jdbc')
                    .options(url="jdbc:mysql://cse6242_team094_mysqldb/cse6242_team094?sessionVariables=sql_mode='NO_ENGINE_SUBSTITUTION'&jdbcCompliantTruncation=false",
                            driver='com.mysql.jdbc.Driver',
                            dbtable='processed_abstracts',
                            user='root',
                            password='p@ssw0rd1',
                            createTableColumnTypes='abstract VARCHAR(65536), cord_uid VARCHAR(1024), abstract_tokens VARCHAR(65536)')
                    .mode('overwrite')
                    .save())
    (titles.write
           .format('jdbc')
           .options(url='jdbc:mysql://cse6242_team094_mysqldb/cse6242_team094',
                    driver='com.mysql.jdbc.Driver',
                    dbtable='titles',
                    user='root',
                    password='p@ssw0rd1')
           .mode('overwrite')
           .save())
    (metadata.write
             .format('jdbc')
             .options(url='jdbc:mysql://cse6242_team094_mysqldb/cse6242_team094',
                      driver='com.mysql.jdbc.Driver',
                      dbtable='metadata',
                      user='root',
                      password='p@ssw0rd1')
              .mode('overwrite')
              .save())