# Extract note text for a large set of notes from ElasticSearch

This notebook script:
1. Connects to an ElasticSearch instance.
2. Extracts a set of elements in an index in the ES instance. The index is based on text extracted from notes in Neptune.
3. Distributes the extracted elements in files in a particular folder structure on a Linux "local machine".

## Background
The extraction processing is brittle, and subject to many interruptions. A Jupyter notebook will not be able to extract more than around 25K notes from ES in the current configuration before something interrupts. The most likely form of interruption is a VPN timeout.

It is necessary to extract in well-defined batches so that we can keep track of progress.

## Output file structure
Output files will be distributed as follows:
1. Folder structure
- There will be subfolders of the output root corresponding to the numbers 0-9.
- Under each numbered folder will be subfolders for all study ids that end in the number.
- In each study id folder will be text files for all notes extracted for the study id.
2. Each file will be named with the STUDY_NOTE_CSN_ID.

For example, the note with STUDY_NOTE_CSN_ID 25 for patient with study ID 998 will be in a file named 25.txt in folder path _output root_/8/998.



## Assumptions
1. An active VPN channel has been established . (This script was intended to run in DBMI's Jupyter environment, so a user cannot start a Jupyter session without first
establishing a VPN connection.)
2. You have an account with access to the Elastic Search instance.
3. File writes have been tested only to the server that hosts the Jupyter environment. Ideally, the script would write to a Globus collection.
4. An extremely large number of notes (millions).
5. The notes to be extracted are specified in a spreadsheet with at least the following columns:
- STUDY_ID
- NOTE_CSN_ID
- STUDY_NOTE_ID
- STUDY_NOTE_CSN_ID


## To extract note text
1. In the cell marked **LOCAL ELASTICSEARCH CONNECTION**, follow the instructions to set up a local configuration file for connection to the ElasticSearch instance. You should not need to modify the code in the section.
2. Move to the cell marked **GENERATION PARAMETERS**.

- Edit values of variables that the script uses to extract a set of notes.  This includes the "ordinal range" variables **note_start** and **note_end**. For example, **note_start** = 25000 and **note_end** = 50000 will extract the 25000th through the 50000th notes in the input CSV.

- From the Run menu, Run All Above Selected cell.
3. In the cell marked **RUN SCRIPT**, run the script. 

The script will:
- write extracted output to the output folder path
- write log files for both all files and missing files


## To summarize work:
After running all batches, you can use the code in the **CONSOLIDATE LIST FOR ALL BATCHES** cell.


## Utility code
The code blocks identified as **UTILITY CODE*** contain utility functions for working with ElasticSearch. They should not need to be edited. Ideally, this utility code would be encapsulated in a Python class and imported.


## Logging
It is necessary to work in batches (e.g., 100K notes at a time) to avoid interruptions. It is also likely that some of the requested notes will not be in the ElasticSearch index. To keep track of both what *was* extracted and what *was not* extracted, the script will log progress for a batch in a spreadsheet. 

The spreadsheet will be named with format **NOTES-STATUS-_start_-_end_.csv**, where _start_ and _end_ correspond to a ordinal number of notes in the input spreadsheet. For example, a file named NOTES-STATUS-10000-20000.csv will track the status of extraction of the 10000th through the 20000th note.

The notebook will also build a file named **MISSING-_start_-_end_.csv**, containing only the values of STUDY_NOTE_CSN_ID for notes that were not extracted.

Finally, the script logs uses Python logging to document information on the overall run.

## Parallelization
If you need to extract a very large number of notes, you can reduce extraction time by establishing multiple parallel streams. To do this:
1. Create multiple copies of this script.
2. In each copy of the script, edit the values of **logfile** and **logapp** in the **Python logging** cell. This will allow you to track progress of each instance of the script in separate logs.
3. Run each instance with a different ordinal range. For example, to extract 100,000 notes in 4 batches of 25K, use:
- 0 to 25000
- 25001 to 50000
- 50001 to 75000
- 75001 to 100000

Note that this form of parallelization requires discipline to keep track of what you've done, especially when working with a large number of files.
Fortunately, the actual extraction of a single note does not take long. In the worst case, just re-extract a batch.

In [None]:
# UTILITY CODE

