In [99]:
import pandas as pd
import numpy as np
import math
hc = pd.read_csv("data/heart_2020_cleaned.csv")
hc.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 319795 entries, 0 to 319794
Data columns (total 18 columns):
 #   Column            Non-Null Count   Dtype  
---  ------            --------------   -----  
 0   HeartDisease      319795 non-null  object 
 1   BMI               319795 non-null  float64
 2   Smoking           319795 non-null  object 
 3   AlcoholDrinking   319795 non-null  object 
 4   Stroke            319795 non-null  object 
 5   PhysicalHealth    319795 non-null  float64
 6   MentalHealth      319795 non-null  float64
 7   DiffWalking       319795 non-null  object 
 8   Sex               319795 non-null  object 
 9   AgeCategory       319795 non-null  object 
 10  Race              319795 non-null  object 
 11  Diabetic          319795 non-null  object 
 12  PhysicalActivity  319795 non-null  object 
 13  GenHealth         319795 non-null  object 
 14  SleepTime         319795 non-null  float64
 15  Asthma            319795 non-null  object 
 16  KidneyDisease     31

In [100]:
def calculate_pearson_py(x, y):
#обчислює стандартний коефіцієнт кореляції Пірсона за формулою
    n = len(x)
    if n != len(y) or n < 2: return np.nan
        
    sx, sy, sxy, sx2, sy2 = 0.0, 0.0, 0.0, 0.0, 0.0

    # обчислення необхідних сум у циклі
    for i in range(n):
        x_val, y_val = x[i], y[i]
        sx += x_val
        sy += y_val
        sxy += x_val * y_val
        sx2 += x_val * x_val
        sy2 += y_val * y_val
            
    # формула Пірсона
    numerator = n * sxy - sx * sy
    denominator = math.sqrt((n * sx2 - sx * sx) * (n * sy2 - sy * sy))
   
    if denominator == 0.0:
        return np.nan
        
    return numerator / denominator

def calculate_partial_correlation_py(x, y, z):
#обчислює часткову кореляцію r_xy.z (виміряємо ступінь зв'язку між величинами x та у, коли вплив z усунено)
    
    # обчислення трьох парних кореляцій
    r_xy = calculate_pearson_py(x, y)
    r_xz = calculate_pearson_py(x, z)
    r_yz = calculate_pearson_py(y, z)
    
    if np.isnan(r_xy) or np.isnan(r_xz) or np.isnan(r_yz):
        return np.nan
        
    # обчислення часткової кореляції
    numerator = r_xy - (r_xz * r_yz)
    den_x = 1.0 - (r_xz * r_xz)
    den_y = 1.0 - (r_yz * r_yz)
    denominator = np.sqrt(den_x * den_y)
    
    if denominator == 0.0:
        return np.nan

    return numerator / denominator

In [101]:
# дані
x_arr = hc["SleepTime"].values.astype(np.float64)
y_arr = hc["MentalHealth"].values.astype(np.float64)
z_arr = hc["BMI"].values.astype(np.float64)

# застосування та вимірювання часу виконання
print("--- PYTHON ---")
%timeit calculate_partial_correlation_py(x_arr, y_arr, z_arr)

# фінальний результат
result = calculate_partial_correlation_py(x_arr, y_arr, z_arr)
print(f"\nРезультат: {result:.6f}")

--- PYTHON ---
625 ms ± 21.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Результат: -0.116790


In [102]:
%load_ext Cython

The Cython extension is already loaded. To reload it, use:
  %reload_ext Cython


In [103]:
%%cython
from libc.math cimport sqrt 
import cython
import numpy as np 

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)

cdef double pearson_c(double[:] x, double[:] y) except? -2.0:
#обчислює стандартний коефіцієнт кореляції Пірсона за формулою

    cdef Py_ssize_t n = x.shape[0]
    cdef Py_ssize_t i
    cdef double sx = 0.0, sy = 0.0, sxy = 0.0
    cdef double sx2 = 0.0, sy2 = 0.0
    cdef double numerator, denominator
    
    cdef double x_val, y_val 
    
    if n != y.shape[0] or n < 2:
        return -2.0

    #обчислення необхідних сум
    for i in range(n):  
        x_val = x[i]
        y_val = y[i]
        sx += x_val
        sy += y_val
        sxy += x_val * y_val
        sx2 += x_val * x_val
        sy2 += y_val * y_val
        
    #формула Пірсона
    numerator = n * sxy - sx * sy
    denominator = sqrt((n * sx2 - sx * sx) * (n * sy2 - sy * sy))
    
    if denominator == 0.0:
        return -2.0
        
    return numerator / denominator

