Skip to content

Commit

Permalink
Drop enormous cloudy SQL and do local Python
Browse files Browse the repository at this point in the history
Remove 200 lines of SQL, and add 100 lines of Python. Clouds are sexier than
local work, but the size of this data is tractable.  Related to ebmdatalab#121.

Don't extract the clinicaltrials.gov zipfile. We can read from it just as well
as reading from the filesystem. Save tons of space.
  • Loading branch information
chadmiller committed Mar 14, 2018
1 parent b853d38 commit 657574b
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 334 deletions.
141 changes: 141 additions & 0 deletions extraction.py
@@ -0,0 +1,141 @@

import re
from datetime import datetime
from dateutil.relativedelta import relativedelta
import xml.etree.ElementTree as ET


def st(node):
"""Safely strip strings but leave Nones alone."""
if node is None: return None
removed = None
return node.text.strip(removed)


def first_date(r, paths):
"""Render the first-nonnull value to a date, conservatively making day-less
months into end-of-month dates."""
# TODO(chad): Fancy date math is unnecessary. Change all date handling to string handling and compare dates lexically. Worth 10 sec.
for path in paths:
val = r.find(path)
if val is None: continue
try:
d = datetime.strptime(val.text.strip(), "%B %d, %Y")
is_exact = True
except ValueError:
# No day, so push to end of that month.
d = datetime.strptime(val.text.strip(), "%B %Y").replace(day=1) + relativedelta(months=+1) + relativedelta(days=-1)
is_exact = False
return d, is_exact
return None, None


def document_to_record(xml_bytes):
r = ET.fromstring(xml_bytes)
d = {}

phase = st(r.find("phase"))
if not phase in ('Phase 1/Phase 2', 'Phase 2', 'Phase 2/Phase 3', 'Phase 3', 'Phase 4', 'N/A'): return

study_status = st(r.find("overall_status"))
if not study_status != 'Withdrawn': return

primary_purpose = st(r.find("study_design_info/primary_purpose"))
if not primary_purpose != 'Device Feasibility': return

now = datetime.now()

study_type = st(r.find("study_type"))
start_date, _ = first_date(r, ["start_date"])
fda_reg_drug = st(r.find("oversight_info/is_fda_regulated_drug"))
fda_reg_device = st(r.find("oversight_info/is_fda_regulated_device"))
primary_completion_date, primary_completion_date_is_exact = first_date(r, ["primary_completion_date"])
completion_date, completion_date_is_exact = first_date(r, ["completion_date"])
available_completion_date = primary_completion_date or completion_date
intervention = st(r.find("intervention"))

is_fda_regulated = fda_reg_drug == "Yes" or fda_reg_device == "Yes" # Probably wrong.

if study_type == 'Interventional' and \
(fda_reg_drug == 'Yes' or fda_reg_device == 'Yes') and \
primary_purpose != 'Device Feasibility' and \
start_date is not None and start_date >= datetime(year=2017, month=1, day=18):
act_flag = 1
else:
act_flag = 0

if study_type == 'Interventional' and \
intervention and re.search(r"Biological|Drug|Device|Genetic|Radiation", intervention) and \
available_completion_date >= datetime(year=2017, month=1, day=18) and \
start_date < datetime(year=2017, month=1, day=18) and \
((fda_reg_drug == 'Yes' or fda_reg_device == 'Yes') or (is_fda_regulated and fda_reg_drug is None and fda_reg_device is None)) and \
location and re.search(r"\b(?:United States|American Samoa|Guam|Northern Mariana Islands|Puerto Rico|U\.S\. Virgin Islands)\b", location):
included_pact_flag = 1
else:
included_pact_flag = 0

if included_pact_flag != 1 and act_flag != 1: return

certificate_date, _ = first_date(r, ["disposition_first_submitted"])
has_results = int(r.find("clinical_results") is not None)

official_title = st(r.find("official_title"))
brief_title = st(r.find("brief_title"))

if (primary_completion_date is None or primary_completion_date < now) and \
completion_date < now and \
study_status in {'Not yet recruiting', 'Active, not recruiting', 'Recruiting', 'Enrolling by invitation', 'Unknown status', 'Available', 'Suspended'}:
discrep_date_status = 1
else:
discrep_date_status = 0

collaborators = " / ".join("{0}={1}".format(i.find("agency").text, i.find("agency_class").text.replace("=", " eq ")) for i in r.findall("sponsors/lead_sponsor")+r.findall("sponsors/collaborator"))


