In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.optimize import curve_fit, minimize
from scipy.signal import find_peaks
import ruptures as rpt
from scipy.optimize import minimize, curve_fit

In [None]:

Y = np.array([0.3, 0.35, 0.4, 0.38, 0.37, 0.36, 0.39, 0.42, 0.4, 0.38, 0.35, 0.37, 0.36])

def peakDetect(Y, threshold=0.01):
    peaks = []
    for i in range(1, len(Y) - 1):
        if Y[i-1] < Y[i] > Y[i+1]:
            if (Y[i] - Y[i-1] > threshold) and (Y[i] - Y[i+1] > threshold):
                peaks.append(i)
    return peaks

peaks = peakDetect(Y, threshold=0.01)
print("Data", Y)
print("peaks:", peaks)
plt.figure(figsize=(10, 5))
plt.plot(Y)
plt.scatter(peaks, Y[peaks], color='red', label='Peaks', zorder=5)
plt.title('Test Detected Peaks')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()


In [None]:

path = ''
data = pd.read_csv(path, parse_dates=['date'])
data.set_index('date', inplace=True)
data['sm'] = pd.to_numeric(data['sm'])
sm = data[(data['latitude'] == 41.948936) & (data['longitude'] == -93.687760)]

In [None]:

def findPeak(Y, height_threshold=0.05, distance=8):
    peaks, _ = find_peaks(Y, height=height_threshold, distance=distance)
    return peaks

def   sm_with_peaks(sm, start=None, end=None):
    if start:
        sm = sm[sm.index >= start]
    if end:
        sm = sm[sm.index <= end]
    
    Y = sm['sm'].values
    t = sm.index

    peaks = findPeak(Y)

    plt.figure(figsize=(15, 6))
    plt.plot(t, Y, label="Soil Moisture", color='blue')
    plt.plot(t[peaks], Y[peaks], 'rx', label="Detected Peaks")
    plt.title("Soil Moisture with Detected Peaks")
    plt.xlabel("Time")
    plt.ylabel("Soil Moisture")
    plt.legend()
    plt.show()

sm_with_peaks(sm, start='2021-01-01', end='2021-04-3')


In [None]:
sm_agg = sm.iloc[::2]
#using: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.find_peaks.html
def findPeak_agg(sm, start=None, end=None, height=None, distance=1):
    if start:
        sm = sm[sm.index >= start]
    if end:
        sm = sm[sm.index <= end]
    peaks, _ = find_peaks(sm['sm'], height=height, distance=distance)

    return peaks

start = '2021-06-01'
end = '2021-08-01'
height_threshold = 0.003 
peaks = findPeak_agg(sm_agg, start=start, end=end, height=height_threshold)
smFocus = sm_agg[(sm_agg.index >= start) & (sm_agg.index <= end)]

plt.figure(figsize=(15, 6))
plt.plot(smFocus.index, smFocus['sm'], label="Soil Moisture", color='blue')
plt.plot(smFocus.index[peaks], smFocus['sm'].iloc[peaks], 'rx', label="Detected Peaks")
plt.title(f"Soil Moisture with Detected Peaks from {start} to {end}")
plt.xlabel("Time")
plt.ylabel("Soil Moisture")
plt.legend()
plt.show()


In [None]:
def expDecay(t, alpha_0, alpha_1, gamma_i):
    return alpha_0 + alpha_1 * np.exp(-gamma_i * t)

def nll(params, t, y):
    alpha_0, alpha_1, gamma_i = params
    y_pred = expDecay(t, alpha_0, alpha_1, gamma_i)
    resid = y - y_pred
    n = len(t)
    cost = n * np.log(2 * np.pi) + np.sum(resid ** 2)
    return 2 * cost

def fit_exponential(sm, peak_loc):
    params_found = []
    costs = []
    for i, peak in enumerate(peak_loc):
        start = peak
        if i < len(peak_loc) - 1:
            end = peak_loc[i + 1]
        else:
            end = len(sm)  
        t = np.arange(0, end - start)
        y = sm['sm'].iloc[start:end].values
        if len(t) > 3:
            try:
                popt, _ = curve_fit(expDecay, t, y, maxfev=10000)
                cost = nll(popt, t, y)
                params_found.append((popt, start, end))
                costs.append(cost)
                print(f"Cost for seg {start}: {cost}")
            except RuntimeError as e:
                print(f"fit fail {peak}: {e}")
    return params_found, costs


params_found, costs = fit_exponential(sm_agg, peaks)
plt.figure(figsize=(15, 6))
plt.plot(sm_agg.index, sm_agg['sm'], label="Soil Moisture", color='blue')
plt.plot(sm_agg.index[peaks], sm_agg['sm'].iloc[peaks], 'rx', label="Peaks")

for params, start, end in params_found:
    t_fit = np.arange(0, end - start)
    y_fit = expDecay(t_fit, *params)
    plt.plot(sm_agg.index[start:end], y_fit, color='orange')

plt.title("Soil Moisture with Peaks and Decays")
plt.xlabel("Time")
plt.ylabel("Soil Moisture")
plt.legend()
plt.show()


