## MapReduce Performance Processing - Analysing 33,000 Covid-19 Research Papers

Example of improving query performance by x100.

How do you query 100M+ rows of Cloud data? Or find the time to read 33000 Covid-19 research papers? 


### Introducing the solution: MapReduce Multiprocessing

A well established way to split your data into chunks and process it in parallel using all the processors in your system/cluster. The process goes something like this: 

INPUT -> SPLIT -> MAP (apply a function with multiple processors) -> REDUCE (put back together & sort) -> OUTPUT

This script uses data from Kaggles "COVID-19 Open Research Dataset Challenge (CORD-19)" and demonstrates the power of multiprocessing. 

https://www.kaggle.com/allen-institute-for-ai/CORD-19-research-challenge

In [1]:
import time
import multiprocessing
from multiprocessing import Pool
from functools import reduce
import re
import csv
import pandas as pd
import collections

### Lets try and summarise the contents of all our covid-19 research papers.

First lets take a look at the data. You'll see we have 33k valid research papers listed here. 

With each paper we have the title, author, abstract and body.

In [3]:
df = pd.read_csv("COVID_19_Open_Research_Dataset.csv")
df.dropna(subset=['abstract'])

print(df.info())
df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 33012 entries, 0 to 33011
Data columns (total 18 columns):
paper_id                       33012 non-null object
title                          29377 non-null object
abstract                       24539 non-null object
body_text                      33012 non-null object
cord_uid                       30183 non-null object
source_x                       30183 non-null object
doi                            29877 non-null object
pmcid                          15121 non-null object
pubmed_id                      23212 non-null float64
license                        30183 non-null object
publish_time                   30183 non-null object
authors                        29634 non-null object
journal                        29063 non-null object
Microsoft Academic Paper ID    296 non-null float64
WHO #Covidence                 420 non-null object
has_full_text                  30183 non-null object
full_text_file                 30183 non-null

Unnamed: 0,paper_id,title,abstract,body_text,cord_uid,source_x,doi,pmcid,pubmed_id,license,publish_time,authors,journal,Microsoft Academic Paper ID,WHO #Covidence,has_full_text,full_text_file,url
0,25621281691205eb015383cbac839182b838514f,SMARCA2-regulated host cell factors are requir...,The human interferon (IFN)-induced MxA protein...,Influenza A viruses (IAV) are severe human pat...,,,,,,,,,,,,,,
1,7db22f7f81977109d493a0edf8ed75562648e839,Recombinant Scorpine Produced Using SUMO Fusio...,"Scorpine, a small cationic peptide from the ve...",The oldest known scorpions lived around 430 mi...,ymp1pj3r,PMC,10.1371/journal.pone.0103456,PMC4113386,25068263.0,cc-by,2014-07-28,"Zhang, Chao; He, Xinlong; Gu, Yaping; Zhou, Hu...",PLoS One,,,True,comm_use_subset,https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4...
2,a137eb51461b4a4ed3980aa5b9cb2f2c1cf0292a,The effect of inhibition of PP1 and TNFα signa...,Background: The complex interplay between vira...,The emergence of Severe Acute Respiratory Synd...,d79twl34,PMC,10.1186/s12918-016-0336-6,PMC5035469,27663205.0,cc-by,2016-09-23,"McDermott, Jason E.; Mitchell, Hugh D.; Gralin...",BMC Syst Biol,,,True,comm_use_subset,https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5...
3,6c3e1a43f0e199876d4bd9ff787e1911fd5cfaa6,Review Article Microbial Agents as Putative In...,,Sjögren's syndrome (SS) is a connective tissue...,kg7c9a9h,PMC,10.1155/2019/8567364,PMC6339763,30723750.0,cc-by,2019-01-06,"Talotta, Rossella; Sarzi-Puttini, Piercarlo; A...",J Immunol Res,,,True,comm_use_subset,https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6...
4,2ce201c2ba233a562ee605a9aa12d2719cfa2beb,A cluster of adenovirus type B55 infection in ...,Background: Human adenovirus type 55 is a re-e...,Human adenovirus (HAdV) is a common pathogen a...,sg7esn4p,PMC,10.1111/irv.12457,PMC5485872,28488368.0,cc-by,2017-06-26,"Yi, Lina; Zou, LiRong; Lu, Jing; Kang, Min; So...",Influenza Other Respir Viruses,,,True,comm_use_subset,https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5...


