# Table of Contents

- [Pre-Processing](#Pre)
    - [Imports](#Imports)
    - [Dataset Loading](#Loading)
    - [Data Examination](#Exam)
    - [Column Pruning](#Column)
    - [Temporal Filtering](#Temp)
    - [Texas Boundary Validation](#Texas)
    - [Calendar Features](#Calendar)
    - [Backfill Features](#Back)
    - [Rolling Statistics](#Rolling)
    - [Categorical Encoding](#Cat)
    - [Machine Learning Preparation](#Prep)
- [Machine Learning](#Learn)
    - [Retry for Speedup](#Retry)
- [Making Predictions](#Pred)
- [Forecasting and Final Calculations](#Conclusions)
    - [State Totals and Uncertainty](#State)
- [Results](#Results)

## Pre-Processing <a id="Pre"></a>

### Imports <a id="Imports"></a>

In [1]:
# Core data processing
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Geospatial processing
import geopandas as gpd
from shapely.geometry import Point

# Machine learning
import xgboost as xgb
from sklearn.model_selection import TimeSeriesSplit, RandomizedSearchCV
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
from sklearn.preprocessing import LabelEncoder

# GPU acceleration (install with: pip install cudf-cu11 cuml-cu11)
GPU_AVAILABLE = True

# Visualization and progress
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import pickle
import joblib

# Set random seed for reproducibility
np.random.seed(42)

In [2]:
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)

### Dataset Loading <a id="Loading"></a>

One version of load_parquet_datasets for most of the notebook that handles using pandas:

In [None]:
def load_parquet_datasets(file_paths):
    """
    Load multiple parquet files with error handling and initial inspection.
    
    Args:
        file_paths (dict): Dictionary with dataset names as keys and file paths as values
        
    Returns:
        dict: Dictionary containing loaded DataFrames
    """
    datasets = {}
    
    for name, path in file_paths.items():
        try:
            df = pd.read_parquet(path)
            print(f"  Successfully loaded {name} with pandas")
            print(f"  Shape: {df.shape}")
            datasets[name] = df
            
        except Exception as e:
            print(f"✗ Failed to load {name}: {str(e)}")
            
    return datasets

Another version of load_parquet_datasets which handles using dask for data that's difficult to hold in memory:

In [3]:
def load_parquet_datasets(file_paths, use_dask=False, batch_size=1_000_000):
    """
    Load multiple parquet files with memory-friendly options.
    """
    import pandas as pd
    import pyarrow.parquet as pq
    datasets = {}
    for name, path in file_paths.items():
        try:
            if use_dask:
                import dask.dataframe as dd
                df = dd.read_parquet(path)
                print(f"  Loaded {name} as Dask DataFrame")
            else:
                # Load just the first batch for inspection
                table = pq.ParquetFile(path)
                batch = next(table.iter_batches(batch_size=batch_size))
                df = batch.to_pandas()
                print(f"  Loaded first {len(df)} rows of {name} for inspection")
            print(f"  Shape: {df.shape}")
            datasets[name] = df
        except Exception as e:
            print(f"✗ Failed to load {name}: {str(e)}")
    return datasets

In [4]:
new_file_paths = {
    'joined_data': '../data/feature_data_13.parquet'
}

new_file_paths

{'joined_data': '../data/feature_data_13.parquet'}

In [5]:
joined_data = load_parquet_datasets(new_file_paths)['joined_data']

joined_data.info()

  Successfully loaded joined_data with pandas
  Shape: (16603790, 29)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16603790 entries, 0 to 16603789
Data columns (total 29 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   date_prod                 datetime64[ms]
 1   api_no_10                 object        
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   county                    object        
 6   sub_region                object        
 7   wellbore_type             object        
 8   Quality_Quant             float64       
 9   Production_Age            int32         
 10  Last_Report               datetime64[ms]
 11  Well_Production_Type      object        
 12  reservoir type            object        
 13  arps model                object        
 14  Production_Status         object        
 15  lat_surface               fl

### Data Examination <a id="Exam"></a>

In [7]:
def comprehensive_data_overview(datasets):
    """
    Provide comprehensive overview of all datasets including data types,
    missing values, and basic statistics.
    
    Args:
        datasets (dict): Dictionary of DataFrames to analyze
    """
    for name, df in datasets.items():
        print(f"\n{'='*60}")
        print(f"DATASET: {name.upper()}")
        print(f"{'='*60}")
        
        # Shape
        print(f"Shape: {df.shape}")
        
        # Column information
        print(f"\nColumn Overview:")
        print(f"{'Column':<25} {'Type':<15} {'Non-Null':<10} {'Unique':<10} {'Missing %':<10}")
        print("-" * 80)
        
        for col in df.columns:
            dtype = str(df[col].dtype)
            non_null = df[col].count()
            unique_count = df[col].nunique()
            missing_pct = (df[col].isnull().sum() / len(df)) * 100
            
            print(f"{col:<25} {dtype:<15} {non_null:<10} {unique_count:<10} {missing_pct:<10.1f}%")
        
        # Date/time columns detection
        potential_date_cols = [col for col in df.columns if 
                              any(keyword in col.lower() for keyword in 
                                  ['date', 'time', 'year', 'month', 'spud', 'completion'])]
        if potential_date_cols:
            print(f"\nPotential date/time columns: {potential_date_cols}")
            
        # Coordinate columns detection
        coord_cols = [col for col in df.columns if 
                     any(keyword in col.lower() for keyword in 
                         ['lat', 'lon', 'longitude', 'latitude', 'surface', 'bottom'])]
        if coord_cols:
            print(f"\nCoordinate columns: {coord_cols}")
        
        # Production-related columns (for well_production dataset)
        if 'production' in name.lower():
            prod_cols = [col for col in df.columns if 
                        any(keyword in col.lower() for keyword in 
                            ['oil', 'gas', 'water', 'bbl', 'mcf', 'prod', 'volume'])]
            if prod_cols:
                print(f"\nProduction columns: {prod_cols}")
        
        # Formation-related columns (for well_formations dataset)
        if 'formation' in name.lower():
            form_cols = [col for col in df.columns if 
                        any(keyword in col.lower() for keyword in 
                            ['formation', 'zone', 'reservoir', 'rock', 'geology', 'basin'])]
            if form_cols:
                print(f"\nFormation columns: {form_cols}")
        
        # Well properties columns
        if 'properties' in name.lower() or 'props' in name.lower():
            prop_cols = [col for col in df.columns if 
                        any(keyword in col.lower() for keyword in 
                            ['depth', 'length', 'horizontal', 'vertical', 'lateral', 'completion', 'wellbore', 'field', 'county', 'region'])]
            if prop_cols:
                print(f"\nWell property columns: {prop_cols}")

In [7]:
comprehensive_data_overview({'joined_data': joined_data})


DATASET: JOINED_DATA
Shape: (16603790, 29)

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
date_prod                 datetime64[ms]  16603790   472        0.0       %
api_no_10                 object          16603790   399913     0.0       %
gas_monthly               float64         16603790   130811     0.0       %
oil_monthly               float64         16603790   38712      0.0       %
start_date                datetime64[ns]  16603790   386        0.0       %
county                    object          16603790   225        0.0       %
sub_region                object          16603790   5          0.0       %
wellbore_type             object          16603790   3          0.0       %
Quality_Quant             float64         16603790   9          0.0       %
Production_Age            int32           16603790   472        0.0       %
Last_Report           

### Column Pruning <a id="Column"></a>

In [9]:
# Drop columns we won't use (high cardinality categoricals or unspecific locations)
columns_to_drop = ['county', 'sub_region', 'formation_standardized']
existing_columns_to_drop = [col for col in columns_to_drop if col in joined_data.columns]
joined_data = joined_data.drop(columns=existing_columns_to_drop)

print(f"After dropping columns: {joined_data.shape}")

After dropping columns: (16603790, 26)


### Temporal Filtering <a id="Temp"></a>

In [10]:
# Filter to spike period (January 2021 through November 2024)
start_date = pd.to_datetime('2021-01-01')
end_date = pd.to_datetime('2024-11-30')

joined_data = joined_data[(joined_data['date_prod'] >= start_date) & (joined_data['date_prod'] <= end_date)]
print(f"After temporal filtering: {joined_data.shape}")

# Display date range
print(f"Date range: {joined_data['date_prod'].min()} to {joined_data['date_prod'].max()}")

After temporal filtering: (16602611, 26)
Date range: 2021-01-01 00:00:00 to 2024-11-01 00:00:00


### Texas Boundary Validation <a id="Texas"></a>

In [13]:
def keep_texas_wells(df):
    """
    Remove wells whose surface lies outside Texas.
    """
    print("Validating Texas coordinates...")
    
    # Load Texas boundary
    texas_boundary_url = "https://raw.githubusercontent.com/johan/world.geo.json/master/countries/USA/TX.geo.json"
    texas_gdf = gpd.read_file(texas_boundary_url)
    texas_gdf = texas_gdf.to_crs(epsg=4326)
    texas_polygon = texas_gdf.unary_union
    
    def _inside(lat, lon):
        if pd.isna(lat) or pd.isna(lon):
            return False
        return Point(lon, lat).within(texas_polygon)
    
    # Check surface coordinates
    surface_mask = df.dropna(subset=['lat_surface', 'lon_surface']).apply(
        lambda r: _inside(r['lat_surface'], r['lon_surface']), axis=1
    )
    
    # Keep wells where the surface is in Texas
    valid_surface_apis = surface_mask[surface_mask].index
    
    print(f"Wells with valid surface coordinates: {len(valid_surface_apis):,}")
    
    return df.loc[valid_surface_apis]

In [19]:
def get_well_level_aggregation(df, api_col='api_no_10'):
    """
    Aggregate time series data to well-level (one row per well) for feature engineering.
    
    Parameters:
    -----------
    df : pandas.DataFrame
        Time series dataframe
    api_col : str
        Well API column name
        
    Returns:
    --------
    pandas.DataFrame : Well-level aggregated dataframe
    """
    well_level = df.groupby(api_col).agg({
        'gas_monthly': ['mean', 'median', 'max'],
        'oil_monthly': ['mean', 'median', 'max'],
        'horizontal_length': 'first',
        'measured_depth': 'first',
        'lat_surface': 'first',
        'lon_surface': 'first',
        'lat_bottomhole': 'first',
        'lon_bottomhole': 'first',
        'wellbore_type': 'first',
        'reservoir type': 'first',
        'arps model': 'first',
        'start_date': 'first',
        'basin': 'first',
        'formation_group': 'first'
    }).reset_index()

    well_level.columns = [
        'api_no_10',
        'gas_monthly_mean',
        'gas_monthly_median',
        'gas_monthly_max',
        'oil_monthly_mean',
        'oil_monthly_median',
        'oil_monthly_max',
        'horizontal_length',
        'measured_depth',
        'lat_surface',
        'lon_surface',
        'lat_bottomhole',
        'lon_bottomhole',
        'wellbore_type',
        'reservoir type',
        'arps model',
        'start_date',
        'basin',
        'formation_group'
                         ]
    
    return well_level

In [20]:
well_df = get_well_level_aggregation(joined_data, api_col='api_no_10')
well_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 399913 entries, 0 to 399912
Data columns (total 19 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   api_no_10           399913 non-null  object        
 1   gas_monthly_mean    399913 non-null  float64       
 2   gas_monthly_median  399913 non-null  float64       
 3   gas_monthly_max     399913 non-null  float64       
 4   oil_monthly_mean    399913 non-null  float64       
 5   oil_monthly_median  399913 non-null  float64       
 6   oil_monthly_max     399913 non-null  float64       
 7   horizontal_length   399913 non-null  float64       
 8   measured_depth      399913 non-null  float64       
 9   lat_surface         399913 non-null  float64       
 10  lon_surface         399913 non-null  float64       
 11  lat_bottomhole      399913 non-null  float64       
 12  lon_bottomhole      399913 non-null  float64       
 13  wellbore_type       399913 no

In [22]:
result_df = keep_texas_wells(well_df)
result_df.info()

Validating Texas coordinates...
Wells with valid surface coordinates: 399,039
<class 'pandas.core.frame.DataFrame'>
Index: 399039 entries, 0 to 399912
Data columns (total 19 columns):
 #   Column              Non-Null Count   Dtype         
---  ------              --------------   -----         
 0   api_no_10           399039 non-null  object        
 1   gas_monthly_mean    399039 non-null  float64       
 2   gas_monthly_median  399039 non-null  float64       
 3   gas_monthly_max     399039 non-null  float64       
 4   oil_monthly_mean    399039 non-null  float64       
 5   oil_monthly_median  399039 non-null  float64       
 6   oil_monthly_max     399039 non-null  float64       
 7   horizontal_length   399039 non-null  float64       
 8   measured_depth      399039 non-null  float64       
 9   lat_surface         399039 non-null  float64       
 10  lon_surface         399039 non-null  float64       
 11  lat_bottomhole      399039 non-null  float64       
 12  lon_bottomhol

In [23]:
api_list = result_df['api_no_10'].unique()
joined_data = joined_data[joined_data['api_no_10'].isin(api_list)]
print(f"After Texas validation: {joined_data.shape}")

After Texas validation: (16564703, 26)


In [24]:
joined_data.to_parquet('../data/preprocess_1.parquet')

### Calendar Features <a id="Calendar"></a>

In [4]:
new_file_paths = {
    'joined_data': '../data/preprocess_1.parquet'
}

new_file_paths

{'joined_data': '../data/preprocess_1.parquet'}

In [5]:
joined_data = load_parquet_datasets(new_file_paths)['joined_data']
joined_data.info()

  Successfully loaded joined_data with pandas
  Shape: (16564703, 26)
<class 'pandas.core.frame.DataFrame'>
Index: 16564703 entries, 0 to 16603789
Data columns (total 26 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   date_prod                 datetime64[ms]
 1   api_no_10                 object        
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   wellbore_type             object        
 6   Quality_Quant             float64       
 7   Production_Age            int32         
 8   Last_Report               datetime64[ms]
 9   Well_Production_Type      object        
 10  reservoir type            object        
 11  arps model                object        
 12  Production_Status         object        
 13  lat_surface               float64       
 14  lon_surface               float64       
 15  lat_bottomhole            float64

In [6]:
# Basic calendar features
joined_data['prod_year'] = joined_data['date_prod'].dt.year
joined_data['prod_month'] = joined_data['date_prod'].dt.month

# Seasonal features (sine/cosine pairs)
joined_data['month_sin'] = np.sin(2 * np.pi * joined_data['prod_month'] / 12)
joined_data['month_cos'] = np.cos(2 * np.pi * joined_data['prod_month'] / 12)

joined_data.info()

<class 'pandas.core.frame.DataFrame'>
Index: 16564703 entries, 0 to 16603789
Data columns (total 30 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   date_prod                 datetime64[ms]
 1   api_no_10                 object        
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   wellbore_type             object        
 6   Quality_Quant             float64       
 7   Production_Age            int32         
 8   Last_Report               datetime64[ms]
 9   Well_Production_Type      object        
 10  reservoir type            object        
 11  arps model                object        
 12  Production_Status         object        
 13  lat_surface               float64       
 14  lon_surface               float64       
 15  lat_bottomhole            float64       
 16  lon_bottomhole            float64       
 17  basin      

### Backfill Features <a id="Back"></a>

In [7]:
def create_backfill_features_fast(df):
    """
    Efficiently create backfill features for each well using vectorized operations.
    """
    print("Creating backfill features (vectorized)...")
    df = df.sort_values(['api_no_10', 'date_prod']).copy()
    
    # Group by well
    grouped = df.groupby('api_no_10')
    
    # Shifted features
    df['last_year'] = grouped['date_prod'].shift(1).dt.year.fillna(-1).astype(int)
    df['last_month'] = grouped['date_prod'].shift(1).dt.month.fillna(-1).astype(int)
    df['last_gas'] = grouped['gas_monthly'].shift(1).fillna(-1)
    df['last_oil'] = grouped['oil_monthly'].shift(1).fillna(-1)
    
    # Months since last report
    prev_date = grouped['date_prod'].shift(1)
    df['months_since_last'] = ((df['date_prod'] - prev_date).dt.days / 30.44).round().fillna(-1).astype(int)
    
    return df

In [None]:
joined_data = create_backfill_features_fast(joined_data)
joined_data.info()

Creating backfill features (vectorized)...


This function freezes my computer, and a previous one wasn't fast enough to process all this time series data. Here is another one that might work by using batching:

In [7]:
def batch_iter(df, batch_size):
    """
    Yields batches of the dataframe, grouped by unique api_no_10 values.
    """
    unique_apis = df['api_no_10'].unique()
    for start in range(0, len(unique_apis), batch_size):
        batch_apis = unique_apis[start:start + batch_size]
        yield df[df['api_no_10'].isin(batch_apis)].copy()

def create_backfill_features_batched(df, batch_size=10000):
    """
    Create backfill features in batches to reduce memory usage.
    """
    print(f"Processing in batches of {batch_size} wells...")
    processed_batches = []
    batch_number = 1
    for batch_df in batch_iter(df, batch_size):
        print(f"Processing batch {batch_number} ({batch_df['api_no_10'].nunique()} wells, {len(batch_df)} rows)")
        batch_df = batch_df.sort_values(['api_no_10', 'date_prod'])
        grouped = batch_df.groupby('api_no_10')
        batch_df['last_year'] = grouped['date_prod'].shift(1).dt.year.fillna(-1).astype(int)
        batch_df['last_month'] = grouped['date_prod'].shift(1).dt.month.fillna(-1).astype(int)
        batch_df['last_gas'] = grouped['gas_monthly'].shift(1).fillna(-1)
        batch_df['last_oil'] = grouped['oil_monthly'].shift(1).fillna(-1)
        prev_date = grouped['date_prod'].shift(1)
        batch_df['months_since_last'] = ((batch_df['date_prod'] - prev_date).dt.days / 30.44).round().fillna(-1).astype(int)
        processed_batches.append(batch_df)
        batch_number += 1
    # Concatenate all processed batches
    result_df = pd.concat(processed_batches).sort_index()
    return result_df

In [8]:
joined_data = create_backfill_features_batched(joined_data)
joined_data.info()

Processing in batches of 10000 wells...
Processing batch 1 (10000 wells, 436982 rows)
Processing batch 2 (10000 wells, 436599 rows)
Processing batch 3 (10000 wells, 436660 rows)
Processing batch 4 (10000 wells, 436382 rows)
Processing batch 5 (10000 wells, 436013 rows)
Processing batch 6 (10000 wells, 436164 rows)
Processing batch 7 (10000 wells, 434300 rows)
Processing batch 8 (10000 wells, 435459 rows)
Processing batch 9 (10000 wells, 434428 rows)
Processing batch 10 (10000 wells, 434634 rows)
Processing batch 11 (10000 wells, 433542 rows)
Processing batch 12 (10000 wells, 433859 rows)
Processing batch 13 (10000 wells, 433264 rows)
Processing batch 14 (10000 wells, 433858 rows)
Processing batch 15 (10000 wells, 434111 rows)
Processing batch 16 (10000 wells, 433314 rows)
Processing batch 17 (10000 wells, 433006 rows)
Processing batch 18 (10000 wells, 431961 rows)
Processing batch 19 (10000 wells, 432668 rows)
Processing batch 20 (10000 wells, 431541 rows)
Processing batch 21 (10000 we

Now that we have superior backfilling columns, we can drop columns that give us unnecessary temporal information but most likely won't be as useful, such as "Last_Report"

In [9]:
joined_data.drop('Last_Report', axis=1, inplace=True)
joined_data.info()

<class 'pandas.core.frame.DataFrame'>
Index: 16564703 entries, 0 to 16603789
Data columns (total 34 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   date_prod                 datetime64[ms]
 1   api_no_10                 object        
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   wellbore_type             object        
 6   Quality_Quant             float64       
 7   Production_Age            int32         
 8   Well_Production_Type      object        
 9   reservoir type            object        
 10  arps model                object        
 11  Production_Status         object        
 12  lat_surface               float64       
 13  lon_surface               float64       
 14  lat_bottomhole            float64       
 15  lon_bottomhole            float64       
 16  basin                     object        
 17  formation_g

In [10]:
joined_data.to_parquet('../data/preprocess_2.parquet')

### Rolling Statistics <a id="Rolling"></a>

In [11]:
def batch_iter(df, batch_size):
    """
    Yields batches of the dataframe, grouped by unique api_no_10 values.
    """
    unique_apis = df['api_no_10'].unique()
    for start in range(0, len(unique_apis), batch_size):
        batch_apis = unique_apis[start:start + batch_size]
        yield df[df['api_no_10'].isin(batch_apis)].copy()

def create_rolling_features_batched(df, batch_size=10000):
    """
    Efficiently create rolling statistics features in batches.
    """
    print(f"Processing rolling features in batches of {batch_size} wells...")
    processed_batches = []
    batch_number = 1
    for batch_df in batch_iter(df, batch_size):
        print(f"Processing batch {batch_number} ({batch_df['api_no_10'].nunique()} wells, {len(batch_df)} rows)")
        batch_df = batch_df.sort_values(['api_no_10', 'date_prod'])
        grouped = batch_df.groupby('api_no_10', group_keys=False)
        batch_df['gas_avg_3'] = grouped['gas_monthly'].rolling(window=3, min_periods=1).mean().reset_index(level=0, drop=True)
        batch_df['oil_avg_3'] = grouped['oil_monthly'].rolling(window=3, min_periods=1).mean().reset_index(level=0, drop=True)
        batch_df['gas_avg_6'] = grouped['gas_monthly'].rolling(window=6, min_periods=1).mean().reset_index(level=0, drop=True)
        batch_df['oil_avg_6'] = grouped['oil_monthly'].rolling(window=6, min_periods=1).mean().reset_index(level=0, drop=True)
        processed_batches.append(batch_df)
        batch_number += 1
    # Concatenate all processed batches and restore original order
    result_df = pd.concat(processed_batches).sort_index()
    return result_df

In [12]:
joined_data = create_rolling_features_batched(joined_data)
joined_data.info()

Processing rolling features in batches of 10000 wells...
Processing batch 1 (10000 wells, 436982 rows)
Processing batch 2 (10000 wells, 436599 rows)
Processing batch 3 (10000 wells, 436660 rows)
Processing batch 4 (10000 wells, 436382 rows)
Processing batch 5 (10000 wells, 436013 rows)
Processing batch 6 (10000 wells, 436164 rows)
Processing batch 7 (10000 wells, 434300 rows)
Processing batch 8 (10000 wells, 435459 rows)
Processing batch 9 (10000 wells, 434428 rows)
Processing batch 10 (10000 wells, 434634 rows)
Processing batch 11 (10000 wells, 433542 rows)
Processing batch 12 (10000 wells, 433859 rows)
Processing batch 13 (10000 wells, 433264 rows)
Processing batch 14 (10000 wells, 433858 rows)
Processing batch 15 (10000 wells, 434111 rows)
Processing batch 16 (10000 wells, 433314 rows)
Processing batch 17 (10000 wells, 433006 rows)
Processing batch 18 (10000 wells, 431961 rows)
Processing batch 19 (10000 wells, 432668 rows)
Processing batch 20 (10000 wells, 431541 rows)
Processing b

In [13]:
joined_data.to_parquet('../data/preprocess_3.parquet')

### Categorical Encoding <a id="Cat"></a>

In [15]:
categorical_cols = ['wellbore_type', 'Well_Production_Type', 'reservoir type', 
                   'arps model', 'Production_Status', 'basin', 'formation_group']

# Only process columns that exist in the dataframe
existing_categorical_cols = [col for col in categorical_cols if col in joined_data.columns]

# Create label encoders
label_encoders = {}
for col in existing_categorical_cols:
    le = LabelEncoder()
    joined_data[col] = joined_data[col].astype(str)  # Convert to string first
    joined_data[col] = le.fit_transform(joined_data[col])
    label_encoders[col] = le

print(f"Encoded {len(existing_categorical_cols)} categorical columns")

# Fill any remaining NaN values
numeric_cols = joined_data.select_dtypes(include=[np.number]).columns
joined_data[numeric_cols] = joined_data[numeric_cols].fillna(-1)

print(f"Final dataset shape: {joined_data.shape}")

Encoded 7 categorical columns
Final dataset shape: (16564703, 38)


In [16]:
joined_data.to_parquet('../data/preprocess_4.parquet')

### Machine Learning Preparation <a id="Prep"></a>

Forgot to turn operator cluster into a label

In [18]:
joined_data['operator_cluster'].value_counts()

operator_cluster
3.0    9049392
0.0    6932712
2.0     529380
1.0      53219
Name: count, dtype: int64

In [19]:
le = LabelEncoder()
joined_data['operator_cluster'] = joined_data['operator_cluster'].astype(str)  # Convert to string first
joined_data['operator_cluster'] = le.fit_transform(joined_data['operator_cluster'])
label_encoders['operator_cluster'] = le
label_encoders

{'wellbore_type': LabelEncoder(),
 'Well_Production_Type': LabelEncoder(),
 'reservoir type': LabelEncoder(),
 'arps model': LabelEncoder(),
 'Production_Status': LabelEncoder(),
 'basin': LabelEncoder(),
 'formation_group': LabelEncoder(),
 'operator_cluster': LabelEncoder()}

In [20]:
joined_data.info()

<class 'pandas.core.frame.DataFrame'>
Index: 16564703 entries, 0 to 16603789
Data columns (total 38 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   date_prod                 datetime64[ms]
 1   api_no_10                 object        
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   wellbore_type             int32         
 6   Quality_Quant             float64       
 7   Production_Age            int32         
 8   Well_Production_Type      int32         
 9   reservoir type            int32         
 10  arps model                int32         
 11  Production_Status         int32         
 12  lat_surface               float64       
 13  lon_surface               float64       
 14  lat_bottomhole            float64       
 15  lon_bottomhole            float64       
 16  basin                     int32         
 17  formation_g

In [21]:
joined_data['operator_cluster'].value_counts()

operator_cluster
3    9049392
0    6932712
2     529380
1      53219
Name: count, dtype: int64

In [22]:
joined_data.to_parquet('../data/preprocess_5.parquet')

In [23]:
# Define feature columns (exclude API, dates, and targets)
exclude_cols = ['api_no_10', 'date_prod', 'start_date', 'gas_monthly', 'oil_monthly']
feature_cols = [col for col in joined_data.columns if col not in exclude_cols]

print(f"Feature columns ({len(feature_cols)}):")
print(feature_cols)

# Create feature matrix and target vectors
X = joined_data[feature_cols].copy()
y_gas = joined_data['gas_monthly'].copy()
y_oil = joined_data['oil_monthly'].copy()

# Store API mapping for later use
api_mapping = joined_data[['api_no_10', 'date_prod']].copy()

print(f"Feature matrix shape: {X.shape}")
print(f"Target vectors shape: {y_gas.shape}")

Feature columns (33):
['wellbore_type', 'Quality_Quant', 'Production_Age', 'Well_Production_Type', 'reservoir type', 'arps model', 'Production_Status', 'lat_surface', 'lon_surface', 'lat_bottomhole', 'lon_bottomhole', 'basin', 'formation_group', 'horizontal_length', 'measured_depth', 'depth_tvd', 'operator_cluster', 'well_generation', 'well_density_1km', 'nearest_well_distance_km', 'prod_year', 'prod_month', 'month_sin', 'month_cos', 'last_year', 'last_month', 'last_gas', 'last_oil', 'months_since_last', 'gas_avg_3', 'oil_avg_3', 'gas_avg_6', 'oil_avg_6']
Feature matrix shape: (16564703, 33)
Target vectors shape: (16564703,)


In [24]:
# Hold out last 5 months (July-November 2024) as test set
# model shouldn't look at future data to simulate deployment into production
# so test set should be the latest months
test_start_date = pd.to_datetime('2024-07-01')
train_mask = joined_data['date_prod'] < test_start_date
test_mask = joined_data['date_prod'] >= test_start_date

# Create splits
X_train = X[train_mask]
y_gas_train = y_gas[train_mask]
y_oil_train = y_oil[train_mask]

X_test = X[test_mask]
y_gas_test = y_gas[test_mask]
y_oil_test = y_oil[test_mask]

print(f"Training set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")
print(f"Training date range: {joined_data[train_mask]['date_prod'].min()} to {joined_data[train_mask]['date_prod'].max()}")
print(f"Test date range: {joined_data[test_mask]['date_prod'].min()} to {joined_data[test_mask]['date_prod'].max()}")

Training set shape: (15558160, 33)
Test set shape: (1006543, 33)
Training date range: 2021-01-01 00:00:00 to 2024-06-01 00:00:00
Test date range: 2024-07-01 00:00:00 to 2024-11-01 00:00:00


## Machine Learning <a id="Learn"></a>

In [26]:
xgb_param_grid = {
    'max_depth': [4, 6, 8],
    'learning_rate': [0.03, 0.05, 0.1],
    'subsample': [0.6, 0.8, 1.0],
    'colsample_bytree': [0.6, 0.8, 1.0],
    'min_child_weight': [1, 3, 5],
    'n_estimators': [600, 800, 1000]
}

# Base XGBoost parameters
xgb_base_params = {
    'objective': 'reg:squarederror',
    'tree_method': 'hist',
    'device': 'cuda' if GPU_AVAILABLE else 'cpu',
    'random_state': 42,
    'n_jobs': -1
}

# Time series cross-validation
tscv = TimeSeriesSplit(n_splits=5, test_size=None, gap=1) #should split by one month

In [27]:
def train_xgb_model(X_train, y_train, target_name):
    print(f"Training XGBoost for {target_name}...")
    
    # Create base model
    xgb_model = xgb.XGBRegressor(**xgb_base_params)
    
    # Random search
    xgb_search = RandomizedSearchCV(
        xgb_model,
        xgb_param_grid,
        n_iter=25,
        cv=tscv,
        scoring='neg_mean_absolute_error',
        random_state=42,
        n_jobs=1,  # Use 1 job to avoid GPU memory issues
        verbose=1
    )
    
    # Fit the model
    xgb_search.fit(X_train, y_train)
    
    return xgb_search

In [28]:
xgb_gas_model = train_xgb_model(X_train, y_gas_train, "gas")

Training XGBoost for gas...
Fitting 5 folds for each of 25 candidates, totalling 125 fits


KeyboardInterrupt: 

Unfortunately, this model is going to take too long (it will take literally days to run with my current setup)
- there are too many features, I put a lot of effort into creating them but they won't work well with my particular setup
- we're fitting too many samples to the trees for hyperparameter tuning, we can use a representative subset for that
- we aren't implementing early stopping of unpromising models, which will waste time
- we are doing more random search iterations than we can afford, let's lower it down for this limited setup

### Retry for Speedup <a id="Retry"></a>

In [29]:
features_to_remove = [
    'wellbore_type',
    'lat_surface', 'lon_surface', 'lat_bottomhole', 'lon_bottomhole',
    'basin', 'operator_cluster', 'well_density_1km',
    'gas_avg_3', 'oil_avg_3'
]

# For gas model, remove 'oil_avg_6' as well
features_gas = [col for col in X.columns if col not in features_to_remove + ['oil_avg_6']]

# For oil model, remove 'gas_avg_6' as well
features_oil = [col for col in X.columns if col not in features_to_remove + ['gas_avg_6']]

# Create reduced feature sets
X_gas = X[features_gas]
X_oil = X[features_oil]

# Downsample (e.g., 500,000 rows; adjust as needed)
sample_frac = 500_000 / len(X_gas)
X_gas_sample = X_gas.sample(frac=sample_frac, random_state=42)
y_gas_sample = y_gas.loc[X_gas_sample.index]

X_oil_sample = X_oil.sample(frac=sample_frac, random_state=42)
y_oil_sample = y_oil.loc[X_oil_sample.index]

In [41]:
# Split for early stopping
tscv = TimeSeriesSplit(n_splits=5, gap=1)

# Reduced hyperparameter search space
xgb_param_grid = {
    'max_depth': [4, 6, 8],
    'learning_rate': [0.03, 0.05, 0.1],
    'subsample': [0.8, 1.0],
    'colsample_bytree': [0.8, 1.0],
    'min_child_weight': [1, 3],
    'n_estimators': [600, 800]
}

xgb_base_params = {
    'objective': 'reg:squarederror',
    'tree_method': 'hist',
    'device': 'cuda' if GPU_AVAILABLE else 'cpu',
    'random_state': 42,
    'n_jobs': -1,
    'early_stopping_rounds': 10,
    'eval_metric': 'mae',
    "verbose": False
}

# Only 5–10 iterations for speed
n_iter = 5

In [48]:
def train_xgb_model(X_train, y_train, target_name):
    print(f"Training XGBoost for {target_name}...")
    xgb_model = xgb.XGBRegressor(**xgb_base_params)
    xgb_search = RandomizedSearchCV(
        xgb_model,
        xgb_param_grid,
        n_iter=n_iter,
        cv=tscv,
        scoring='neg_mean_absolute_error',
        random_state=42,
        n_jobs=1,
        verbose=0
    )
    # Use callbacks for early stopping
    xgb_search.fit(
        X_train, y_train,
        eval_set=[(X_train, y_train)],  # Required for early stopping, but CV will override this per fold
        verbose=False
    )
    return xgb_search

In [49]:
xgb_gas_model = train_xgb_model(X_gas_sample, y_gas_sample, "gas")

Training XGBoost for gas...


In [50]:
print(f"Best gas model MAE: {-xgb_gas_model.best_score_:.4f}")

Best gas model MAE: 524.7908


In [51]:
xgb_gas_model

0,1,2
,estimator,"XGBRegressor(...ree=None, ...)"
,param_distributions,"{'colsample_bytree': [0.8, 1.0], 'learning_rate': [0.03, 0.05, ...], 'max_depth': [4, 6, ...], 'min_child_weight': [1, 3], ...}"
,n_iter,5
,scoring,'neg_mean_absolute_error'
,n_jobs,1
,refit,True
,cv,TimeSeriesSpl...est_size=None)
,verbose,0
,pre_dispatch,'2*n_jobs'
,random_state,42

0,1,2
,objective,'reg:squarederror'
,base_score,
,booster,
,callbacks,
,colsample_bylevel,
,colsample_bynode,
,colsample_bytree,1.0
,device,'cuda'
,early_stopping_rounds,10
,enable_categorical,False


In [53]:
xgb_oil_model = train_xgb_model(X_oil_sample, y_oil_sample, "oil")

Training XGBoost for oil...


In [54]:
print(f"Best oil model MAE: {-xgb_oil_model.best_score_:.4f}")

Best oil model MAE: 100.2080


In [55]:
best_gas_model = xgb_gas_model.best_estimator_
best_oil_model = xgb_oil_model.best_estimator_
best_gas_model, best_oil_model

(XGBRegressor(base_score=None, booster=None, callbacks=None,
              colsample_bylevel=None, colsample_bynode=None,
              colsample_bytree=1.0, device='cuda', early_stopping_rounds=10,
              enable_categorical=False, eval_metric='mae', feature_types=None,
              feature_weights=None, gamma=None, grow_policy=None,
              importance_type=None, interaction_constraints=None,
              learning_rate=0.03, max_bin=None, max_cat_threshold=None,
              max_cat_to_onehot=None, max_delta_step=None, max_depth=6,
              max_leaves=None, min_child_weight=1, missing=nan,
              monotone_constraints=None, multi_strategy=None, n_estimators=800,
              n_jobs=-1, num_parallel_tree=None, ...),
 XGBRegressor(base_score=None, booster=None, callbacks=None,
              colsample_bylevel=None, colsample_bynode=None,
              colsample_bytree=1.0, device='cuda', early_stopping_rounds=10,
              enable_categorical=False, eval_met

In [57]:
# Save models
joblib.dump(best_gas_model, '../models/best_gas_model.pkl')
joblib.dump(best_oil_model, '../models/best_oil_model.pkl')
joblib.dump(label_encoders, '../models/label_encoders.pkl')

['../models/label_encoders.pkl']

Now that we have our best model parameters, we should train on the full training set and estimate performance using the test set

In [58]:
best_gas_model.fit(X_train, y_gas_train)

ValueError: Must have at least 1 validation dataset for early stopping.

In [59]:
xgb_gas_model.best_params_

{'subsample': 0.8,
 'n_estimators': 800,
 'min_child_weight': 1,
 'max_depth': 6,
 'learning_rate': 0.03,
 'colsample_bytree': 1.0}

In [60]:
new_gas_params = {
    'subsample': 0.8,
    'n_estimators': 800,
    'min_child_weight': 1,
    'max_depth': 6,
    'learning_rate': 0.03,
    'colsample_bytree': 1.0,
    'objective': 'reg:squarederror',
    'tree_method': 'hist',
    'device': 'cuda' if GPU_AVAILABLE else 'cpu',
    'random_state': 42,
    'n_jobs': -1,
    'eval_metric': 'mae',
    "verbose": False
}

final_gas_model = xgb.XGBRegressor(**new_gas_params)

In [61]:
final_gas_model.fit(X_train, y_gas_train)

0,1,2
,objective,'reg:squarederror'
,base_score,
,booster,
,callbacks,
,colsample_bylevel,
,colsample_bynode,
,colsample_bytree,1.0
,device,'cuda'
,early_stopping_rounds,
,enable_categorical,False


In [62]:
xgb_oil_model.best_params_

{'subsample': 0.8,
 'n_estimators': 800,
 'min_child_weight': 1,
 'max_depth': 6,
 'learning_rate': 0.03,
 'colsample_bytree': 1.0}

Same params for oil we can just run the gas params for it

In [63]:
final_oil_model = xgb.XGBRegressor(**new_gas_params)

In [64]:
final_oil_model.fit(X_train, y_oil_train)

0,1,2
,objective,'reg:squarederror'
,base_score,
,booster,
,callbacks,
,colsample_bylevel,
,colsample_bynode,
,colsample_bytree,1.0
,device,'cuda'
,early_stopping_rounds,
,enable_categorical,False


In [66]:
joblib.dump(final_gas_model, '../models/final_gas_model.pkl')
joblib.dump(final_oil_model, '../models/final_oil_model.pkl')

['../models/final_oil_model.pkl']

In [67]:
gas_pred = final_gas_model.predict(X_test)
oil_pred = final_oil_model.predict(X_test)

In [68]:
gas_mae = mean_absolute_error(y_gas_test, gas_pred)
oil_mae = mean_absolute_error(y_oil_test, oil_pred)
gas_mae, oil_mae

(514.4783804691635, 93.31968212236362)

A little better than the hyperparameter tuning models, which shows that our sampling strategy worked

In [69]:
gas_mape = mean_absolute_percentage_error(y_gas_test, gas_pred)
oil_mape = mean_absolute_percentage_error(y_oil_test, oil_pred)
gas_mape, oil_mape

(8.220877791622806e+16, 1.2086298113880788e+16)

These mape values aren't good but are also obviously nonsensical given the MAE. It might be because of zero values or near zero values in the dataset. With the proper libraries, I might implement smape in the future, as it seems to correct for some mape problems, but also has its own and ultimately isn't necessarily important right now

## Making Predictions <a id="Pred"></a>

In [71]:
# Create complete date range for each well
all_apis = joined_data['api_no_10'].unique()
date_range = pd.date_range(start='2021-01-01', end='2025-02-28', freq='MS')

# Create complete grid
forecast_data = []
for api in tqdm(all_apis, desc="Creating forecast grid"):
    for date in date_range:
        forecast_data.append({'api_no_10': api, 'date_prod': date})

forecast_df = pd.DataFrame(forecast_data)

forecast_df.info()

Creating forecast grid: 100%|████████████████████████████████████████████████| 399039/399039 [00:40<00:00, 9865.68it/s]


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19951950 entries, 0 to 19951949
Data columns (total 2 columns):
 #   Column     Dtype         
---  ------     -----         
 0   api_no_10  object        
 1   date_prod  datetime64[ns]
dtypes: datetime64[ns](1), object(1)
memory usage: 304.4+ MB


In [None]:
# Merge with existing data
forecast_df = forecast_df.merge(joined_data, on=['api_no_10', 'date_prod'], how='left')

That simple approach is likely to overload my memory based on recent memory, so let's try this instead:

In [72]:
chunk_size = 10000  # Adjust based on your available RAM

merged_chunks = []
for i in range(0, len(all_apis), chunk_size):
    api_chunk = all_apis[i:i+chunk_size]
    forecast_chunk = forecast_df[forecast_df['api_no_10'].isin(api_chunk)]
    df_chunk = joined_data[joined_data['api_no_10'].isin(api_chunk)]
    merged_chunk = forecast_chunk.merge(df_chunk, on=['api_no_10', 'date_prod'], how='left')
    merged_chunks.append(merged_chunk)
    del forecast_chunk, df_chunk, merged_chunk  # Free memory

# Concatenate all chunks
forecast_df = pd.concat(merged_chunks, ignore_index=True)

This took a very long time so isn't really recommended on systems like mine

In [73]:
forecast_df.to_parquet('../data/forecast_1.parquet')

In [5]:
forecast_df = load_parquet_datasets({'joined_data': '../data/forecast_1.parquet'})['joined_data']
forecast_df.info()

  Successfully loaded joined_data with pandas
  Shape: (19951950, 38)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19951950 entries, 0 to 19951949
Data columns (total 38 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   api_no_10                 object        
 1   date_prod                 datetime64[ns]
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   wellbore_type             float64       
 6   Quality_Quant             float64       
 7   Production_Age            float64       
 8   Well_Production_Type      float64       
 9   reservoir type            float64       
 10  arps model                float64       
 11  Production_Status         float64       
 12  lat_surface               float64       
 13  lon_surface               float64       
 14  lat_bottomhole            float64       
 15  lon_bottomhole            fl

In [6]:
# Fill missing static features from the last known values
static_cols = ['lat_surface', 'lon_surface', 'lat_bottomhole', 'lon_bottomhole',
               'basin', 'formation_group', 'horizontal_length', 'measured_depth',
               'depth_tvd', 'operator_cluster', 'well_generation', 'well_density_1km',
               'nearest_well_distance_km', 'wellbore_type', 'Well_Production_Type',
               'reservoir type', 'arps model', 'Production_Status']

existing_static_cols = [col for col in static_cols if col in forecast_df.columns]

def batch_fill_static_features(df, static_cols, batch_size=10000):
    unique_apis = df['api_no_10'].unique()
    batches = []
    for start in range(0, len(unique_apis), batch_size):
        batch_apis = unique_apis[start:start + batch_size]
        batch_df = df[df['api_no_10'].isin(batch_apis)].copy()
        for col in static_cols:
            if col in batch_df.columns:
                batch_df[col] = batch_df.groupby('api_no_10')[col].ffill().bfill()
        batches.append(batch_df)
        del batch_df  # Free memory
    result_df = pd.concat(batches, ignore_index=True)
    return result_df

In [8]:
forecast_df = batch_fill_static_features(forecast_df, existing_static_cols, batch_size=10000)
comprehensive_data_overview({'forecast_df': forecast_df})


DATASET: FORECAST_DF
Shape: (19951950, 38)

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 object          19951950   399039     0.0       %
date_prod                 datetime64[ns]  19951950   50         0.0       %
gas_monthly               float64         16564703   129580     17.0      %
oil_monthly               float64         16564703   38707      17.0      %
start_date                datetime64[ns]  16564703   385        17.0      %
wellbore_type             float64         19951950   3          0.0       %
Quality_Quant             float64         16564703   9          17.0      %
Production_Age            float64         16564703   383        17.0      %
Well_Production_Type      float64         19951950   4          0.0       %
reservoir type            float64         19951950   3          0.0       %
arps model            

start_date and quality_quant are also static, prod age is easy to fill as are prod_year, prod_month, month_sin and month_cos, the rest are a little more complicated and gas and oil are what we are predicting

In [9]:
forecast_df.to_parquet('../data/forecast_2.parquet')

In [10]:
forecast_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19951950 entries, 0 to 19951949
Data columns (total 38 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   api_no_10                 object        
 1   date_prod                 datetime64[ns]
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   wellbore_type             float64       
 6   Quality_Quant             float64       
 7   Production_Age            float64       
 8   Well_Production_Type      float64       
 9   reservoir type            float64       
 10  arps model                float64       
 11  Production_Status         float64       
 12  lat_surface               float64       
 13  lon_surface               float64       
 14  lat_bottomhole            float64       
 15  lon_bottomhole            float64       
 16  basin                     float64       
 17  format

In [11]:
forecast_df['date_prod'].describe()

count                         19951950
mean     2023-01-15 16:48:00.000002560
min                2021-01-01 00:00:00
25%                2022-01-01 00:00:00
50%                2023-01-16 12:00:00
75%                2024-02-01 00:00:00
max                2025-02-01 00:00:00
Name: date_prod, dtype: object

In [12]:
forecast_df['start_date'] = forecast_df.groupby('api_no_10')['start_date'].ffill().bfill()
forecast_df['Quality_Quant'] = forecast_df.groupby('api_no_10')['Quality_Quant'].ffill().bfill()
comprehensive_data_overview({'forecast_df': forecast_df})


DATASET: FORECAST_DF
Shape: (19951950, 38)

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 object          19951950   399039     0.0       %
date_prod                 datetime64[ns]  19951950   50         0.0       %
gas_monthly               float64         16564703   129580     17.0      %
oil_monthly               float64         16564703   38707      17.0      %
start_date                datetime64[ns]  19951950   385        0.0       %
wellbore_type             float64         19951950   3          0.0       %
Quality_Quant             float64         19951950   9          0.0       %
Production_Age            float64         16564703   383        17.0      %
Well_Production_Type      float64         19951950   4          0.0       %
reservoir type            float64         19951950   3          0.0       %
arps model            

In [13]:
date_diff = forecast_df['date_prod'] - forecast_df['start_date']
forecast_df['Production_Age'] = (date_diff.dt.days / 30.44).round().astype(int) + 1  # +1 so start month = 1
    
# Handle negative values (data quality issues)
forecast_df['Production_Age'] = np.maximum(forecast_df['Production_Age'], 1)

forecast_df['prod_year'] = forecast_df['date_prod'].dt.year
forecast_df['prod_month'] = forecast_df['date_prod'].dt.month
forecast_df['month_sin'] = np.sin(2 * np.pi * forecast_df['prod_month'] / 12)
forecast_df['month_cos'] = np.cos(2 * np.pi * forecast_df['prod_month'] / 12)

comprehensive_data_overview({'forecast_df': forecast_df})


DATASET: FORECAST_DF
Shape: (19951950, 38)

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 object          19951950   399039     0.0       %
date_prod                 datetime64[ns]  19951950   50         0.0       %
gas_monthly               float64         16564703   129580     17.0      %
oil_monthly               float64         16564703   38707      17.0      %
start_date                datetime64[ns]  19951950   385        0.0       %
wellbore_type             float64         19951950   3          0.0       %
Quality_Quant             float64         19951950   9          0.0       %
Production_Age            int32           19951950   386        0.0       %
Well_Production_Type      float64         19951950   4          0.0       %
reservoir type            float64         19951950   3          0.0       %
arps model            

In [14]:
forecast_df.to_parquet('../data/forecast_3.parquet')

In [15]:
def batch_iter(df, batch_size):
    unique_apis = df['api_no_10'].unique()
    for start in range(0, len(unique_apis), batch_size):
        batch_apis = unique_apis[start:start + batch_size]
        yield df[df['api_no_10'].isin(batch_apis)].copy()

def create_backfill_features_batched_skipna(df, batch_size=10000):
    print(f"Processing in batches of {batch_size} wells...")
    processed_batches = []
    batch_number = 1
    for batch_df in batch_iter(df, batch_size):
        print(f"Processing batch {batch_number} ({batch_df['api_no_10'].nunique()} wells, {len(batch_df)} rows)")
        batch_df = batch_df.sort_values(['api_no_10', 'date_prod'])
        for api, group in batch_df.groupby('api_no_10'):
            group = group.copy()
            # Identify valid (non-missing) production rows
            valid_idx = group['gas_monthly'].notna() & group['oil_monthly'].notna()
            # Prepare columns for last known values
            last_year = np.full(len(group), -1)
            last_month = np.full(len(group), -1)
            last_gas = np.full(len(group), -1.0)
            last_oil = np.full(len(group), -1.0)
            months_since_last = np.full(len(group), -1)
            last_valid_idx = -1
            for i in range(len(group)):
                if i > 0 and last_valid_idx != -1:
                    last_year[i] = group.iloc[last_valid_idx]['date_prod'].year
                    last_month[i] = group.iloc[last_valid_idx]['date_prod'].month
                    last_gas[i] = group.iloc[last_valid_idx]['gas_monthly']
                    last_oil[i] = group.iloc[last_valid_idx]['oil_monthly']
                    months_since_last[i] = int(round((group.iloc[i]['date_prod'] - group.iloc[last_valid_idx]['date_prod']).days / 30.44))
                if valid_idx.iloc[i]:
                    last_valid_idx = i
            group['last_year'] = last_year
            group['last_month'] = last_month
            group['last_gas'] = last_gas
            group['last_oil'] = last_oil
            group['months_since_last'] = months_since_last
            processed_batches.append(group)
        batch_number += 1
    result_df = pd.concat(processed_batches).sort_index()
    return result_df

In [16]:
forecast_df = create_backfill_features_batched_skipna(forecast_df, batch_size=10000)
comprehensive_data_overview({'forecast_df': forecast_df})

Processing in batches of 10000 wells...
Processing batch 1 (10000 wells, 500000 rows)


KeyboardInterrupt: 

This is too slow, we will need to take an approach of merging and backfilling

In [17]:
joined_data = load_parquet_datasets({'joined_data': '../data/preprocess_5.parquet'})['joined_data']
joined_data.info()

  Successfully loaded joined_data with pandas
  Shape: (16564703, 38)
<class 'pandas.core.frame.DataFrame'>
Index: 16564703 entries, 0 to 16603789
Data columns (total 38 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   date_prod                 datetime64[ms]
 1   api_no_10                 object        
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   wellbore_type             int32         
 6   Quality_Quant             float64       
 7   Production_Age            int32         
 8   Well_Production_Type      int32         
 9   reservoir type            int32         
 10  arps model                int32         
 11  Production_Status         int32         
 12  lat_surface               float64       
 13  lon_surface               float64       
 14  lat_bottomhole            float64       
 15  lon_bottomhole            float64

In [18]:
last_and_rolling_cols = [
    'api_no_10', 'date_prod',
    'last_year', 'last_month', 'last_gas', 'last_oil', 'months_since_last',
    'gas_avg_3', 'oil_avg_3', 'gas_avg_6', 'oil_avg_6'
]
bfill_helper = joined_data[last_and_rolling_cols].copy()
bfill_helper.to_parquet('../data/forecast_helper_1.parquet')

In [4]:
bfill_helper = load_parquet_datasets({'joined_data': '../data/forecast_helper_1.parquet'})['joined_data']
bfill_helper.info()

  Loaded first 1000000 rows of joined_data for inspection
  Shape: (1000000, 11)
<class 'pandas.core.frame.DataFrame'>
Index: 1000000 entries, 0 to 1002295
Data columns (total 11 columns):
 #   Column             Non-Null Count    Dtype         
---  ------             --------------    -----         
 0   api_no_10          1000000 non-null  object        
 1   date_prod          1000000 non-null  datetime64[ms]
 2   last_year          1000000 non-null  int32         
 3   last_month         1000000 non-null  int32         
 4   last_gas           1000000 non-null  float64       
 5   last_oil           1000000 non-null  float64       
 6   months_since_last  1000000 non-null  int32         
 7   gas_avg_3          1000000 non-null  float64       
 8   oil_avg_3          1000000 non-null  float64       
 9   gas_avg_6          1000000 non-null  float64       
 10  oil_avg_6          1000000 non-null  float64       
dtypes: datetime64[ms](1), float64(6), int32(3), object(1)
memory usag

I have now implemented the dask library in the "load_parquet_datasets" function to prevent my computer from continually freezing when I load forecast_df, hopefully it helps

In [5]:
forecast_df = load_parquet_datasets({'joined_data': '../data/forecast_3.parquet'}, use_dask=True)['joined_data']
forecast_df.info()

  Loaded joined_data as Dask DataFrame
  Shape: (<dask_expr.expr.Scalar: expr=ReadParquetFSSpec(124b6ae).size() // 38, dtype=int32>, 38)
<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 38 entries, api_no_10 to oil_avg_6
dtypes: datetime64[ns](2), float64(32), int32(3), string(1)

In [6]:
import dask.dataframe as dd

def batch_merge_and_backfill_dask(forecast_df, df_last, cols_to_fill, batch_size=10000):
    """
    Batch merge and backfill for Dask DataFrame (forecast_df) and pandas DataFrame (df_last).
    """
    # Get unique APIs as a pandas array (efficient, as Dask only computes the column)
    unique_apis = forecast_df['api_no_10'].drop_duplicates().compute().values
    merged_batches = []
    for start in range(0, len(unique_apis), batch_size):
        batch_apis = unique_apis[start:start + batch_size]
        # Subset forecast_df for this batch and compute to pandas
        forecast_batch = forecast_df[forecast_df['api_no_10'].isin(batch_apis)].compute()
        last_batch = df_last[df_last['api_no_10'].isin(batch_apis)].copy()
        # Sort by api_no_10 and date_prod
        forecast_batch = forecast_batch.sort_values(['api_no_10', 'date_prod'])
        last_batch = last_batch.sort_values(['api_no_10', 'date_prod'])
        # Merge
        merged = forecast_batch.merge(
            last_batch,
            on=['api_no_10', 'date_prod'],
            how='left',
            suffixes=('', '_fromlast')
        )
        # For each feature, fill NAs by backfilling within each well
        for col in cols_to_fill:
            merged[col] = merged[col].combine_first(merged[f"{col}_fromlast"])
            # Backfill (future to past) within each well
            merged[col] = merged.groupby('api_no_10')[col].bfill()
            # Forward fill to ensure all nulls are filled (optional, for edge cases)
            merged[col] = merged.groupby('api_no_10')[col].ffill()
            merged = merged.drop(columns=[f"{col}_fromlast"])
        merged_batches.append(merged)
    # Concatenate all batches (as pandas DataFrame)
    result_df = pd.concat(merged_batches, ignore_index=True)
    # Optionally, convert back to Dask DataFrame for further processing
    # result_ddf = dd.from_pandas(result_df, npartitions=forecast_df.npartitions)
    return result_df

In [7]:
# Usage
cols_to_fill = [
    'last_year', 'last_month', 'last_gas', 'last_oil', 'months_since_last',
    'gas_avg_3', 'oil_avg_3', 'gas_avg_6', 'oil_avg_6'
]
forecast_df = batch_merge_and_backfill_dask(forecast_df, bfill_helper, cols_to_fill)

NameError: name 'batch_merge_and_backfill' is not defined

Even this function is overloading my memory, it froze my computer which wasn't saved here. Let's try another even more memory conscious version

In [6]:
import dask.dataframe as dd
import gc

def batch_merge_and_backfill_dask(
    forecast_df, df_last, cols_to_fill, batch_size=10000, 
    write_batches_to_disk=False, batch_output_prefix="batch_result"
):
    """
    Memory-efficient batch merge and backfill for Dask DataFrame (forecast_df) 
    and pandas DataFrame (df_last).
    Provides progress logging for batches and APIs.
    """
    unique_apis = forecast_df['api_no_10'].drop_duplicates().compute().values
    total_apis = len(unique_apis)
    num_batches = (total_apis + batch_size - 1) // batch_size  # ceil div

    print(f"✅ Total unique APIs: {total_apis}")
    print(f"✅ Batch size: {batch_size}")
    print(f"✅ Total batches: {num_batches}")

    batch_filepaths = []
    merged_batches = []

    for batch_num, start in enumerate(range(0, total_apis, batch_size), start=1):
        end = min(start + batch_size, total_apis)
        batch_apis = unique_apis[start:end]

        print(f"🔹 Processing batch {batch_num}/{num_batches} ({len(batch_apis)} APIs)...")

        # Subset and load only this batch into pandas
        forecast_batch = forecast_df[forecast_df['api_no_10'].isin(batch_apis)].compute()
        last_batch = df_last[df_last['api_no_10'].isin(batch_apis)].copy()

        forecast_batch = forecast_batch.sort_values(['api_no_10', 'date_prod'])
        last_batch = last_batch.sort_values(['api_no_10', 'date_prod'])

        # Merge
        merged = forecast_batch.merge(
            last_batch,
            on=['api_no_10', 'date_prod'],
            how='left',
            suffixes=('', '_fromlast')
        )

        # Backfill and forward fill for each feature
        for col in cols_to_fill:
            merged[col] = merged[col].combine_first(merged[f"{col}_fromlast"])
            merged[col] = merged.groupby('api_no_10')[col].bfill()
            merged[col] = merged.groupby('api_no_10')[col].ffill()
            merged = merged.drop(columns=[f"{col}_fromlast"])

        if write_batches_to_disk:
            filepath = f"{batch_output_prefix}_{batch_num}.parquet"
            merged.to_parquet(filepath, index=False)
            batch_filepaths.append(filepath)
            print(f"✅ Batch {batch_num} written to {filepath}")
            del merged
        else:
            merged_batches.append(merged)
            print(f"✅ Batch {batch_num} completed and kept in memory.")

        del forecast_batch, last_batch
        gc.collect()

    print(f"✅ All {num_batches} batches processed.")

    if write_batches_to_disk:
        print(f"🔄 Combining {len(batch_filepaths)} batch files into Dask DataFrame...")
        result_ddf = dd.read_parquet(batch_filepaths)
        return result_ddf
    else:
        print(f"🔄 Concatenating {len(merged_batches)} batches into final DataFrame...")
        result_df = pd.concat(merged_batches, ignore_index=True)
        return result_df

In [None]:
cols_to_fill = [
    'last_year', 'last_month', 'last_gas', 'last_oil', 'months_since_last',
    'gas_avg_3', 'oil_avg_3', 'gas_avg_6', 'oil_avg_6'
]
forecast_df = batch_merge_and_backfill_dask(forecast_df, bfill_helper, cols_to_fill, batch_size=5000)
# trying batch size of 5000 since 10000 still froze my machine

Batch size of 5000 still did not work, it did not even get past 1 batch before freezing. I'll try much smaller just to see if I can get it to work at all

In [None]:
cols_to_fill = [
    'last_year', 'last_month', 'last_gas', 'last_oil', 'months_since_last',
    'gas_avg_3', 'oil_avg_3', 'gas_avg_6', 'oil_avg_6'
]
forecast_df = batch_merge_and_backfill_dask(forecast_df, bfill_helper, cols_to_fill, batch_size=100)
# trying batch size of 100 to check if it works at all

It got through one batch (out of about 4000, quite slowly) and then already froze on the 2nd, so something is definitely wrong with the implementation in terms of memory handling at least

I'm going to save to disk while using dask and compute/use only the needed columns in the forecast_batch to save a lot of time in computation and merging

In [7]:
import dask.dataframe as dd
import gc

def batch_merge_and_backfill_dask(
    forecast_df, df_last, cols_to_fill, batch_size=10000, 
    write_batches_to_disk=False, batch_output_prefix="batch_result"
):
    """
    Memory-efficient batch merge and backfill for Dask DataFrame (forecast_df) 
    and pandas DataFrame (df_last).
    Provides progress logging for batches and APIs.
    """
    columns_needed = ['api_no_10', 'date_prod'] + cols_to_fill
    unique_apis = forecast_df['api_no_10'].drop_duplicates().compute().values
    total_apis = len(unique_apis)
    num_batches = (total_apis + batch_size - 1) // batch_size  # ceil div

    print(f"✅ Total unique APIs: {total_apis}")
    print(f"✅ Batch size: {batch_size}")
    print(f"✅ Total batches: {num_batches}")

    batch_filepaths = []
    merged_batches = []

    for batch_num, start in enumerate(range(0, total_apis, batch_size), start=1):
        end = min(start + batch_size, total_apis)
        batch_apis = unique_apis[start:end]

        print(f"🔹 Processing batch {batch_num}/{num_batches} ({len(batch_apis)} APIs)...")

        # Subset and load only this batch into pandas
        forecast_batch = forecast_df[forecast_df['api_no_10'].isin(batch_apis)][columns_needed].compute()
        last_batch = df_last[df_last['api_no_10'].isin(batch_apis)].copy()

        forecast_batch = forecast_batch.sort_values(['api_no_10', 'date_prod'])
        last_batch = last_batch.sort_values(['api_no_10', 'date_prod'])

        # Merge
        merged = forecast_batch.merge(
            last_batch,
            on=['api_no_10', 'date_prod'],
            how='left',
            suffixes=('', '_fromlast')
        )

        # Backfill and forward fill for each feature
        for col in cols_to_fill:
            merged[col] = merged[col].combine_first(merged[f"{col}_fromlast"])
            merged[col] = merged.groupby('api_no_10')[col].bfill()
            merged[col] = merged.groupby('api_no_10')[col].ffill()
            merged = merged.drop(columns=[f"{col}_fromlast"])

        if write_batches_to_disk:
            filepath = f"../batch_data/{batch_output_prefix}_{batch_num}.parquet"
            merged.to_parquet(filepath, index=False)
            batch_filepaths.append(filepath)
            print(f"✅ Batch {batch_num} written to {filepath}")
            del merged
        else:
            merged_batches.append(merged)
            print(f"✅ Batch {batch_num} completed and kept in memory.")

        del forecast_batch, last_batch
        gc.collect()

    print(f"✅ All {num_batches} batches processed.")

    if write_batches_to_disk:
        print(f"🔄 Combining {len(batch_filepaths)} batch files into Dask DataFrame...")
        result_ddf = dd.read_parquet(batch_filepaths)
        return result_ddf
    else:
        print(f"🔄 Concatenating {len(merged_batches)} batches into final DataFrame...")
        result_df = pd.concat(merged_batches, ignore_index=True)
        return result_df

In [8]:
cols_to_fill = [
    'last_year', 'last_month', 'last_gas', 'last_oil', 'months_since_last',
    'gas_avg_3', 'oil_avg_3', 'gas_avg_6', 'oil_avg_6'
]
forecast_df = batch_merge_and_backfill_dask(forecast_df, bfill_helper, cols_to_fill, batch_size=1000, write_batches_to_disk=False)

✅ Total unique APIs: 399039
✅ Batch size: 1000
✅ Total batches: 400
🔹 Processing batch 1/400 (1000 APIs)...
✅ Batch 1 completed and kept in memory.
🔹 Processing batch 2/400 (1000 APIs)...
✅ Batch 2 completed and kept in memory.
🔹 Processing batch 3/400 (1000 APIs)...
✅ Batch 3 completed and kept in memory.
🔹 Processing batch 4/400 (1000 APIs)...
✅ Batch 4 completed and kept in memory.
🔹 Processing batch 5/400 (1000 APIs)...
✅ Batch 5 completed and kept in memory.
🔹 Processing batch 6/400 (1000 APIs)...
✅ Batch 6 completed and kept in memory.
🔹 Processing batch 7/400 (1000 APIs)...
✅ Batch 7 completed and kept in memory.
🔹 Processing batch 8/400 (1000 APIs)...
✅ Batch 8 completed and kept in memory.
🔹 Processing batch 9/400 (1000 APIs)...
✅ Batch 9 completed and kept in memory.
🔹 Processing batch 10/400 (1000 APIs)...
✅ Batch 10 completed and kept in memory.
🔹 Processing batch 11/400 (1000 APIs)...
✅ Batch 11 completed and kept in memory.
🔹 Processing batch 12/400 (1000 APIs)...
✅ Batch

In [9]:
forecast_df.to_parquet('../data/forecast_4_backfill.parquet')

Since this is a dask dataframe and we do need to know its properties to know whether the function succeeded, let's implement a new version of comprehensive_data_overview:

In [34]:
import dask.dataframe as dd

def comprehensive_data_overview_dask(datasets):
    """
    Provide comprehensive overview of all Dask datasets including dtypes,
    missing values, and basic stats.
    
    Args:
        datasets (dict): Dictionary of Dask DataFrames
        sample_rows (int): Number of rows to sample for numeric stats preview
    """
    for name, df in datasets.items():
        print(f"\n{'='*60}")
        print(f"DATASET: {name.upper()}")
        print(f"{'='*60}")

        # Shape
        shape = df.shape
        print(f"Shape: ({shape})")

        # Column information
        print(f"\nColumn Overview:")
        print(f"{'Column':<25} {'Type':<15} {'Non-Null':<10} {'Unique':<10} {'Missing %':<10}")
        print("-" * 80)

        # Compute stats for all columns
        counts = df.count()
        nunique = df.nunique()
        total_rows = shape[0]
        missing = df.isnull().sum()

        for col in df.columns:
            dtype = str(df[col].dtype)
            non_null = counts[col]
            unique_count = nunique[col]
            missing_pct = (missing[col] / total_rows) * 100

            print(f"{col:<25} {dtype:<15} {non_null:<10} {unique_count:<10} {missing_pct:<10.1f}%")

In [22]:
comprehensive_data_overview_dask({'backfill_df': forecast_df})


DATASET: BACKFILL_DF
Shape: ((19951950, 11))

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 object          19951950   399039     0.0       %
date_prod                 datetime64[ns]  19951950   50         0.0       %
last_year                 float64         19951950   5          0.0       %
last_month                float64         19951950   13         0.0       %
last_gas                  float64         19951950   128036     0.0       %
last_oil                  float64         19951950   38278      0.0       %
months_since_last         float64         19951950   43         0.0       %
gas_avg_3                 float64         19951950   262000     0.0       %
oil_avg_3                 float64         19951950   95299      0.0       %
gas_avg_6                 float64         19951950   430300     0.0       %
oil_avg_6           

In [23]:
bfill_helper['date_prod'].describe()

count                       1000000
mean     2022-10-27 02:54:56.908000
min             2021-01-01 00:00:00
25%             2021-12-01 00:00:00
50%             2022-11-01 00:00:00
75%             2023-10-01 00:00:00
max             2024-11-01 00:00:00
Name: date_prod, dtype: object

Unfortunately I accidentally only loaded the first 1000000 values of bfill data, so it won't be very accurate, I'll need to load all of it this time using dask and modify my function accordingly, but at least I know it works.

!! I also shouldn't forget that I need to sort the original forecast_df values in the exact same way that I sort the filled features so that I know they match the correct api number and production date

In [4]:
bfill_helper = load_parquet_datasets({'joined_data': '../data/forecast_helper_1.parquet'}, use_dask=True)['joined_data']
bfill_helper.info()

  Loaded joined_data as Dask DataFrame
  Shape: (<dask_expr.expr.Scalar: expr=ReadParquetFSSpec(5910b21).size() // 11, dtype=int32>, 11)
<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 11 entries, api_no_10 to oil_avg_6
dtypes: datetime64[ms](1), float64(6), int32(3), string(1)

In [5]:
bfill_helper = bfill_helper.compute()
bfill_helper.info()

<class 'pandas.core.frame.DataFrame'>
Index: 16564703 entries, 0 to 16603789
Data columns (total 11 columns):
 #   Column             Dtype         
---  ------             -----         
 0   api_no_10          string        
 1   date_prod          datetime64[ms]
 2   last_year          int32         
 3   last_month         int32         
 4   last_gas           float64       
 5   last_oil           float64       
 6   months_since_last  int32         
 7   gas_avg_3          float64       
 8   oil_avg_3          float64       
 9   gas_avg_6          float64       
 10  oil_avg_6          float64       
dtypes: datetime64[ms](1), float64(6), int32(3), string(1)
memory usage: 1.4 GB


In [40]:
comprehensive_data_overview_dask({'bfill_helper': bfill_helper})


DATASET: BFILL_HELPER
Shape: ((16564703, 11))

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 string          16564703   399039     0.0       %
date_prod                 datetime64[ms]  16564703   47         0.0       %
last_year                 int32           16564703   5          0.0       %
last_month                int32           16564703   13         0.0       %
last_gas                  float64         16564703   128036     0.0       %
last_oil                  float64         16564703   38278      0.0       %
months_since_last         int32           16564703   43         0.0       %
gas_avg_3                 float64         16564703   262000     0.0       %
oil_avg_3                 float64         16564703   95299      0.0       %
gas_avg_6                 float64         16564703   430300     0.0       %
oil_avg_6          

In [6]:
forecast_df = load_parquet_datasets({'joined_data': '../data/forecast_3.parquet'}, use_dask=True)['joined_data']
forecast_df.info()

  Loaded joined_data as Dask DataFrame
  Shape: (<dask_expr.expr.Scalar: expr=ReadParquetFSSpec(1407f7b).size() // 38, dtype=int32>, 38)
<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 38 entries, api_no_10 to oil_avg_6
dtypes: datetime64[ns](2), float64(32), int32(3), string(1)

In [43]:
len(forecast_df['api_no_10'].drop_duplicates().compute().values)

399039

In [44]:
sum(forecast_df[['api_no_10', 'date_prod']].compute().duplicated())

0

In [45]:
len(forecast_df['date_prod'].drop_duplicates().compute().values)

50

In [47]:
forecast_df['date_prod'].compute().dtype

dtype('<M8[ns]')

In [48]:
bfill_helper['date_prod'].dtype

dtype('<M8[ms]')

In [50]:
forecast_df['api_no_10'].dtype

string[pyarrow]

In [51]:
bfill_helper['api_no_10'].dtype

string[pyarrow]

In [36]:
cols_to_fill = [
    'last_year', 'last_month', 'last_gas', 'last_oil', 'months_since_last',
    'gas_avg_3', 'oil_avg_3', 'gas_avg_6', 'oil_avg_6'
]
backfill_df = batch_merge_and_backfill_dask(forecast_df, bfill_helper, cols_to_fill, batch_size=1000, write_batches_to_disk=False)

✅ Total unique APIs: 399039
✅ Batch size: 1000
✅ Total batches: 400
🔹 Processing batch 1/400 (1000 APIs)...
✅ Batch 1 completed and kept in memory.
🔹 Processing batch 2/400 (1000 APIs)...
✅ Batch 2 completed and kept in memory.
🔹 Processing batch 3/400 (1000 APIs)...
✅ Batch 3 completed and kept in memory.
🔹 Processing batch 4/400 (1000 APIs)...
✅ Batch 4 completed and kept in memory.
🔹 Processing batch 5/400 (1000 APIs)...
✅ Batch 5 completed and kept in memory.
🔹 Processing batch 6/400 (1000 APIs)...
✅ Batch 6 completed and kept in memory.
🔹 Processing batch 7/400 (1000 APIs)...
✅ Batch 7 completed and kept in memory.
🔹 Processing batch 8/400 (1000 APIs)...
✅ Batch 8 completed and kept in memory.
🔹 Processing batch 9/400 (1000 APIs)...
✅ Batch 9 completed and kept in memory.
🔹 Processing batch 10/400 (1000 APIs)...
✅ Batch 10 completed and kept in memory.
🔹 Processing batch 11/400 (1000 APIs)...
✅ Batch 11 completed and kept in memory.
🔹 Processing batch 12/400 (1000 APIs)...
✅ Batch

In [37]:
backfill_df.to_parquet('../data/forecast_4_backfill.parquet')

In [38]:
comprehensive_data_overview_dask({'backfill_df': backfill_df})


DATASET: BACKFILL_DF
Shape: ((20000000, 11))

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 string          20000000   1000       0.0       %
date_prod                 datetime64[ns]  20000000   50         0.0       %
last_year                 float64         20000000   5          0.0       %
last_month                float64         20000000   13         0.0       %
last_gas                  float64         20000000   7337       0.0       %
last_oil                  float64         20000000   2939       0.0       %
months_since_last         float64         20000000   4          0.0       %
gas_avg_3                 float64         20000000   11575      0.0       %
oil_avg_3                 float64         20000000   4893       0.0       %
gas_avg_6                 float64         20000000   14699      0.0       %
oil_avg_6           

Something looks very wrong here, there are 1000 api_no_10 as if only one batch of apis was processed

In [39]:
backfill_df['api_no_10'].nunique()

1000

The take column do have a subtle mismatch in datatype, I'm not sure that's the problem but I can't think of anything else so let's try fixing this and then running again

In [7]:
forecast_df['date_prod'] = forecast_df['date_prod'].astype('datetime64[ns]')
bfill_helper['date_prod'] = bfill_helper['date_prod'].astype('datetime64[ns]')

In [53]:
cols_to_fill = [
    'last_year', 'last_month', 'last_gas', 'last_oil', 'months_since_last',
    'gas_avg_3', 'oil_avg_3', 'gas_avg_6', 'oil_avg_6'
]
backfill_df = batch_merge_and_backfill_dask(forecast_df, bfill_helper, cols_to_fill, batch_size=1000, write_batches_to_disk=False)

✅ Total unique APIs: 399039
✅ Batch size: 1000
✅ Total batches: 400
🔹 Processing batch 1/400 (1000 APIs)...
✅ Batch 1 completed and kept in memory.
🔹 Processing batch 2/400 (1000 APIs)...
✅ Batch 2 completed and kept in memory.
🔹 Processing batch 3/400 (1000 APIs)...
✅ Batch 3 completed and kept in memory.
🔹 Processing batch 4/400 (1000 APIs)...
✅ Batch 4 completed and kept in memory.
🔹 Processing batch 5/400 (1000 APIs)...
✅ Batch 5 completed and kept in memory.
🔹 Processing batch 6/400 (1000 APIs)...
✅ Batch 6 completed and kept in memory.
🔹 Processing batch 7/400 (1000 APIs)...
✅ Batch 7 completed and kept in memory.
🔹 Processing batch 8/400 (1000 APIs)...
✅ Batch 8 completed and kept in memory.
🔹 Processing batch 9/400 (1000 APIs)...
✅ Batch 9 completed and kept in memory.
🔹 Processing batch 10/400 (1000 APIs)...
✅ Batch 10 completed and kept in memory.
🔹 Processing batch 11/400 (1000 APIs)...
✅ Batch 11 completed and kept in memory.
🔹 Processing batch 12/400 (1000 APIs)...
✅ Batch

In [54]:
comprehensive_data_overview_dask({'backfill_df': backfill_df})


DATASET: BACKFILL_DF
Shape: ((19951950, 11))

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 string          19951950   399039     0.0       %
date_prod                 datetime64[ns]  19951950   50         0.0       %
last_year                 float64         19951950   5          0.0       %
last_month                float64         19951950   13         0.0       %
last_gas                  float64         19951950   128036     0.0       %
last_oil                  float64         19951950   38278      0.0       %
months_since_last         float64         19951950   43         0.0       %
gas_avg_3                 float64         19951950   262000     0.0       %
oil_avg_3                 float64         19951950   95299      0.0       %
gas_avg_6                 float64         19951950   430300     0.0       %
oil_avg_6           

In [55]:
backfill_df.to_parquet('../data/forecast_4_backfill.parquet')

This seems to have worked to solve the issue and now we properly included all backfill values for maximum accuracy! The problem was that when I load the backfill helper dataset in with dask and then compute it back into a pandas dataframe, it subtly changed the datasets of some columns (some didn't matter like api being a string instead of an object, but the datetime[ns] was converted to datetime[ms] and this was creating problems with merging its batches with those of forecast_df

Next I just need to sort the forecast_df values in the same way I sorted for backfill_df (realistically I can probably combine that step in the future), and then I'll be able to add these columns from backfill_df directly (number of rows should be the same and api and date prod should match for each row)

In case I want to re-create anything, I need to remember that the datasets I used to create backfill_df (saved as forecast_4_backfill.parquet) are forecast_df (saved as forecast_3.parquet) and bfill_helper (saved as forecast_helper_1.parquet)... and that is also the version of forecast_df I will be moving forward with:
- forecast_4.parquet = forecast_3.parquet(sorted) + forecast_4_backfill.parquet

In [8]:
backfill_df = load_parquet_datasets({'joined_data': '../data/forecast_4_backfill.parquet'}, use_dask=True)['joined_data']
backfill_df.info()

  Loaded joined_data as Dask DataFrame
  Shape: (<dask_expr.expr.Scalar: expr=ReadParquetFSSpec(8d1cbf6).size() // 11, dtype=int32>, 11)
<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 11 entries, api_no_10 to oil_avg_6
dtypes: datetime64[ns](1), float64(9), string(1)

In [9]:
backfill_df = backfill_df.compute()
backfill_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19951950 entries, 0 to 19951949
Data columns (total 11 columns):
 #   Column             Dtype         
---  ------             -----         
 0   api_no_10          string        
 1   date_prod          datetime64[ns]
 2   last_year          float64       
 3   last_month         float64       
 4   last_gas           float64       
 5   last_oil           float64       
 6   months_since_last  float64       
 7   gas_avg_3          float64       
 8   oil_avg_3          float64       
 9   gas_avg_6          float64       
 10  oil_avg_6          float64       
dtypes: datetime64[ns](1), float64(9), string(1)
memory usage: 1.7 GB


Now it's time to sort forecast_df in the same way that I sorted it when creating backfill_df, so I can properly add the backfill_df columns:

In [10]:
import dask.dataframe as dd
import gc

def batch_sort_forecast_df(
    forecast_df, batch_size=10000, batch_output_prefix="sorted_batch", write_batches_to_disk=False
):
    """
    Memory-efficient batch sorter for Dask DataFrame (forecast_df).
    Sorts by ['api_no_10', 'date_prod'] in the same batching as the merge/backfill.
    """
    unique_apis = forecast_df['api_no_10'].drop_duplicates().compute().values
    total_apis = len(unique_apis)
    num_batches = (total_apis + batch_size - 1) // batch_size

    print(f"✅ Total unique APIs: {total_apis}")
    print(f"✅ Batch size: {batch_size}")
    print(f"✅ Total batches: {num_batches}")

    batch_filepaths = []
    sorted_batches = []

    for batch_num, start in enumerate(range(0, total_apis, batch_size), start=1):
        end = min(start + batch_size, total_apis)
        batch_apis = unique_apis[start:end]

        print(f"🔹 Sorting batch {batch_num}/{num_batches} ({len(batch_apis)} APIs)...")

        # Subset and load this batch into pandas
        batch = forecast_df[forecast_df['api_no_10'].isin(batch_apis)].compute()

        # Sort
        batch = batch.sort_values(['api_no_10', 'date_prod'])

        if write_batches_to_disk:
            filepath = f"../batch_data/{batch_output_prefix}_{batch_num}.parquet"
            batch.to_parquet(filepath, index=False)
            batch_filepaths.append(filepath)
            print(f"✅ Sorted batch {batch_num} written to {filepath}")
            del batch
        else:
            sorted_batches.append(batch)
            print(f"✅ Sorted batch {batch_num} kept in memory.")

        gc.collect()

    print(f"✅ All {num_batches} batches sorted.")

    if write_batches_to_disk:
        print(f"🔄 Combining {len(batch_filepaths)} sorted batch files into Dask DataFrame...")
        sorted_ddf = dd.read_parquet(batch_filepaths)
        return sorted_ddf
    else:
        print(f"🔄 Concatenating {len(sorted_batches)} batches into final sorted DataFrame...")
        sorted_df = pd.concat(sorted_batches, ignore_index=True)
        return sorted_df

In [None]:
# I need to use the same respective parameters as I did to create backfill_df
forecast_df = batch_sort_forecast_df(forecast_df, batch_size=1000, write_batches_to_disk=False)

Even this function unfortunately doesn't work, even though it's only computing and sorting, it is freezing my computer and is unable to hold so much in memory (it manages to sort just one time, which in itself takes a long time, and freezes on the 2nd sort out of 400). But I noticed that the function worked fine before when I was sorting only a limited number of columns, so I'm thinking of re-creating the function to work with a limited subset of columns each time (it will always need api_no_10 and date_prod)

In [10]:
forecast_df.shape

(<dask_expr.expr.Scalar: expr=Assign(frame=ReadParquetFSSpec(1407f7b)).size() // 38, dtype=int32>,
 38)

In [11]:
forecast_df.shape[0].compute()

19951950

That is the same number as backfill_df, which is good

In [8]:
forecast_df.columns

Index(['api_no_10', 'date_prod', 'gas_monthly', 'oil_monthly', 'start_date',
       'wellbore_type', 'Quality_Quant', 'Production_Age',
       'Well_Production_Type', 'reservoir type', 'arps model',
       'Production_Status', 'lat_surface', 'lon_surface', 'lat_bottomhole',
       'lon_bottomhole', 'basin', 'formation_group', 'horizontal_length',
       'measured_depth', 'depth_tvd', 'operator_cluster', 'well_generation',
       'well_density_1km', 'nearest_well_distance_km', 'prod_year',
       'prod_month', 'month_sin', 'month_cos', 'last_year', 'last_month',
       'last_gas', 'last_oil', 'months_since_last', 'gas_avg_3', 'oil_avg_3',
       'gas_avg_6', 'oil_avg_6'],
      dtype='object')

In [9]:
len(forecast_df.columns) - 2

36

There are a total of 36 columns in forecast_df, which divides neatly by 9 (the exact number sorted effectively before), and we don't even need to sort the last batch since we already are going to replace it with backfill_df. So I'm going to try to create another version of the batch sort function which will only compute, and only sort, a max of 9 columns at once, and then will simply add all columns at the end

In [20]:
import dask.dataframe as dd
import gc

# new parameter!
def batch_sort_forecast_df(
    forecast_df, max_sort_columns=9, batch_size=10000, batch_output_prefix="sorted_batch", write_batches_to_disk=False
):
    """
    Memory-efficient batch sorter for Dask DataFrame (forecast_df).
    Sorts by ['api_no_10', 'date_prod'] in the same batching as the merge/backfill.
    """
    unique_apis = forecast_df['api_no_10'].drop_duplicates().compute().values
    total_apis = len(unique_apis)
    num_batches = (total_apis + batch_size - 1) // batch_size

    print(f"✅ Total unique APIs: {total_apis}")
    print(f"✅ Batch size: {batch_size}")
    print(f"✅ Total batches: {num_batches}")

    # this is a bit clumsy and non-pythonic but we know what we need in this specific case
    col_batch_1 = ['api_no_10', 'date_prod']
    col_batch_2 = ['api_no_10', 'date_prod']
    col_batch_3 = ['api_no_10', 'date_prod']
    col_batch_1.extend(forecast_df.columns[2:2+max_sort_columns])
    col_batch_2.extend(forecast_df.columns[2+max_sort_columns:2+max_sort_columns*2])
    col_batch_3.extend(forecast_df.columns[2+max_sort_columns*2:2+max_sort_columns*3])

    print(f"✅ Column Batch 1: {col_batch_1}")
    print(f"✅ Column Batch 2: {col_batch_2}")
    print(f"✅ Column Batch 3: {col_batch_3}")

    batch_filepaths = []
    sorted_batches_1 = []
    sorted_batches_2 = []
    sorted_batches_3 = []

    for batch_num, start in enumerate(range(0, total_apis, batch_size), start=1):
        end = min(start + batch_size, total_apis)
        batch_apis = unique_apis[start:end]

        print(f"🔹 Sorting batch {batch_num}/{num_batches} ({len(batch_apis)} APIs)...")

        # Subset and load this batch into pandas
        batch_1 = forecast_df[forecast_df['api_no_10'].isin(batch_apis)][col_batch_1].compute()
        batch_2 = forecast_df[forecast_df['api_no_10'].isin(batch_apis)][col_batch_2].compute()
        batch_3 = forecast_df[forecast_df['api_no_10'].isin(batch_apis)][col_batch_3].compute()

        # Sort
        batch_1 = batch_1.sort_values(['api_no_10', 'date_prod'])
        batch_2 = batch_2.sort_values(['api_no_10', 'date_prod'])
        batch_3 = batch_3.sort_values(['api_no_10', 'date_prod'])

        if write_batches_to_disk:
            filepath = f"../batch_data/{batch_output_prefix}_{batch_num}.parquet"
            batch.to_parquet(filepath, index=False)
            batch_filepaths.append(filepath)
            print(f"✅ Sorted batch {batch_num} written to {filepath}")
            del batch
        else:
            sorted_batches_1.append(batch_1)
            sorted_batches_2.append(batch_2)
            sorted_batches_3.append(batch_3)
            print(f"✅ Sorted batch {batch_num} kept in memory.")

        gc.collect()

    print(f"✅ All {num_batches} batches sorted.")

    if write_batches_to_disk:
        print(f"🔄 Combining {len(batch_filepaths)} sorted batch files into Dask DataFrame...")
        sorted_ddf = dd.read_parquet(batch_filepaths)
        return sorted_ddf
    else:
        print(f"🔄 Concatenating {len(sorted_batches_1)} batches into final sorted DataFrame...")
        sorted_df_1 = pd.concat(sorted_batches_1, ignore_index=True)
        sorted_df_2 = pd.concat(sorted_batches_2, ignore_index=True)
        sorted_df_3 = pd.concat(sorted_batches_3, ignore_index=True)
        for col in sorted_df_2.columns[2:]:
            sorted_df_1[col] = sorted_df_2[col]
        for col in sorted_df_3.columns[2:]:
            sorted_df_1[col] = sorted_df_3[col]
        return sorted_df_1

In [21]:
# I need to use the same respective parameters as I did to create backfill_df
forecast_df = batch_sort_forecast_df(forecast_df, batch_size=1000, write_batches_to_disk=False)

✅ Total unique APIs: 399039
✅ Batch size: 1000
✅ Total batches: 400
✅ Column Batch 1: ['api_no_10', 'date_prod', 'gas_monthly', 'oil_monthly', 'start_date', 'wellbore_type', 'Quality_Quant', 'Production_Age', 'Well_Production_Type', 'reservoir type', 'arps model']
✅ Column Batch 2: ['api_no_10', 'date_prod', 'Production_Status', 'lat_surface', 'lon_surface', 'lat_bottomhole', 'lon_bottomhole', 'basin', 'formation_group', 'horizontal_length', 'measured_depth']
✅ Column Batch 3: ['api_no_10', 'date_prod', 'depth_tvd', 'operator_cluster', 'well_generation', 'well_density_1km', 'nearest_well_distance_km', 'prod_year', 'prod_month', 'month_sin', 'month_cos']
🔹 Sorting batch 1/400 (1000 APIs)...
✅ Sorted batch 1 kept in memory.
🔹 Sorting batch 2/400 (1000 APIs)...
✅ Sorted batch 2 kept in memory.
🔹 Sorting batch 3/400 (1000 APIs)...
✅ Sorted batch 3 kept in memory.
🔹 Sorting batch 4/400 (1000 APIs)...
✅ Sorted batch 4 kept in memory.
🔹 Sorting batch 5/400 (1000 APIs)...
✅ Sorted batch 5 kept

In [22]:
forecast_df.to_parquet('../data/forecast_3_sorted.parquet')

That took about an hour and 20 minutes and I made an error the first time so I had to re-do the entire thing, but now that the data is loaded we should be good to go!

In [23]:
forecast_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19951950 entries, 0 to 19951949
Data columns (total 29 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   api_no_10                 string        
 1   date_prod                 datetime64[ns]
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   wellbore_type             float64       
 6   Quality_Quant             float64       
 7   Production_Age            int32         
 8   Well_Production_Type      float64       
 9   reservoir type            float64       
 10  arps model                float64       
 11  Production_Status         float64       
 12  lat_surface               float64       
 13  lon_surface               float64       
 14  lat_bottomhole            float64       
 15  lon_bottomhole            float64       
 16  basin                     float64       
 17  format

It looks good so far, the right number of rows with all the right columns, and takes up as much memory as I already imagined it would. Now it's time to add the backfill columns

In [24]:
backfill_df = load_parquet_datasets({'joined_data': '../data/forecast_4_backfill.parquet'}, use_dask=True)['joined_data']
backfill_df.info()

  Loaded joined_data as Dask DataFrame
  Shape: (<dask_expr.expr.Scalar: expr=ReadParquetFSSpec(81259e1).size() // 11, dtype=int32>, 11)
<class 'dask.dataframe.dask_expr.DataFrame'>
Columns: 11 entries, api_no_10 to oil_avg_6
dtypes: datetime64[ns](1), float64(9), string(1)

In [25]:
backfill_df = backfill_df.compute()
backfill_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19951950 entries, 0 to 19951949
Data columns (total 11 columns):
 #   Column             Dtype         
---  ------             -----         
 0   api_no_10          string        
 1   date_prod          datetime64[ns]
 2   last_year          float64       
 3   last_month         float64       
 4   last_gas           float64       
 5   last_oil           float64       
 6   months_since_last  float64       
 7   gas_avg_3          float64       
 8   oil_avg_3          float64       
 9   gas_avg_6          float64       
 10  oil_avg_6          float64       
dtypes: datetime64[ns](1), float64(9), string(1)
memory usage: 1.7 GB


Let's check that the heads and tails of the dataframes are the same, for evidence that the columns line up properly

In [26]:
forecast_df.head(20)

Unnamed: 0,api_no_10,date_prod,gas_monthly,oil_monthly,start_date,wellbore_type,Quality_Quant,Production_Age,Well_Production_Type,reservoir type,arps model,Production_Status,lat_surface,lon_surface,lat_bottomhole,lon_bottomhole,basin,formation_group,horizontal_length,measured_depth,depth_tvd,operator_cluster,well_generation,well_density_1km,nearest_well_distance_km,prod_year,prod_month,month_sin,month_cos
0,4200131030,2021-01-01,0.0,2.0,1993-01-01,2.0,6.0,337,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,1,0.5,0.8660254
1,4200131030,2021-02-01,0.0,1.0,1993-01-01,2.0,6.0,338,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,2,0.8660254,0.5
2,4200131030,2021-03-01,0.0,2.0,1993-01-01,2.0,6.0,339,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,3,1.0,6.123234000000001e-17
3,4200131030,2021-04-01,0.0,2.0,1993-01-01,2.0,6.0,340,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,4,0.8660254,-0.5
4,4200131030,2021-05-01,0.0,1.0,1993-01-01,2.0,6.0,341,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,5,0.5,-0.8660254
5,4200131030,2021-06-01,0.0,2.0,1993-01-01,2.0,6.0,342,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,6,1.224647e-16,-1.0
6,4200131030,2021-07-01,0.0,1.0,1993-01-01,2.0,6.0,343,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,7,-0.5,-0.8660254
7,4200131030,2021-08-01,0.0,1.0,1993-01-01,2.0,6.0,344,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,8,-0.8660254,-0.5
8,4200131030,2021-09-01,0.0,1.0,1993-01-01,2.0,6.0,345,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,9,-1.0,-1.83697e-16
9,4200131030,2021-10-01,0.0,2.0,1993-01-01,2.0,6.0,346,3.0,1.0,2.0,1.0,31.628696,-95.472879,31.628696,-95.472879,2.0,16.0,2468.526123,570.0,564.3,0.0,1.0,310.0,0.069054,2021,10,-0.8660254,0.5


In [28]:
backfill_df[['api_no_10', 'date_prod']].head(20)

Unnamed: 0,api_no_10,date_prod
0,4200131030,2021-01-01
1,4200131030,2021-02-01
2,4200131030,2021-03-01
3,4200131030,2021-04-01
4,4200131030,2021-05-01
5,4200131030,2021-06-01
6,4200131030,2021-07-01
7,4200131030,2021-08-01
8,4200131030,2021-09-01
9,4200131030,2021-10-01


The beginning is the same

In [29]:
backfill_df[['api_no_10', 'date_prod']].tail(20)

Unnamed: 0,api_no_10,date_prod
19951930,4250130488,2023-07-01
19951931,4250130488,2023-08-01
19951932,4250130488,2023-09-01
19951933,4250130488,2023-10-01
19951934,4250130488,2023-11-01
19951935,4250130488,2023-12-01
19951936,4250130488,2024-01-01
19951937,4250130488,2024-02-01
19951938,4250130488,2024-03-01
19951939,4250130488,2024-04-01


In [30]:
forecast_df[['api_no_10', 'date_prod']].tail(20)

Unnamed: 0,api_no_10,date_prod
19951930,4250130488,2023-07-01
19951931,4250130488,2023-08-01
19951932,4250130488,2023-09-01
19951933,4250130488,2023-10-01
19951934,4250130488,2023-11-01
19951935,4250130488,2023-12-01
19951936,4250130488,2024-01-01
19951937,4250130488,2024-02-01
19951938,4250130488,2024-03-01
19951939,4250130488,2024-04-01


All the same dates and all the same indices, it looks solid. I'll just go ahead and check the whole columns since it should be too expensive:

In [31]:
sum(forecast_df['api_no_10'] == backfill_df['api_no_10'])

19951950

Exactly the same as the number of entries!

In [32]:
sum(forecast_df['date_prod'] == backfill_df['date_prod'])

19951950

In [33]:
for col in backfill_df.columns[2:]:
    forecast_df[col] = backfill_df[col]

forecast_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19951950 entries, 0 to 19951949
Data columns (total 38 columns):
 #   Column                    Dtype         
---  ------                    -----         
 0   api_no_10                 string        
 1   date_prod                 datetime64[ns]
 2   gas_monthly               float64       
 3   oil_monthly               float64       
 4   start_date                datetime64[ns]
 5   wellbore_type             float64       
 6   Quality_Quant             float64       
 7   Production_Age            int32         
 8   Well_Production_Type      float64       
 9   reservoir type            float64       
 10  arps model                float64       
 11  Production_Status         float64       
 12  lat_surface               float64       
 13  lon_surface               float64       
 14  lat_bottomhole            float64       
 15  lon_bottomhole            float64       
 16  basin                     float64       
 17  format

That is a monster of a dataframe

In [35]:
comprehensive_data_overview_dask({'forecast_df': forecast_df})


DATASET: FORECAST_DF
Shape: ((19951950, 38))

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 string          19951950   399039     0.0       %
date_prod                 datetime64[ns]  19951950   50         0.0       %
gas_monthly               float64         16564703   129580     17.0      %
oil_monthly               float64         16564703   38707      17.0      %
start_date                datetime64[ns]  19951950   385        0.0       %
wellbore_type             float64         19951950   3          0.0       %
Quality_Quant             float64         19951950   9          0.0       %
Production_Age            int32           19951950   386        0.0       %
Well_Production_Type      float64         19951950   4          0.0       %
reservoir type            float64         19951950   3          0.0       %
arps model          

In [36]:
forecast_df.to_parquet('../data/forecast_4.parquet')

## Forecasting and Final Calculations <a id="Conclusions"></a>

In [37]:
missing_mask = forecast_df['gas_monthly'].isna()
print(f"Rows needing predictions: {missing_mask.sum():,}")

Rows needing predictions: 3,387,247


In [38]:
sum(forecast_df['oil_monthly'].isna())

3387247

In [39]:
# Define feature columns (exclude API, dates, and targets)
exclude_cols = ['api_no_10', 'date_prod', 'start_date', 'gas_monthly', 'oil_monthly']
feature_cols = [col for col in forecast_df.columns if col not in exclude_cols]

# Create feature matrix and target vectors
X_forecast = forecast_df[missing_mask][feature_cols].copy()
comprehensive_data_overview_dask({'X_forecast': X_forecast})


DATASET: X_FORECAST
Shape: ((3387247, 33))

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
wellbore_type             float64         3387247    3          0.0       %
Quality_Quant             float64         3387247    9          0.0       %
Production_Age            int32           3387247    386        0.0       %
Well_Production_Type      float64         3387247    4          0.0       %
reservoir type            float64         3387247    3          0.0       %
arps model                float64         3387247    4          0.0       %
Production_Status         float64         3387247    3          0.0       %
lat_surface               float64         3387247    378656     0.0       %
lon_surface               float64         3387247    385128     0.0       %
lat_bottomhole            float64         3387247    382387     0.0       %
lon_bottomhole        

I accidentally wrote "and target vectors" in the comment above the creation of the forecast features, but that is just because I copied my code from before, it doesn't mean anything, there are no target vectors in this case since these are missing values, we can estimate the performance from what we did on the test set earlier

In [40]:
gas_model = joblib.load('../models/final_gas_model.pkl')
oil_model = joblib.load('../models/final_oil_model.pkl')

In [41]:
gas_model

0,1,2
,objective,'reg:squarederror'
,base_score,
,booster,
,callbacks,
,colsample_bylevel,
,colsample_bynode,
,colsample_bytree,1.0
,device,'cuda'
,early_stopping_rounds,
,enable_categorical,False


In [42]:
oil_model

0,1,2
,objective,'reg:squarederror'
,base_score,
,booster,
,callbacks,
,colsample_bylevel,
,colsample_bynode,
,colsample_bytree,1.0
,device,'cuda'
,early_stopping_rounds,
,enable_categorical,False


In [43]:
gas_predictions = gas_model.predict(X_forecast)

In [45]:
pd.Series(gas_predictions).describe()

count    3.387247e+06
mean     8.513797e+03
std      4.205036e+04
min     -2.296225e+04
25%      1.872266e+01
50%      1.015935e+02
75%      2.572326e+03
max      7.384224e+06
dtype: float64

In [47]:
sum(pd.Series(gas_predictions) <= 0)

162587

In [48]:
100*sum(pd.Series(gas_predictions) <= 0) / len(pd.Series(gas_predictions))

4.799974728739889

In [50]:
100*sum(forecast_df['gas_monthly'] == 0) / len(forecast_df['gas_monthly'])

20.609890261352902

There are some negative values there which we'll have to fix, **they'll just be interpreted as zeroes**, it's less than 5% of the data so hopefully not a problem considering how many zeroes there are, there are actually a lot more zero values in forecast_df for the gas_monthly values that we know, but that could also be happenstance since there isn't necessarily a random selection of missing values

In [52]:
gas_predictions = np.maximum(gas_predictions, 0)
pd.Series(gas_predictions).describe()

count    3.387247e+06
mean     8.531177e+03
std      4.204627e+04
min      0.000000e+00
25%      1.872266e+01
50%      1.015935e+02
75%      2.572326e+03
max      7.384224e+06
dtype: float64

In [53]:
forecast_df['gas_monthly'].describe()

count    1.656470e+07
mean     2.646018e+03
std      1.856230e+04
min      0.000000e+00
25%      1.000000e+00
50%      5.600000e+01
75%      8.210000e+02
max      1.614492e+07
Name: gas_monthly, dtype: float64

So the values towards the low end of our predictions tend to be higher than average whereas the values towards the highest end tend to be lower, this suggests to me that our model might over-correcting towards the mean, so that is one possible area where we could improve it

In [54]:
oil_predictions = oil_model.predict(X_forecast)

In [55]:
pd.Series(oil_predictions).describe()

count    3.387247e+06
mean     2.847601e+03
std      9.207719e+03
min     -3.608422e+03
25%      4.662048e+00
50%      4.286880e+01
75%      3.625202e+02
max      1.419534e+06
dtype: float64

In [56]:
sum(pd.Series(oil_predictions) <= 0)

131359

Similar amount of negative oil values, perhaps even fewer

In [57]:
oil_predictions = np.maximum(oil_predictions, 0)
pd.Series(oil_predictions).describe()

count    3.387247e+06
mean     2.850251e+03
std      9.206856e+03
min      0.000000e+00
25%      4.662048e+00
50%      4.286880e+01
75%      3.625202e+02
max      1.419534e+06
dtype: float64

In [58]:
forecast_df['oil_monthly'].describe()

count    1.656470e+07
mean     4.283009e+02
std      2.322347e+03
min      0.000000e+00
25%      1.000000e+00
50%      2.300000e+01
75%      1.060000e+02
max      1.391600e+06
Name: oil_monthly, dtype: float64

The oil model interestingly is a bit different, it doesn't over-correct towards the mean it just has a bit larger than average values, but quite close in many case

In [59]:
forecast_df.loc[missing_mask, 'gas_monthly'] = gas_predictions
forecast_df.loc[missing_mask, 'oil_monthly'] = oil_predictions

In [60]:
forecast_df.to_parquet('../well_forecasts_complete.parquet')

In [61]:
comprehensive_data_overview_dask({'forecast_df': forecast_df})


DATASET: FORECAST_DF
Shape: ((19951950, 38))

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 string          19951950   399039     0.0       %
date_prod                 datetime64[ns]  19951950   50         0.0       %
gas_monthly               float64         19951950   471423     0.0       %
oil_monthly               float64         19951950   597728     0.0       %
start_date                datetime64[ns]  19951950   385        0.0       %
wellbore_type             float64         19951950   3          0.0       %
Quality_Quant             float64         19951950   9          0.0       %
Production_Age            int32           19951950   386        0.0       %
Well_Production_Type      float64         19951950   4          0.0       %
reservoir type            float64         19951950   3          0.0       %
arps model          

### State-Level Totals and Uncertainty <a id="State"></a>

In [62]:
target_period = forecast_df[
    (forecast_df['date_prod'] >= '2024-09-01') & 
    (forecast_df['date_prod'] <= '2024-11-30')
].copy()

comprehensive_data_overview_dask({'target_period': target_period})


DATASET: TARGET_PERIOD
Shape: ((1197117, 38))

Column Overview:
Column                    Type            Non-Null   Unique     Missing % 
--------------------------------------------------------------------------------
api_no_10                 string          1197117    399039     0.0       %
date_prod                 datetime64[ns]  1197117    3          0.0       %
gas_monthly               float64         1197117    81560      0.0       %
oil_monthly               float64         1197117    89233      0.0       %
start_date                datetime64[ns]  1197117    385        0.0       %
wellbore_type             float64         1197117    3          0.0       %
Quality_Quant             float64         1197117    9          0.0       %
Production_Age            int32           1197117    383        0.0       %
Well_Production_Type      float64         1197117    4          0.0       %
reservoir type            float64         1197117    3          0.0       %
arps model         

In [63]:
monthly_totals = target_period.groupby('date_prod').agg({
    'gas_monthly': 'sum',
    'oil_monthly': 'sum'
}).reset_index()

print("Monthly totals for Sep-Nov 2024:")
print(monthly_totals)

Monthly totals for Sep-Nov 2024:
   date_prod   gas_monthly   oil_monthly
0 2024-09-01  1.157746e+09  2.146556e+08
1 2024-10-01  1.164559e+09  2.121537e+08
2 2024-11-01  1.153913e+09  2.077197e+08


Let's consider the uncertainty:

In [72]:
n_bootstrap = 100
bootstrap_results = []
unique_apis = target_period['api_no_10'].unique()

print("Calculating uncertainty with bootstrap...")
for i in tqdm(range(n_bootstrap), desc="Bootstrap sampling"):
    # Sample wells with replacement
    sampled_apis = np.random.choice(unique_apis, 
                                   size=len(unique_apis), 
                                   replace=True)
    
    # Create bootstrap sample
    bootstrap_sample = []
    for api in sampled_apis:
        api_data = target_period[target_period['api_no_10'] == api]
        bootstrap_sample.append(api_data)
    
    bootstrap_df = pd.concat(bootstrap_sample, ignore_index=True)
    
    # Calculate totals for this bootstrap sample
    bootstrap_totals = bootstrap_df.groupby('date_prod').agg({
        'gas_monthly': 'sum',
        'oil_monthly': 'sum'
    }).reset_index()
    
    bootstrap_results.append(bootstrap_totals)

Calculating uncertainty with bootstrap...


Bootstrap sampling:   0%|                                                                      | 0/100 [02:38<?, ?it/s]


KeyboardInterrupt: 

Taking way too long as usual, let's take another approach:

In [73]:
def bootstrap_aggregate_fast(target_period, n_bootstrap=1000):
    unique_apis = target_period['api_no_10'].unique()
    n_apis = len(unique_apis)
    bootstrap_results = []

    for _ in tqdm(range(n_bootstrap), desc="Bootstrap sampling"):
        # Step 1: Sample API counts
        sampled_apis = np.random.choice(unique_apis, size=n_apis, replace=True)
        sampled_counts = pd.Series(sampled_apis).value_counts()

        # Step 2: Merge counts with data
        sampled_data = target_period[target_period['api_no_10'].isin(sampled_counts.index)].copy()
        sampled_data = sampled_data.merge(sampled_counts.rename('count'), left_on='api_no_10', right_index=True)

        # Step 3: Weight and aggregate
        sampled_data['gas_monthly_weighted'] = sampled_data['gas_monthly'] * sampled_data['count']
        sampled_data['oil_monthly_weighted'] = sampled_data['oil_monthly'] * sampled_data['count']

        bootstrap_totals = sampled_data.groupby('date_prod').agg({
            'gas_monthly_weighted': 'sum',
            'oil_monthly_weighted': 'sum'
        }).reset_index().rename(
            columns={'gas_monthly_weighted': 'gas_monthly', 'oil_monthly_weighted': 'oil_monthly'}
        )

        bootstrap_results.append(bootstrap_totals)

    return bootstrap_results

In [75]:
bootstrap_results = bootstrap_aggregate_fast(target_period, n_bootstrap=100)

Bootstrap sampling: 100%|████████████████████████████████████████████████████████████| 100/100 [05:09<00:00,  3.09s/it]


In [76]:
# Calculate confidence intervals
all_bootstrap = pd.concat(bootstrap_results)
confidence_intervals = all_bootstrap.groupby('date_prod').agg({
    'gas_monthly': ['mean', lambda x: np.percentile(x, 2.5), lambda x: np.percentile(x, 97.5)],
    'oil_monthly': ['mean', lambda x: np.percentile(x, 2.5), lambda x: np.percentile(x, 97.5)]
})

print("\nState totals with 95% confidence intervals:")
print(confidence_intervals)


State totals with 95% confidence intervals:
             gas_monthly                               oil_monthly  \
                    mean    <lambda_0>    <lambda_1>          mean   
date_prod                                                            
2024-09-01  1.157466e+09  1.139847e+09  1.179911e+09  2.146671e+08   
2024-10-01  1.164302e+09  1.146643e+09  1.184698e+09  2.120810e+08   
2024-11-01  1.153625e+09  1.136426e+09  1.173554e+09  2.075977e+08   

                                        
              <lambda_0>    <lambda_1>  
date_prod                               
2024-09-01  2.115205e+08  2.175992e+08  
2024-10-01  2.086922e+08  2.147655e+08  
2024-11-01  2.045415e+08  2.097808e+08  


In [78]:
confidence_intervals.columns

MultiIndex([('gas_monthly',       'mean'),
            ('gas_monthly', '<lambda_0>'),
            ('gas_monthly', '<lambda_1>'),
            ('oil_monthly',       'mean'),
            ('oil_monthly', '<lambda_0>'),
            ('oil_monthly', '<lambda_1>')],
           )

In [80]:
confidence_intervals.columns = [('gas_monthly',       'mean'),
            ('gas_monthly', '2.5%'),
            ('gas_monthly', '97.5%'),
            ('oil_monthly',       'mean'),
            ('oil_monthly', '2.5%'),
            ('oil_monthly', '97.5%')]
print("\nState totals with 95% confidence intervals:")
print(confidence_intervals)


State totals with 95% confidence intervals:
            (gas_monthly, mean)  (gas_monthly, 2.5%)  (gas_monthly, 97.5%)  \
date_prod                                                                    
2024-09-01         1.157466e+09         1.139847e+09          1.179911e+09   
2024-10-01         1.164302e+09         1.146643e+09          1.184698e+09   
2024-11-01         1.153625e+09         1.136426e+09          1.173554e+09   

            (oil_monthly, mean)  (oil_monthly, 2.5%)  (oil_monthly, 97.5%)  
date_prod                                                                   
2024-09-01         2.146671e+08         2.115205e+08          2.175992e+08  
2024-10-01         2.120810e+08         2.086922e+08          2.147655e+08  
2024-11-01         2.075977e+08         2.045415e+08          2.097808e+08  


In [81]:
monthly_totals.to_csv('../state_totals_sep_nov_2024.csv', index=False)
confidence_intervals.to_csv('../state_totals_with_uncertainty.csv')

## Results <a id="Results"></a>

In [74]:
print("Monthly totals for Sep-Nov 2024:")
print(monthly_totals)

Monthly totals for Sep-Nov 2024:
   date_prod   gas_monthly   oil_monthly
0 2024-09-01  1.157746e+09  2.146556e+08
1 2024-10-01  1.164559e+09  2.121537e+08
2 2024-11-01  1.153913e+09  2.077197e+08


This chart lets us know how we're doing: https://www.rrc.texas.gov/oil-and-gas/research-and-statistics/production-data/texas-monthly-oil-gas-production/

Actual Texas Gas values by MCF:
- Gas Sep: 1,065,447,457
- Gas Oct: 1,110,512,790
- Gas Nov: 1,068,563,172

On gas our predictions are pretty good, just a bit high, but right within the range of one billion MCF, and our predictions even replicate the slight spike in October

Actual Texas Oil values by Barrel:
- Oil Sep: 193,438,651
- Oil Oct: 201,262,381
- Oil Nov: 191,097,147

Also not bad predictions for oil, right around the ballpark of 200 million, the spike in the oil case is less accurate though

So overall I'd say my predictions easily pass the sanity check, they are within the correct range of values. With more advanced approaches they could probably be even more precise. I can calculate the percentage difference from the actual values:

In [66]:
100*abs(monthly_totals['gas_monthly'].iloc[0] - 1065447457) / 2 / (monthly_totals['gas_monthly'].iloc[0] + 1065447457)

2.0758040626671086

In [67]:
100*abs(monthly_totals['gas_monthly'].iloc[1] - 1110512790) / 2 / (monthly_totals['gas_monthly'].iloc[1] + 1110512790)

1.187789277325298

In [68]:
100*abs(monthly_totals['gas_monthly'].iloc[2] - 1068563172) / 2 / (monthly_totals['gas_monthly'].iloc[2] + 1068563172)

1.920142839634344

In [69]:
100*abs(monthly_totals['oil_monthly'].iloc[0] - 193438651) / 2 / (monthly_totals['oil_monthly'].iloc[0] + 193438651)

2.5995183873374628

In [70]:
100*abs(monthly_totals['oil_monthly'].iloc[1] - 201262381) / 2 / (monthly_totals['oil_monthly'].iloc[1] + 201262381)

1.3172346618675461

In [71]:
100*abs(monthly_totals['oil_monthly'].iloc[2] - 191097147) / 2 / (monthly_totals['oil_monthly'].iloc[2] + 191097147)

2.083986223404608

All within 3 percent of the actual value, often considerably closer than that (even near 1 percent away)