### Precomputing time series windows into wavelet frequencies.

In [83]:
# ─── 1. Imports & Mode Selection ─────────────────────────────────────────────────
import os
import numpy as np
import pandas as pd
import torch
import pywt

# ─── Change only this line to either "train" or "test" ─────────────────────────────
mode = "test"   # options: "train" or "test"
# mode = "test"

assert mode in ("train", "test"), "mode must be 'train' or 'test'"

print(f"▶️  Running wavelet conversion in **{mode.upper()}** mode\n")


▶️  Running wavelet conversion in **TEST** mode



In [84]:
# ─── 2. Build Input / Output Paths Based on Mode ─────────────────────────────────

# Base folders
data_folder = r"C:\Users\thoma\Desktop\Wavelet Diff\data"
wavelets_base = os.path.join(data_folder, "wavelets")

# Input CSVs (they already exist under Training Data / Testing Data)
input_csv = os.path.join(
    data_folder,
    "Training Data"   if mode == "train" else "Testing Data",
    "bitcoin_2010-07-29_2025-04-25_" + mode + ".csv"
)

# Output folder for this mode (“train wavelet” or “test wavelet”)
output_dir = os.path.join(
    wavelets_base,
    "train wavelet" if mode == "train" else "test wavelet"
)
os.makedirs(output_dir, exist_ok=True)

# Wavelet and window parameters (shared)
wavelet_name = "db4"   # change as needed (e.g. "sym6", "coif3", etc.)
level = 4              # decomposition level
window_len = 70        # each window spans 70 days
step = 1               # sliding step

print("➤ Input CSV:", input_csv)
print("➤ Output folder:", output_dir)
print(f"➤ Wavelet = {wavelet_name}, level = {level}, window_len = {window_len}\n")


➤ Input CSV: C:\Users\thoma\Desktop\Wavelet Diff\data\Testing Data\bitcoin_2010-07-29_2025-04-25_test.csv
➤ Output folder: C:\Users\thoma\Desktop\Wavelet Diff\data\wavelets\test wavelet
➤ Wavelet = db4, level = 4, window_len = 70



In [85]:
# ─── 3. Load CSV & Extract “Close” + “Date” ──────────────────────────────────────
#
# CSV columns: Date (YYYY-MM-DD), Year, Month, Day, Close.
full_df = pd.read_csv(input_csv, header=0, parse_dates=["Date"])

# Ensure required columns exist
for col in ("Date", "Close"):
    if col not in full_df.columns:
        raise KeyError(f"CSV must contain a '{col}' column.")

# 1. “Close” → float32 series of shape (num_days,)
series = full_df["Close"].astype(np.float32).to_numpy()

# 2. “Date” → datetime64[ns] array of shape (num_days,)
dates = full_df["Date"].to_numpy()

# Reshape series to (num_days, 1) so num_features = 1
full_data = series.reshape(-1, 1)
num_days, num_features = full_data.shape  # num_features will be 1

print(f"✅ Loaded {mode.upper()}: {num_days} days × {num_features} feature (Close)")
print(f"✅ Loaded dates dtype = {dates.dtype}, length = {len(dates)}\n")


✅ Loaded TEST: 1077 days × 1 feature (Close)
✅ Loaded dates dtype = datetime64[ns], length = 1077



In [86]:
# ─── 4. Compute SWT at Level 4 with Padding (fixed unpacking) ─────────────────────
import pywt

# 1) Original series length
N = len(series)  # e.g. 4844

# 2) Compute how many to pad so that (N + pad_len) % 16 == 0 for L = 4
pad_len = (-N) % 16
if pad_len > 0:
    print(f"▶️ Series length = {N}. Padding {pad_len} samples to reach multiple of 16 → {N + pad_len}.\n")
    series_padded = np.pad(series, (0, pad_len), mode="symmetric")
    dates_padded  = np.pad(dates,  (0, pad_len), mode="edge")
else:
    print(f"▶️ Series length = {N} is already a multiple of 16.\n")
    series_padded = series.copy()
    dates_padded  = dates.copy()