### How big is this data?

In [4]:
# Combine all the abstracts into a block of strings and then split into parts equal to however many processors i have available. 
data = df["abstract"].tolist()
data = str(data)
n = int(len(data)/multiprocessing.cpu_count() -1 )
parts = [data[i:i+n] for i in range(0, len(data), n)]

# Words i will want to remove later because they are purely gramatical and provide little insight
ENGLISH_STOP_WORDS = ["nan","et","al","i", "c", "n","a", "about", "above", "above", "across", "after", "afterwards", "again", "against", "all", "almost", "alone", "along", "already", "also","although","always","am","among", "amongst", "amoungst", "amount",  "an", "and", "another", "any","anyhow","anyone","anything","anyway", "anywhere", "are", "around", "as",  "at", "back","be","became", "because","become","becomes", "becoming", "been", "before", "beforehand", "behind", "being", "below", "beside", "besides", "between", "beyond", "bill", "both", "bottom","but", "by", "call", "can", "cannot", "cant", "co", "con", "could", "couldnt", "cry", "de", "describe", "detail", "do", "done", "down", "due", "during", "each", "eg", "eight", "either", "eleven","else", "elsewhere", "empty", "enough", "etc", "even", "ever", "every", "everyone", "everything", "everywhere", "except", "few", "fifteen", "fify", "fill", "find", "fire", "first", "five", "for", "former", "formerly", "forty", "found", "four", "from", "front", "full", "further", "get", "give", "go", "had", "has", "hasnt", "have", "he", "hence", "her", "here", "hereafter", "hereby", "herein", "hereupon", "hers", "herself", "him", "himself", "his", "how", "however", "hundred", "ie", "if", "in", "inc", "indeed", "interest", "into", "is", "it", "its", "itself", "keep", "last", "latter", "latterly", "least", "less", "ltd", "made", "many", "may", "me", "meanwhile", "might", "mill", "mine", "more", "moreover", "most", "mostly", "move", "much", "must", "my", "myself", "name", "namely", "neither", "never", "nevertheless", "next", "nine", "no", "nobody", "none", "noone", "nor", "not", "nothing", "now", "nowhere", "of", "off", "often", "on", "once", "one", "only", "onto", "or", "other", "others", "otherwise", "our", "ours", "ourselves", "out", "over", "own","part", "per", "perhaps", "please", "put", "rather", "re", "same", "see", "seem", "seemed", "seeming", "seems", "serious", "several", "she", "should", "show", "side", "since", "sincere", "six", "sixty", "so", "some", "somehow", "someone", "something", "sometime", "sometimes", "somewhere", "still", "such", "system", "take", "ten", "than", "that", "the", "their", "them", "themselves", "then", "thence", "there", "thereafter", "thereby", "therefore", "therein", "thereupon", "these", "they", "thick", "thin", "third", "this", "those", "though", "three", "through", "throughout", "thru", "thus", "to", "together", "too", "top", "toward", "towards", "twelve", "twenty", "two", "un", "under", "until", "up", "upon", "us", "very", "via", "was", "we", "well", "were", "what", "whatever", "when", "whence", "whenever", "where", "whereafter", "whereas", "whereby", "wherein", "whereupon", "wherever", "whether", "which", "while", "whither", "who", "whoever", "whole", "whom", "whose", "why", "will", "with", "within", "without", "would", "yet", "you", "your", "yours", "yourself", "yourselves", "the"]

#number of charaters
len(data)

37522171

#### 375 million characters to analyse!