import configparser
import re
import csv
from pathlib import Path

import pandas as pd
import os

# Elasticsearch python libs
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from ssl import create_default_context
from elasticsearch_dsl import Search

import time
from datetime import timedelta
from datetime import datetime



In [None]:
# UTILITY CODE

####################################################
# Helper functions, add any custom functions here
####################################################

# function make substitutions for newlines
def reformat_text(text):
    #return re.sub(r'[^\s]([ ]{5,})[^\s]', '<br>', text)
    return re.sub(r'[ ]{3}[ ]+', '\n', text)


def space_checker(text):
    SP_THRESHOLD = 6
    text_pos = 0
    sp_pos = 0
    for c in text:
        if c.isspace():
            if sp_cnt == 0:
                sp_pos = text_pos
            sp_cnt += 1
            if sp_cnt > SP_THRESHOLD:
                print('found space groups of threshold @ {},{}'.format(sp_pos, text_pos))
        else:
            sp_pos = 0
            sp_cnt = 0
        text_pos += 1
            
def get_single_report(id):
    reports = []
    s = Search(using=es, index=LOCAL['elasticsearch']['INDEX']) \
            .query("match", NOTE_CSN_ID=id)
    
    # convert to a dictionary for the DSL
    body = s.to_dict()
    
     # do the search with the body
    response = es.search(body=body)
    
    hits = response['hits']['hits']   
    text = reformat_text(hits[0]['_source']['NOTE_TEXT'])   # experimental code
    #text = hits[0]['_source']['NOTE_TEXT']
    #space_checker(hits[0]['_source']['NOTE_TEXT'])     # experimental code

    return text

# parameters:  
#    fname, report text,  html flag (true/false)
def save_to_file(fname, text, outdir, html):
    
    if html:
        filename = '{}/{}.html'.format(outdir, fname)
    else:
        filename = '{}/{}.txt'.format(outdir, fname)
        
    with open(filename, 'w') as f:
        if html:
            f.write('<html>\n')
            f.write('<p>\n')
            
        # write the report text
        f.write(text)
        
        if html:
            f.write('</p>\n')
            f.write('</html>\n')
    f.close()
    
def get_file_size(fname, text, outdir, html):
    if html:
        filename = '{}/{}.html'.format(outdir, fname)
    else:
        filename = '{}/{}.txt'.format(outdir, fname)
    
    return os.path.getsize(filename)
    
       

In [None]:
# UTILITY CODE

##############################################################
# Load Elasticsearch settings from a common config file stored in the local file system of the R3 Jupyter host machine.
# Do not modify
###############################################################
config = configparser.ConfigParser()
config.read('/data/R3_Library/es_settings.ini')
COMMON_SETTINGS = config
CERT = COMMON_SETTINGS['cert']['CERT_FILE']

In [None]:
# LOCAL ELASTICSEARCH CONFIGURATION

# Create a local Kibana (ElasticSearch) configuration file with the following content:

#-----------content

#[cert]
#CERT_FILE=<path to the common certificate--e.g., '/data/R3_Library/Certs/notes01.revchain.crt'>

#[elasticsearch]
#INDEX=<custom index name or use 'neptune'>
#HOST=<host name for the Kibana instance--e.g., 'dbmi-neptune-notes-01.dbmi.pitt.edu'>
#PORT=<port>

#[user]
#USERNAME=<your username for the Kibana instance>
#PASSWORD=<your password for the Kibana instance>

#-----------content

# NOTE:  if you are using your own index you can specify that name in INDEX;  if you have
#        have access to the full index use 'neptune'

#Indicate the path to the file. For example, you can store the file in the same Jupyter directory in which you run this notebook.
local_config_path = 'es_local_settings.ini'

config.read(local_config_path)
LOCAL = config

# set the ssl context to communicate via https
ssl_context = create_default_context(cafile=CERT)

# initalize the Elasticsearch client
es = Elasticsearch(COMMON_SETTINGS['elasticsearch']['HOST'],
		http_auth=(LOCAL['user']['USERNAME'], LOCAL['user']['PASSWORD']),
		scheme="https",
		port=COMMON_SETTINGS['elasticsearch']['PORT'],
		ssl_context=ssl_context
		)

# check for activate connection
if not es.ping():
    raise ValueError("Connection failed")