In [None]:
plt.figure(figsize=(15, 6))
plt.plot(sm_agg.index, sm_agg['sm'], label="Soil Moisture", color='blue')
plt.plot(sm_agg.index[peaks], sm_agg['sm'].iloc[peaks], 'rx', label="Peaks")

for params, start, end in params_found:
    t_fit = np.arange(0, end - start)
    y_fit = expDecay(t_fit, *params)
    plt.plot(sm_agg.index[start:end], y_fit, color='orange', linestyle='--')

plt.title("Soil Moisture with Detected Peaks and Fitted Decays")
plt.xlabel("Time")
plt.ylabel("Soil Moisture")
plt.legend()
plt.show()


In [None]:
avg_cost = np.mean(costs)

plt.figure(figsize=(15, 6))

for i, (params, start, end) in enumerate(params_found):
    if costs[i] < avg_cost:
        t_fit = np.arange(0, end - start)
        y_actual = sm_agg['sm'].iloc[start:end].values
        plt.plot(t_fit, y_actual, label=f" seg {i+1} (Cost: {costs[i]:.2f})")
        print(f" seg {i+1}: Params = {params}, Cost = {costs[i]:.2f}")

plt.title(" segs with Cost < Average")
plt.xlabel("Time ")
plt.ylabel("Soil Moisture")
plt.legend()
plt.show()


In [None]:
def expDecay(t, alpha_0, alpha_1, gamma_i, tau_i=0):
    return alpha_0 + alpha_1 * np.exp(-np.exp(gamma_i) * (t - tau_i))
def nll(params, Y, tau_i, tau_ip1, peak_loc=None):
    alpha_0, alpha_1, gamma_i = params
    n = tau_ip1 - tau_i
    t = np.arange(1, n + 1)
    resid = Y[tau_i:tau_ip1] - expDecay(t, alpha_0, alpha_1, gamma_i)
    cost = n * np.log(2 * np.pi) + np.sum(resid ** 2)
    if peak_loc is not None:
        for peak in peak_loc:
            if tau_i <= peak < tau_ip1:
                cost *= 0.5  
    return cost

def extractParam(Y, tau_i, tau_ip1, peak_loc=None):
    t = np.arange(tau_ip1 - tau_i)
    y = Y[tau_i:tau_ip1]
    initial_guess = [0.5, 0.5, 0.1]
    try:
        params, _ = curve_fit(expDecay, t, y, p0=initial_guess, maxfev=10000)
        print(f"seg {tau_i}-{tau_ip1}, has params: {params}")
        return params
    except RuntimeError as e:
        print(f"param fail {tau_i}-{tau_ip1}: {e}")
        return [1e6, 1e6, 1e6]
    #adapted from: https://centre-borelli.github.io/ruptures-docs/user-guide/costs/costcustom/
class ExponentialCost(rpt.costs.CostL2):
    def __init__(self, peak_loc=None):
        super().__init__()
        self.peak_loc = peak_loc

    def fit(self, signal):
        self.signal = signal
        return self

    def error(self, start, end):
        if end - start < 2:
            print(f"seg too short to fit curve: {start}-{end}")
            return np.inf
        params = extractParam(self.signal, start, end, self.peak_loc)
        return nll(params, self.signal, start, end, self.peak_loc)

def findPeak_agg(sm, weeks_prior=0.2, distance=1):
    peaks = []
    time_range = pd.Timedelta(weeks=weeks_prior)
    for i in range(1, len(sm)):
        curr_time = sm.index[i]
        start_t = curr_time - time_range
        prior_data = sm[start_t:curr_time]['sm']
        
        if not prior_data.empty:
            max_prior = prior_data.min()
            height_threshold = max_prior + 0.003
            
            if sm['sm'].iloc[i] > height_threshold:
                if i > distance and all(sm['sm'].iloc[i] > sm['sm'].iloc[i-distance:i]):
                    peaks.append(i)
                    print(f" Peak at {i}, T: {curr_time}")
    
    print(f"num peaks: {len(peaks)}")
    return peaks

def main(start, end):
    smFocus = sm[start:end]
    sm_agg = smFocus.iloc[::8]
    Y = sm_agg['sm'].values
    peaks = findPeak_agg(sm_agg)
    algo = rpt.Pelt(custom_cost=ExponentialCost(peak_loc=peaks), min_size=3, jump=1).fit(Y)
    ans = algo.predict(pen=5)
    ans = [cp for cp in ans if cp < len(sm_agg)]

    if len(ans) == 0:
        print("no cps")
    else:
        print(f"cps: {ans}")

    plt.figure(figsize=(10, 6))
    plt.plot(sm_agg.index, Y, label="Soil Moisture Data")
    plt.plot(sm_agg.index[peaks], Y[peaks], 'rx', label="Detected Peaks")
    for cp in ans:
        plt.axvline(sm_agg.index[cp], color="r", linestyle="--", label=f"Cp at {sm_agg.index[cp]}")
    plt.legend()
    plt.title(f"Changepoints on SM from {start} to {end}")
    plt.xlabel("Time")
    plt.ylabel("Soil Moisture")
    plt.show()

main('2020-01-01', '2020-03-01')