## 1. Analyse without multi-processing

We're going to time how long it takes my system to convert these characters into words and sort the words by abundance - using a single processor. 

We have split the data into chunks and will process in series using the "Mapper" function and then we will bring it all back together with the "reducer" function. 

In [5]:
# Function is given below, multi-processing in Jupyter requires you to import your functions
from My_Functions_covid import mapper

######---------Functions--------#######

# Meat of the function - cleans, filters and splits the characters into words and then finds the most common words. Performed in chunks. 

#def mapper(data):
#    lower = re.sub(r'[^\w\s]', '', data).lower()
#    clean_words = re.sub(r'[^a-zA-Z ]', '', lower)
#    output = []
#    for x in clean_words.split():
#        if x not in ENGLISH_STOP_WORDS:
#            output.append(x)
#    return collections.Counter(output).most_common(10)

# Brings chunks back together
def reducer(mapped):
    dataframe = pd.DataFrame()
    for x in list(mapped):
        df = pd.DataFrame(dict(x).items())
        dataframe = dataframe.append(df)
    return dataframe.groupby(dataframe.columns[0]).sum().sort_values(dataframe.columns[0], ascending = False)

######---------Code--------#######

t0 = time.process_time()
clean = map(mapper, parts)

reduced = reducer(clean)
t1 = time.process_time()

print(t1-t0)
reduced.sort_values(reduced.columns[0], ascending = False).head(10)

18.0625


Unnamed: 0_level_0,1
0,Unnamed: 1_level_1
virus,23949
infection,17206
viral,16839
cells,15021
respiratory,12692
study,12645
protein,12578
disease,11360
patients,8531
,6368


#### 18 second runtime!

This is a rarther poor query speed for a business application. 

## 2. Analyse with multi-processing

We are going to time the exact same analysis as above but this time process all the parts in parallel using multiple processors.

We have split the data into chunks and will process in parallel using the "Mapper" function and then we will bring it all back together with the "reducer" function. 

INPUT -> SPLIT -> MAP (apply a function with multiple processors) -> REDUCE (put back together & sort) -> OUTPUT

In [6]:
# Python's mutliprocessing library allows you to create a pool of a allowable processors. 
pool = Pool(multiprocessing.cpu_count())

######---------Functions--------#######

# Meat of the function - cleans, filters and splits the characters into words and then finds the most common words. Performed in chunks. 

#def mapper(data):
#    lower = re.sub(r'[^\w\s]', '', data).lower()
#    clean_words = re.sub(r'[^a-zA-Z ]', '', lower)
#    output = []
#    for x in clean_words.split():
#        if x not in ENGLISH_STOP_WORDS:
#            output.append(x)
#    return collections.Counter(output).most_common(10)

# Brings chunks back together
def reducer(mapped):
    dataframe = pd.DataFrame()
    for x in list(mapped):
        df = pd.DataFrame(dict(x).items())
        dataframe = dataframe.append(df)
    return dataframe.groupby(dataframe.columns[0]).sum().sort_values(dataframe.columns[0], ascending = False)

######---------Code--------#######

###### Notice how we are now telling the Mapper function to use our pool of processors

t0 = time.process_time()
clean2 = pool.map(mapper, parts)

reduced2 = reducer(clean2)
t1 = time.process_time()

print(t1-t0)
reduced2.sort_values(reduced.columns[0], ascending = False).head(10)

0.171875


Unnamed: 0_level_0,1
0,Unnamed: 1_level_1
virus,23949
infection,17206
viral,16839
cells,15021
respiratory,12692
study,12645
protein,12578
disease,11360
patients,8531
,6368


#### 0.17 second runtime!

This represents a 100x improvement to query performance and is an acceptable query speed for business applications. You can see why companies like Google, AWS, M&S, British Airways etc. use it. 

#### Findings
A note on the findings from the Covid-19 papers - its clear most studies focus on virual infections of respiratory cells with around 1/3 focusing on the impact to patients. 