# Covid 19 Search Engine

- **Goal**: Create a search engine that uses EMR Serverless for ad-hoc processing of larges amount of unstructured textual data
- **Who is this course for?**: Anyone with background in Python programing and some knowledge of Spark and EMR willing to connect a practical application of ML and Data Engineering
- **Data is available here**: https://www.kaggle.com/datasets/allen-institute-for-ai/CORD-19-research-challenge?datasetId=551982&sortBy=voteCount


## Agenda

- How search engines works?
- Benefits of Serverless Architecture
- Code and Data Overview
- Spark Processing Pipeline
- Submitting EMR Job
- Crawling data 
- Athena Table
- Running Search Engine
- Production Ready Code Overview

## How (Modern) Search Engines works

- **Information Retrieval**: finding material (usually documents) of an unstructured nature (usually text) that satisfies an information need from within large collections (usually stored on computers)
- **Term-based Retrieval Methods**: Mathematical framework defining query-document matching based on exact syntactic matching between a document and a query to estimate the relevance of documents given a search query.
- **Usage**: Internet Search (Google, Bing etc), Smart Devices (Alexa, Google Assistant), Web shopping and etc

## Term Based Search Engines: BM25 (TF-IDF)
![Image of BM25](https://miro.medium.com/max/720/1*V8zEF3m21WkJ-UYzME6cKA.webp)

## Improving Search Performance

- Removing Stop Words
- Filtering unwanted text
- Removal of Duplicates
- etc

## How to process high volumes of data?

- **Distributed Computing**: Modern data engineering allows seamless horizontal scaling through frameworks like Hadoop and Spark
- **In Memory Processing**: Usually limited to a single machine, uses ephemeral memory such as ram to perform computations faster
- **SQL and NoSQL database**: Using specific query languages to manipulate structured and unstructured data.

## Clusters are expensive and hard to maintain

- Setting up clusters for Hadoop and Spark is costly
- Depending on the application, cluster will spend most of the time idle
- On-going updates and maintance requires dedicated people


## Using the power of serverless architecture for data processing

![Image of BM25](https://d1.awsstatic.com/Product-Page-Diagram_Amazon-EMR-Serverless%402x.d462590cd415fe4022e8644cf487f01c7a5f3f15.png)

## Setup

In [1]:
import sys
import os
import warnings
import time
import shutil
from IPython.display import display, HTML

sys.path.append(os.path.dirname(os.getcwd()))

import boto3
import pandas as pd
import awswrangler as wrangler
from pyspark.sql.dataframe import DataFrame as SparkDataframe
from pyspark.sql.functions import col, isnull, when, length, lit, coalesce, regexp_replace, to_date

from src.processing import athena
from src.processing.spark import get_spark_session, from_files
from src.search import engine, results
from src.processing.emr import EMRServerless

In [None]:
pd.set_option('display.max_colwidth', None)
display(HTML("<style>.container { width:100% !important; }</style>"))
warnings.filterwarnings('ignore')

## Loading Data into Spark

In this section we will show step by step what each node of our Spark Direct Acyclic Graph (DAG) is doing. In order to use the data for a search engine we want to perform some cleaning and feature calculation to make it easier for our end user to use the data

Reading data from local, download the data from: 

https://www.kaggle.com/datasets/allen-institute-for-ai/CORD-19-research-challenge?datasetId=551982&sortBy=voteCount

### Getting the Spark Session

```
def get_spark_session(env: str = None, app_name: str = 'SparkApp') -> SparkSession:
    if env is not None and env == 'DEV':
        spark = (SparkSession
                 .builder
                 .master('local[*]')
                 .appName(app_name)
                 .getOrCreate())
    else:
        spark = (SparkSession
                 .builder
                 .appName(app_name)
                 .getOrCreate())
    return spark
```

In [None]:
spark_session = get_spark_session(env='DEV', app_name='COVID-19 Research papers dataset')

Configuring some of the spark context to have a cleaner Jupyter Notebook experience :)

In [None]:
spark_session.sparkContext.setLogLevel('error') # setting the log level to log error only
log4jLogger = spark_session._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
logger.warn(f"Pyspark script logger initialized from {__name__}")

### We set a small sample amount to process just for demonstration purposes.

In [None]:
SAMPLE_SEED = 1234
SAMPLE_AMOUNT = 0.01 # Randomly Sample 1% of the data
INPUT_PATH = '/Users/rpossas/Dev/workspace/data/Covid-19-Patient-Health-Analytics/metadata.csv'

In [None]:
df = from_files(spark=spark_session, data_dir=INPUT_PATH, file_format="CSV").sample(SAMPLE_AMOUNT, seed=SAMPLE_SEED)

We will only be interested in a few columns, filtering the dataframe for better readability

In [None]:
INITIAL_COLUMNS = ('title','abstract','publish_time','authors','url')
df = df.select(list(INITIAL_COLUMNS))

### Lets have a look at the data

In [None]:
df.toPandas().head(5)

## Cleaning and Processing Data

### Cleaning title

Cleans and removes junk titles, which are filtered according to the regex provided

In [None]:
def clean_title(spark_df: SparkDataframe, relevant_regex: str = None) -> SparkDataframe:
    """
    Cleans and removes junk titles, which are filtered according to the regex provided
    :param spark_df: Spark Dataframe
    :param relevant_regex: Regex that is used to filter relevant titles
    :return: Filtered Dataframe
    """
    if relevant_regex is None:
        relevant_regex = '.*vir.*|.*sars.*|.*mers.*|.*corona.*|.*ncov.*|.*immun.*|.*nosocomial.*'
        relevant_regex = relevant_regex + '.*epidem.*|.*emerg.*|.*vacc.*|.*cytokine.*'

    is_title_junk = (length(col('title')) < 30) & ~(col('title').rlike(relevant_regex))
    spark_df = spark_df.na.fill('', ['title'])
    spark_df = spark_df.withColumn('title', when(is_title_junk, lit('')).otherwise(col('title')))
    return spark_df

In [None]:
df = clean_title(spark_df=df)
df.toPandas().head(2)

### Cleaning Abstract

We perform the following operations

- Remove any unused word from the provided regex
- Replace Unknown Abstracts by None
- Replace blank abstracts by the Title
- Drop Duplicate Abstracts

In [None]:
def clean_abstract(spark_df: SparkDataframe, abstract_regex: str = None) -> SparkDataframe:
    """
    Removes unused words and filters abstracts
    :param spark_df: Spark Dataframe
    :param abstract_regex: Regex of words to be removed from abstract
    :return: Spark Dataframe
    """
    if abstract_regex is None:
        abstract_regex = '(Publisher|Abstract|Summary|BACKGROUND|INTRODUCTION)'

    spark_df = spark_df.withColumn('abstract',
                                   when(col('abstract') == 'Unknown', lit(None))
                                   .otherwise(col('abstract')))
    spark_df = spark_df.withColumn('abstract', coalesce('abstract', 'title')) # Replace empty abstracts by the title
    spark_df = spark_df.withColumn('abstract', regexp_replace('abstract', abstract_regex, ''))
    spark_df = spark_df.drop_duplicates(['abstract'])
    return spark_df

In [None]:
df = clean_abstract(spark_df=df)
df.toPandas().head(2)

### Tagging Key Words

In [None]:
SARS_COV_2_DATE = '2019-11-30'

#### Virus

In [None]:
def tag_virus(spark_df: SparkDataframe,
              abstract_column='abstract',
              virus_search: str = None) -> SparkDataframe:
    """
    Creates a new column that indicates whether the article is about viruses
    :param spark_df: Spark Dataframe
    :param abstract_column: Text column to search for references
    :param virus_search: regular expression to be applied
    :return: Dataframe with new boolean column indicating presence of virus terms
    """
    if virus_search is None:
        virus_search = f".*(virus|viruses|viral)"
    viral_cond = col(abstract_column).rlike(virus_search)
    return spark_df.withColumn('virus_related', coalesce(viral_cond, lit(False)))

#### Coronavirus

In [None]:
def tag_coronavirus(spark_df: SparkDataframe, abstract_column='abstract', corona_regex: str = None):
    """
    Creates a column that indicates whether the article is about coronavirus
    :param spark_df: Spork Dataframe
    :param abstract_column: Text column to search for terms
    :param corona_regex: Regular expression to be applied
    :return: Dataframe with new boolean column indicating presence of coronavirus terms
    """
    if corona_regex is None:
        corona_regex = col(abstract_column).rlike(".*corona")
    return spark_df.withColumn('corona_related', coalesce(corona_regex, lit(False)))

#### SARS

In [None]:
def tag_sars(spark_df: SparkDataframe, abstract_column='abstract', sars_regex: str = None):
    """
    Creates a column that indicates whether the article is about sars
    :param spark_df: Spork Dataframe
    :param abstract_column: Text column to search for terms
    :param sars_regex: Regular expression to be applied
    :return: Dataframe with new boolean column indicating presence of coronavirus terms
    """
    if sars_regex is None:
        sars_regex = ".*sars"

    sars_cond = col(abstract_column).rlike(sars_regex)
    sars_not_covid = ~(col('covid_related')) & (sars_cond)
    return spark_df.withColumn('sars_related', coalesce(sars_not_covid, lit(False)))

#### COVID

In [None]:
def tag_covid(spark_df: SparkDataframe,
              date_column: str = 'publish_time',
              abstract_column: str = 'abstract',
              covid_terms: str = None):
    """
    Creates a boolean column to flag whether the article is about COVID
    :param spark_df: Spark Dataframe
    :param date_column: Column with date of article
    :param abstract_column: Abstract Column
    :param covid_terms: List of covid terms to be used as a regular expression
    :return: Spark Dataframe with new boolean column
    """
    if covid_terms is None:
        covid_terms = ['covid', 'sars-?n?cov-?2', '2019-ncov', 'novel coronavirus', 'sars coronavirus 2']

    covid_search = f".*({'|'.join(covid_terms)})"

    since_covid = ((col(date_column) > SARS_COV_2_DATE) | (isnull(col(date_column))))
    covid_term_match = since_covid | col(abstract_column).rlike(covid_search)
    wuhan_outbreak = since_covid & col(abstract_column).rlike('.*(wuhan|hubei)')
    covid_match = covid_term_match | wuhan_outbreak
    spark_df = spark_df.withColumn('covid_related', coalesce(covid_match, lit(False)))
    return spark_df

In [None]:
df = tag_covid(spark_df=df)
df = tag_virus(spark_df=df)
df = tag_coronavirus(spark_df=df)
df = tag_sars(spark_df=df)

In [None]:
df.toPandas().head(2)

### Formatting dates and Filling Nulls

In [None]:
def format_date(spark_df: SparkDataframe, date_column='publish_time') -> SparkDataframe:
    """
    Formats date column to a single format
    :param spark_df: Spark Dataframe
    :param date_column: Column to be formatted
    :return: Dataframe with formatted dates column
    """
    spark_df = spark_df.withColumn(date_column, when(isnull(col(date_column)), '').
                                   otherwise(to_date(col(date_column))))
    return spark_df

In [None]:
def fill_nulls(spark_df: SparkDataframe,
               columns=['authors', 'abstract']) -> SparkDataframe:
    """
    Fill columns with null values
    :param spark_df: Spark Dataframe
    :return: Spark Dataframe with empty string instead of None
    """
    return spark_df.na.fill('', columns)

In [None]:
df = format_date(spark_df=df)
df = fill_nulls(spark_df=df)

## Running Spark Jobs on EMR Serverless

The above spark job will run on EMR Serverless. The AWS Service works similarly to lambda, where you only pay for the amount of processing/storage used.

The main advantage is that there is not requirement to set up clusters and, therefore, pay for idle time. This makes EMR Serverless ideal to ad-hoc jobs

### Setting up Environment

In order to run EMR Serverless we need to:

- Upload our locally created modules to AWS so our Spark Job can access them
- Upload our Python Virtual Environment with any custom frameworks we may require
- Upload our data to S3 so it is accessible by the Spark Job

#### Change Variables below to reflect your environment

In [2]:
AWS_PROFILE = 'projectpro'
AWS_REGION = 'us-east-1'

In [3]:
S3_BUCKET = 'project-pro-emr-serverless123'
S3_LOGS_BUCKET = f'projectpro-emr-serverless-logs'
aws_access_key_id = 'Enter your access key id'
aws_secret_access_key = 'Enter your secret key'
APPLICATION_ID = None

#### We need to create a EMR Role to be used by the submitted jobs, you can create a role with the following rules

In [4]:
EMR_JOB_ROLE_ARN = 'arn:aws:iam::143176219551:role/emr-new-role'

In [5]:
APPLICATION_NAME = 'COVID19'

DEFAULT_INPUT_PATH = f's3://{S3_BUCKET}/data'
S3_OUT_PATH = f's3://{S3_BUCKET}/out/'
DEFAULT_INPUT_FORMAT = 'csv'

UPDATE_ENVIRONMENT = False
UPDATE_MODULES = True

ENVIRONMENT_PATH = f's3://{S3_BUCKET}/environment'
ENVIRONMENT_FILE = 'environment.tar.gz'
FULL_ENVIRONMENT_PATH = f'{ENVIRONMENT_PATH}/{ENVIRONMENT_FILE}'

MODULE_PATH = f's3://{S3_BUCKET}/modules'
MODULE_FILE = 'src.zip'
FULL_MODULE_PATH = f'{MODULE_PATH}/{MODULE_FILE}'

SCRIPT_PATH = f's3://{S3_BUCKET}/scripts'
SCRIPT_FILE = 'spark_job.py'
FULL_SCRIPT_PATH = f'{SCRIPT_PATH}/{SCRIPT_FILE}'

In [31]:
session = boto3.Session(region_name=AWS_REGION,aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
client = session.client("emr-serverless", region_name=AWS_REGION)

#### Creating Virtual Environment using Docker

The virtual environment we are going to upload needs to be created by an Amazon Linux image, otherwise it won't work.

We use a docker image with the below command to create an ``environment.tar.gz`` in our folder which is going to be uploaded to s3

``docker build --output . .``

In [None]:
!ls

In [None]:
!ls ../

#### Uploading required files to S3

In [7]:
if UPDATE_MODULES:
    if os.path.isfile(MODULE_FILE):
        os.remove(MODULE_FILE)

    if SCRIPT_FILE is not None and SCRIPT_PATH is not None:
        module_file_name = MODULE_FILE.split('.')[0]
        module_file_extension = MODULE_FILE.split('.')[1]
        filename = f'{module_file_name}.{module_file_extension}'
        shutil.make_archive('src', 'zip', '../', 'src')
        wrangler.s3.upload(SCRIPT_FILE, FULL_SCRIPT_PATH, boto3_session=session)
        wrangler.s3.upload(MODULE_FILE, FULL_MODULE_PATH, boto3_session=session)

if UPDATE_ENVIRONMENT:
    if not os.path.isfile(f'../{ENVIRONMENT_FILE}'):
        raise Exception('Build your environment first using Docker: '
                        'DOCKER_BUILDKIT=1 docker build --output . .')

    wrangler.s3.upload(f'../{ENVIRONMENT_FILE}', FULL_ENVIRONMENT_PATH, boto3_session=session)

In [8]:
# Create and start a new EMRServerless Spark Application
emr_serverless = EMRServerless(emr_client=client, application_id=APPLICATION_ID)
print(f"Creating and starting EMR Serverless Spark App")
emr_serverless.create_application(APPLICATION_NAME, "emr-6.6.0")
emr_serverless.start_application()

Creating and starting EMR Serverless Spark App


In [9]:
files = wrangler.s3.list_objects(f'{DEFAULT_INPUT_PATH}', boto3_session=session)
job_dict = {}
job_run_id = None

In [10]:
print(f"Running a total of {len(files)} EMR Serverless Jobs")

for ix, file in enumerate(sorted(files)):
    # Submit a Spark job
    try:
        job_run_id = emr_serverless.run_spark_job(
            name=APPLICATION_NAME,
            script_location=f"{FULL_SCRIPT_PATH}",
            venv_name="environment",
            venv_location=f'{FULL_ENVIRONMENT_PATH}',
            modules_location=f'{FULL_MODULE_PATH}',
            job_role_arn=EMR_JOB_ROLE_ARN,
            arguments=["PROD", file, DEFAULT_INPUT_FORMAT, S3_OUT_PATH],
            s3_bucket_name=S3_LOGS_BUCKET,
            wait=False
        )
        print(f"Submitting new Spark job num {ix} and id {job_run_id}")

    except Exception as e:
        print(f'Error while submitting job: \n{e}')

        for job_run_id in job_dict.keys():
            job_status = emr_serverless.cancel_spark_job(job_id=job_run_id)
            print(f'Job {job_run_id} cancelled')

        raise e

    job_dict[job_run_id] = False

Running a total of 1 EMR Serverless Jobs
Submitting new Spark job num 0 and id 00f76fursoe2b209


In [11]:
all_done = False
jobs_completed = 0
jobs_running = len(job_dict.keys())
jobs_failed = 0

while not all_done:

    for job_id in job_dict.keys():
        job_status = emr_serverless.get_job_run(job_id)
        job_done = job_status.get("state") in [
            "SUCCESS",
            "FAILED",
            "CANCELLING",
            "CANCELLED",
        ]

        if job_done:
            job_dict[job_id] = True
            job_state = job_status.get("state")

            if job_state == "SUCCESS":
                jobs_completed += 1

            if job_state == "FAILED":
                jobs_failed += 1

            jobs_running -= 1
            all_done = all(job_dict.values())

        print(f"Jobs Running {jobs_running},"
              f"\nJobs Completed {jobs_completed},"
              f"\nJobs Failed {jobs_failed}")

    if all_done:
        break

    print('----')

    time.sleep(20)

print("Done! 👋")

Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 

Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 1,
Jobs Completed 0,
Jobs Failed 0
----
Jobs Running 0,
Jobs Completed 1,
Jobs Failed 0
Done! 👋


### Querying Data with Athena

In [40]:
sql = "SELECT * FROM covid19_emr_1out limit 10000"
athena_df = wrangler.athena.read_sql_query(sql, database='default', boto3_session=session)

In [41]:
athena_df.head(5)

Unnamed: 0,publish_time,title,abstract,covid_related,virus_related,corona_related,sars_related,url
0,2020-03-21,Neurological Complications of Coronavirus Dise...,Coronavirus disease 2019 (COVID-19) is a pande...,True,True,False,False,https://doi.org/10.7759/cureus.7352; https://w...
1,2022-04-27,Myeloid-derived suppressor cells in COVID-19: ...,Coronavirus disease 2019 (COVID-19) is a poten...,True,True,False,False,https://www.ncbi.nlm.nih.gov/pubmed/35489643/;...
2,2021-02-23,Clinical significance of measuring serum cytok...,Coronavirus disease 2019 (COVID-19) is a rapid...,True,True,True,False,https://www.sciencedirect.com/science/article/...
3,2022-05-04,Therapeutic approaches and vaccination in figh...,Coronavirus disease 2019 (COVID-19) is a remar...,True,True,True,False,https://doi.org/10.1016/j.genrep.2022.101619; ...
4,2021-01-01,Arterial Thrombosis in an Asymptomatic COVID-1...,Coronavirus disease 2019 (COVID-19) is a sever...,True,True,True,False,


In [45]:
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\DELL\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping tokenizers\punkt.zip.


True

In [46]:
bm25_index = engine.BM25SearchEngine(athena_df)

Creating the BM25 index from the abstracts of the papers
Use index="text" if you want to index the texts of the paper instead
Finished Indexing in 44.0 seconds


In [47]:
results.SearchResults(bm25_index.search('COVID19 Infection')[:10])

In [48]:
results.SearchResults(bm25_index.search('Are COVID19 and Coronavirus related?')[:10])