<a href="https://colab.research.google.com/github/StanleyLiangYork/DeepLearningForMalaria/blob/master/Continuous_Ranked_Probability_Score.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import math
import os.path

Given a deterministic observation t, make a CDF(Cumulative distribution function) out of it

In [0]:
def heavyside(thresholds, actual):
    result = [1 if t >= actual else 0 for t in thresholds]
    return result

Check annd make sure all probabilities are in [0,1] and the CDF should be non-decreasing?

In [0]:
def is_cdf_valid(case):
  if case[0] < 0 or case[0] > 1:
    return False

  for i in range(1, len(case)):
    if case[i] > 1 or case[i] < case[i-1]:
      return False
    
  return True

Calculates the Continuous Ranked Probability Score given:</p>


*   thresholds: a vector of threshold values
*   predictions: 2d array rows of [predictions P(y <= t) for each threshold]
*   actuals: a vector of real observations


In [0]:
def calc_crps(thresholds, predictions, actuals):
  nthresh = len(thresholds)
  ncases  = len(predictions)
  crps = 0

  for case, actual in zip(predictions, actuals):
    if (len(case) == nthresh) and is_cdf_valid(case):
      obscdf = heavyside(thresholds, actual)
      for fprob, oprob in zip(case, obscdf):
        crps = crps + (fprob - oprob)*(fprob - oprob)
    else:
      crps = crps + nthresh  # treat delta at each threshold as 1
  
  crps = crps / float(ncases * nthresh)
  return crps


Test the function

In [0]:
thresholds = [2., 4., 6., 8., 10.]
actuals = [3., 1.5, 7., 4.]

loop through the observation values, if threshold is greater, return 1, if not return 0 

In [16]:
pred = [heavyside(thresholds, actual)  for actual in actuals]
print(pred)

[[0, 1, 1, 1, 1], [1, 1, 1, 1, 1], [0, 0, 0, 1, 1], [0, 1, 1, 1, 1]]


In [21]:
crps = calc_crps(thresholds, pred, actuals)
print("Exact answer = {0}".format(crps))

Exact answer = 0.0


In [24]:
thresholds = [2., 4., 6., 8., 10.]
actuals = [3., 1.5, 7., 4.]
pred = [heavyside(thresholds, actual)  for actual in actuals]
pred[1] = [0, 0.3, 0.5, 1.0]
print(pred)
crps = calc_crps(thresholds, pred, actuals)
print("Invalid length for one case: answer = {0}".format(crps))

[[0, 1, 1, 1, 1], [0, 0.3, 0.5, 1.0], [0, 0, 0, 1, 1], [0, 1, 1, 1, 1]]
Invalid length for one case: answer = 0.25


In [25]:
thresholds = [2., 4., 6., 8., 10.]
actuals = [3., 1.5, 7., 4.]
pred = [heavyside(thresholds, actual)  for actual in actuals]
pred[1] = [0, 0.5, 0.3, 0.8, 1.0] # not a valid CDF, the correct one [0, 0.3, 0.5, 0.8, 1.0]
print(pred)
crps = calc_crps(thresholds, pred, actuals)
print("Invalid CDF for one case: answer = {0}".format(crps))

[[0, 1, 1, 1, 1], [0, 0.5, 0.3, 0.8, 1.0], [0, 0, 0, 1, 1], [0, 1, 1, 1, 1]]
Invalid CDF for one case: answer = 0.25


In [27]:
pred = [ [0.]*len(thresholds)  for actual in actuals]  
print(pred)
crps = calc_crps(thresholds, pred, actuals)
print("All zero predictions: answer = {0}".format(crps))
# all zero for actual=3 is wrong for thresholds 4,6,8,10 i.e. 4 thresholds

[[0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0]]
All zero predictions: answer = 0.75


In [29]:
pred = [ [1.]*len(thresholds)  for actual in actuals]  
print(pred)
crps = calc_crps(thresholds, pred, actuals)
print("All one predictions: answer = {0}".format(crps))

[[1.0, 1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, 1.0, 1.0], [1.0, 1.0, 1.0, 1.0, 1.0]]
All one predictions: answer = 0.25


In [0]:
def sigmoid(num, center):
  length = num
  return [ 1/(1 + math.exp(-(x-center))) for x in range(0,length) ]

In [40]:
thresholds = [2., 4., 6., 8., 10.]
actuals = [3., 1.5, 7., 4.]
pred = [ sigmoid(5, actual)  for actual in actuals] 
crps = calc_crps(thresholds, pred, actuals)
print("Sigmoids: answer = {0}".format(crps))

Sigmoids: answer = 0.3606027186161378
