## Log Periodic Power Law (LPPL)

The LPPL model is described as below : 

$$y(t) = A + B(t_c − t)^{\alpha} + C(t_c − t)^{\alpha} cos[\omega ln(t_c − t) + \phi] \ \ \ \ (1)$$

where $y(t)$ is the oil price at time $t$, $\alpha$ is the exponential growth, $\omega$ is the control of the amplitude of the oscillations and A, B, C and $\phi$ are the parameters with no structural information.

Note that in Eq. (1), $t_c$ is a critical time or a turning point, to be predicated.

One main feature captured by Eq. (1) is the dampened yet accelerated oscillation in oil price. That is, when $t$
approaches $t_c$, the oscillations occur more frequently with a decreasing amplitude. In other words, $(t_c − t)^\alpha$ is the power law term, which describes the faster than exponential change of the prices owing to positive feedback mechanisms and $(_c − t)^\alpha cos[\omega ln(tc −t)+\phi]$ is the periodic term, which indicates a correction to the power law term and has the symmetry of discrete scale invariance. 

The most probable time of the turning point is when $t = t_c$, for $t ≥ t_c$.

In the above LPPL model, there are seven parameters that need to be optimized, including four nonlinear parameters, $t_c$, $\omega$, $\phi$ and $\alpha$; and three linear parameters, A, B and C. 

For simplicity, the linear parameters can be directly derived from the given nonlinear parameters by using least square method.

$$f_j = (t_c - t_j)^\alpha \ \ \ \ (2)$$

$$g_j = (t_c - t_j)^\alpha \cos[\omega ln(t_c - t_j) + \phi] \ \ \ \ (3)$$

where $t_j$ ($j = 1, 2, ... , J$) with $j$ as a time unit and $J$ is the total time units in an interval, the linear parameters can be calculated using the following equations:

$$
\begin{pmatrix}
A \\
B \\
C
\end{pmatrix}
=
[(V^T_{3 \times J} \dot V_{J \times 3})^{-1} (V^T_{3 \times J} \dot V_{J \times 1})]_{3 \times 1}
\ \ \ \ (4)$$

where 
$
V_{J \times 3} = 
\begin{pmatrix}
 1 & f_1 & g_1 \\ 
 1 & f_2 & g_2 \\ 
 . & . & . \\
 . & . & . \\
 . & . & . \\
 1 & f_J & g_J 
\end{pmatrix}_{J \times 3}
$
, which is a matrix with $J$ rows and 3 columns.


$
Y_{J \times 1} = 
\begin{pmatrix}
y(1) \\ 
y(2) \\ 
 . \\
 . \\
 . \\
y(J)
\end{pmatrix}_{J \times 1}
$
, which is a column vector with $J$ rows.

$$
\begin{pmatrix}
A \\
B \\
C
\end{pmatrix}
= 
\begin{pmatrix}
J & \sum_{j=1}^{J} f_j & \sum_{j=1}^{J} g_j \\
\sum_{j=1}^{J} f_j & \sum_{j=1}^{J} f_j^2 & \sum_{j=1}^{J} f_j g_j \\
\sum_{j=1}^{J} g_j & \sum_{j=1}^{J} f_j g_j & \sum_{j=1}^{J} g_j^2
\end{pmatrix}^{-1}
\begin{pmatrix}
\sum_{j=1}^{J} y(j) \\
\sum_{j=1}^{J} y(j) f_j \\
\sum_{j=1}^{J} y(j) g_j
\end{pmatrix}
\ \ \ \ (5)$$

This approach is proven to be very stable and able to yield good estimation of the linear parameters A, B and C.

linear parameters, $t_c$, $\omega$, $\phi$ and $\alpha$ proves to be more challenging. In fact, it can be proven that searching for the optimal values of the four nonlinear parameters in the LPPL model is an NP-hard problem.


In [1]:
import numpy as np
from numba import njit

@njit
def calculate_f_g(t_c, t, alpha, omega, phi):
    """
    Calculate the vectors f_j and g_j for a given t_c, alpha, omega, and phi.
    """
    f = (t_c - t) ** alpha
    g = f * np.cos(omega * np.log(t_c - t) + phi)
    return f, g

@njit
def calculate_linear_parameters(t, y, t_c, alpha, omega, phi):
    """
    Calculate the linear parameters A, B, and C using the LPPL model.
    """
    # Calculate f and g
    f, g = calculate_f_g(t_c, t, alpha, omega, phi)
    
    # Construct the V matrix
    V = np.column_stack((np.ones_like(f), f, g))
        
    # Compute the normal equations
    # (V_T @ V)^(-1) @ (V_T @ y)
    params = np.linalg.inv(V.T @ V) @ (V.T @ y)  # A, B, C
    
    return params  # [A, B, C]