d["nct_id"] = st(r.find("id_info/nct_id"))
d["act_flag"] = act_flag
d["included_pact_flag"] = included_pact_flag
d["location"] = st(r.find("location"))
d["exported"] = st(r.find("oversight_info/is_us_export"))
d["phase"] = phase
d["start_date"] = start_date.strftime("%Y-%m-%d")
d["available_completion_date"] = available_completion_date.strftime("%Y-%m-%d")
d["legacy_fda_regulated"] = int(is_fda_regulated)
d["primary_completion_date_used"] = int(primary_completion_date is not None)
d["has_results"] = has_results
d["results_submitted_date"], _ = first_date(r, ["results_first_submitted"])
d["has_certificate"] = int(certificate_date is not None)
d["certificate_date"] = certificate_date
d["results_due"] = int(certificate_date is None or certificate_date + relativedelta(years=3, days=30) > now)

d["primary_completion_date_used"] = int(primary_completion_date is not None)
d["defaulted_pcd_flag"] = int(primary_completion_date_is_exact == False)
d["defaulted_cd_flag"] = int(completion_date_is_exact == False)
#last_updated_date, --The last date the trial record was updated
d["defaulted_cd_flag"] = st(r.find("enrollment"))
d["study_status"] = st(r.find("overall_status"))
d["study_type"] = study_type
d["collaborators"] = collaborators
d["primary_purpose"] = primary_purpose
d["sponsor"] = st(r.find("sponsors/lead_sponsor/agency"))
d["sponsor_type"] = st(r.find("sponsors/lead_sponsor.agency_class"))

d["fda_reg_drug"] = fda_reg_drug
d["fda_reg_device"] = fda_reg_device

d["url"] = st(r.find("required_header/url"))
d["title"] = official_title or brief_title
d["official_title"] = official_title
d["brief_title"] = brief_title

d["discrep_date_status"] = discrep_date_status
d["late_cert"] = int(certificate_date is not None and available_completion_date is not None and certificate_date > available_completion_date + relativedelta(years=1))

d["defaulted_date"] = not primary_completion_date_is_exact or not completion_date_is_exact

d["condition"] = st(r.find("condition"))
d["condition_mesh"] = st(r.find("condition_browse"))
d["intervention"] = intervention
d["intervention_mesh"] = st(r.find("intervention_browse"))

return d
151 changes: 41 additions & 110 deletions load_data.py
@@ -1,12 +1,7 @@
# -*- coding: utf-8 -*-
import logging
import traceback

from bigquery import Client
from bigquery import TableExporter
from bigquery import wait_for_job
from bigquery import gen_job_name
import xmltodict
import sys
import os
import subprocess
import json
Expand All @@ -17,27 +12,47 @@
import requests
import contextlib
import re
from google.cloud.exceptions import NotFound
from xml.parsers.expat import ExpatError
import zipfile
from csv import DictWriter

import extraction


STORAGE_PREFIX = 'clinicaltrials/'
WORKING_VOLUME = '/mnt/volume-lon1-01/' # location with at least 10GB space
WORKING_DIR = WORKING_VOLUME + STORAGE_PREFIX
STORAGE_PREFIX = 'clinicaltrials-data/'
WORKING_VOLUME = os.environ.get('DATA_DIR', '/mnt/volume-lon1-01/') # location with at least 10GB space
WORKING_DIR = os.path.join(WORKING_VOLUME, STORAGE_PREFIX)

logging.basicConfig(filename='{}data_load.log'.format(WORKING_VOLUME), level=logging.DEBUG)

def raw_json_name():
date = datetime.datetime.now().strftime('%Y-%m-%d')
return "raw_clincialtrials_json_{}.csv".format(date)

def document_stream(zip_filename):
with zipfile.ZipFile(zip_filename, 'r') as enormous_zipfile:
for name in enormous_zipfile.namelist():
if "NCT" not in name or not name.endswith(".xml"):
continue
yield name, enormous_zipfile.read(name)

def postprocessor(path, key, value):
"""Convert key names to something bigquery compatible
"""
if key.startswith('#') or key.startswith('@'):
key = key[1:]
return key, value

def fabricate_csv(input_filename, output_filename):
_, temp_output_filename = tempfile.mkstemp()

columns = [
'nct_id', 'act_flag', 'included_pact_flag', 'location', 'exported', 'phase', 'start_date', 'available_completion_date', 'legacy_fda_regulated', 'primary_completion_date_used', 'has_results', 'results_submitted_date', 'has_certificate', 'certificate_date', 'results_due', 'study_status', 'study_type', 'primary_purpose', 'fda_reg_drug', 'fda_reg_device', 'sponsor', 'sponsor_type', 'url', 'title', 'condition', 'condition_mesh', 'intervention', 'intervention_mesh',

'brief_title', 'collaborators', 'defaulted_cd_flag', 'defaulted_date', 'defaulted_pcd_flag', 'discrep_date_status', 'late_cert', 'official_title', ]
with open(temp_output_filename, 'w') as out:
csv = DictWriter(out, columns)
csv.writeheader()
for name, xmldoc in document_stream(input_filename):
try:
row = extraction.document_to_record(xmldoc)
if row:
csv.writerow(row)
except Exception as exc:
logging.exception("Couldn't get data from %s.", name)
raise

