In [1]:
from __future__ import division, print_function

from Bio import Entrez, Medline

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyspark
from pyspark.ml import Pipeline, feature as spark_ft, classification as spark_cls
from sklearn import metrics as skmetrics
import wordcloud

%matplotlib inline

In [2]:
packages = [
    'com.databricks:spark-xml_2.11:0.4.1',
    'JohnSnowLabs:spark-nlp:1.5.4'
]

spark = pyspark.sql.SparkSession.builder \
    .master('local[4]') \
    .appName('notebook') \
    .config('spark.jars', 'pysparknlp-1.0.0/lib/sparknlp.jar') \
    .config('spark.jars.packages', ','.join(packages)) \
    .getOrCreate()

In [3]:
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

In [4]:
def query(terms, num_docs=1000):
    search_term = '+'.join(terms)
    print('Searching PubMed abstracts for documents containing term: ',search_term)
    handle = Entrez.esearch(db="pubmed", term=search_term, retmax=num_docs)
    record = Entrez.read(handle)
    handle.close()
    idlist = record["IdList"]
    
    handle = Entrez.efetch(db="pubmed", id=idlist, rettype="medline",retmode="text")
    records = Medline.parse(handle)
    data = []
    for record in records:
        data.append((record.get("TI", "?"),record.get("AU", "?"),record.get("SO", "?"),record.get("AB","?")))

    df = pd.DataFrame(data=data, columns=['Title','Authors','Source','Abstract'])
    df.head(10)

    df.replace(r'^\?$', np.nan, regex=True, inplace=True)
    df['Authors'] = df['Authors'].apply(lambda x: x if isinstance(x, list) else [])
    df.fillna('', inplace=True)
    df['Topic'] = search_term
    
    return spark.createDataFrame(df)

In [5]:
topics = [
    ['type', '1', 'diabetes'], 
    ['creutzfeldt', 'jakob', 'disease'], 
    ['post', 'traumatic', 'stress', 'disorder'],
    ['heart', 'disease']
]

In [6]:
texts = None

np.random.seed(123)
for terms in topics:
    num_docs = np.random.randint(200, 1000)
    print('terms', terms, 'num_docs', num_docs)
    if texts is None:
        texts = query(terms, num_docs)
    else:
        texts = texts.union(query(terms, num_docs))

terms ['type', '1', 'diabetes'] num_docs 710
Searching PubMed abstracts for documents containing term:  type+1+diabetes


Email address is not specified.

To make use of NCBI's E-utilities, NCBI requires you to specify your
email address with each request.  As an example, if your email address
is A.N.Other@example.com, you can specify it as follows:
   from Bio import Entrez
   Entrez.email = 'A.N.Other@example.com'
In case of excessive usage of the E-utilities, NCBI will attempt to contact
a user at the email address provided before blocking access to the
E-utilities.


terms ['creutzfeldt', 'jakob', 'disease'] num_docs 565
Searching PubMed abstracts for documents containing term:  creutzfeldt+jakob+disease
terms ['post', 'traumatic', 'stress', 'disorder'] num_docs 582
Searching PubMed abstracts for documents containing term:  post+traumatic+stress+disorder
terms ['heart', 'disease'] num_docs 522
Searching PubMed abstracts for documents containing term:  heart+disease


In [7]:
texts.count()

2379

In [8]:
texts.show()

