<a href="https://colab.research.google.com/github/adomfosugit/Oil-and-Gas-Decline-Curve-Analysis-and-WOR/blob/main/WOR_Analysis_Python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.optimize import curve_fit


class DeclineCurveAnalysis:
    def __init__(self, model_type="exponential"):
        """
        Initialize Decline Curve Analysis.
        Supported models: 'exponential', 'hyperbolic', 'harmonic'
        """
        self.model_type = model_type
        self.popt = None
        self.pcov = None
        self.r_squared = None
        self.qi, self.di, self.b = None, None, None

    # --- Arps Decline Models ---
    @staticmethod
    def exponential(t, qi, di):
        return qi * np.exp(-di * t)

    @staticmethod
    def hyperbolic(t, qi, di, b):
        return qi / ((1 + b * di * t) ** (1 / b))

    @staticmethod
    def harmonic(t, qi, di):
        return qi / (1 + di * t)

    def get_model(self):
        if self.model_type == "exponential":
            return self.exponential
        elif self.model_type == "hyperbolic":
            return self.hyperbolic
        elif self.model_type == "harmonic":
            return self.harmonic
        else:
            raise ValueError("Unsupported model type")

    # --- Fit Model ---
    def fit(self, time_data: np.ndarray, rate_data: np.ndarray):
        """
        Fit decline curve model to production data.
        Supports numeric or datetime inputs.
        """
        # Handle datetime inputs
        if np.issubdtype(type(time_data[0]), np.datetime64) or isinstance(time_data[0], pd.Timestamp):
            start_date = pd.to_datetime(time_data[0])
            time_data = (pd.to_datetime(time_data) - start_date).days.to_numpy()

        model = self.get_model()

        if self.model_type == "exponential":
            p0 = [rate_data[0], 0.01]  # qi, di
            bounds = (0, [np.inf, 1])
        elif self.model_type == "hyperbolic":
            p0 = [rate_data[0], 0.01, 0.5]  # qi, di, b
            bounds = (0, [np.inf, 1, 2])
        elif self.model_type == "harmonic":
            p0 = [rate_data[0], 0.01]  # qi, di
            bounds = (0, [np.inf, 1])

        self.popt, self.pcov = curve_fit(model, time_data, rate_data, p0=p0, bounds=bounds, maxfev=10000)

        # Store fitted parameters
        if self.model_type == "exponential":
            self.qi, self.di = self.popt
            self.b = 0
        elif self.model_type == "harmonic":
            self.qi, self.di = self.popt
            self.b = 1
        elif self.model_type == "hyperbolic":
            self.qi, self.di, self.b = self.popt

        # Compute R²
        residuals = rate_data - self.forecast(time_data)
        ss_res = np.sum(residuals ** 2)
        ss_tot = np.sum((rate_data - np.mean(rate_data)) ** 2)
        self.r_squared = 1 - (ss_res / ss_tot)

        return self.popt

    # --- Auto-fit Best Model ---
    def fit_best_model(self, time_data, rate_data):
        """
        Fit exponential, harmonic, and hyperbolic models,
        then select the best based on R².
        """
        results = {}
        models = ["exponential", "harmonic", "hyperbolic"]

        for model in models:
            temp = DeclineCurveAnalysis(model_type=model)
            try:
                temp.fit(time_data, rate_data)
                results[model] = {
                    "r2": temp.r_squared,
                    "popt": temp.popt,
                    "pcov": temp.pcov,
                    "qi": temp.qi,
                    "di": temp.di,
                    "b": temp.b
                }
            except Exception:
                continue

        if not results:
            raise ValueError("No model could be fitted to the data.")

        best_model = max(results, key=lambda k: results[k]["r2"])
        best = results[best_model]

        self.model_type = best_model
        self.popt = best["popt"]
        self.pcov = best["pcov"]
        self.qi = best["qi"]
        self.di = best["di"]
        self.b = best["b"]
        self.r_squared = best["r2"]

        return best_model, self.r_squared, self.popt

    # --- Forecast ---
    def forecast(self, time_forecast: np.ndarray):
        if self.popt is None:
            raise ValueError("Model must be fitted before forecasting")
        return self.get_model()(time_forecast, *self.popt)

    # --- EUR Calculation ---
    def calculate_eur(self, economic_limit: float = 50, max_time: int = 600):
        time_forecast = np.linspace(0, max_time, 1000)
        rates = self.forecast(time_forecast)
        mask = rates >= economic_limit
        if not np.any(mask):
            return 0
        return np.trapz(rates[mask], time_forecast[mask])

    # --- Detailed Plot ---
    def create_detailed_plot(self, time_data: np.ndarray, rate_data: np.ndarray,
                             economic_limit: float = 50) -> go.Figure:
        """
        Create a detailed 2x2 Plotly visualization.
        Supports both numeric time and datetime inputs.
        """
        if self.popt is None:
            raise ValueError("Model must be fitted before creating detailed plot")

        # Handle datetime
        if np.issubdtype(type(time_data[0]), np.datetime64) or isinstance(time_data[0], pd.Timestamp):
            start_date = pd.to_datetime(time_data[0])
            time_days = (pd.to_datetime(time_data) - start_date).days.to_numpy()
            forecast_days = np.linspace(0, max(time_days) * 2, 200)
            forecast_dates = start_date + pd.to_timedelta(forecast_days, unit='D')
            x_hist, x_forecast = time_data, forecast_dates
            use_dates = True
        else:
            time_days = time_data
            forecast_days = np.linspace(0, max(time_data) * 2, 200)
            x_hist, x_forecast = time_days, forecast_days
            use_dates = False

        forecast_rates = self.forecast(forecast_days)
        eur = self.calculate_eur(economic_limit)

        # Economic limit
        t_el = None
        if any(forecast_rates <= economic_limit):
            t_el_idx = np.where(forecast_rates <= economic_limit)[0][0]
            t_el = x_forecast[t_el_idx]

        # Subplots
        fig = make_subplots(
            rows=2, cols=2,
            subplot_titles=("Production Decline", "Cumulative Production",
                            "Residual Analysis", "Parameter Sensitivity"),
            specs=[[{}, {}], [{}, {}]]
        )

        # Production decline
        fig.add_trace(go.Scatter(x=x_hist, y=rate_data, mode='markers',
                                 name='Historical Data', marker=dict(color='blue', size=6)), row=1, col=1)
        fig.add_trace(go.Scatter(x=x_forecast, y=forecast_rates, mode='lines',
                                 name='Forecast', line=dict(color='red', width=2)), row=1, col=1)

        if t_el is not None:
            fig.add_trace(go.Scatter(
                x=[t_el, t_el], y=[0, economic_limit * 1.1],
                mode='lines', name='Economic Limit',
                line=dict(color='green', width=2, dash='dash')
            ), row=1, col=1)

        # Cumulative production
        cumulative_hist = np.cumsum(rate_data) * (time_days[1] - time_days[0]) if len(time_days) > 1 else [0]
        cumulative_forecast = np.cumsum(forecast_rates) * (forecast_days[1] - forecast_days[0])

        fig.add_trace(go.Scatter(x=x_hist, y=cumulative_hist, mode='markers',
                                 name='Historical Cumulative', marker=dict(color='blue', size=6)), row=1, col=2)
        fig.add_trace(go.Scatter(x=x_forecast, y=cumulative_forecast, mode='lines',
                                 name='Forecast Cumulative', line=dict(color='purple', width=2)), row=1, col=2)

        # EUR annotation
        fig.add_annotation(xref="x2", yref="y2", x=x_forecast[-1], y=cumulative_forecast[-1] * 0.8,
                           text=f"EUR: {eur:.0f} bbl", showarrow=False,
                           bgcolor="rgba(255,255,255,0.8)", bordercolor="rgba(0,0,0,0.2)", borderwidth=1)

        # Residuals
        predicted_rates = self.forecast(time_days)
        residuals = rate_data - predicted_rates
        fig.add_trace(go.Scatter(x=x_hist, y=residuals, mode='markers', name='Residuals',
                                 marker=dict(color='orange', size=6)), row=2, col=1)
        fig.add_shape(type="line", x0=min(x_hist), x1=max(x_hist),
                      y0=0, y1=0, line=dict(color="black", dash="dash"), xref="x3", yref="y3")

        # Parameter sensitivity
        variations = {
            "qi +10%": [self.popt.copy(), "red"],
            "di +10%": [self.popt.copy(), "green"],
            "b +10%": [self.popt.copy(), "blue"]
        }

        for label, (params, color) in variations.items():
            idx = {"qi": 0, "di": 1, "b": 2 if self.model_type == "hyperbolic" else None}
            if "qi" in label:
                params[idx["qi"]] *= 1.1
            elif "di" in label:
                params[idx["di"]] *= 1.1
            elif "b" in label and idx["b"] is not None:
                params[idx["b"]] *= 1.1

            rates_var = self.get_model()(forecast_days, *params)
            fig.add_trace(go.Scatter(x=x_forecast, y=rates_var, mode="lines",
                                     name=label, line=dict(color=color, dash="dot")), row=2, col=2)

        # Layout
        fig.update_layout(title_text="Detailed Decline Curve Analysis",
                          height=900, showlegend=True, template="plotly_white")

        # Axes labels
        x_label = "Date" if use_dates else "Time (days)"
        fig.update_xaxes(title_text=x_label, row=1, col=1)
        fig.update_yaxes(title_text="Rate (bbl/day)", row=1, col=1)
        fig.update_xaxes(title_text=x_label, row=1, col=2)
        fig.update_yaxes(title_text="Cumulative (bbl)", row=1, col=2)
        fig.update_xaxes(title_text=x_label, row=2, col=1)
        fig.update_yaxes(title_text="Residuals", row=2, col=1)
        fig.update_xaxes(title_text=x_label, row=2, col=2)
        fig.update_yaxes(title_text="Rate Sensitivity", row=2, col=2)

        return fig