os.rename(temp_output_filename, output_filename)


def download_and_extract():
Expand All @@ -46,24 +61,9 @@ def download_and_extract():
"""
logging.info("Downloading. This takes at least 30 mins on a fast connection!")
url = 'https://clinicaltrials.gov/AllPublicXML.zip'

# download and extract
container = tempfile.mkdtemp(prefix=STORAGE_PREFIX.rstrip(os.sep), dir=WORKING_VOLUME)
try:
data_file = os.path.join(container, "data.zip")
subprocess.check_call(["wget", "-q", "-O", data_file, url])
# Can't "wget|unzip" in a pipe because zipfiles have index at end of file.
with contextlib.suppress(OSError):
shutil.rmtree(WORKING_DIR)
subprocess.check_call(["unzip", "-q", "-o", "-d", WORKING_DIR, data_file])
finally:
shutil.rmtree(container)


def upload_to_cloud():
# XXX we should periodically delete old ones of these
logging.info("Uploading to cloud")
subprocess.check_call(["gsutil", "cp", "{}{}".format(WORKING_DIR, raw_json_name()), "gs://ebmdatalab/{}".format(STORAGE_PREFIX)])
data_file = os.path.join(WORKING_DIR, "clinicaltrialsgov-allxml.zip")
return data_file
subprocess.check_call(["wget", "-q", "-O", data_file, url])


def notify_slack(message):
Expand All @@ -83,82 +83,13 @@ def notify_slack(message):
)


def convert_to_json():
logging.info("Converting to JSON...")
dpath = WORKING_DIR + 'NCT*/'
files = [x for x in sorted(glob.glob(dpath + '*.xml'))]
start = datetime.datetime.now()
completed = 0
with open(WORKING_DIR + raw_json_name(), 'a') as f2:
for source in files:
logging.info("Converting %s", source)
with open(source, 'rb') as f:
try:
f2.write(
json.dumps(
xmltodict.parse(
f,
item_depth=0,
postprocessor=postprocessor)
) + "\n")
except ExpatError:
logging.warn("Unable to parse %s", source)

completed += 1
if completed % 100 == 0:
elapsed = datetime.datetime.now() - start
per_file = elapsed.seconds / completed
remaining = int(per_file * (len(files) - completed) / 60.0)
logging.info("%s minutes remaining", remaining)



def convert_and_download():
logging.info("Executing SQL in cloud and downloading results...")
storage_path = STORAGE_PREFIX + raw_json_name()
schema = [
{'name': 'json', 'type': 'string'},
]
client = Client('clinicaltrials')
tmp_client = Client('tmp_eu')
table_name = 'current_raw_json'
tmp_table = tmp_client.dataset.table("clincialtrials_tmp_{}".format(gen_job_name()))
with contextlib.suppress(NotFound):
table = client.get_table(table_name)
table.gcbq_table.delete()

table = client.create_storage_backed_table(
table_name,
schema,
storage_path
)
sql_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'view.sql')
with open(sql_path, 'r') as sql_file:
job = table.gcbq_client.run_async_query(gen_job_name(), sql_file.read())
job.destination = tmp_table
job.use_legacy_sql = False
job.write_disposition = 'WRITE_TRUNCATE'
job.begin()

# The call to .run_async_query() might return before results are actually ready.
# See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#timeoutMs
wait_for_job(job)
t1_exporter = TableExporter(tmp_table, STORAGE_PREFIX + 'test_table-')
t1_exporter.export_to_storage()

#with tempfile.NamedTemporaryFile(mode='r+') as f:
with open('/tmp/clinical_trials.csv', 'w') as f:
t1_exporter.download_from_storage_and_unzip(f)


if __name__ == '__main__':
with contextlib.suppress(OSError):
os.remove("/tmp/clinical_trials.csv")
try:
download_and_extract()
convert_to_json()
upload_to_cloud()
convert_and_download()
enormous_zipfile = download_and_extract()
fabricate_csv(enormous_zipfile, '/tmp/clinical_trials.csv')

env = os.environ.copy()
with open(os.environ.get("UPLOAD_SETTINGS", "/etc/profile.d/fdaaa_staging.sh")) as e:
for k, _, v in re.findall(r"""^\s+export\s+([A-Z][A-Z0-9_]*)=([\"']?)(\S+|.*)\2""", e.read(), re.MULTILINE):
Expand Down

0 comments on commit 657574b

Please sign in to comment.