N_pad = len(series_padded)
print(f"▶️ After padding: N_pad = {N_pad}.  N_pad % 16 == {N_pad % 16} (should be 0).")

# 3) Debug: confirm series_padded is a 1D NumPy array of length N_pad
print(">>> series_padded type:", type(series_padded))
print(">>> series_padded dtype/shape:", series_padded.dtype, series_padded.shape)
if not isinstance(series_padded, np.ndarray) or series_padded.ndim != 1:
    raise ValueError("series_padded must be a 1D NumPy array before calling pywt.swt!")

# 4) Check max SWT level on the padded length
max_level = pywt.swt_max_level(N_pad)
print(f"▶️ pywt.swt_max_level(N_pad={N_pad}) = {max_level}\n")

# 5) Decide effective_level = min(4, max_level)
effective_level = min(4, max_level)
if effective_level < 4:
    print(f"⚠️ Even after padding, max_level = {max_level} < 4.  Using level = {effective_level}.\n")
else:
    print(f"▶️ Proceeding with SWT at level = 4 on length = {N_pad}.\n")

# 6) Compute SWT at that effective_level
#    Now swt_list is a list of length (effective_level + 1)
swt_list = pywt.swt(series_padded, wavelet=wavelet_name, level=effective_level, trim_approx=True)

# 7) Debug: print out the raw shapes of each coefficient array
#    swt_list[0] = cA_L,   swt_list[1] = cD_L,   swt_list[2] = cD_{L-1}, …, swt_list[L] = cD_1
print("▶️ Raw SWT output arrays:")
for i, coeff_array in enumerate(swt_list):
    if i == 0:
        print(f"   cA_{effective_level} shape = {coeff_array.shape}")
    else:
        level_num = effective_level - (i - 1)
        print(f"   cD_{level_num} shape = {coeff_array.shape}")
print()

# 8) Now explicitly pull out cD1 … cD4 and cA4, padding with zeros if missing:
#    (We expect swt_list to be ordered [cA4, cD4, cD3, cD2, cD1] when effective_level=4.)

def get_or_zero(arr_list, idx):
    """Return arr_list[idx] if it exists, else a zero array of shape (N_pad,)"""
    if idx < len(arr_list):
        return arr_list[idx]
    else:
        return np.zeros((N_pad,), dtype=series_padded.dtype)

# Since swt_list[0] is always cA_L, and swt_list[1] is cD_L, etc., we can index as follows:
cA4_padded = get_or_zero(swt_list, 0)       # index 0 → cA4 (if effective_level=4)
cD4_padded = get_or_zero(swt_list, 1)       # index 1 → cD4
cD3_padded = get_or_zero(swt_list, 2)       # index 2 → cD3
cD2_padded = get_or_zero(swt_list, 3)       # index 3 → cD2
cD1_padded = get_or_zero(swt_list, 4)       # index 4 → cD1

# 9) Reshape each to (N_pad, 1)
coeffs_cA4 = cA4_padded.reshape(-1, 1)
coeffs_cD4 = cD4_padded.reshape(-1, 1)
coeffs_cD3 = cD3_padded.reshape(-1, 1)
coeffs_cD2 = cD2_padded.reshape(-1, 1)
coeffs_cD1 = cD1_padded.reshape(-1, 1)

print("✅ Completed SWT (padded) at level =", effective_level)
print("   • coeffs_cA4 shape =", coeffs_cA4.shape)
print("   • coeffs_cD4 shape =", coeffs_cD4.shape)
print("   • coeffs_cD3 shape =", coeffs_cD3.shape)
print("   • coeffs_cD2 shape =", coeffs_cD2.shape)
print("   • coeffs_cD1 shape =", coeffs_cD1.shape, "\n")


▶️ Series length = 1077. Padding 11 samples to reach multiple of 16 → 1088.

▶️ After padding: N_pad = 1088.  N_pad % 16 == 0 (should be 0).
>>> series_padded type: <class 'numpy.ndarray'>
>>> series_padded dtype/shape: float32 (1088,)
▶️ pywt.swt_max_level(N_pad=1088) = 6

▶️ Proceeding with SWT at level = 4 on length = 1088.