In [2]:
import numpy as np
import pandas as pd


np.random.seed(42)
q_i = 1500
D = 0.0015
economic_limit = 50


dates = pd.date_range(start="2020-01-01", end="2024-12-31", freq="D")
t_days = np.arange(len(dates))  # numeric time in days

true_production = q_i * np.exp(-D * t_days)

noise = np.random.normal(loc=0, scale=0.05, size=len(dates))
production = true_production * (1 + noise)
water  = noise  + q_i * np.exp(D * t_days)
production = np.clip(production, economic_limit, None)

data = pd.DataFrame({
    "Date": dates,
    "Production": production.round(2),
    "Water ": water.round(2)
})

data



Unnamed: 0,Date,Production,Water
0,2020-01-01,1537.25,1500.02
1,2020-01-02,1487.40,1502.24
2,2020-01-03,1543.94,1504.54
3,2020-01-04,1606.98,1506.84
4,2020-01-05,1473.57,1509.02
...,...,...,...
1822,2024-12-27,96.44,23068.42
1823,2024-12-28,94.22,23103.03
1824,2024-12-29,98.06,23137.75
1825,2024-12-30,99.24,23172.50


In [3]:
data = data.sort_values('Date')
start_date = data['Date'].iloc[0]
data['Time_days'] = (data['Date'] - start_date).dt.days