cpdef double calculate_partial_correlation_cython(double[:] x, double[:] y, double[:] z):
#обчислює часткову кореляцію r_xy.z (виміряємо ступінь зв'язку між величинами x та у, коли вплив z усунено)
    
    cdef double r_xy, r_xz, r_yz
    cdef double numerator, denominator
    cdef double den_x, den_y
    
    # обчислення трьох попарних кореляцій
    r_xy = pearson_c(x, y)
    r_xz = pearson_c(x, z)
    r_yz = pearson_c(y, z)

    if r_xy == -2.0 or r_xz == -2.0 or r_yz == -2.0:
        return -2.0
        
    # обчислення часткової кореляції
    numerator = r_xy - (r_xz * r_yz)
    den_x = 1.0 - (r_xz * r_xz)
    den_y = 1.0 - (r_yz * r_yz)
    denominator = sqrt(den_x * den_y)
    
    if denominator == 0.0:
        return -2.0

    return numerator / denominator

In [104]:
# дані
x_arr = hc["SleepTime"].values.astype(np.float64)
y_arr = hc["MentalHealth"].values.astype(np.float64)
z_arr = hc["BMI"].values.astype(np.float64)

# застосування та вимірювання часу виконання
print("--- Cython ---")
%timeit calculate_partial_correlation_cython(x_arr, y_arr, z_arr)

# Результат
result = calculate_partial_correlation_cython(x_arr, y_arr, z_arr)
print(f"\nРезультат: {result:.6f}")

--- Cython ---
1.5 ms ± 68.1 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

Результат: -0.116790


In [105]:
import numpy as np
from numba import njit 
import math 


@njit 
def calculate_pearson_numba(x, y):
#обчислює стандартний коефіцієнт кореляції Пірсона за формулою
    n = len(x) 
    if n != len(y) or n < 2:
        return 2.0 
        
    sx, sy, sxy, sx2, sy2 = 0.0, 0.0, 0.0, 0.0, 0.0

   #обчислення необхідних сум
    for i in range(n):
        x_val, y_val = x[i], y[i]
        sx += x_val
        sy += y_val
        sxy += x_val * y_val
        sx2 += x_val * x_val
        sy2 += y_val * y_val
        
    #формула Пірсона        
    numerator = n * sxy - sx * sy
    denominator = math.sqrt((n * sx2 - sx * sx) * (n * sy2 - sy * sy)) 
    
    if denominator == 0.0: return 2.0  
    
    return numerator / denominator

@njit 
def calculate_partial_correlation_numba(x, y, z):
#обчислює часткову кореляцію r_xy.z (виміряємо ступінь зв'язку між величинами x та у, коли вплив z усунено)
   
    #обчислення трьох попарних кореляцій 
    r_xy = calculate_pearson_numba(x, y)
    r_xz = calculate_pearson_numba(x, z)
    r_yz = calculate_pearson_numba(y, z) # Коректна логіка y, z
    
    if r_xy == 2.0 or r_xz == 2.0 or r_yz == 2.0: return np.nan
        
    #обчислення часткової кореляції
    numerator = r_xy - (r_xz * r_yz)
    den_x = 1.0 - (r_xz * r_xz)
    den_y = 1.0 - (r_yz * r_yz)
    
    denominator = np.sqrt(den_x * den_y) 
    
    if denominator == 0.0: return np.nan

    return numerator / denominator

In [106]:
# дані
x_arr = hc["SleepTime"].values.astype(np.float64)
y_arr = hc["MentalHealth"].values.astype(np.float64)
z_arr = hc["BMI"].values.astype(np.float64)

# застосування та вимірювання часу виконання
print("--- Numba ---")
%timeit calculate_partial_correlation_numba(x_arr, y_arr, z_arr)

# Результат
result = calculate_partial_correlation_numba(x_arr, y_arr, z_arr)
print(f"\nРезультат: {result:.6f}")


--- Numba ---
1.39 ms ± 124 μs per loop (mean ± std. dev. of 7 runs, 1 loop each)

Результат: -0.116790


In [107]:
def measure(func, *args):
    start = time.time()
    res = func(*args)
    return res, time.time()-start

x = hc["SleepTime"].values.astype(np.float64)
y = hc["MentalHealth"].values.astype(np.float64)
z = hc["BMI"].values.astype(np.float64)

results = [
    ["Python", *measure(calculate_partial_correlation_py, x, y, z)],
    ["Cython", *measure(calculate_partial_correlation_cython, x, y, z)],
    ["Numba", *measure(calculate_partial_correlation_numba, x, y, z)]
]

# таблиця
df = pd.DataFrame(results, columns=["Метод","Часткова кореляція","Час (с)"])
df["Часткова кореляція"] = df["Часткова кореляція"].map("{:.5f}".format)
df["Час (с)"] = df["Час (с)"].map("{:.6f}".format)

print("\033[1m\n------- ПОРІВНЯННЯ -------\033[0m")
print(df.to_string(index=False, justify='center'))
# ---- Висновок ----
fastest = df.loc[df["Час (с)"].astype(float).idxmin(),"Метод"]
print(f"\033[1m\nНайшвидший метод:\033[0m {fastest}")

[1m
------- ПОРІВНЯННЯ -------[0m
Метод  Часткова кореляція Час (с) 
Python      -0.11679      0.632964
Cython      -0.11679      0.001341
 Numba      -0.11679      0.001289
[1m
Найшвидший метод:[0m Numba
