# Lotto XGBoost with 5-Number Buckets (Named Ranges)

This notebook:

1. Loads `Lotto5.xlsx` (NZ Lotto draw history).
2. Builds **5-number bucket features** with meaningful names, e.g. `bucket_1_5_count`, `bucket_6_10_count`, etc.
3. Adds simple extra features (Odd/Even, date parts, etc.).
4. Trains an **XGBoost multi-output regressor** (one target per winning number).
5. Performs 5-fold cross-validation.
6. Demonstrates generating candidate draws and predicting with the best model.
7. Saves the expanded dataset (with new features) as `Lotto5_Imputed.xlsx`.

Place this notebook in the same folder as `Lotto5.xlsx` and run all cells top to bottom.


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from deltalake import write_deltalake


from sklearn.model_selection import cross_validate
from sklearn.metrics import mean_squared_error, make_scorer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.multioutput import MultiOutputRegressor

from xgboost import XGBRegressor

np.random.seed(42)
pd.set_option('display.width', 200)
pd.set_option('display.max_columns', 100)

print('Libraries imported.')

In [None]:
# Path to your Excel file. Ensure Lotto5.xlsx is in the same directory as this notebook.
excel_path = "Lotto5.xlsx"

df = pd.read_excel(excel_path)
print("Data shape:", df.shape)
display(df.head())
print("\nColumns:", list(df.columns))

## Basic cleaning and helper columns

We:

- Ensure the winning number columns are numeric.
- Convert helper columns like `From Last`, `Same As Day`, `Odd`, `Even` to numeric (if present).
- Parse `Date` and add simple date features.


In [None]:
# Winning number columns (targets)
number_cols = [
    "Winning Number 1",
    "Winning Number 2",
    "Winning Number 3",
    "Winning Number 4",
    "Winning Number 5",
    "Winning Number 6",
]

# Ensure numeric for winning numbers
for col in number_cols:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors="coerce")
    else:
        raise ValueError(f"Expected column '{col}' not found in dataframe.")

# Helper columns that may exist
helper_cols = ["From Last", "Same As Day", "Odd", "Even"]
for col in helper_cols:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors="coerce")

# Date handling
if "Date" in df.columns:
    df["Date"] = pd.to_datetime(df["Date"], errors="coerce")
    df["Year"] = df["Date"].dt.year.fillna(0).astype(int)
    df["Month"] = df["Date"].dt.month.fillna(0).astype(int)
    df["DayOfWeek"] = df["Date"].dt.dayofweek.fillna(0).astype(int)
else:
    df["Year"] = 0
    df["Month"] = 0
    df["DayOfWeek"] = 0

print("After basic cleaning:")
display(df.head())

## 5-number bucket features with named ranges

We map numbers as follows (for NZ Lotto 1–40):

- 1–5   → bucket index 0 → features: `bucket_1_5_count`, `bucket_1_5_present`
- 6–10  → bucket index 1 → `bucket_6_10_count`, ...
- 11–15 → bucket index 2
- 16–20 → bucket index 3
- 21–25 → bucket index 4
- 26–30 → bucket index 5
- 31–35 → bucket index 6
- 36–40 → bucket index 7

So you can immediately see which 5-number range each feature refers to.