▶️ Raw SWT output arrays:
   cA_4 shape = (1088,)
   cD_4 shape = (1088,)
   cD_3 shape = (1088,)
   cD_2 shape = (1088,)
   cD_1 shape = (1088,)

✅ Completed SWT (padded) at level = 4
   • coeffs_cA4 shape = (1088, 1)
   • coeffs_cD4 shape = (1088, 1)
   • coeffs_cD3 shape = (1088, 1)
   • coeffs_cD2 shape = (1088, 1)
   • coeffs_cD1 shape = (1088, 1) 



In [87]:
# ─── 5. Slice 70-Day Windows & Collect Corresponding Dates ────────────────────────
#
# We will slide a 70-day window (step=1) over all num_days. For each start index i:
#   1) Extract [i : i + 70] from cD1…cD4 and cA4 → five (70,1) arrays
#   2) Concatenate along axis=1 → one (70,5) array
#   3) Extract [i : i + 70] from the dates array → one (70,) datetime64[ns] array

num_windows = num_days - window_len + 1
windowed_feats = []  # to collect (70,5) arrays
windowed_dates = []  # to collect (70,) arrays

for start in range(0, num_windows, step):
    end = start + window_len

    # 1) SWT coefficient slices (each shape = (70,1))
    slice_d1 = coeffs_cD1[start:end, :]
    slice_d2 = coeffs_cD2[start:end, :]
    slice_d3 = coeffs_cD3[start:end, :]
    slice_d4 = coeffs_cD4[start:end, :]
    slice_a4 = coeffs_cA4[start:end, :]

    # 2) Concatenate → (70, 5)
    feat70x5 = np.concatenate([slice_d1, slice_d2, slice_d3, slice_d4, slice_a4], axis=1)
    windowed_feats.append(feat70x5)

    # 3) Corresponding dates (70,)
    dates70 = dates[start:end]
    windowed_dates.append(dates70)

# Stack into NumPy arrays
all_feats = np.stack(windowed_feats, axis=0)  # shape = (num_windows, 70, 5)
all_dates_np = np.stack(windowed_dates, axis=0)  # shape = (num_windows, 70)

print(f"✅ Stacked {mode.upper()} windows → features: {all_feats.shape}, dates: {all_dates_np.shape}\n")


✅ Stacked TEST windows → features: (1008, 70, 5), dates: (1008, 70)



In [88]:
# ─── 6. Z-Score Normalization per Channel ────────────────────────────────────────
#
# Compute per-channel (level) mean and std on the TRAIN split,
# then apply to all_feats. For TEST mode, assume means/stds already saved under train folder.
#
# Band order in all_feats: [cD1, cD2, cD3, cD4, cA4] (detail → approx)

# Convert raw features to tensor for computing stats
all_feats_tensor = torch.from_numpy(all_feats.astype(np.float32))  # [num_windows,70,5]

if mode == "train":
    # 1) Compute means and stds across all windows and time-steps for each channel
    means = all_feats_tensor.mean(dim=(0, 1))  # shape: (5,) [cD1,cD2,cD3,cD4,cA4]
    stds  = all_feats_tensor.std(dim=(0, 1))   # shape: (5,) [cD1,cD2,cD3,cD4,cA4]

    # 2) Avoid near-zero std
    eps = 1e-6
    stds = torch.where(stds < eps, torch.ones_like(stds), stds)

    # 3) Save normalization stats under train output_dir
    # Reshape to (5,1) for model compatibility and save
    means = means.view(5, 1)  # [cD1,cD2,cD3,cD4,cA4] × 1
    stds = stds.view(5, 1)    # [cD1,cD2,cD3,cD4,cA4] × 1
    
    torch.save(means, os.path.join(output_dir, "wavelet_means.pt"))
    torch.save(stds,  os.path.join(output_dir, "wavelet_stds.pt"))

    print("✅ Computed TRAIN means and stds (shape: [5,1]):")
    print("   Band order: [cD1, cD2, cD3, cD4, cA4]")
    print(f"   • means = {means.squeeze().numpy()}")
    print(f"   • stds  = {stds.squeeze().numpy()}\n")

