## Evidence Export (from Elasticsearch)

This will export ES records from the evidence data index into a parquet dataset (note that these records undergo several normalizations and modifications as well as scoring by comparison to the remote GS OT urls for evidence string files).

In [4]:
%run utils.py
import tqdm
import os
import os.path as osp
import json
import pandas as pd
import numpy as np
import more_itertools
from elasticsearch import helpers
import pyarrow as pa
export_dir = '/lab/data/raw/evidence-data-stage-2.parquet'

In [5]:
es = get_es_client()

In [6]:
# Show example result
es.search(index='master_evidence-data', body={"query": {"match_all": {}}})['hits']['hits'][0]

{'_index': 'master_evidence-data',
 '_type': '_doc',
 '_id': '38c0851b3dba497d71ad645a01f498c3',
 '_score': 1.0,
 '_source': {'access_level': 'public',
  'disease': {'biosample': {'id': 'http://purl.obolibrary.org/obo/UBERON_0002116',
    'name': 'ileum'},
   'efo_info': {'efo_id': 'http://www.ebi.ac.uk/efo/EFO_0000384',
    'label': "Crohn's disease",
    'path': [['EFO_0000408', 'EFO_0000405', 'EFO_0003767', 'EFO_0000384'],
     ['EFO_0000408',
      'EFO_0000540',
      'EFO_0005140',
      'EFO_0003767',
      'EFO_0000384']],
    'therapeutic_area': {'codes': [], 'labels': []}},
   'id': 'EFO_0000384'},
  'evidence': {'comparison_name': "'Crohn's disease' vs 'non inflammatory bowel disease control' in 'A1b Paris classification'",
   'confidence_level': 'medium',
   'date_asserted': '2019-05-14T15:19:25Z',
   'evidence_codes': ['ECO_0000295'],
   'evidence_codes_info': [[{'eco_id': 'ECO_0000295',
      'label': 'RNA-seq evidence'}]],
   'experiment_overview': "RNA-seq of ileal biop

In [7]:
# Setup scanner for entire evidence data index
batch_size = 10000
query = {"query": {"match_all": {}}}
# 1626542 records as of 20191007
res = helpers.scan(es, query, index='master_evidence-data', size=batch_size, scroll='1h')

In [None]:
! [ -d $export_dir ] && rm -rf $export_dir

In [None]:
# Use fixed schema or resulting parquet dataset will be useless otherwise
schema = pa.schema([
    pa.field("access_level", pa.string()),
    pa.field("batch", pa.int64()),
    pa.field("disease", pa.string()),
    pa.field("drug", pa.string()),
    pa.field("evidence", pa.string()),
    pa.field("id", pa.string()),
    pa.field("literature", pa.string()),
    pa.field("private", pa.string()),
    pa.field("scores", pa.string()),
    pa.field("sourceID", pa.string()),
    pa.field("target", pa.string()),
    pa.field("type", pa.string()),
    pa.field("unique_association_fields", pa.string()),
    pa.field("validated_against_schema_version", pa.string())
])

In [10]:
def to_value(v):
    # Convert objects to json string (leave scalars)
    return v if np.isscalar(v) else json.dumps(v)

# Loop through batches and write each as a new chunk in the parquet dataset
for i, batch in enumerate(more_itertools.chunked(tqdm.tqdm(res), batch_size)):
    df = pd.DataFrame([r['_source'] for r in batch])
    df = df.applymap(to_value)
    df['batch'] = i
    df = df[sorted(df.columns.tolist())]
    for f in schema:
        if f.name not in df:
            df[f.name] = None 
        if f.type.to_pandas_dtype() == np.object_:
            df[f.name] = df[f.name].fillna('')
        df[f.name] = df[f.name].astype(f.type.to_pandas_dtype())
    df = df[[f.name for f in schema]]
    df.to_parquet(export_dir, index=False, partition_cols=['batch'])

1626543it [17:06, 1584.12it/s]


In [11]:
!du -ch $export_dir

18M	/lab/data/raw/evidence-data-stage-2.parquet/batch=103
18M	/lab/data/raw/evidence-data-stage-2.parquet/batch=104
18M	/lab/data/raw/evidence-data-stage-2.parquet/batch=132
18M	/lab/data/raw/evidence-data-stage-2.parquet/batch=135
12M	/lab/data/raw/evidence-data-stage-2.parquet/batch=161
12M	/lab/data/raw/evidence-data-stage-2.parquet/batch=159
11M	/lab/data/raw/evidence-data-stage-2.parquet/batch=150
11M	/lab/data/raw/evidence-data-stage-2.parquet/batch=157
18M	/lab/data/raw/evidence-data-stage-2.parquet/batch=134
18M	/lab/data/raw/evidence-data-stage-2.parquet/batch=133
19M	/lab/data/raw/evidence-data-stage-2.parquet/batch=105
18M	/lab/data/raw/evidence-data-stage-2.parquet/batch=102
11M	/lab/data/raw/evidence-data-stage-2.parquet/batch=156
11M	/lab/data/raw/evidence-data-stage-2.parquet/batch=151
11M	/lab/data/raw/evidence-data-stage-2.parquet/batch=158
12M	/lab/data/raw/evidence-data-stage-2.parquet/batch=160
11M	/lab/data/raw/evidence-data-stage-2.parquet/batch=31
11M	/lab/data/r