+--------------------+--------------------+--------------------+--------------------+---------------+
|               Title|             Authors|              Source|            Abstract|          Topic|
+--------------------+--------------------+--------------------+--------------------+---------------+
|A luciferase immu...|[Ling Y, Jiang P,...|Clin Biochem. 201...|AIM: Luciferase i...|type+1+diabetes|
|Type 1 diabetes m...|[Singh RM, Howart...|Mol Cell Biochem....|There is much evi...|type+1+diabetes|
|Erratum. Validati...|[Sosenko JM, Skyl...|Diabetes Care. 20...|                    |type+1+diabetes|
|Macrovascular dis...|[Bjornstad P, Don...|Lancet Diabetes E...|Cardiovascular di...|type+1+diabetes|
|Insufficient evid...|[Brignardello-Pet...|J Am Dent Assoc. ...|                    |type+1+diabetes|
|Genetic risk scor...|[Thomas NJ, Jones...|Lancet Diabetes E...|                    |type+1+diabetes|
|Genetic risk scor...|[Leslie RD, Lernm...|Lancet Diabetes E...|                  

In [9]:
non_empty_texts = texts.where('Abstract != ""') \
    .withColumn('id', pyspark.sql.functions.monotonically_increasing_id())

In [10]:
non_empty_texts.count()

2180

In [11]:
non_empty_texts.show()

+--------------------+--------------------+--------------------+--------------------+---------------+---+
|               Title|             Authors|              Source|            Abstract|          Topic| id|
+--------------------+--------------------+--------------------+--------------------+---------------+---+
|A luciferase immu...|[Ling Y, Jiang P,...|Clin Biochem. 201...|AIM: Luciferase i...|type+1+diabetes|  0|
|Type 1 diabetes m...|[Singh RM, Howart...|Mol Cell Biochem....|There is much evi...|type+1+diabetes|  1|
|Macrovascular dis...|[Bjornstad P, Don...|Lancet Diabetes E...|Cardiovascular di...|type+1+diabetes|  2|
|Association betwe...|[Ahola AJ, Forsbl...|Diabetes Res Clin...|AIMS: Depressive ...|type+1+diabetes|  3|
|Alpha-1 antitryps...|[Weir GC, Ehlers ...|Pediatr Diabetes....|OBJECTIVE: To det...|type+1+diabetes|  4|
|Considering Cultu...|[Rose M, Aronow L...|Curr Diab Rep. 20...|PURPOSE OF REVIEW...|type+1+diabetes|  5|
|Improved Murine-M...|[Racine JJ, Stewa...|Dia

In [12]:
label_indexer = spark_ft.StringIndexer(inputCol='Topic', outputCol='label')

In [13]:
label_indexer_model = label_indexer.fit(non_empty_texts)

In [14]:
label_deindexer = spark_ft.IndexToString(inputCol='prediction', outputCol='pred_label', 
                                         labels=label_indexer_model.labels)

In [15]:
train, test = label_indexer_model.transform(non_empty_texts).randomSplit(weights=[0.8, 0.2], seed=123)

In [16]:
abstract_assembler = DocumentAssembler() \
    .setInputCol("Abstract") \
    .setOutputCol("document")
    
title_assembler = DocumentAssembler() \
    .setInputCol("Title") \
    .setOutputCol("document")

In [17]:
sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence") \
    .setUseAbbreviations(True)
    
tokenizer = Tokenizer() \
  .setInputCols(["sentence"]) \
  .setOutputCol("token")

stemmer = Stemmer() \
    .setInputCols(["token"]) \
    .setOutputCol("stem")
    
normalizer = Normalizer() \
    .setInputCols(["stem"]) \
    .setOutputCol("normalized")

nlp_pipeline = Pipeline(stages=[sentence_detector, tokenizer, stemmer, normalizer])

In [18]:
abstract_finisher = Finisher() \
    .setInputCols(["normalized"]) \
    .setOutputCols(["ntokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(True)
    
title_finisher = Finisher() \
    .setInputCols(["normalized"]) \
    .setOutputCols(["title"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(True)

In [19]:
stopWords = spark_ft.StopWordsRemover.loadDefaultStopWords('english')
sw_remover = spark_ft.StopWordsRemover(inputCol='ntokens', outputCol='text', stopWords=stopWords)

In [20]:
abstract_pipeline = Pipeline(stages=[abstract_assembler, nlp_pipeline, abstract_finisher, sw_remover])

title_pipeline = Pipeline(stages=[title_assembler, nlp_pipeline, title_finisher])

preproc_pipeline = Pipeline(stages=[abstract_pipeline, title_pipeline])

In [21]:
preproc_model = preproc_pipeline.fit(train)
processed = preproc_model.transform(train).select('id', 'topic', 'title', 'text', 'label')

In [22]:
processed.show()

+---+---------------+--------------------+--------------------+-----+
| id|          topic|               title|                text|label|
+---+---------------+--------------------+--------------------+-----+
|  0|type+1+diabetes|[a, luciferas, im...|[aim, luciferas, ...|  0.0|
|108|type+1+diabetes|[a, typ, diabet, ...|[aimshypothesi, i...|  0.0|
|  7|type+1+diabetes|[annal, express, ...|[backgroundto, cl...|  0.0|
| 66|type+1+diabetes|[adipos, impact, ...|[object, central,...|  0.0|
| 94|type+1+diabetes|[administr, of, v...|[object, two, cas...|  0.0|
|122|type+1+diabetes|[alpha, cell, dys...|[typ, diabet, cha...|  0.0|
| 77|type+1+diabetes|[an, effect, trea...|[nanotechnologi, ...|  0.0|
| 91|type+1+diabetes|[analysi, of, pan...|[background, decr...|  0.0|
| 63|type+1+diabetes|[assess, the, nut...|[object, lowcarbo...|  0.0|
| 61|type+1+diabetes|[associ, between,...|[aim, investig, a...|  0.0|
|  3|type+1+diabetes|[associ, between,...|[aim, depress, mo...|  0.0|
|137|type+1+diabetes

In [23]:
text2vec = spark_ft.Word2Vec(
    vectorSize=100, minCount=5, seed=123, 
    inputCol='text', outputCol='text_vec', 
    windowSize=5, maxSentenceLength=30
)

title2vec = spark_ft.Word2Vec(
    vectorSize=50, minCount=3, seed=123, 
    inputCol='title', outputCol='title_vec', 
    windowSize=5, maxSentenceLength=10
)

assembler = spark_ft.VectorAssembler(inputCols=['text_vec', 'title_vec'], outputCol='features')

feature_pipeline = Pipeline(stages=[text2vec, title2vec, assembler])

In [24]:
feature_model = feature_pipeline.fit(processed)
features = feature_model.transform(processed)

In [25]:
features.show()

+---+---------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+
| id|          topic|               title|                text|label|            text_vec|           title_vec|            features|
+---+---------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+
|  0|type+1+diabetes|[a, luciferas, im...|[aim, luciferas, ...|  0.0|[0.00135010731583...|[-0.0093724036589...|[0.00135010731583...|
|108|type+1+diabetes|[a, typ, diabet, ...|[aimshypothesi, i...|  0.0|[-0.0172051572149...|[-0.0485772364307...|[-0.0172051572149...|
|  7|type+1+diabetes|[annal, express, ...|[backgroundto, cl...|  0.0|[0.03553665489388...|[-0.0297967683523...|[0.03553665489388...|
| 66|type+1+diabetes|[adipos, impact, ...|[object, central,...|  0.0|[0.01813366204482...|[-0.0657454569635...|[0.01813366204482...|
| 94|type+1+diabetes|[administr, of, v...|[object, two, cas...|  0.0|

In [26]:
text2vec_model = text2vec.fit(processed)

In [27]:
text2vec_model.findSynonyms('obes', 10).show()

+-----------+------------------+
|       word|        similarity|
+-----------+------------------+
|       girl|0.8909512758255005|
|   leukemia|0.8899913430213928|
|       foot|0.8795618414878845|
|         dr|0.8758804798126221|
|recentonset|0.8757311701774597|
|osteoporosi|0.8751940131187439|
|     gestat|0.8743801116943359|
| neuropathi|0.8684095144271851|
|        cgl|0.8683326840400696|
|       modi|0.8677959442138672|
+-----------+------------------+



In [28]:
text2vec_model.findSynonyms('trauma', 10).show()

+------------+------------------+
|        word|        similarity|
+------------+------------------+
|      combat| 0.904567301273346|
|      experi|0.8929694890975952|
|  postdeploy|0.8746237754821777|
|  catastroph| 0.864286482334137|
|relationship|0.8641922473907471|
|         tbi|0.8591167330741882|
|         sud|0.8578217029571533|
|     symptom|  0.85660320520401|
|        ptss|0.8558170199394226|
|      buffer|0.8549688458442688|
+------------+------------------+



In [29]:
mlpc = spark_cls.MultilayerPerceptronClassifier(
    maxIter=100, seed=123, layers=[150, 75, 4]
)

model_pipeline = Pipeline(stages=[mlpc, label_deindexer])

In [30]:
model = model_pipeline.fit(features)

In [31]:
test_processed = preproc_model.transform(test).select('id', 'topic', 'title', 'text', 'label')

test_features = feature_model.transform(test_processed)

preds = model.transform(test_features)

In [32]:
preds.show()

+---+---------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+---------------+
| id|          topic|               title|                text|label|            text_vec|           title_vec|            features|prediction|     pred_label|
+---+---------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+---------------+
|115|type+1+diabetes|[a, physic, activ...|[background, thi,...|  0.0|[-0.0181661611564...|[-0.0456972203346...|[-0.0181661611564...|       0.0|type+1+diabetes|
| 47|type+1+diabetes|[a, plateau, in, ...|[object, describ,...|  0.0|[0.02216673717203...|[-0.0611507280264...|[0.02216673717203...|       0.0|type+1+diabetes|
| 73|type+1+diabetes|[a, quarter, of, ...|[individu, typ, d...|  0.0|[0.01657626631090...|[-0.0464318374563...|[0.01657626631090...|       0.0|type+1+diabetes|
| 69|type+1+diabetes|[allcaus, mortal,..

In [33]:
pred_df = preds.select('title', 'text', 'label', 'prediction').toPandas()

In [34]:
pred_df.head()

Unnamed: 0,title,text,label,prediction
0,"[a, physic, activ, intervent, for, children, w...","[background, thi, studi, describ, develop, fea...",0.0,0.0
1,"[a, plateau, in, new, onset, typ, diabet, inci...","[object, describ, incid, preval, typ, diabet, ...",0.0,0.0
2,"[a, quarter, of, patient, with, typ, diabet, h...","[individu, typ, diabet, td, ar, increas, risk,...",0.0,0.0
3,"[allcaus, mortal, in, adult, with, and, withou...","[object, estim, agespecif, sexspecif, allcaus,...",0.0,0.0
4,"[alpha, antitrypsin, treatment, of, newonset, ...","[object, determin, safeti, pharmacokinet, alph...",0.0,0.0


In [35]:
list(enumerate(label_indexer_model.labels))

[(0, 'type+1+diabetes'),
 (1, 'post+traumatic+stress+disorder'),
 (2, 'heart+disease'),
 (3, 'creutzfeldt+jakob+disease')]

In [36]:
pd.DataFrame(
    data=skmetrics.confusion_matrix(pred_df['label'], pred_df['prediction']),
    columns=['pred ' + l for l in label_indexer_model.labels],
    index=['true ' + l for l in label_indexer_model.labels]
)

Unnamed: 0,pred type+1+diabetes,pred post+traumatic+stress+disorder,pred heart+disease,pred creutzfeldt+jakob+disease
true type+1+diabetes,115,0,9,4
true post+traumatic+stress+disorder,1,98,5,0
true heart+disease,9,1,63,1
true creutzfeldt+jakob+disease,0,0,3,92


In [37]:
print(skmetrics.classification_report(pred_df['label'], pred_df['prediction'], 
                                      target_names=label_indexer_model.labels))

                                precision    recall  f1-score   support

               type+1+diabetes       0.92      0.90      0.91       128
post+traumatic+stress+disorder       0.99      0.94      0.97       104
                 heart+disease       0.79      0.85      0.82        74
     creutzfeldt+jakob+disease       0.95      0.97      0.96        95

                   avg / total       0.92      0.92      0.92       401