In [None]:
#UTILITY CODE
#Python logging.

#Assumptions:
#1. Log file will be written to application folder.
#2. The application folder contains the ini file for logging.
#3. The ini file content conforms to recommendations in https://www.datadoghq.com/blog/python-logging-best-practices/

import logging.config
log_dir = '' # Application folder
log_config = 'logging.ini' # Configures logging
logfile = 'NoteExporter3.log'
logapp = 'NoteExporter3'
logger = logging.getLogger(logapp)
logging.config.fileConfig(log_config, disable_existing_loggers=False, defaults={'log_file':logfile})

#Filter out elasticsearch info.
logger_es = logging.getLogger('elasticsearch')
logger_es.setLevel(logging.WARN)

def print_and_logger_info(message: str) -> None:
    print(message)
    logger.info(message)


In [None]:
#GENERATION PARAMETERS
#--------
#1. Full path to file of notes to extract.
id_file = '/data/R3_Library/2887_Hochheiser_Melanoma_Encounter/input/R3_2887_HOCHHEISER_ENCOUNTER_NOTES_2022_07_07.csv'

#2. Full paths to output directories
outdir = '/data/R3_Library/2887_Hochheiser_Melanoma_Encounter/output'
trackingdir = outdir + '/tracking'
missingdir = outdir + '/missing'
jupytertrackingdir = os.getcwd()+'/tracking'
jupytermissingdir = os.getcwd()+'/missing'

#3. Ordinal range of notes to extract in this batch.
note_start = 2450000
note_end=2475000

#Set read_input to False if doing repeated runs, so that the script will use the input already in memory.
read_input = True
#--------

In [None]:
# RUN SCRIPT

print_and_logger_info('************************')
print_and_logger_info('NOTE EXTRACTION STARTED')

#Output tracking files.
tracking_col_names =  ['STUDY_ID','STUDY_NOTE_CSN_ID','status', 'size','date','time']

dfTracking = pd.DataFrame(columns = tracking_col_names)
tracking_file = 'NOTES_STATUS_' + str(note_start) + '_' + str(note_end) +'.csv'
tracking_path = trackingdir + '/' + tracking_file

missing_col_names = ['STUDY_NOTE_CSN_ID']
dfmissing = pd.DataFrame(columns = missing_col_names)
missing_file = 'MISSING_' + str(note_start) + '_' + str(note_end) +'.csv'
missing_path = missingdir + '/' + missing_file

if read_input:
    print_and_logger_info(f'Reading input spreadsheet: {id_file}')
    csvreader = pd.read_csv(id_file, sep=',', dtype='str', na_filter=False)[['STUDY_ID','NOTE_CSN_ID','STUDY_NOTE_CSN_ID']]
else:
    print_and_logger_info(f'Using input spreadsheet data in memory for file: {id_file}')

print_and_logger_info(f'Total number of notes in spreadsheet: {len(csvreader.index)}')
print_and_logger_info(f'Range of notes to extract: {str(note_start)} to {str(note_end)}')
print_and_logger_info('Extracting notes')

#Track overall generation time.
start_time = time.time()


# Because of the large number of files, files will be distributed to the following output file folder structure:
# output
# --number from 0-9, corresponding to the last number of the study ID.
# |
#   --study ID
#   |
#     --study_note_csn_id.txt

#Create the output subfolders.
os.system(f"mkdir -p {outdir}")
for i in range(0,10):
    numdir = outdir+"/"+str(i)
    os.system(f"mkdir -p {numdir}")
    
#Create the tracking folders.
os.system(f'mkdir -p {trackingdir}')
os.system(f'mkdir -p {missingdir}')
    