# Exemple d'utilisation
if __name__ == "__main__":
    # Simulated data
    t = np.linspace(1, 100, 100)  # Time units
    y = np.random.rand(100)  # Random example prices

    # LPPL parameters
    t_c = 120
    alpha = 0.5
    omega = 10
    phi = 0.1

    # Calculate A, B, and C
    A, B, C = calculate_linear_parameters(t, y, t_c, alpha, omega, phi)
    print(f"A: {A}, B: {B}, C: {C}")

A: 0.40892615610661487, B: 0.00932241095608144, C: 0.007352676444861303


For this purpose, the multi-population genetic algorithm (MPGA) is employed to search for the optimal parameter values in the LPPL model. The MPGA is one of the most popular heuristic algorithms with the advantages of improving convergence rates and maintaining relatively low mean-square-errors.

So the framework of LPPL Model can be briefly summarized as follows:

**Step 1 :** 

A sample interval is selected for the prediction of a turning point in the future time horizon. The interval is at least four-year long after the previous major turning point in history.

**Step 2 :** 

The sample interval is further divided into over 100 subintervals to avoid the bias of specific sample interval and the impact of selecting a sample interval on the forecast result.

**Step 3 :** 

For each subinterval, the MPGA (multi-population genetic algorithm) is employed to optimize the parameters in the LPPL model. The optimized LPPL model is then used to predict a date in the future when a turning point will occur.

**Step 4 :** 

The Lomb periodogram analysis is conducted to statistically test the predicted turning points obtained by the LPPL models for all subintervals.

**Step 5 :** 

The turning points that are statistically validated by the Lomb periodogram analysis are considered as predicted turning points by the LPPL model

## Multi-Population Genetic Algorithm (MPGA)

the MPGA works on multiple populations with the objective to evaluate each subinterval. After the initial populations are produced, if the optimization criteria are not met, new populations are created and the search starts again. 

The first step of the MPGA is to generate multiple populations. Inspired by the biology concepts of crossover and mutation, each population in the MPGA can be muted into hundreds of chromosomes and each chromosome represents a feasible solution for the four nonlinear parameters in the LPPL model. Based on Eq. (1), the MPGA measures the fitness value of each of chromosomes (i.e., the four nonlinear parameters) generated from all the populations by computing the residual sum of squares (RSS) between the historical oil price at time t or y(t) and the results from the LPPL models:

$$
RSS_{m,n} = \sum_{t=1}^{J} \left( y(t) - A - B (t^{m,n}_c - t)^{\alpha^{m,n}} - C (t^{m,n}_c - t)^{\alpha^{m,n}} \cos \left[\omega^{m,n} \ln (t^{m,n}_c - t) + \phi^{m,n} \right] \right)^2
$$

where $RSS_{m,n}$ represents the fitness value (RSS) of the $n$-th chromosome in the $m$-th population; $t^{m,n}_c$, $\omega^{m,n}$, $\phi^{m,n}$ and $\alpha^{m,n}$ correspond to the $n$-th chromosome in the $m$-th population.

In [35]:
import numpy as np
from numba import njit

@njit
def predict_price(t, A, B, C, t_c, alpha, omega, phi):
    """
    Predict price using the LPPL model.
    
    Parameters:
    - t : ndarray : Time points
    - A, B, C : float : Linear parameters
    - t_c : float : Critical time
    - alpha : float : Power-law exponent
    - omega : float : Angular frequency
    - phi : float : Phase

    Returns:
    - predicted : ndarray : Predicted prices
    """
    # Calculate f and g
    f = (t_c - t) ** alpha
    g = f * np.cos(omega * np.log(t_c - t) + phi)
    
    # Predicted price using the LPPL model
    predicted = A + B * f + C * g
    
    return predicted

@njit
def calculate_rss(y, t, A, B, C, t_c, alpha, omega, phi):
    """
    Calculate the Residual Sum of Squares (RSS) for given parameters.
    
    Parameters:
    - y : ndarray : Observed data (prices)
    - t : ndarray : Time points
    - A, B, C : float : Linear parameters
    - t_c : float : Critical time
    - alpha : float : Power-law exponent
    - omega : float : Angular frequency
    - phi : float : Phase
    
    Returns:
    - rss : float : Residual Sum of Squares
    """
    # Predicted price using the LPPL model
    predicted = predict_price(t, A, B, C, t_c, alpha, omega, phi)
    
    # Residual Sum of Squares
    rss = np.sum((y - predicted) ** 2)
    
    return rss

@njit
def calculate_rss_population(y, t, linear_params, nonlinear_params):
    """
    Calculate the RSS for all chromosomes in a population.
    
    Parameters:
    - y : ndarray : Observed data (prices)
    - t : ndarray : Time points
    - linear_params : ndarray : Array of linear parameters [A, B, C]
    - nonlinear_params : ndarray : Array of nonlinear parameters [[t_c, alpha, omega, phi], ...]
    
    Returns:
    - rss_values : ndarray : RSS values for each chromosome
    """
    num_chromosomes = nonlinear_params.shape[0]
    rss_values = np.zeros(num_chromosomes)
    
    for i in range(num_chromosomes):
        t_c, alpha, omega, phi = nonlinear_params[i]
        A, B, C = linear_params[i]  # Assuming one set of linear params per chromosome
        rss_values[i] = calculate_rss(y, t, A, B, C, t_c, alpha, omega, phi)
    
    return rss_values