In [None]:
def num_to_bucket(num: float, bucket_size: int = 5) -> float:
    """Map a lotto number to a 0-based bucket index of size `bucket_size`.
    Returns NaN for missing values.
    """
    if pd.isna(num):
        return np.nan
    return int((int(num) - 1) // bucket_size)

# Create per-number bucket columns (indices)
for col in number_cols:
    df[f"{col}_bucket"] = df[col].apply(num_to_bucket)

bucket_cols = [f"{col}_bucket" for col in number_cols]
max_bucket = int(df[bucket_cols].max().max())

# Infer max actual number from data (e.g. 40)
max_number = int(df[number_cols].max().max())
bucket_size = 5

# Build mapping from bucket index -> human-readable column names
bucket_index_to_count_col = {}
bucket_index_to_present_col = {}
bucket_count_cols = []
bucket_present_cols = []

for i in range(max_bucket + 1):
    low = i * bucket_size + 1
    high = min((i + 1) * bucket_size, max_number)
    count_name = f"bucket_{low}_{high}_count"
    present_name = f"bucket_{low}_{high}_present"
    bucket_index_to_count_col[i] = count_name
    bucket_index_to_present_col[i] = present_name
    bucket_count_cols.append(count_name)
    bucket_present_cols.append(present_name)

print("Bucket index to feature names:")
for i in range(max_bucket + 1):
    print(i, "->", bucket_index_to_count_col[i], ",", bucket_index_to_present_col[i])

display(df[bucket_cols].head())

In [None]:
# Compute bucket counts per draw using the named columns
def bucket_count_row(row):
    counts = np.zeros(max_bucket + 1, dtype=int)
    buckets = row[bucket_cols].values
    for b in buckets:
        if not pd.isna(b):
            b_int = int(b)
            if 0 <= b_int <= max_bucket:
                counts[b_int] += 1
    # Map counts into named columns
    data = {}
    for i in range(max_bucket + 1):
        data[bucket_index_to_count_col[i]] = counts[i]
    return pd.Series(data, index=bucket_count_cols)

df_bucket_counts = df.apply(bucket_count_row, axis=1)
df = pd.concat([df, df_bucket_counts], axis=1)

# Presence flags using named columns
for i in range(max_bucket + 1):
    count_col = bucket_index_to_count_col[i]
    present_col = bucket_index_to_present_col[i]
    df[present_col] = (df[count_col] > 0).astype(int)

# Bucket energy (weighted sum of bucket indices by count)
df["bucket_energy"] = 0
for i in range(max_bucket + 1):
    count_col = bucket_index_to_count_col[i]
    df["bucket_energy"] += i * df[count_col]

print("Bucket features created (named ranges):")
display(df.head())

In [None]:
# Save expanded dataset with new features
output_excel_path = "Lotto5_Imputed.xlsx"
df.to_excel(output_excel_path, index=False)
print(f"Saved dataset with new features to {output_excel_path}")

## Build feature matrix and target matrix

- **Targets**: the six winning numbers as a 6D regression target.
- **Features**: named bucket counts/presence, bucket energy, helper columns, and date parts.


In [None]:
target_cols = number_cols.copy()

# bucket_count_cols and bucket_present_cols already defined with meaningful names
candidate_feature_cols = (
    bucket_count_cols
    + bucket_present_cols
    + ["bucket_energy", "From Last", "Same As Day", "Odd", "Even", "Year", "Month", "DayOfWeek"]
)

# Keep only columns that exist in df (in case some helper cols are missing)
feature_cols = [c for c in candidate_feature_cols if c in df.columns]

print("Using feature columns:")
print(feature_cols)

X = df[feature_cols].values
y = df[target_cols].values

print("Feature matrix shape:", X.shape)
print("Target matrix shape:", y.shape)

## XGBoost model with cross-validation

We use:

- `SimpleImputer` to handle any missing feature values.
- `MultiOutputRegressor(XGBRegressor)` to predict all 6 numbers at once.
- 5-fold cross-validation with **negative MSE** scoring.


In [None]:
# Define base XGBoost regressor
xgb_reg = XGBRegressor(
    n_estimators=300,
    max_depth=4,
    learning_rate=0.05,
    subsample=0.8,
    colsample_bytree=0.8,
    objective="reg:squarederror",
    tree_method="hist",  # change to 'gpu_hist' if you have GPU XGBoost installed
    random_state=42,
)

model = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="median")),
        ("regressor", MultiOutputRegressor(xgb_reg)),
    ]
)

scoring = make_scorer(mean_squared_error, greater_is_better=False)

cv_results = cross_validate(
    model,
    X,
    y,
    cv=5,
    scoring=scoring,
    return_estimator=True,
    n_jobs=-1,
)

test_scores = -cv_results["test_score"]  # convert back to positive MSE
print("Cross-validation MSE scores:", test_scores)
print("Mean CV MSE:", np.mean(test_scores))

best_model_index = np.argmin(test_scores)
best_model = cv_results["estimator"][best_model_index]
print("Best model index:", best_model_index)

## Predicting from candidate draws

To keep things simple, we:

1. Generate a **candidate draw** (6 random numbers 1–40, no replacement).
2. Build the same **bucket-based features** (with named ranges) for that draw.
3. Use the best cross-validated model to predict a 6D output.
4. Map predictions back into the 1–40 range (wrapping with modulo).

This is more for exploration / "pattern resonance" than real prediction.


