## TODO/uncertainties

* Do we only care about nth-phase studies? Or is any study that progresses promising for stock price?
* If we need to run the query repeatedly rather than one mammoth operation, refine the process (e.g. use `fields` query param to reduce what we get

In [148]:
import requests
from datetime import datetime, timedelta

def get_clinical_trials(page_token=None):
    # Base URL for the API
    url = "https://clinicaltrials.gov/api/v2/studies?aggFilters=results%3Awith&sort=LastUpdatePostDate"
    
    if page_token:
        url = url + f"&pageToken={page_token}"
    
    try:
        # Make the GET request
        response = requests.get(url)
        
        # Raise an exception for bad status codes
        response.raise_for_status()
        
        # Parse JSON response into a dictionary
        data = response.json()
        
        return data
        
    except requests.exceptions.RequestException as e:
        print(f"Error making request: {e}")
        return None
    except ValueError as e:
        print(f"Error parsing JSON: {e}")
        return None
    
trials = get_clinical_trials() 
trials2 = get_clinical_trials(trials['nextPageToken'])

In [146]:
# Convert to a datetime object
def convert_date(date_string):
    return datetime.strptime(date_string, '%Y-%m-%d').date()

def trial_date(trial):
    date_string = trial['protocolSection']['statusModule']['lastUpdatePostDateStruct']['date']
    return convert_date(date_string)

def org_info(trial):
    """Returns dict with informal(?) org name, 'fullName' and 'type'
    where type"""
    return trial['protocolSection']['identificationModule']['organization']

In [180]:
def fetch_all_trials(min_date=None):
    """Repeatedly hits the clinicaltrials.gov API to retrieve everything on their db up to some min date. 
    Unknown how long it takes to fetch 10 years worth - for regular use, we would need to refine the process, 
    and we might need to chunk it to get and process all the data even once.
    (if their sorting is reliable we could limit by max date rather than number, but
    I wouldn't be inclined to trust it without understanding it more deeply than I have time to here)."""
    trials_list = []
    trials = get_clinical_trials()
    memos = []
    min_date = convert_date(min_date)
    page_date = datetime.today().date()
    while trials['nextPageToken'] and trials['nextPageToken'] not in memos and not min_date or min_date <= page_date:
        memos.append(trials['nextPageToken'])
        trials = get_clinical_trials(trials['nextPageToken'])
        trials_list = trials_list + trials['studies']
        page_date = trial_date(trials['studies'][-1])
        # print(trial_date(trials['studies'][-1]))
        # print(f"Study id: {trials['studies'][-1]['protocolSection']['identificationModule']['nctId']}")
        # print('***********')
    return trials_list
        
all_trials = fetch_all_trials(min_date='2024-12-19')    

In [164]:
# trials['nextPageToken']
# trials['studies'][0]

## Study metrics

The trials don't have an obvious 'success/failure' mark, so I provisionally use a Claude-generated scoring system based on common data among the trials. 

For serious use, these metrics would need to be substantially refined - we might want determine optimal metrics by an ML process that compares against stock prices, along with some basic sanity measures, like 'if more than n% of the people taking the treatment died, we mark it as an automatic failure'.

In [38]:
def safe_float(val, default=0.0):
    """Safely convert value to float"""
    try:
        return float(val)
    except (ValueError, TypeError):
        return default

def safe_int(val, default=0):
    """Safely convert value to int"""
    try:
        return int(val)
    except (ValueError, TypeError):
        return default

def score_primary_outcome(trial_data):
    """Score primary outcome achievement (max 0.4)"""
    try:
        outcomes = trial_data['resultsSection']['outcomeMeasuresModule']['outcomeMeasures']
        primary_outcomes = [o for o in outcomes if o['type'] == 'PRIMARY']
        
        if not primary_outcomes:
            return 0.2
            
        total_score = 0
        total_measures = 0
        
        for outcome in primary_outcomes:
            if 'analyses' in outcome and outcome['analyses']:
                noninferior_analyses = [a for a in outcome['analyses'] 
                                      if a.get('nonInferiorityType') == 'NON_INFERIORITY']
                if noninferior_analyses:
                    met_criteria = 0
                    for analysis in noninferior_analyses:
                        if 'ciLowerLimit' in analysis and 'nonInferiorityComment' in analysis:
                            lower_bound = safe_float(analysis['ciLowerLimit'])
                            comment = analysis['nonInferiorityComment']
                            try:
                                margin = float(''.join(c for c in comment if c.isdigit() or c in '.-'))
                                if lower_bound > margin:
                                    met_criteria += 1
                            except ValueError:
                                continue
                    total_score += met_criteria / len(noninferior_analyses)
                    total_measures += 1
            
            elif 'classes' in outcome and outcome['classes']:
                for class_data in outcome['classes']:
                    if 'categories' in class_data:
                        for category in class_data['categories']:
                            if 'measurements' in category:
                                value = safe_float(category['measurements'][0]['value'])
                                total_score += 1 if value < 10 else value/20
                                total_measures += 1

        return min(0.4, (total_score / max(1, total_measures)) * 0.4)
    except Exception as e:
        print(f"Error scoring primary outcome: {e}")
        return 0.2

def score_safety(trial_data):
    """Score safety outcomes (max 0.3)"""
    try:
        safety_score = 0.3
        if 'adverseEventsModule' in trial_data.get('resultsSection', {}):
            events = trial_data['resultsSection']['adverseEventsModule']
            
            if 'eventGroups' in events:
                for group in events['eventGroups']:
                    deaths = safe_int(group.get('deathsNumAffected', 0))
                    if deaths > 0:
                        safety_score -= 0.1
                    
                    serious = safe_int(group.get('seriousNumAffected', 0))
                    if serious > 0:
                        safety_score -= 0.1
                    
                    other_affected = safe_int(group.get('otherNumAffected', 0))
                    other_at_risk = safe_int(group.get('otherNumAtRisk', 1))
                    
                    if other_affected > 0 and other_at_risk > 0:
                        rate = other_affected / other_at_risk
                        if rate > 0.1:
                            safety_score -= 0.1
        
        return max(0, safety_score)
    except Exception as e:
        print(f"Error scoring safety: {e}")
        return 0.15

def score_study_execution(trial_data):
    """Score study execution (max 0.2)"""
    try:
        execution_score = 0
        
        if 'participantFlowModule' in trial_data.get('resultsSection', {}):
            flow = trial_data['resultsSection']['participantFlowModule']
            if 'periods' in flow and flow['periods']:
                period = flow['periods'][0]
                started = 0
                completed = 0
                
                for milestone in period.get('milestones', []):
                    if milestone['type'] == 'STARTED':
                        for achievement in milestone['achievements']:
                            started += safe_int(achievement.get('numSubjects', 0))
                    elif milestone['type'] == 'COMPLETED':
                        for achievement in milestone['achievements']:
                            completed += safe_int(achievement.get('numSubjects', 0))
                
                if started > 0:
                    completion_rate = completed / started
                    if completion_rate >= 0.85:
                        execution_score += 0.1
                    else:
                        execution_score += (completion_rate / 0.85) * 0.1

        execution_score += 0.1  # Protocol adherence score
        
        return execution_score
    except Exception as e:
        print(f"Error scoring execution: {e}")
        return 0.1

def score_secondary_outcomes(trial_data):
    """Score secondary outcomes (max 0.1)"""
    try:
        outcomes = trial_data['resultsSection']['outcomeMeasuresModule']['outcomeMeasures']
        secondary_outcomes = [o for o in outcomes if o['type'] == 'SECONDARY']
        
        if not secondary_outcomes:
            return 0.05
            
        total_score = 0
        total_measures = 0
        
        for outcome in secondary_outcomes:
            if 'classes' in outcome and outcome['classes']:
                for class_data in outcome['classes']:
                    if 'categories' in class_data:
                        for category in class_data['categories']:
                            if 'measurements' in category:
                                total_measures += 1
                                value = safe_float(category['measurements'][0]['value'])
                                total_score += 1 if value < 10 else value/20

        return min(0.1, (total_score / max(1, total_measures)) * 0.1)
    except Exception as e:
        print(f"Error scoring secondary outcomes: {e}")
        return 0.05

def score_clinical_trial(trial_data):
    """
    Score a clinical trial from 0 (complete failure) to 1 (complete success)
    
    Parameters:
    trial_data (dict): Clinical trial data in JSON format
    
    Returns:
    dict: Score between 0 and 1 plus breakdown of scoring components
    """
    try:
        # Calculate component scores
        primary_score = score_primary_outcome(trial_data)
        safety_score = score_safety(trial_data)
        execution_score = score_study_execution(trial_data)
        secondary_score = score_secondary_outcomes(trial_data)
        
        # Calculate total score
        total_score = primary_score + safety_score + execution_score + secondary_score
        
        # Return both total and breakdown
        return {
            'total_score': round(total_score, 3),
            'components': {
                'primary_outcome': round(primary_score, 3),
                'safety': round(safety_score, 3),
                'execution': round(execution_score, 3),
                'secondary_outcomes': round(secondary_score, 3)
            }
        }
    except Exception as e:
        print(f"Error scoring trial: {e}")
        return {
            'total_score': 0.5,
            'components': {
                'primary_outcome': 0.2,
                'safety': 0.15,
                'execution': 0.1,
                'secondary_outcomes': 0.05
            }
        }

In [48]:
study = get_clinical_trials()['studies'][9]
score_primary_outcome(study)
score_safety(study)
score_study_execution(study)
score_secondary_outcomes(study)
score_clinical_trial(study)

{'total_score': 0.659,
 'components': {'primary_outcome': 0.4,
  'safety': 0,
  'execution': 0.159,
  'secondary_outcomes': 0.1}}

## Filtering trials

We drop all trials that aren't classed as 'INDUSTRY'.

I use 0.8 as a minimum score for now, with the strong caveat that per above this algorithm would need serious refinement.

In [168]:
filtered_trials = []
for trial in all_trials:
    is_company =  org_info(trial)['class'] == 'INDUSTRY'
    score = score_clinical_trial(trial)['total_score'] 
    if is_company and score > 0.8:
        filtered_trials.append(trial)

In [185]:
successful_trials = []
for trial in filtered_trials:
    info = []
    info.append(org_info(trial)['fullName'])
    info.append(trial['protocolSection']['statusModule']['studyFirstSubmitDate'])
    successful_trials.append(info)
successful_trials

[['Tandem Diabetes Care, Inc.', '2023-01-05'],
 ['Boston Scientific Corporation', '2022-05-27'],
 ['Phathom Pharmaceuticals, Inc.', '2023-10-24'],
 ['GlaxoSmithKline', '2020-09-04'],
 ['Rhaeos, Inc.', '2021-08-10']]

In [183]:
org_info(filtered_trials[0])
# filtered_trials[0]

{'fullName': 'Tandem Diabetes Care, Inc.', 'class': 'INDUSTRY'}