# Example Usage
if __name__ == "__main__":
    # Example data
    t = np.linspace(1, 100, 100)  # Time points
    y = np.random.rand(100)  # Observed data (random example)

    # Example parameters
    linear_params = np.array([
        [1.0, -0.5, 0.3],  # [A, B, C] for each chromosome
        [1.1, -0.4, 0.35]
    ])
    nonlinear_params = np.array([
        [120.0, 0.5, 10.0, 0.1],  # [t_c, alpha, omega, phi] for chromosome 1
        [130.0, 0.6, 8.0, 0.15]   # [t_c, alpha, omega, phi] for chromosome 2
    ])

    # Calculate RSS values
    rss_values = calculate_rss_population(y, t, linear_params, nonlinear_params)
    print("RSS values:", rss_values)

RSS values: [2024.01035365 2365.59910753]


For each generation, the minimum fitness value and its corresponding chromosome in each population ($\substack{\min \\ n} (RSS_{m,n})$ for each $m$), and the minimum fitness value in all populations ($\substack{\min \\ m,n} (RSS_{m,n})$) and its corresponding chromosome are recorded.

A new population will be generated by selection, crossover, mutation and re-inserting. Next, the chromosome whose fitness value is the smallest among the $m$-th population ($\substack{\min \\ n} (RSS_{m,n})$ for each $m$) will substitute the chromosome whose fitness value is the largest among the $m + 1$-th population ($\substack{\max \\ n} (RSS_{m,n})$ for each $m + 1$). 

This process is generally referred to as the immigration operation that combines individual populations into a unified entity. After the immigration operation, if the minimum fitness value in a new population ($\substack{\min \\ n} (RSS_{m,n})$) is less than the corresponding record (i.e., the minimum fitness value and its corresponding chromosome in this population), the records are updated; otherwise, the original records in this population remain. 

The record of the minimum fitness value of all populations ($\substack{\min \\ m,n} (RSS_{m,n})$) and the corresponding chromosome are processed in the same way. Finally, if the minimum fitness value of all populations does not change for a given number of consecutive iterations (set to 50 in our computation), or the total number of iterations reaches a given upper bound (set to 500 in our computation), the algorithm terminates and the latest minimum fitness value of all populations and its corresponding chromosome ($t_c$, $\omega$, $\phi$ and $\alpha$) are considered as the outputs of the MPGA. After obtaining the four nonlinear parameters optimized by the MPGA, the three linear parameters can be subsequently derived from Eq. (5). The corresponding LPPL model is thus established. 

In [21]:
import pandas as pd
import numpy as np

data = pd.read_csv('WTI_Spot_Price_daily.csv', skiprows=4)
y = np.array(data.iloc[:, 1])

In [36]:
import numpy as np
from numba import njit

@njit
def evaluate_population(y, t, linear_params, nonlinear_params):
    """
    Evaluate RSS for each chromosome in a population.
    """
    return calculate_rss_population(y, t, linear_params, nonlinear_params)

def initialize_population(pop_size, param_bounds):
    """
    Initialize a population with random chromosomes.
    """
    return np.random.uniform(
        low=[bounds[0] for bounds in param_bounds],
        high=[bounds[1] for bounds in param_bounds],
        size=(pop_size, len(param_bounds))
    )

def select_parents(population, fitness):
    """
    Perform selection based on fitness (e.g., tournament selection).
    """
    selected = []
    for _ in range(len(population)):
        i, j = np.random.choice(len(population), 2, replace=False)
        selected.append(population[i] if fitness[i] < fitness[j] else population[j])
    return np.array(selected)

def crossover(parents, crossover_rate):
    """
    Perform crossover to generate offspring.
    """
    offspring = []
    for i in range(0, len(parents), 2):
        if np.random.rand() < crossover_rate:
            crossover_point = np.random.randint(1, parents.shape[1])
            offspring.append(np.concatenate((parents[i, :crossover_point], parents[i + 1, crossover_point:])))
            offspring.append(np.concatenate((parents[i + 1, :crossover_point], parents[i, crossover_point:])))
        else:
            offspring.extend([parents[i], parents[i + 1]])
    return np.array(offspring)

def mutate(offspring, mutation_rate, param_bounds):
    """
    Apply mutation to offspring.
    """
    for i in range(len(offspring)):
        if np.random.rand() < mutation_rate:
            mutation_point = np.random.randint(offspring.shape[1])
            offspring[i, mutation_point] = np.random.uniform(
                param_bounds[mutation_point][0], param_bounds[mutation_point][1]
            )
    return offspring

def immigration_operation(populations, fitness_values):
    """
    Perform immigration between populations.
    """
    for m in range(len(populations) - 1):
        # Best chromosome in population m
        best_index = np.argmin(fitness_values[m])
        best_chromosome = populations[m][best_index]

        # Worst chromosome in population m+1
        worst_index = np.argmax(fitness_values[m + 1])
        populations[m + 1][worst_index] = best_chromosome
    return populations

