In [1]:
import numpy as np

In [12]:
def test_wrank():
    # Identity
    x = np.array([1, 2, 3, 4, 5])
    assert (wrank(x) == np.array([1, 2, 3, 4, 5])).all()
    
    # General example
    x = np.array([2, 4, 6, 8, 10])
    assert (wrank(x) == np.array([1, 2, 3, 4, 5])).all()
    
    # Averages ties
    x = np.array([2, 4, 4, 8, 10])
    assert (wrank(x) == np.array([1, 2.5, 2.5, 4, 5])).all()
    
    # Weighted
    x = np.array([2, 4, 6, 8, 10])
    w = np.array([0.5, 1, 1, 1.5, 2])
    assert (wrank(x, w) == np.array([0.75, 1.5, 2.5, 3.75, 5.5])).all()
    
    # Disorderly + weighted
    x = np.array([3, 4, 6, 1, 2])
    w = np.array([0.5, 1, 1, 1.5, 2])
    assert(wrank(x, w) == np.array([4.25, 5., 6., 1.25, 3.])).all()
    
def wrank(x, w=None):
    """
    Access in R as wCorr:::wrank
    """
    if w is None:
        w = np.ones(len(x))
    else:
        w = np.array(w)
        
    # push that rank to all final tied units
    ord = np.argsort(x)
    rord = np.argsort(ord)
    xp = x[ord]  # x, permuted
    wp = w[ord]  # weights, permuted
    rnk = np.empty(len(x))
    
    # setup first iteration
    t1 = 0  # total weight of lower ranked elements
    i = 0   # index
    t2 = 0  # total weight of tied elements (including self)
    n = 0   # number of tied elements
    
    while i < len(x) - 1:
        t2 += wp[i]  # tied weight increases by this unit
        n += 1
        
        if xp[i+1] != xp[i]:   # the next one is not a tie
            # find the rank of all tied elements
            rnki = t1 + (1 + (t2 - 1)/2)
            
            # push that rank to all tied units
            for ii in range(n+1):  # (1, n+1)
                rnk[i-ii+1] = rnki
                
            # reset for next iteration
            t1 += t2
            t2 = 0
            n = 0
        
        i += 1
    
    # final row
    t2 += wp[i]
    rnki = t1 + (1 + (t2 - 1) / 2)  # final rank
    
    # push that rank to all final tied units
    for ii in range(n+1):
        rnk[i-ii] = rnki
        
    rnk = rnk[rord]
    
    return list(rnk)


test_wrank()


In [22]:
def test_contCorr():
    # Test with Pearson method
    x = [1, 2, 3, 4, 5]
    y = [2, 4, 6, 8, 10]
    w = [1, 1, 1, 1, 1]
    assert np.isclose(contCorr(x, y, w, method=['Pearson']), 1.0)

    # Test with Spearman method
    x = [1, 2, 3, 4, 5]
    y = [2, 4, 6, 8, 10]
    w = [1, 1, 1, 1, 1]
    assert np.isclose(contCorr(x, y, w, method=['Spearman']), 1.0)

    # Test with weighted data
    x = [1, 2, 3, 4, 5]
    y = [2, 4, 6, 8, 10]
    w = [0.5, 1, 1, 1.5, 2]
    assert np.isclose(contCorr(x, y, w, method=['Spearman']), 1.0)
    
    # Test with negatively correlated data
    x = [1, 2, 3, 4, 5]
    y = [3, 4, 6, 1, 2]
    w = [0.5, 1, 1, 1.5, 2]
    assert np.isclose(contCorr(x, y, w, method=['Pearson']), -0.5430768084288788)
    
    # Test with negatively correlated data
    x = np.array([1, 2, 3, 4, 5])
    y = np.array([3, 4, 6, 1, 2])
    w = np.array([0.5, 1, 1, 1.5, 2])
    assert np.isclose(contCorr(x, y, w, method=['Spearman']), -0.5555556)

    # Test with missing data
    x = [1, 2, 3, 4, np.nan]
    y = [2, 4, 6, 8, 10]
    w = [1, 1, 1, 1, 1]
    assert np.isnan(contCorr(x, y, w, method=['Pearson']))
    
    # Test with tied data
    x = np.array([1, 2, 3, 4, 5])
    y = np.array([3, 2, 1, 2, 4])
    w = np.array([0.5, 1, 1, 1.5, 2])
    assert np.isclose(contCorr(x, y, w, method=['Spearman']), 0.6516946235415335)
    

def contCorr(x, y, w, method=['Pearson', 'Spearman']):
    """
    R: wCorr:::contCorr
    """

    x = np.array(x, dtype=np.float64)
    y = np.array(y, dtype=np.float64)
    w = np.array(w, dtype=np.float64)

    pm = np.where(np.char.lower(np.array(method)) == np.char.lower(np.array(['Pearson', 'Spearman'])))
    if pm[0][0] == 1:
        # Spearman
        x = wrank(x, w)
        y = wrank(y, w)
        
    xb = np.sum(w * x) / np.sum(w)
    yb = np.sum(w * y) / np.sum(w)
    numerator = np.sum(w * (x - xb) * (y - yb))
    denom = np.sqrt(np.sum(w * (x - xb) ** 2) * np.sum(w * (y - yb) ** 2))
    return numerator / denom

test_contCorr()