# NIH Grant Tracking with Spark

## Phase I: Develop and Test

In this notebook, I will explore the NIH grant tracking project in the spark framework. First of all, let's run some development process with a smaller data set from the grant abstracts residing in the local `Data/PRJABS/` folder.

In [1]:
%matplotlib inline
import matplotlib
import numpy as np
import pandas as pd
from datetime import datetime
import seaborn as sns
import os
import dill
import re
from tqdm import tqdm_notebook
import pickle
import matplotlib.pyplot as plt
from nltk.stem.porter import *
sns.set()
matplotlib.rcParams['figure.dpi'] = 144

Start the spark section:

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

sc = SparkContext("local[*]", "temp")
sqlContext = SQLContext(sc)

Define the local file path conversion function:

In [3]:
def localpath(path):
    return 'file://' + os.path.join(os.path.abspath(os.path.curdir), path)

Prepare a test folder with the grant abstracts in 1985:

In [28]:
from shutil import copyfile
if os.path.exists('Data/test/'):
    os.system('rm -r Data/test')
    os.mkdir('Data/test/')
    fns = os.listdir('Data/PRJABS/')
    for fn in sorted(fns)[:3]:
        copyfile(os.path.join('Data/PRJABS/',fn),
                 os.path.join('Data/test/',fn))

Load the abstract content in the test folder into spark, clean text, tokenize the corpus, and stem the words:

In [29]:
abstract = sc.textFile(localpath('Data/test/'))
def text_cleaning(doc):
    '''lower case, clean words/symbols'''
    rm_list ='\"|\,|\(|  +|\)|\.|\'|\:'
    doc = re.sub(r'{}'.format(rm_list),' ',doc)
    doc = doc.strip().lower()
    return doc

stemmer = PorterStemmer()
def stem(words):
    '''Get the stem of the words'''
    words_stem = []
    for word in words:
        word_stem = stemmer.stem(word)
        if len(word_stem) > 2:
            words_stem.append(word_stem)
    return words_stem

df_abs = (abstract.map(lambda doc: text_cleaning(doc))
                  .filter(lambda doc: len(doc) > 0)
                  .filter(lambda line: not line.startswith('app'))
                  .map(lambda doc: doc.split(' '))
                  .map(lambda word: [x for x in word if len(x)>0])
                  .map(lambda word: stem(word))
                  .map(lambda doc: (int(doc[0]), doc[1:]))
                  .filter(lambda doc: len(doc[1])>0)
                  .toDF(['Id','words']))
df_abs.show(5)

+-------+--------------------+
|     Id|               words|
+-------+--------------------+
|3091032|[the, primari, de...|
|3104974|[the, asthma, and...|
|3091425|[thi, program, pr...|
|3104981|[the, research, t...|
|3091436|[the, host-parasi...|
+-------+--------------------+
only showing top 5 rows



Build the machine learning pipeline with the LDA model on spark:

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import CountVectorizer, Tokenizer, IDF
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.clustering import LDA
from nltk.corpus import stopwords
# build the pipeline and lda model with online optimizer
stop_words = StopWordsRemover(inputCol='words',
                             outputCol='clean')
stop_words.setStopWords(stop_words.loadDefaultStopWords('english'))
countv = CountVectorizer(inputCol=stop_words.getOutputCol(), 
                         outputCol="tokens")
idf = IDF(inputCol=countv.getOutputCol(),outputCol="features")
lda = LDA(maxIter=10,k=10,optimizer='online')

pipeline = Pipeline(stages=[stop_words, countv, idf, lda])

lda_model = pipeline.fit(df_abs)

labels = lda_model.transform(df_abs)

In [38]:
lda_model.stages[3].logPerplexity(labels)

9.387032246261784

In [35]:
# identify the label as the topic with the max probability
# save the label to file
if os.path.exists('Data/Topics/'):
    os.system('rm -r Data/Topics/')
topic_labels = (labels.select('Id','topicDistribution')
                      .rdd
                      .map(lambda x: (x[0],np.argmax(x[1])))
                      .saveAsTextFile(localpath('Data/Topics/labels/')))
# Get the topics
wordnum = 5 # choose the number of topic words
vocabulary = lda_model.stages[1].vocabulary
# make the vocabulary the broadcast variable to all the nodes
# convert the topics from indices to actual words and store them
voc_bv = sc.broadcast(vocabulary)
topic_df =(lda_model.stages[3].describeTopics(wordnum)
                    .rdd
                    .map(lambda x:(x[0],[voc_bv.value[Id] for Id in x[1]],x[2]))
                    .saveAsTextFile(localpath('Data/Topics/words/')))

Testing the scripts on the local `Hadoop` file system:

In [None]:
!hadoop fs -rm -r hdfs:///user/vagrant/output/
!spark-submit --master local[*] --py-files myutils.py grant.py \
    hdfs:///user/vagrant/test/ hdfs:///user/vagrant/output/

In [64]:
!rm -r output
!mkdir output
!hadoop fs -copyToLocal hdfs:///user/vagrant/output/* output/

rm: cannot remove 'output': No such file or directory


## Phase II: Run the whole LDA learning on GCP

The following commands need to be implemented in the **terminal**

In [None]:
# delete the output folder if it already exists
!gsutil rm -r gs://camalot/output

Need to make sure that the `cluster_init.sh` has the necessary packages including `numpy` and `ntlk`:

In [None]:
# more powerful
!gcloud dataproc clusters create cluster-1 --initialization-actions \
    gs://camalot/cluster_init.sh --region us-west2 --zone us-west2-a \
    --master-machine-type n1-standard-2 --master-boot-disk-size 500GB \
    --num-workers 3 \
    --worker-machine-type n1-standard-4 --worker-boot-disk-size 500GB \
    --num-worker-local-ssds 1 --project tdi2018-217422

In [None]:
!gcloud dataproc jobs submit pyspark --cluster cluster-1 --region us-west2 \
    --py-files gs://camalot/datacourse/grant_tracking/myutils.py \
    gs://camalot/datacourse/grant_tracking/grant.py \
    -- gs://camalot/data/NIH/PRJABS/ \
            gs://camalot/output/

In [None]:
# shut down cluster at the end
!gcloud dataproc clusters delete cluster-1 --region us-west2

In [None]:
sc.stop()