# MPGA Parameters
pop_size = 50
num_populations = 5
param_bounds = [(100, 200), (0.1, 1), (1, 20), (0, 2 * np.pi)]  # [(t_c), (alpha), (omega), (phi)]
crossover_rate = 0.7
mutation_rate = 0.1
max_iterations = 500
stagnation_limit = 50

# Initialize populations
populations = [initialize_population(pop_size, param_bounds) for _ in range(num_populations)]
fitness_records = [np.inf] * num_populations
global_best_fitness = np.inf
global_best_chromosome = None

stagnation_counter = 0
iteration = 0

while iteration < max_iterations and stagnation_counter < stagnation_limit:
    fitness_values = []
    for m in range(num_populations):
        # Evaluate fitness for current population
        fitness = evaluate_population(y, t, linear_params, populations[m])
        fitness_values.append(fitness)

        # Update local best
        min_fitness = np.min(fitness)
        if min_fitness < fitness_records[m]:
            fitness_records[m] = min_fitness
            if min_fitness < global_best_fitness:
                global_best_fitness = min_fitness
                global_best_chromosome = populations[m][np.argmin(fitness)]

    # Check stagnation
    if global_best_fitness == fitness_records[0]:
        stagnation_counter += 1
    else:
        stagnation_counter = 0

    # Generate new populations with selection, crossover, mutation
    new_populations = []
    for m in range(num_populations):
        parents = select_parents(populations[m], fitness_values[m])
        offspring = crossover(parents, crossover_rate)
        new_population = mutate(offspring, mutation_rate, param_bounds)
        new_populations.append(new_population)

    # Perform immigration operation
    populations = immigration_operation(new_populations, fitness_values)
    iteration += 1

# Output the best solution
print("Best fitness:", global_best_fitness)
print("Best chromosome (t_c, alpha, omega, phi):", global_best_chromosome)

Best fitness: 9.380458988383156
Best chromosome (t_c, alpha, omega, phi): [166.5462943    0.18942639   1.7682024    3.72248052]


## Validation of turning point prediction using the Lomb periodogram analysis

In order to validate the turning points predicted by the LPPL models, it is necessary to determine whether the frequency ($\frac{\omega}{2\pi}$) obtained from the MPGA (note that $\omega$ is one of the optimal four nonlinear parameters) and the frequency of $y(t) − A − B(t_c − t)^\alpha$ are consistent. A test method called the Lomb periodogram analysis is adopted to detect periodic oscillations for $y(t) − A − B(t_c − t)^\alpha$ and calculate its frequency.

There is many existing methods but the Lomb periodogram analysis method is not only able to objectively evaluate the critical time tc or the turning point, but is also suitable for non-uniform time series.

The validation process starts with pre-setting the frequency series ($freq_i$) ($i = 1, 2, ... , M$) with $M$ as the length of the pre-given frequency series. For a given frequency $f$, the power spectral density $P(f)$ can be computed by the Lomb periodogram analysis as below:

$$
P(f) = \frac{1}{2\sigma^2} \left\{
\frac{\left( \sum_{j=1}^{J} (x_j - \bar{x}) \cos \left( 2\pi f (t_j - \tau) \right) \right)^2}{\sum_{j=1}^{J} \cos^2 \left( 2\pi f (t_j - \tau) \right)}
+
\frac{\left( \sum_{j=1}^{J} (x_j - \bar{x}) \sin \left( 2\pi f (t_j - \tau) \right) \right)^2}{\sum_{j=1}^{J} \sin^2 \left( 2\pi f (t_j - \tau) \right)}
\right\}
$$

where $$x_j = y_j - A - B(t_c - t_j)^\alpha$$ 

at times $t_j$ ($j = 1, 2, ..., J$); 

$$\bar{x} = \frac{1}{J} \sum_{j=1}^{J} x_j$$ 

and 

$$\sigma^2 = \frac{1}{J-1} \sum_{j=1}^{J}(x_j - \bar{x})^2$$ 

are respectively the mean and the variance of $x_j$. The time offset, $\tau$ is calculated by 

$$\tau = \frac{1}{4\pi f} \arctan \frac{\sum_{j=1}^{J}\sin(4\pi f t_j)}{\sum_{j=1}^{J}\cos(4\pi f t_j)}$$

Invalid values are then removed from the resulting $P(freq_i)$ series ($i = 1, 2, ... , M$). These invalid values include: 1) $P(f_{mpf})$ corresponding to the most probable frequency $(f_{mpf})$, which is caused by the random series, and inversely proportional to the length of the given frequency series ($J$), $f_{mpf} ≈ 1.5/J$; 2) the $P(freq_i)$ which is smaller than
the critical value that is calculated by $z = −ln(1 − (1 − p)^{1/M})$, at
the given statistical significance level of p. If there are no valid values
in the $P(freq_i)$ series, the Lomb periodogram rejects the conclusion.