In [None]:
def build_features_from_draw(draw_numbers, feature_columns, max_bucket_local=None, bucket_size_local=5):
    """Build a one-row feature DataFrame for a candidate draw using the same
    bucket logic and feature columns as the training data.
    """
    draw_numbers = np.array(draw_numbers, dtype=int)
    if max_bucket_local is None:
        max_bucket_local = max_bucket

    # bucket counts
    counts = np.zeros(max_bucket_local + 1, dtype=int)
    for n in draw_numbers:
        b = num_to_bucket(n, bucket_size=bucket_size_local)
        if 0 <= b <= max_bucket_local:
            counts[b] += 1

    row = {}

    # bucket counts and presence using named columns
    for i in range(max_bucket_local + 1):
        count_col = bucket_index_to_count_col[i]
        present_col = bucket_index_to_present_col[i]
        if count_col in feature_columns:
            row[count_col] = counts[i]
        if present_col in feature_columns:
            row[present_col] = int(counts[i] > 0)

    # bucket_energy
    if "bucket_energy" in feature_columns:
        row["bucket_energy"] = sum(i * counts[i] for i in range(max_bucket_local + 1))

    # helper + date features (set to neutral values)
    defaults = {
        "From Last": 0,
        "Same As Day": 0,
        "Odd": 0,
        "Even": 0,
        "Year": 0,
        "Month": 0,
        "DayOfWeek": 0,
    }
    for col, val in defaults.items():
        if col in feature_columns and col not in row:
            row[col] = val

    # ensure all feature_columns exist
    for col in feature_columns:
        if col not in row:
            row[col] = 0

    return pd.DataFrame([row], columns=feature_columns)

# Example: generate a few candidate draws and predict
num_predictions = 5
for i in range(num_predictions):
    candidate_numbers = np.sort(np.random.choice(np.arange(1, 41), size=6, replace=False))
    input_df = build_features_from_draw(candidate_numbers, feature_cols)
    pred = best_model.predict(input_df.values)[0]  # shape (6,)

    # Map predictions into 1–40 range and round
    predicted_numbers = ((np.round(pred).astype(int) - 1) % 40) + 1
    predicted_numbers = np.sort(predicted_numbers)

    print(f"Prediction set {i+1}:")
    print("  Candidate base numbers:", candidate_numbers)
    print("  Model predicted numbers:", predicted_numbers)
    print("-" * 60)

## Monte Carlo Lotto Simulator, Bucket Energy & Delta Export

This section:

- Reuses the existing 5-number bucket logic.
- Computes additional draw-level features like `draw_median`, `draw_mean`, `draw_min`, `draw_max`.
- Visualises **bucket energy** and **median** over time.
- Builds an **empirical + uniform blended Monte Carlo simulator** to generate many synthetic draws.
- Computes the same bucket features for synthetic draws.
- Optionally saves both real and synthetic feature tables to **Delta Lake** for ROAPI or other tools.


In [None]:
# Use the existing dataframe `df` and `number_cols` from earlier cells.
# If your raw draws dataframe has a different name, adjust `df_draws` accordingly.
df_draws = df  # alias for clarity

number_cols = number_cols  # ensure we use the same target columns

print("Using number columns:", number_cols)

def compute_bucket_features_for_draws(df_base: pd.DataFrame) -> pd.DataFrame:
    """Compute bucket count/presence, bucket_energy and draw-level stats for each draw."""
    df_feat = df_base.copy()

    # Map each number to a bucket index using existing num_to_bucket
    bucket_indices = df_feat[number_cols].applymap(num_to_bucket)

    # Count per bucket into the named bucket_*_count columns
    def row_to_bucket_counts(row):
        counts = np.zeros(max_bucket + 1, dtype=int)
        for b in row.values:
            if not pd.isna(b):
                b_int = int(b)
                if 0 <= b_int <= max_bucket:
                    counts[b_int] += 1
        data = {}
        for i in range(max_bucket + 1):
            data[bucket_index_to_count_col[i]] = counts[i]
        return pd.Series(data, index=bucket_count_cols)

    df_counts = bucket_indices.apply(row_to_bucket_counts, axis=1)
    df_feat[df_counts.columns] = df_counts

    # Presence flags using named columns
    for i in range(max_bucket + 1):
        count_col = bucket_index_to_count_col[i]
        present_col = bucket_index_to_present_col[i]
        df_feat[present_col] = (df_feat[count_col] > 0).astype(int)

    # Bucket energy: sum(bucket_index * count)
    df_feat["bucket_energy"] = df_counts.apply(
        lambda row: sum(i * row[bucket_index_to_count_col[i]] for i in range(max_bucket + 1)),
        axis=1,
    )

    # Draw-level stats from the actual numbers
    df_feat["draw_median"] = df_feat[number_cols].median(axis=1)
    df_feat["draw_mean"] = df_feat[number_cols].mean(axis=1)
    df_feat["draw_min"] = df_feat[number_cols].min(axis=1)
    df_feat["draw_max"] = df_feat[number_cols].max(axis=1)

    # Odd / Even counts
    df_feat["Odd"] = df_feat[number_cols].apply(lambda r: (r % 2 != 0).sum(), axis=1)
    df_feat["Even"] = df_feat[number_cols].apply(lambda r: (r % 2 == 0).sum(), axis=1)

    return df_feat

