## Download and Parse MEDLINE Abstracts
This Notebook describes the way you can download and parse the publically available Medline Abstracts. There are about 812 XML files that are available on the ftp server. Each XML file conatins about 30,000 Document Abstracts.
<ul>
<li> First we download the Medline XMLs from their FTP Server and store them in a local directory on the head node of the Spark Cluster </li>
<li> Next we parse the XMLs using a publically available Medline Parser and store the parsed content in Tab separated files on the container associated with the spark cluster. </li>
</ul>
<br>Note: This Notebook is meant to be run on a Spark Cluster. If you are running it through a jupyter notebbok, make sure to use the PySpark Kernel.

#### Using the Parser 
Download and install the pubmed_parser library into the spark cluster nodes. You can us the egg file available in the repo or  produce the .egg file by running<br>
<b>python setup.py bdist_egg </b><br>
in repository and add import for it. The egg file file can be read from the blob storage. Once you have the egg file ready you can put it in the container associated with your spark cluster.
<br>

#### Installing a additional packages on Spark Nodes
To install additional packages you need to use script action from the azure portal. see <a href = "https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-customize-cluster-linux">this</a>  <br>
Here's an example:
<br> To install unidecode, you can use script action (on your Spark Cluster)
<br>add the following lines to your script file (.sh)
<br><b>#!/usr/bin/env bash
<br>/usr/bin/anaconda/bin/conda install unidecode</b>


In [2]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import requests
import lxml
import pip
import unidecode #use script action to install it
import os

#Specify the path of the egg file
spark.sparkContext.addPyFile('wasb:///pubmed_parser-0.1-py2.7.egg')

sc

<pyspark.context.SparkContext object at 0x7f8e9a01dd50>

In [None]:
home_dir = os.getcwd()
medline_dir = os.path.join(home_dir, 'ftp.nlm.nih.gov', 'nlmdata','.medleasebaseline','gz')
os.listdir(home_dir)

<b>Download the files </b>

In [4]:
import subprocess
import os
home_dir = os.getcwd()
medline_dir = os.path.join(home_dir, 'ftp.nlm.nih.gov', 'nlmdata','.medleasebaseline','gz')
def download_xml_gz_files(num):    
    print("Download MEDLINE for the first time") 
    for i in range(num, num + 1):    
        remote_filename = 'ftp://ftp.nlm.nih.gov/nlmdata/.medleasebaseline/gz/medline16n%04d.xml.gz' % i
        print 'downloading %s .....' % remote_filename
        local_filename = os.path.join(medline_dir, 'medline16n%04d.xml.gz' % i)
        
        #don't download the xml file if it was already downloaded
        if not os.path.exists(local_filename):
            subprocess.call(['wget', '-x', remote_filename, '-r'])                   

<b> Function to save files with text separated by the specified delimiter </b>

In [5]:
def saveDfToCsv(df, tsvOutputDir, sep, includeHeader):        
    df.repartition(1).write.\
        format("com.databricks.spark.csv").\
        option("header", includeHeader).\
        option("delimiter", sep).\
        save(tsvOutputDir,  mode='overwrite')

<b> Parse the XMLs and save them as a Tab separated File </b><br>
There are a total of 812 XML files. It would take time for downloading that much data. Its advisable to do it in batches of 50.
Downloading and parsing 1 file takes approximately 25-30 seconds. 

In [6]:
import os
from glob import glob
import pubmed_parser as pp
from pyspark.sql import SparkSession
from pyspark.sql import Row  
from pyspark.sql.functions import regexp_replace          

home_dir = os.getcwd()
medline_dir = os.path.join(home_dir, 'ftp.nlm.nih.gov', 'nlmdata','.medleasebaseline','gz')

#Change 813 to a smaller number if you want to test. Downloading and Parsing 1 file takes ~25-30 seconds. 
for i in range(1, 813): 
    download_xml_gz_files(i)
    file_collection = [os.path.join(medline_dir,'medline16n%04d.xml.gz'%x)  
                       for x in range(i, i+1)]
    medline_files_rdd = sc.parallelize(file_collection, numSlices=6000)
    for x in file_collection:
        print 'processing %s .....' % os.path.basename(x)
        dicts_out = pp.parse_medline_xml(x)
        parse_results_rdd = medline_files_rdd.\
            flatMap(lambda x: [Row(file_name = os.path.basename(x), **publication_dict) 
                               for publication_dict in dicts_out])
    
    #convert RDD into dataframe
    parse_results_df = parse_results_rdd.toDF()
    
    #Remove additional new line characters present in the Affiliations field"
    parse_results_df = parse_results_df.withColumn("affiliation", regexp_replace("affiliation", "[^\\S]", " "))
    
    tsvOutputDir = 'wasb:///medline_baseline/' + str(i)    
    saveDfToCsv(parse_results_df, tsvOutputDir, "\t", "true")

Download MEDLINE for the first time
downloading ftp://ftp.nlm.nih.gov/nlmdata/.medleasebaseline/gz/medline16n0001.xml.gz .....
processing medline16n0001.xml.gz .....
Download MEDLINE for the first time
downloading ftp://ftp.nlm.nih.gov/nlmdata/.medleasebaseline/gz/medline16n0002.xml.gz .....
processing medline16n0002.xml.gz .....