In other words, the turning points predicted by the LPPL model are
not statistically valid. Otherwise the frequency corresponding to the
maximum valid values in the $P(freq_i)$ series is the result of the Lomb
periodogram test.

The Lomb periodogram analysis can be briefly summarized as
follows. First, an LPPL model corresponding to each subinterval is
obtained. 

The Lomb periodogram analysis then computes the frequency value based on the periodic oscillations of the LPPL model.
If the frequency value is close to the frequency $(\omega/2\pi)$ optimized
by the MPGA1, the Lomb periodogram analysis concludes that the
prediction by the LPPL model is effective. Otherwise, the predicted
turning points are invalid and are thus deleted. 

Eventually only the
turning points predicated by the LPPL models that pass the Lomb
periodogram test are recorded.

In [38]:
import numpy as np
from scipy.signal import lombscargle
from numba import njit

# Step 1: Load and Select Data
def load_data(file_path):
    """
    Load the historical price data from a CSV file or other source.
    """
    data = np.loadtxt(file_path, delimiter=',')  # Replace with appropriate loading logic
    return data

def select_interval(data, start_year, end_year):
    """
    Select a specific time interval for analysis.
    """
    return data[(data[:, 0] >= start_year) & (data[:, 0] <= end_year)]

# Step 2: Divide Interval into Subintervals
def divide_into_subintervals(data, num_subintervals):
    """
    Divide the selected interval into overlapping subintervals.
    """
    subintervals = []
    step_size = len(data) // num_subintervals
    for i in range(num_subintervals):
        start = i * step_size
        end = start + step_size
        subintervals.append(data[start:end])
    return subintervals

# Step 3: Apply MPGA to Optimize LPPL Parameters
def optimize_mpga(subinterval):
    """
    Use a Multi-Population Genetic Algorithm (MPGA) to optimize LPPL parameters.
    """
    # Placeholder: Implement MPGA logic (see MPGA code provided earlier)
    # Returns the optimized parameters: [t_c, alpha, omega, phi]
    pass

# Step 4: Validate with Lomb Periodogram
def validate_turning_points(predicted_tcs, time_series):
    """
    Use the Lomb-Scargle periodogram to validate predicted turning points.
    """
    # Convert turning points to frequencies for Lomb-Scargle
    frequencies = 1 / (predicted_tcs - np.min(predicted_tcs))
    power = lombscargle(time_series, frequencies, normalize=True)
    return power

# Step 5: Aggregate and Analyze Results
def aggregate_and_analyze(subintervals, predictions):
    """
    Aggregate predictions across all subintervals and analyze statistical significance.
    """
    # Identify clusters of predictions (group by proximity)
    validated_turning_points = []
    for t_c in predictions:
        # Apply clustering logic or statistical tests
        # Placeholder for validation
        validated_turning_points.append(t_c)
    return validated_turning_points

# Main Framework
def lppl_framework(file_path, start_year, end_year, num_subintervals):
    """
    Main function to execute the LPPL framework.
    """
    # Step 1: Load and Select Data
    data = load_data(file_path)
    selected_data = select_interval(data, start_year, end_year)

    # Step 2: Divide Data into Subintervals
    subintervals = divide_into_subintervals(selected_data, num_subintervals)

    # Step 3: Apply MPGA to Each Subinterval
    predictions = []
    for subinterval in subintervals:
        params = optimize_mpga(subinterval)
        predictions.append(params[0])  # Append predicted t_c

    # Step 4: Validate with Lomb Periodogram
    validated_points = validate_turning_points(predictions, selected_data[:, 0])

    # Step 5: Aggregate and Analyze Results
    turning_points = aggregate_and_analyze(subintervals, validated_points)
    return turning_points

# Example Usage
if __name__ == "__main__":
    file_path = "price_data.csv"  # Replace with your dataset
    start_year = 2004
    end_year = 2016
    num_subintervals = 120

    turning_points = lppl_framework(file_path, start_year, end_year, num_subintervals)
    print("Predicted Turning Points:", turning_points)

Unnamed: 0,Day,Cushing OK WTI Spot Price FOB Dollars per Barrel
0,12/9/2024,68.65
1,12/6/2024,68.58
2,12/5/2024,68.58
3,12/4/2024,68.81
4,12/3/2024,70.15
...,...,...
9801,01/8/1986,25.87
9802,01/7/1986,25.85
9803,01/6/1986,26.53
9804,01/3/1986,26.00


In [42]:
import pandas as pd
import numpy as np

data = pd.read_csv('WTI_Spot_Price_daily.csv', skiprows=4)
data.columns = ['Date', 'Price']
data['Date'] = pd.to_datetime(data['Date'])

data = data["Price"]


class LPPLModel:

    def __init__(self, data, optimizers):
        self.data = data
        self.optimizers = optimizers

        

In [None]:
from datetime import datetime, timedelta