percentdisplay = 0
for index, rows in csvreader.iterrows():
    
    if index > note_end:
        break
    if index >= note_start and index <= note_end:
        #Poor man's TQDM.
        percentdone = int(round(1 - (note_end-index)/(note_end-note_start),1)*100)
        if percentdone % 10 == 0 and percentdone <100:
            if percentdone != percentdisplay:
                percentdisplay = percentdone
                print(f'{percentdone}% complete')
        
        try:
            #Extract note text from Elastic Search, using the Epic NOTE_CSN_ID.
            text = get_single_report(rows['NOTE_CSN_ID'])
            
            #Create the output subfolder for the study id.
            thepath = outdir+"/"+rows['STUDY_ID'][-1]+"/"+rows['STUDY_ID']
            os.system(f"mkdir -p {thepath}")
            
            #Write the extracted text.
            save_to_file(rows['STUDY_NOTE_CSN_ID'], text, thepath, False)
            filesize = get_file_size(rows['STUDY_NOTE_CSN_ID'], text, thepath, False)
            
            #Indicate the status.
            genstatus = 'extracted'
            
        except:
            #Indicate the status.
            genstatus = 'failed'
            filesize = 0
            
            #Log the missing file.
            dfmissing.loc[len(dfmissing.index)] = [rows['STUDY_NOTE_CSN_ID']]
    
        #Log the status.
        dictStatus={'STUDY_ID':[rows['STUDY_ID']],'STUDY_NOTE_CSN_ID':[rows['STUDY_NOTE_CSN_ID']],'status':[genstatus],'size':[filesize],'date':[datetime.now().strftime("%m/%d/%Y")],'time':[datetime.now().strftime("%H:%M:%S")]}
        dfStatus = pd.DataFrame(data=dictStatus,columns=tracking_col_names)
        dfTracking = pd.concat([dfTracking,dfStatus])

    

#Write logs to two locations:
#1. In the output path (presumably in R3 data path)
#2. Locally to Jupyter (so it is not necessary to copy in order to view it easily.)

dfTracking.to_csv(tracking_path, mode='w', header=True, index=False)
dfTracking.to_csv(os.getcwd()+ tracking_file, mode='w', header=True, index=False)
dfmissing.to_csv(missing_path, mode='w', header=True, index=False)
dfmissing.to_csv(os.getcwd()+ missing_file, mode='w', header=True, index=False)

#Log statistics for batch.
print_and_logger_info(f'Completed batch run for notes in range: {str(note_start)} to {str(note_end)}')
print_and_logger_info(f'Number of missing notes in this batch: {len(dfmissing.index)}')
elapsed_time = time.time() - start_time
print_and_logger_info(f'Extraction time for this batch: {"{:0>8}".format(str(timedelta(seconds=elapsed_time)))}')
totsize = dfTracking['size'].sum()
print_and_logger_info(f'Total size of files extracted for this batch: {totsize/1024:.2f} KB ({totsize/1024**2:.3f} MB)')
print_and_logger_info('DONE')



# Consolidate lists

Union all of the NOTES_STATUS..csv and  MISSING..csv files into single files.

In [None]:
def consolidate_list(listtype):
    
    tracking_col_names =  ['STUDY_ID','STUDY_NOTE_CSN_ID','status', 'size','date','time']
    missing_col_names = ['STUDY_NOTE_CSN_ID']

    if listtype =='missing':
        cols = missing_col_names
        listdir = missingdir
        jupyterlistdir = jupytermissingdir
        allfile = 'MISSING_ALL.csv'
        
    elif listtype == 'status':
        cols = tracking_col_names
        listdir = trackingdir
        jupyterlistdir = jupytertrackingdir
        allfile = 'NOTES_STATUS_ALL.csv'
    else:
        raise('Unknown list type: ',listtype)
    
    allpath = os.path.join(listdir,allfile)
    dfall = pd.DataFrame(columns = cols)

    #Get list of files in the missing directory.
    listfiles = os.listdir(listdir)

    #Append contents of each file to the consolidated file.
    for path in os.listdir(listdir):
        if path !=allfile:
            path_file = os.path.join(listdir,path)
            dflist = pd.read_csv(path_file, sep=',', dtype='str', na_filter=False)
            dfall = pd.concat([dfall,dflist],ignore_index=True).drop_duplicates()

    #Write consolidated file.
    dfall.to_csv(allpath, mode='w', header=True, index=False)
    dfall.to_csv(os.path.join(jupyterlistdir,allfile), mode='w', header=True, index=False)
    
    return


In [None]:
# CONSOLIDATE LISTS FOR ALL BATCHES

# argument: 
#   missing - generate a 'MISSING_ALL.csv' file
#   status - generate a 'NOTES_STATUS_ALL.csv' file
consolidate_list('missing')