else:  # mode == "test"
    # 1) Load means and stds from train folder
    train_stats_dir = os.path.join(wavelets_base, "train wavelet")
    means = torch.load(os.path.join(train_stats_dir, "wavelet_means.pt"))  # [5,1]
    stds  = torch.load(os.path.join(train_stats_dir, "wavelet_stds.pt"))   # [5,1]

    print("✅ Loaded TRAIN means and stds for TEST normalization (shape: [5,1]):")
    print("   Band order: [cD1, cD2, cD3, cD4, cA4]")
    print(f"   • means = {means.squeeze().numpy()}")
    print(f"   • stds  = {stds.squeeze().numpy()}\n")

# 4) Normalize features
#   train_norm or test_norm: FloatTensor [num_windows, 70, 5]
all_feats_norm = (all_feats_tensor - means.view(1, 1, -1)) / stds.view(1, 1, -1)

print(f"✅ Normalized {mode.upper()} features → shape = {all_feats_norm.shape}\n")

✅ Loaded TRAIN means and stds for TEST normalization (shape: [5,1]):
   Band order: [cD1, cD2, cD3, cD4, cA4]
   • means = [-6.3051544e-08 -4.2946709e-07 -1.9071674e-06 -9.6872409e-06
  1.2477184e-02]
   • stds  = [0.05016337 0.05399104 0.05013015 0.05003453 0.05762416]

✅ Normalized TEST features → shape = torch.Size([1008, 70, 5])



In [89]:
# ─── 7. Convert Dates to Int64 & Save Both (Norm. Feats + Dates) ─────────────────
#
# We will save two .pt files under the mode’s output_dir:
#   1) features_norm: FloatTensor of shape [num_windows, 70, 5]
#   2) dates        : Int64Tensor of shape [num_windows, 70] (nanoseconds since epoch)

# 1) Already have all_feats_norm as FloatTensor
# 2) Convert dates to int64 (ns since epoch)
all_dates_int = all_dates_np.astype("datetime64[ns]").astype(np.int64)  # shape = (num_windows, 70)
all_dates_tensor = torch.from_numpy(all_dates_int)

# 3) Save both
feats_filename = f"level4_swt_{mode}_windows_norm.pt"
dates_filename = f"level4_{mode}_window_dates.pt"  # same as before

torch.save(all_feats_norm, os.path.join(output_dir, feats_filename))
torch.save(all_dates_tensor, os.path.join(output_dir, dates_filename))

print(f"✅ Saved normalized {mode.upper()} features to:\n   {os.path.join(output_dir, feats_filename)}")
print(f"✅ Saved {mode.upper()} dates          to:\n   {os.path.join(output_dir, dates_filename)}\n")


✅ Saved normalized TEST features to:
   C:\Users\thoma\Desktop\Wavelet Diff\data\wavelets\test wavelet\level4_swt_test_windows_norm.pt
✅ Saved TEST dates          to:
   C:\Users\thoma\Desktop\Wavelet Diff\data\wavelets\test wavelet\level4_test_window_dates.pt



In [90]:
# ─── 8. Verification ─────────────────────────────────────────────────┐
# Reload immediately to confirm integrity                                         │
loaded_feats_norm = torch.load(os.path.join(output_dir, feats_filename))
loaded_dates      = torch.load(os.path.join(output_dir, dates_filename))

assert loaded_feats_norm.shape == all_feats_norm.shape, "Feature‐shape mismatch!"
assert torch.allclose(loaded_feats_norm, all_feats_norm), "Feature‐data mismatch!"
assert loaded_dates.shape == all_dates_tensor.shape, "Dates‐shape mismatch!"
assert torch.equal(loaded_dates, all_dates_tensor), "Dates‐data mismatch!"

print(f"✅ Verified reload for {mode.upper()} normalized features:")
print(f"   • features_norm shape = {loaded_feats_norm.shape}")
print(f"   • dates           shape = {loaded_dates.shape}")


✅ Verified reload for TEST normalized features:
   • features_norm shape = torch.Size([1008, 70, 5])
   • dates           shape = torch.Size([1008, 70])