def calculate_time_fraction(start_time, end_time):
    # Calculer la différence de temps
    time_difference = end_time - start_time
    
    # Convertir 3 semaines en jours
    three_weeks = timedelta(weeks=3)
    
    # Calculer la fraction de temps
    result = (time_difference.days() * 0.75) / three_weeks.total_seconds()
    
    return result

# Exemple d'utilisation
start_time = datetime(2023, 1, 1)
end_time = datetime(2024, 1, 22)
print(calculate_time_fraction(start_time, end_time))

TypeError: 'int' object is not callable

In [49]:
365 * 0.75 / 21

13.035714285714286

In [48]:
three_weeks = timedelta(weeks=3)

three_weeks.total_seconds()

1814400.0

### **Algorithm 1.** Pseudo-code of the MPGA to optimize nonlinear parameters.

1: Read historical data;

2: Set the start time and end time of the sample denoted as $time_{start}$ and $time_{end}$ respectively;

3: Predetermine the value ranges of the four nonlinear parameters, $t_c$, $\omega$, $\phi$ and $\alpha$ ($t_c$ is the day after sample to the future 10 years; $\omega$ is between 0 and 40; $\phi$ is between $0$ and $2\pi$; $\alpha$ is between $0.1$ and $0.9$);

4: Predetermine the number of all populations, and let it equal $10$;

5: Predetermine population size, and let it equal $100$;

6: Predetermine the up bound of the total loop denoted as $MaxGen$, and let it equal 500;

7: Predetermine the up bound that the minimum fitness value of all population does not change, denoted as $StopGen$, and let it equal $50$;

8: Predetermine the selection probability of each population, and let it equal 0.9;

9: Selecting the subintervals from the sample is as follows:

10: Predetermine moving step of start time of subintervals, denoted as $delta$, and let it equal the larger of $(time_{end}-time_{start}) \times 0.75 / (three weeks)$ and three weeks;

11: **for** $subinterval_{end} = time_{end}$ : - one week : $time_{end}-$ six weeks **do**

12: - **for** $subinterval_{start} = time_{start} : delta : time_{end} - (time_{end} - time_{start})/4$ **do**

13: - - Get the subinterval $[subinterval_{start} \ \ subinterval_{end}]$ from the sample $[time_{start} \ \ time_{end}]$, where $subinterval_{start}$ is the start time of this subinterval; $subinterval_{end}$ is the end time of this subinterval;

14: - - Randomly generate the crossover probability of each population between 0.001 and 0.05;

15: - - Randomly generate the mutation probability of each population between 0.001 and 0.05;

16: - - **for** each population **do**

17: - - - Initialize one population, which includes 100 chromosomes;

18: - - - Calculate the fitness value of each chromosome, according to Equation 6;

19: - - - Find the minimum fitness value of current population;

20: - - - Record this minimum fitness value, denoted as $bestObjV$, and its corresponding chromosome;

21: - - **end for**

22: - - Find and record the minimum fitness value of all populations;

23: - - Add two counters, one is used to record the current number of loop, denoted as $gen$, and the other is used to record the number that the minimum fitness value of all populations does not change, denoted as $gen0$. Let $gen$ equal 1, and let $gen0$ equal 0;

24: - - **while** $gen0 < StopGen$ and $gen <= MaxGen$ do

25: - - - **for** each population **do**

26: - - - - Perform the selection, crossover, and mutation and reinserting operation;

27: - - - **end for**

28: - - - Perform immigration operation;

29: - - - Find the minimum fitness value of each population and their corresponding chromosomes in the current loop;

30: - - - Find the minimum fitness value of all populations in the current loop, denoted as $newbestObjV$;

31: - - - **if** $newbestObjV < bestObjV$ **then**

32: - - - - $bestObjV = newbestObjV$;

33: - - - - $gen0 = 0$;

34: - - - **else**

35: - - - - $gen0 = gen0 + 1$

36: - - - **end if**

37: - - - $gen = gen + 1$;

38: - - **end while**

39: - - Save $bestObjV$ and its corresponding chromosome under the current subinterval;

40: - **end for**

41: **end for**

In [7]:
import numpy as np
from numba import njit

# ---- Paramètres prédéfinis ----
time_start = 2000.0    # année de début (exemple)
time_end = 2010.0      # année de fin (exemple)
num_populations = 10
population_size = 100
MaxGen = 500
StopGen = 50
selection_probability = 0.9

# Paramètres non linéaires : [t_c, omega, phi, alpha]
param_bounds = {
    "t_c": (time_end, time_end + 365*10),  # 10 ans après la fin du sample en jours
    "omega": (0, 40),
    "phi": (0, 2*np.pi),
    "alpha": (0.1, 0.9)
}

@njit
def predict_price(t, A, B, C, t_c, alpha, omega, phi):
    """
    Predict price using the LPPL model.
    
    Parameters:
    - t : ndarray : Time points
    - A, B, C : float : Linear parameters
    - t_c : float : Critical time
    - alpha : float : Power-law exponent
    - omega : float : Angular frequency
    - phi : float : Phase

    Returns:
    - predicted : ndarray : Predicted prices
    """
    # Calculate f and g
    f = (t_c - t) ** alpha
    g = f * np.cos(omega * np.log(t_c - t) + phi)
    
    # Predicted price using the LPPL model
    predicted = A + B * f + C * g
    
    return predicted