df_real_feat = compute_bucket_features_for_draws(df_draws)
print("Real feature table shape:", df_real_feat.shape)
df_real_feat.head()

In [None]:
# Visualisations: bucket_energy & draw_median over time, plus distributions
x_axis = df_real_feat["Date"] if "Date" in df_real_feat.columns else df_real_feat.index

plt.figure(figsize=(10, 4))
plt.plot(x_axis, df_real_feat["bucket_energy"])
plt.title("Bucket Energy Over Time (Real Draws)")
plt.xlabel("Draw")
plt.ylabel("Bucket Energy")
plt.tight_layout()
plt.show()

plt.figure(figsize=(10, 4))
plt.plot(x_axis, df_real_feat["draw_median"])
plt.title("Draw Median Over Time (Real Draws)")
plt.xlabel("Draw")
plt.ylabel("Median of Drawn Numbers")
plt.tight_layout()
plt.show()

plt.figure(figsize=(6, 4))
plt.hist(df_real_feat["bucket_energy"], bins=20)
plt.title("Distribution of Bucket Energy (Real Draws)")
plt.xlabel("Bucket Energy")
plt.ylabel("Frequency")
plt.tight_layout()
plt.show()

avg_counts = df_real_feat[bucket_count_cols].mean()
plt.figure(figsize=(10, 3))
plt.bar(range(len(avg_counts)), avg_counts.values)
plt.xticks(range(len(avg_counts)), avg_counts.index, rotation=45)
plt.title("Average Count per 5-Number Bucket (Real Draws)")
plt.ylabel("Avg Count per Draw")
plt.tight_layout()
plt.show()

In [None]:
# Empirical distribution of numbers from real draws
def compute_empirical_probs(df_base: pd.DataFrame, number_columns) -> pd.Series:
    nums = df_base[number_columns].values.ravel()
    values, counts = np.unique(nums, return_counts=True)
    n_min = int(nums.min())
    n_max = int(nums.max())
    all_numbers = np.arange(n_min, n_max + 1)

    freq = pd.Series(0, index=all_numbers, dtype=float)
    freq.loc[values] = counts
    p_emp = freq / freq.sum()
    return p_emp

p_empirical = compute_empirical_probs(df_draws, number_cols)
p_uniform = pd.Series(1.0 / len(p_empirical), index=p_empirical.index, dtype=float)

# Blend empirical with uniform so we expand the observed pattern distribution
alpha = 0.7  # 70% empirical, 30% uniform
p_blended = alpha * p_empirical + (1 - alpha) * p_uniform
p_blended = p_blended / p_blended.sum()

print("First few blended probabilities:")
print(p_blended.head())

def simulate_single_draw(draw_size=6, probs: pd.Series = p_blended):
    """Simulate one lotto draw by sampling without replacement from the blended distribution."""
    numbers = probs.index.to_numpy()
    p = probs.values
    sample = np.random.choice(numbers, size=draw_size, replace=False, p=p)
    return np.sort(sample)

def simulate_dataset(n_draws=100_000, draw_size=6, probs: pd.Series = p_blended):
    draws = [simulate_single_draw(draw_size, probs) for _ in range(n_draws)]
    df_sim = pd.DataFrame(draws, columns=number_cols[:draw_size])
    df_sim["SimDrawID"] = np.arange(1, n_draws + 1)
    # Synthetic dates purely for plotting/ordering
    df_sim["SimDate"] = pd.date_range("2000-01-01", periods=n_draws, freq="D")
    return df_sim

N_SYNTH_DRAWS = 50_000  # adjust as you like
df_synth = simulate_dataset(N_SYNTH_DRAWS)
print("Synthetic draws shape:", df_synth.shape)
df_synth.head()

In [None]:
# Compute the same bucket features for synthetic draws
df_synth_feat = compute_bucket_features_for_draws(df_synth)

print("Average bucket counts (real):")
print(df_real_feat[bucket_count_cols].mean().round(3))

print("\nAverage bucket counts (synthetic):")
print(df_synth_feat[bucket_count_cols].mean().round(3))

# Optional: synthetic bucket energy distribution
plt.figure(figsize=(6, 4))
plt.hist(df_synth_feat["bucket_energy"], bins=20)
plt.title("Distribution of Bucket Energy (Synthetic Draws)")
plt.xlabel("Bucket Energy")
plt.ylabel("Frequency")
plt.tight_layout()
plt.show()