# Extract arrays for fitting
time_data = data['Time_days'].to_numpy()
rate_data = data['Production'].to_numpy()
time_data

array([   0,    1,    2, ..., 1824, 1825, 1826])

In [4]:
import plotly.graph_objects as go

# Ensure proper datetime
data['Date'] = pd.to_datetime(data['Date'], dayfirst=True)

fig = go.Figure()

# --- Primary Y-axis (Oil) ---
fig.add_trace(go.Scatter(
    x=data['Date'],
    y=data['Production'],
    mode='lines+markers',
    name='Oil Rate (BOPD)',
    line=dict(color='green'),
    yaxis='y1'
))

# --- Secondary Y-axis (Water) ---
fig.add_trace(go.Scatter(
    x=data['Date'],
    y=data['Water '],
    mode='lines+markers',
    name='Water Rate (BWPD)',
    line=dict(color='blue', dash='dot'),
    yaxis='y2'
))

# --- Layout ---
fig.update_layout(
    title='Production Trends Over Time',
    xaxis=dict(title='Date'),
    yaxis=dict(
        title='Oil Rate (BOPD)',
        titlefont=dict(color='green'),
        tickfont=dict(color='green')
    ),
    yaxis2=dict(
        title='Water Rate (BWPD)',
        titlefont=dict(color='blue'),
        tickfont=dict(color='blue'),
        anchor='x',
        overlaying='y',
        side='right'
    ),
    template='plotly_white',
    hovermode='x unified',
    legend=dict(x=0.02, y=0.98, bordercolor='gray', borderwidth=0.5)
)

fig.show()



In [5]:
# numeric days for fitting
import plotly.io as pio
time_data_days = (data['Date'] - data['Date'].iloc[0]).dt.days.to_numpy()
date_array = start_date + pd.to_timedelta(time_data_days, unit='D')
dca = DeclineCurveAnalysis(model_type="exponential")
fitted_params = dca.fit_best_model(time_data_days, rate_data)
fig = dca.create_detailed_plot(date_array, rate_data,economic_limit=50,)
pio.show(fig)


`trapz` is deprecated. Use `trapezoid` instead, or one of the numerical integration functions in `scipy.integrate`.


Discarding nonzero nanoseconds in conversion.



In [6]:
# Display fittefd params
print(fitted_params)
dca.r_squared

('hyperbolic', np.float64(0.9934570563687062), array([1.49702127e+03, 1.49451936e-03, 8.22743789e-14]))


np.float64(0.9934570563687062)

In [7]:
#WOR analysis with Python

In [8]:

import numpy as np
import pandas as pd
import plotly.graph_objects as go
from sklearn.linear_model import LinearRegression