@njit
def calculate_rss(y, t, A, B, C, t_c, alpha, omega, phi):
    """
    Calculate the Residual Sum of Squares (RSS) for given parameters.
    
    Parameters:
    - y : ndarray : Observed data (prices)
    - t : ndarray : Time points
    - A, B, C : float : Linear parameters
    - t_c : float : Critical time
    - alpha : float : Power-law exponent
    - omega : float : Angular frequency
    - phi : float : Phase
    
    Returns:
    - rss : float : Residual Sum of Squares
    """
    # Predicted price using the LPPL model
    predicted = predict_price(t, A, B, C, t_c, alpha, omega, phi)
    
    # Residual Sum of Squares
    rss = np.sum((y - predicted) ** 2)
    
    return rss

@njit
def calculate_rss_population(y, t, linear_params, nonlinear_params):
    """
    Calculate the RSS for all chromosomes in a population.
    
    Parameters:
    - y : ndarray : Observed data (prices)
    - t : ndarray : Time points
    - linear_params : ndarray : Array of linear parameters [A, B, C]
    - nonlinear_params : ndarray : Array of nonlinear parameters [[t_c, alpha, omega, phi], ...]
    
    Returns:
    - rss_values : ndarray : RSS values for each chromosome
    """
    num_chromosomes = nonlinear_params.shape[0]
    rss_values = np.zeros(num_chromosomes)
    
    for i in range(num_chromosomes):
        t_c, alpha, omega, phi = nonlinear_params[i]
        A, B, C = linear_params[i]  # Assuming one set of linear params per chromosome
        rss_values[i] = calculate_rss(y, t, A, B, C, t_c, alpha, omega, phi)
    
    return rss_values

@njit
def calculate_f_g(t_c, t, alpha, omega, phi):
    """
    Calculate the vectors f_j and g_j for a given t_c, alpha, omega, and phi.
    """
    f = (t_c - t) ** alpha
    g = f * np.cos(omega * np.log(t_c - t) + phi)
    return f, g

@njit
def calculate_linear_parameters(t, y, t_c, alpha, omega, phi):
    """
    Calculate the linear parameters A, B, and C using the LPPL model.
    """
    # Calculate f and g
    f, g = calculate_f_g(t_c, t, alpha, omega, phi)
    
    # Construct the V matrix
    V = np.column_stack((np.ones_like(f), f, g))
        
    # Compute the normal equations
    # (V_T @ V)^(-1) @ (V_T @ y)
    params = np.linalg.inv(V.T @ V) @ (V.T @ y)  # A, B, C
    
    return params  # [A, B, C]

def load_data(file_path):
    # Charger les données : [temps, prix]
    data = np.loadtxt(file_path, delimiter=',')
    return data

def select_sample(data, time_start, time_end):
    # Sélectionne l’échantillon principal
    mask = (data[:,0] >= time_start) & (data[:,0] <= time_end)
    return data[mask]

def equation_6_fitness(chromosome, sub_data):
    """
    Calcule le RSS (fitness) pour un chromosome.
    chromosome = [t_c, omega, phi, alpha]
    LPPL: y(t) = A + B(t_c - t)^alpha + C(t_c - t)^alpha cos[omega ln(t_c - t) + phi]
    A, B, C sont déterminés par Eq (5) après avoir fixé t_c, omega, phi, alpha.
    Ici, on suppose que A,B,C sont optimisés en interne ou fournis via une fonction séparée.
    """
    # Implémentation simplifiée. Il faudra :
    # 1. Calculer f_j, g_j
    # 2. Résoudre Eq(5) pour obtenir A,B,C
    # 3. Calculer le RSS
    # Code placeholder :
    # TODO: Implémenter réellement cette fonction en fonction du LPPL complet.

    

    t_c, omega, phi, alpha = chromosome

    A, B, C = calculate_linear_parameters(t, y, t_c, alpha, omega, phi)



    return np.random.rand()  # Placeholder, retourner une valeur aléatoire temporaire

def initialize_population(param_bounds, population_size):
    low = [v[0] for v in param_bounds.values()]
    high = [v[1] for v in param_bounds.values()]
    return np.random.uniform(low, high, size=(population_size, len(param_bounds)))

def selection(population, fitness):
    # Sélection par tournoi simple
    selected = []
    for _ in range(len(population)):
        i, j = np.random.choice(len(population), 2, replace=False)
        selected.append(population[i] if fitness[i] < fitness[j] else population[j])
    return np.array(selected)

def crossover(parents, prob):
    offspring = []
    for i in range(0, len(parents), 2):
        p1, p2 = parents[i], parents[i+1]
        if np.random.rand() < prob:
            cp = np.random.randint(1, len(p1))
            child1 = np.concatenate((p1[:cp], p2[cp:]))
            child2 = np.concatenate((p2[:cp], p1[cp:]))
            offspring += [child1, child2]
        else:
            offspring += [p1, p2]
    return np.array(offspring)

def mutate(offspring, prob, param_bounds):
    keys = list(param_bounds.keys())
    for i in range(len(offspring)):
        if np.random.rand() < prob:
            mp = np.random.randint(len(param_bounds))
            low, high = param_bounds[keys[mp]]
            offspring[i, mp] = np.random.uniform(low, high)
    return offspring

def immigration_operation(populations, fitness_values):
    # Meilleur de la pop m remplace le pire de la pop m+1
    for m in range(len(populations) - 1):
        f = fitness_values[m]
        best_idx = np.argmin(f)
        best_chrom = populations[m][best_idx]

        # Calculer fitness pour population m+1
        f_next = fitness_values[m+1]
        worst_idx = np.argmax(f_next)
        populations[m+1][worst_idx] = best_chrom
    return populations

def generate_subintervals(data, time_start, time_end):
    """
    Génère les sous-intervalles suivant la logique du pseudo-code.
    - On calcule delta comme le max entre ((time_end - time_start)*0.75/21 jours) et 21 jours.
    - subinterval_end varie de time_end à time_end - 6 semaines (42 jours) par pas de 7 jours.
    - subinterval_start varie de time_start à time_end-(time_end-time_start)/4 par pas de delta.
    """
    three_weeks = 21.0  # jours
    six_weeks = 42.0
    one_week = 7.0
    total_days = (time_end - time_start)
    delta = max((total_days * 0.75) / three_weeks, three_weeks)

    subintervals = []
    # Pour simplifier, on considère data[:,0] en jours continus
    for sub_end in np.arange(time_end, time_end - six_weeks, -one_week):
        for sub_start in np.arange(time_start, time_end - total_days/4, delta):
            mask = (data[:,0] >= sub_start) & (data[:,0] <= sub_end)
            sub_data = data[mask]
            if len(sub_data) > 0:
                subintervals.append((sub_start, sub_end, sub_data))
    return subintervals

def mpga_optimization(file_path):
    # Etape 1 & 2
    data = load_data(file_path)
    sample = select_sample(data, time_start, time_end)

    # Génération des sous-intervalles
    subintervals = generate_subintervals(sample, time_start, time_end)

    # Paramètres globaux
    best_solutions = []

    # Boucle sur les sous-intervalles
    for (sub_start, sub_end, sub_data) in subintervals:
        # Définir le crossover et mutation probability par population
        crossover_prob = np.random.uniform(0.001, 0.05, size=num_populations)
        mutation_prob = np.random.uniform(0.001, 0.05, size=num_populations)

        # Initialiser les populations
        populations = [initialize_population(param_bounds, population_size) for _ in range(num_populations)]

        # Calculer fitness initial
        fitness_values = []
        bestObjV = np.inf
        bestChrom = None

        for m in range(num_populations):
            fit = np.array([equation_6_fitness(ch, sub_data) for ch in populations[m]])
            fitness_values.append(fit)
            # Minimum fitness pour cette pop
            local_min = np.min(fit)
            if local_min < bestObjV:
                bestObjV = local_min
                bestChrom = populations[m][np.argmin(fit)]

        # gen, gen0
        gen = 1
        gen0 = 0

        # Boucle du MPGA
        while gen0 < StopGen and gen <= MaxGen:
            # Opérations génétiques
            new_populations = []
            new_fitness_values = []

            for m in range(num_populations):
                # Sélection
                fit = fitness_values[m]
                selected = selection(populations[m], fit)
                # Crossover
                offspring = crossover(selected, crossover_prob[m])
                # Mutation
                mutated = mutate(offspring, mutation_prob[m], param_bounds)
                new_populations.append(mutated)

            # Immigration
            populations = immigration_operation(new_populations, fitness_values)

            # Recalculer fitness
            fitness_values = []
            for m in range(num_populations):
                fit = np.array([equation_6_fitness(ch, sub_data) for ch in populations[m]])
                fitness_values.append(fit)

            # Trouver le meilleur global du loop courant
            current_best = np.inf
            for m in range(num_populations):
                local_min = np.min(fitness_values[m])
                if local_min < current_best:
                    current_best = local_min

            if current_best < bestObjV:
                bestObjV = current_best
                gen0 = 0
            else:
                gen0 += 1

            gen += 1

        # Sauvegarder le résultat pour ce sous-intervalle
        best_solutions.append((sub_start, sub_end, bestObjV, bestChrom))

    return best_solutions

# Exemple d'utilisation
if __name__ == "__main__":
    file_path = "historical_data.csv"  # Adapté à votre contexte
    results = mpga_optimization(file_path)
    # "results" contiendra une liste de tuples: (sub_start, sub_end, bestObjV, bestChrom)
    print(results)

FileNotFoundError: historical_data.csv not found.