class WORAnalysis:
    def __init__(self, data: pd.DataFrame):
        """
        WOR analysis class.
        Expects columns: 'Date', 'Production', 'Water ' (water produced).
        Optionally: 'Cumulative Oil'
        """
        self.data = data.copy()
        self.model = None
        self.fit_indices = None
        self.slope = None
        self.intercept = None

        # Ensure correct dtypes
        self.data['Date'] = pd.to_datetime(self.data['Date'], errors='coerce')
        self.data = self.data.sort_values('Date').reset_index(drop=True)

        # Compute cumulative oil if missing
        if 'Cumulative Oil' not in self.data.columns:
            self.data['Cumulative Oil'] = self.compute_cumulative_oil()
            print("Computed 'Cumulative Oil' from oil rate and time.")

        # Compute WOR and log(WOR)
        self.data['WOR'] = self.data['Water '] / self.data['Production']
        self.data['logWOR'] = np.log10(self.data['WOR'])

    def compute_cumulative_oil(self):
        """
        Compute cumulative oil from oil rate and date.
        Assumes oil rate is in barrels/day or barrels/month (time-based integration).
        """
        df = self.data.copy()

        # Compute time difference in days
        df['Days'] = df['Date'].diff().dt.days.fillna(0)

        # Assume oil rate is in barrels/day (if monthly data, Days ≈ 30 per step)
        df['Incremental Oil'] = df['Production'] * df['Days']

        # Compute cumulative oil (Np)
        cumulative_oil = df['Incremental Oil'].cumsum()

        return cumulative_oil

    def fit_trend(self, fit_range=None):
        """
        Fit log(WOR) vs Cumulative Oil within a specified range.

        fit_range:
            - tuple of (start_index, end_index) or
            - tuple of (start_date, end_date) as strings
        """
        df = self.data.copy()

        # Determine fit subset
        if fit_range is not None:
            start, end = fit_range

            # If given as dates
            if isinstance(start, str) and isinstance(end, str):
                mask = (df['Date'] >= pd.to_datetime(start)) & (df['Date'] <= pd.to_datetime(end))
            else:
                # Assume numeric indices
                mask = np.arange(len(df))
                mask = (mask >= start) & (mask <= end)

            df = df.loc[mask]
            self.fit_indices = df.index
        else:
            self.fit_indices = df.index

        # Drop any missing values
        df = df.dropna(subset=['Cumulative Oil', 'logWOR'])

        # Linear regression: log(WOR) = a + b * Np
        X = df[['Cumulative Oil']].to_numpy()
        y = df['logWOR'].to_numpy()

        model = LinearRegression().fit(X, y)
        self.model = model
        self.slope = model.coef_[0]
        self.intercept = model.intercept_

        return model

    def plot_results(self):
        """
        Plot WOR trend and fitted line.
        """
        df = self.data.copy()
        fig = go.Figure()

        # Scatter of all data
        fig.add_trace(go.Scatter(
            x=df['Cumulative Oil'], y=df['logWOR'],
            mode='markers', name='Data',
            marker=dict(color='blue', size=6)
        ))

        # Add fitted line
        if self.model is not None:
            fit_df = df.loc[self.fit_indices]
            x_fit = fit_df['Cumulative Oil']
            y_pred = self.model.predict(x_fit.to_numpy().reshape(-1, 1))

            fig.add_trace(go.Scatter(
                x=x_fit, y=y_pred,
                mode='lines', name='Fitted Trend',
                line=dict(color='red', width=2)
            ))

            # Add regression equation text
            eqn_text = f"log(WOR) = {self.slope:.4f} * Np + {self.intercept:.4f}"
            fig.add_annotation(
                x=0.05, y=0.95, xref="paper", yref="paper",
                text=eqn_text,
                showarrow=False,
                font=dict(size=14, color="black"),
                bgcolor="white"
            )

        fig.update_layout(
            title='WOR Diagnostic Plot (log(WOR) vs Cumulative Oil)',
            xaxis_title='Cumulative Oil Produced (bbl)',
            yaxis_title='log10(WOR)',
            template='plotly_white',
            height=600
        )

        return fig

    def __str__(self):
        if self.model is not None:
            return f"Regression Equation: log(WOR) = {self.slope:.4f} * Np + {self.intercept:.4f}"
        else:
            return "Model not yet fitted."



In [9]:
wor = WORAnalysis(data)
wor.data = wor.data.replace([np.inf, -np.inf], np.nan).dropna(subset=['logWOR', 'Cumulative Oil'])
wor.fit_trend()   #fit_range=('2023-01-01', '2024-01-01'))
fig = wor.plot_results()
fig.show()


Computed 'Cumulative Oil' from oil rate and time.
