# BigQuery Data Preparation and Feature Extraction for Liquid Neural Networks

This notebook demonstrates how to:
1. Extract and prepare data from BigQuery tables using BigFrames
2. Process features through Restricted Boltzmann Machines (RBMs)
3. Feed the RBM output into a CfC-based liquid neural network with LSTM neurons for gating
4. Implement a motor neuron that outputs a value to trigger deeper exploration

The pipeline is designed to handle terabyte-sized tables efficiently through chunked processing.

## Setup and Imports

In [1]:
# Install required packages if needed
# !pip install google-cloud-bigquery bigframes tensorflow ncps

In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import logging
import time
from typing import Dict, List, Optional, Tuple, Union, Any, Generator

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('bigquery_pipeline')

# Import our components
from ember_ml.nn.features.terabyte_feature_extractor import TerabyteFeatureExtractor, TerabyteTemporalStrideProcessor
from ember_ml.models.optimized_rbm import OptimizedRBM


# Import the integrated pipeline
from pipeline_demo import IntegratedPipeline

DEBUG: get_stats_module - Backend base module name: ember_ml.backend.mlx
DEBUG: get_stats_module - Constructed module name: ember_ml.backend.mlx.stats
DEBUG: get_stats_module - Successfully imported module: ember_ml.backend.mlx.stats


## BigQuery Connection Setup

Set up the connection to BigQuery. You can use service account credentials or application default credentials.

In [3]:
# Set your GCP project ID
PROJECT_ID = "massmkt-poc"  # Replace with your project ID

# Path to service account credentials (optional)
CREDENTIALS_PATH = '/Users/sydneybach/sydney-bach.json'  # Replace with path to credentials.json if needed

# BigQuery location
LOCATION = "US"

# Initialize the feature extractor
feature_extractor = TerabyteFeatureExtractor(
    project_id=PROJECT_ID,
    location=LOCATION,
    chunk_size=100000,
    max_memory_gb=16.0,
    verbose=True
)

# Set up BigQuery connection
feature_extractor.setup_bigquery_connection(CREDENTIALS_PATH)

2025-04-22 12:29:37,151 - terabyte_feature_extractor - INFO - Using mlx backend for computation
2025-04-22 12:29:37,152 - terabyte_feature_extractor - INFO - Initialized TerabyteFeatureExtractor with chunk_size=100000, max_memory_gb=16.0
2025-04-22 12:29:37,179 - terabyte_feature_extractor - INFO - BigQuery connection set up successfully


## Explore Available Tables

Let's explore the available tables in your BigQuery project.

In [4]:
# Import BigQuery client
from google.cloud import bigquery

# Create client
client = bigquery.Client(project=PROJECT_ID)

# List datasets
datasets = list(client.list_datasets())
print(f"Datasets in project {PROJECT_ID}:")
for dataset in datasets:
    print(f"- {dataset.dataset_id}")

# Choose a dataset to explore
if datasets:
    dataset_id = datasets[0].dataset_id
    print(f"\nTables in dataset {dataset_id}:")
    tables = list(client.list_tables(dataset_id))
    for table in tables:
        print(f"- {table.table_id}")

Datasets in project massmkt-poc:
- 180601513
- 239703222
- 242584221
- 267229650
- BQML_Datasets
- BigQuery_Google_Ads
- Event_Data_Dictionary
- FieldOps_Reporting_Dataset
- MM_LP_Data
- MM_LP_POC
- Marketing_Cloud
- Monitored_Zipcodes_Expanded
- Partners
- QF_AIOPS
- QF_Activation_POC
- QF_BUY_FLOW_TRANSACTIONS
- QF_Shell_Account_CleanUp_Data
- Service_Appointment_History
- TEST1
- abandoned_jobs
- analytics_251783832
- analytics_379694883
- analytics_405473592
- analytics_424581992
- analytics_435146347
- analytics_451204749
- biwf_mysql_db
- confluent_sink
- connected_communities_dev
- connected_communities_prod
- contact_engine
- datafirst_prod
- datalake_ingestion_sandbox
- design_repair
- dev_sandbox
- dispatch_events
- dispatch_events_test
- ds_mmdldev_bluemarble
- ds_mmdldev_bluemarble_raw
- ds_mmdldev_bluemarble_stg
- ds_mmdldev_kafka_master
- ds_mmdldev_nokiahal_raw
- ds_mmdldev_pc360_raw
- ds_mmpoc_dev_consmobile_silver
- ds_mmpoc_master
- ds_mmpoc_master_uscentral
- dwh_sta

## Extract Features from BigQuery

Now let's extract features from a BigQuery table. Replace `TABLE_ID` with the table you want to use.

In [5]:
# Set the table ID
TABLE_ID = "TEST1.ctl_modem_speedtest_event"  # Replace with your table ID

# Set the target column (optional)
TARGET_COLUMN = 'downloadLatency'  # Replace with your target column if needed

# Set a limit for testing (remove for full dataset)
LIMIT = 10000

# Define a function to process each chunk
def process_chunk(chunk_df):
    # If it's a BigFrames DataFrame, convert it to pandas
    if hasattr(chunk_df, 'to_pandas') and callable(getattr(chunk_df, 'to_pandas')):
        # Convert BigFrames DataFrame to pandas DataFrame
        return chunk_df.to_pandas()
    
    # If it's already a pandas DataFrame, return it
    if isinstance(chunk_df, pd.DataFrame):
        return chunk_df
    
    # Otherwise, try to convert to pandas DataFrame
    try:
        return pd.DataFrame(chunk_df)
    except Exception as e:
        print(f"Error converting to pandas DataFrame: {e}")
        # Return empty DataFrame as fallback
        return pd.DataFrame()

# Extract features
result = feature_extractor.prepare_data(
    table_id=TABLE_ID,
    target_column=TARGET_COLUMN,
    limit=LIMIT,
    force_categorical_columns=[
        "eventType", "eventSource", "eventCategory", "eventPublisherId",
        "productClass", "downloadTestStatus", "uploadState", "uploadTestStatus",
        "wtn", "serialNumber"
    ]
)

if result is not None:
    train_df, val_df, test_df, train_features, val_features, test_features, scaler, imputer = result
    
    print(f"Train shape: {train_df.shape}")
    print(f"Validation shape: {val_df.shape}")
    print(f"Test shape: {test_df.shape}")
    print(f"Features: {train_features}")
else:
    print("Feature extraction failed")
    # Create empty variables to avoid NameError in subsequent cells
    train_df = pd.DataFrame()
    val_df = pd.DataFrame()
    test_df = pd.DataFrame()
    train_features = []
    val_features = []
    test_features = []

2025-04-22 12:29:41,258 - terabyte_feature_extractor - INFO - Starting data preparation for table TEST1.ctl_modem_speedtest_event
2025-04-22 12:29:41,259 - terabyte_feature_extractor - INFO - Limiting to 10000 rows (1 chunks)


2025-04-22 12:29:50,660 - terabyte_feature_extractor - INFO - Table TEST1.ctl_modem_speedtest_event has approximately 241 rows
2025-04-22 12:29:50,661 - terabyte_feature_extractor - INFO - Processing approximately 241 rows in 1 chunks of 100000
2025-04-22 12:29:50,661 - terabyte_feature_extractor - INFO - Processing chunk 1/1


2025-04-22 12:29:54,590 - terabyte_feature_extractor - INFO - Chunk 1/1 processed in 3.93s. Progress: 100.0%. Estimated time remaining: 0.00s
2025-04-22 12:29:54,591 - terabyte_feature_extractor - INFO - Current memory usage: 0.91 GB
2025-04-22 12:29:54,742 - terabyte_feature_extractor - INFO - Completed processing 1 chunks in 4.08s
2025-04-22 12:29:54,742 - terabyte_feature_extractor - ERROR - Failed to process data: result is not a DataFrame


Feature extraction failed


In [6]:
import bigframes.pandas as bf
from sklearn.preprocessing import StandardScaler  # Will be replaced with BigFrames scaler
from sklearn.impute import SimpleImputer
from typing import List, Optional, Tuple, Dict
import pandas as pd  # For struct flattening and one-hot encoding
import warnings
import numpy as np  # For random number generation
from bigframes.ml.preprocessing import MaxAbsScaler  # BigFrames scaler
import gc  # For memory management

# --- Imports for corrected type checking ---
import pyarrow.types as pat  # Explicit PyArrow types import
# --- End corrected type checking imports ---
PROJECT_ID = "massmkt-poc"  # @param {type:"string"}
REGION = "US"  # @param {type:"string"}

# Set BigQuery DataFrames options
# Note: The project option is not required in all environments.
# On BigQuery Studio, the project ID is automatically detected.
bf.options.bigquery.project = PROJECT_ID

# Note: The location option is not required.
# It defaults to the location of the first table or query
# passed to read_gbq(). For APIs where a location can't be
# auto-detected, the location defaults to the "US" location.
bf.options.bigquery.location = REGION
def prepare_bigquery_data_bf(
    project_id: str,
    table_id: str,
    target_column: Optional[str] = None,
    force_categorical_columns: List[str] = None,
    drop_columns: List[str] = None,
    high_null_threshold: float = 0.9,
    limit: Optional[int] = None,
    index_col: Optional[str] = None,
) -> Tuple[bf.DataFrame, bf.DataFrame, bf.DataFrame, List[str], List[str], List[str], MaxAbsScaler, SimpleImputer]:
    """
    Prepares data from a BigQuery table using BigFrames.

    Args:
        project_id: GCP project ID.
        table_id: BigQuery table ID (dataset.table).
        target_column: Target variable name. Heuristics if None.
        force_categorical_columns: Always treat as categorical.
        drop_columns: Columns to drop.
        high_null_threshold: Drop columns with > this % nulls (after encoding).
        limit: Optional row limit for testing.
        index_col: Optional index column.

    Returns:
        Tuple: (train_bf_df, val_bf_df, test_bf_df, train_features, val_features, test_features, scaler, imputer)
    """

    # --- Session and Memory Cleanup ---
    bf.close_session()
    gc.collect()

    # Set fixed random seed for reproducibility
    np.random.seed(42)
    bf.options.bigquery.project = project_id

    if force_categorical_columns is None:
        force_categorical_columns = []
    if drop_columns is None:
        drop_columns = []

    bf_df = bf.read_gbq(table_id, index_col=index_col)
    gc.collect()

    # --- Data Type Conversion ---
    numeric_columns = ['downloadThroughput', 'downloadLatency', 'uploadThroughput']
    for col in numeric_columns:
        if col in bf_df.columns:
            bf_df[col] = bf_df[col].astype('Float64')
    if 'eventTimestamp' in bf_df.columns:
        bf_df['eventTimestamp'] = bf_df['eventTimestamp'].astype('datetime64[ns]')
    # --- End data type conversion ---

    if limit:
        bf_df = bf_df.head(limit)
    print("Schema (bf_df.dtypes):", bf_df.dtypes)

    # --- Robust Numeric and Datetime Type Detection ---
    def is_numeric_type(col_type) -> bool:
        try:
            if pd.api.types.is_numeric_dtype(col_type):
                return True
            if hasattr(col_type, "id"):
                return pat.is_integer(col_type) or pat.is_floating(col_type)
            return False
        except TypeError as e:
            print(f"Error checking numeric type for {col_type}: {e}")
            return False

    def is_datetime_type(col_type) -> bool:
        try:
            if pd.api.types.is_datetime64_any_dtype(col_type):
                return True
            if hasattr(col_type, "id"):
                return pat.is_timestamp(col_type) or pat.is_date(col_type)
            return False
        except TypeError as e:
            print(f"Error checking datetime type for {col_type}: {e}")
            return False

    schema = bf_df.dtypes
    numeric_cols: List[str] = []
    datetime_cols: List[str] = []
    categorical_cols: List[str] = []
    struct_cols: List[str] = []
    boolean_cols: List[str] = []

    for col_name, col_type in schema.items():
        if is_numeric_type(col_type):
            numeric_cols.append(col_name)
        elif is_datetime_type(col_type):
            datetime_cols.append(col_name)
        elif hasattr(bf, 'BooleanDtype') and col_type == bf.BooleanDtype():
            boolean_cols.append(col_name)
        elif hasattr(bf, 'StringDtype') and (col_type == bf.StringDtype() or col_name in force_categorical_columns):
            categorical_cols.append(col_name)
        elif "STRUCT" in str(col_type).upper():
            struct_cols.append(col_name)

    # Modify prepare_dataframe to accept fixed_dummy_columns
    def prepare_dataframe(bf_df_split, is_train=False, scaler=None, imputer=None,
                          high_null_cols_train=None,
                          fixed_dummy_columns: Optional[List[str]] = None
                         ) -> Optional[Tuple[bf.DataFrame, List[str], MaxAbsScaler, SimpleImputer, List[str], Optional[List[str]]]]:
        try:
            print(f"Preparing dataframe (is_train={is_train})...")

            # Filter drop_columns if any
            cols_to_drop = [col for col in drop_columns if col in bf_df_split.columns]
            print(f"Columns to drop (after filtering): {cols_to_drop}")
            if cols_to_drop:
                bf_df_split = bf_df_split.drop(columns=cols_to_drop)

            # Flatten STRUCT columns
            for struct_col in struct_cols:
                if struct_col in bf_df_split.columns:
                    print(f"Flattening struct column: {struct_col}")
                    bf_df_split = bf_df_split.ml.flatten(struct_col)

            # Convert datetime columns
            for col in datetime_cols:
                if col in bf_df_split.columns:
                    print(f"Converting to timestamp: {col}")
                    bf_df_split[col] = bf_df_split[col].to_timestamp()

            # Determine target column
            _target_column = target_column
            if _target_column is None:
                eligible = [col for col in numeric_cols if col in bf_df_split.columns and col not in categorical_cols]
                if eligible:
                    _target_column = eligible[-1]
                    print(f"WARNING: No target specified. Using '{_target_column}' as target.")
                else:
                    raise ValueError("No suitable target column found.")

            # Initial features: numeric (excluding target) plus boolean columns
            features = [col for col in numeric_cols if col != _target_column and col in bf_df_split.columns]
            features.extend(boolean_cols)
            print(f"Initial features: {features}")

            # One-hot encode categorical columns
            if categorical_cols:
                print(f"Encoding categorical columns: {categorical_cols}")
                df_pd = bf_df_split.to_pandas()
                df_dummy = pd.get_dummies(df_pd, columns=categorical_cols, drop_first=False)
                df_dummy.columns = df_dummy.columns.str.replace(r'[^0-9a-zA-Z_]', '_', regex=True)
                if is_train:
                    fixed_dummy = list(df_dummy.columns)
                else:
                    if fixed_dummy_columns is not None:
                        df_dummy = df_dummy.reindex(columns=fixed_dummy_columns, fill_value=0)
                    fixed_dummy = fixed_dummy_columns
                bf_df_split = bf.DataFrame(df_dummy)
            else:
                print("No categorical columns to encode.")
                fixed_dummy = fixed_dummy_columns

            # Add cyclical features for datetime columns
            for col in datetime_cols:
                if col in bf_df_split.columns:
                    print(f"Adding cyclical features for: {col}")
                    bf_df_split[col + '_sin_hour'] = ops.sin(2 * ops.pi * bf_df_split[col].dt.hour / 23.0)
                    bf_df_split[col + '_cos_hour'] = ops.cos(2 * ops.pi * bf_df_split[col].dt.hour / 23.0)
                    bf_df_split[col + '_sin_dayofweek'] = ops.sin(2 * ops.pi * bf_df_split[col].dt.dayofweek / 6.0)
                    bf_df_split[col + '_cos_dayofweek'] = ops.cos(2 * ops.pi * bf_df_split[col].dt.dayofweek / 6.0)
                    bf_df_split[col + '_sin_dayofmonth'] = ops.sin(2 * ops.pi * bf_df_split[col].dt.day / 30.0)
                    bf_df_split[col + '_cos_dayofmonth'] = ops.cos(2 * ops.pi * bf_df_split[col].dt.day / 30.0)
                    bf_df_split[col + '_sin_month'] = ops.sin(2 * ops.pi * bf_df_split[col].dt.month / 11.0)
                    bf_df_split[col + '_cos_month'] = ops.cos(2 * ops.pi * bf_df_split[col].dt.month / 11.0)

            print(f"After datetime feature engineering, columns: {bf_df_split.columns.tolist()}")

            # Re-assess numeric columns
            current_numeric = [col for col, dtype in bf_df_split.dtypes.items() if is_numeric_type(dtype)]
            features = [col for col in current_numeric if col != _target_column and col in bf_df_split.columns]
            print(f"Final features: {features}")

            # Drop high-null columns (if any)
            high_null_cols_to_drop_list = []
            if is_train:
                train_pd = bf_df_split[features + [_target_column]].to_pandas()
                imputer = SimpleImputer(strategy='median')
                train_pd[features] = imputer.fit_transform(train_pd[features])
                scaler = MaxAbsScaler()
                train_pd[features] = scaler.fit_transform(train_pd[features])
                # Update numeric features: drop then join updated ones.
                updated_numeric = bf.DataFrame(train_pd[features], index=train_pd.index)
                bf_df_split = bf_df_split.drop(columns=features).join(updated_numeric)
                print("Scaler and imputer fitted and applied on training data.")
            else:
                if high_null_cols_train is None:
                    raise ValueError("High null columns from training must be provided for validation/test sets.")
                high_null_cols_to_drop_list = high_null_cols_train
                temp_pd = bf_df_split[features + [_target_column]].to_pandas()
                temp_pd[features] = imputer.transform(temp_pd[features])
                temp_pd[features] = scaler.transform(temp_pd[features])
                updated_numeric = bf.DataFrame(temp_pd[features], index=temp_pd.index)
                bf_df_split = bf_df_split.drop(columns=features).join(updated_numeric)
                print("Scaler and imputer applied on validation/test data.")

            # (Optional) Drop any high-null columns
            high_null_cols_to_drop_final = [col for col in high_null_cols_to_drop_list if col in bf_df_split.columns]
            print(f"Columns to drop due to high nulls: {high_null_cols_to_drop_final}")
            if high_null_cols_to_drop_final:
                bf_df_split = bf_df_split.drop(columns=high_null_cols_to_drop_final)
            features = [col for col in features if col not in high_null_cols_to_drop_final]
            print(f"Features after dropping high null columns: {features}")

            gc.collect()
            print("Dataframe prepared successfully. Returning split.")
            return bf_df_split, features, scaler, imputer, high_null_cols_to_drop_final, fixed_dummy
        except Exception as e:
            print(f"ERROR in prepare_dataframe (is_train={is_train}): {e}")
            import traceback
            traceback.print_exc()
            return None

    # --- Split Data ---
    if index_col and pd.api.types.is_datetime64_any_dtype(bf_df.index):
        print("Performing temporal split...")
        split_date = bf_df.index.quantile(0.8)
        train_bf_df = bf_df[bf_df.index <= split_date]
        temp_bf_df = bf_df[bf_df.index > split_date]
        print(f"Temporal split date: {split_date}")
    else:
        print("Performing random split...")
        bf_df['__split_rand'] = bf.Series(np.random.rand(len(bf_df.index)).tolist(), index=bf_df.index)
        train_bf_df = bf_df[bf_df['__split_rand'] <= 0.8].drop(columns=['__split_rand'])
        temp_bf_df = bf_df[(bf_df['__split_rand'] > 0.8) & (bf_df['__split_rand'] <= 1.0)]
        print("Random split ratios: 80/20 (train/temp)")

    # Manually split temp_bf_df into validation and test sets.
    temp_bf_df['__split_rand2'] = bf.Series(np.random.rand(len(temp_bf_df.index)).tolist(), index=temp_bf_df.index)
    val_bf_df = temp_bf_df[temp_bf_df['__split_rand2'] <= 0.5].drop(columns=['__split_rand2'])
    test_bf_df = temp_bf_df[temp_bf_df['__split_rand2'] > 0.5].drop(columns=['__split_rand2'])
    if '__split_rand' in temp_bf_df.columns:
        temp_bf_df = temp_bf_df.drop(columns=['__split_rand'])
    print("Validation/test split ratios: 50/50 from temp")

    # --- Process Splits ---
    res_train = prepare_dataframe(train_bf_df, is_train=True)
    if res_train is None:
        return None
    train_bf_df_processed, train_features, scaler, imputer, high_null_cols_train, fixed_dummy_columns = res_train

    res_val = prepare_dataframe(val_bf_df, is_train=False, scaler=scaler, imputer=imputer, high_null_cols_train=high_null_cols_train, fixed_dummy_columns=fixed_dummy_columns)
    if res_val is None:
        return None
    val_bf_df_processed, val_features, _, _, _, _ = res_val

    res_test = prepare_dataframe(test_bf_df, is_train=False, scaler=scaler, imputer=imputer, high_null_cols_train=high_null_cols_train, fixed_dummy_columns=fixed_dummy_columns)
    if res_test is None:
        return None
    test_bf_df_processed, test_features, _, _, _, _ = res_test

    gc.collect()
    return train_bf_df_processed, val_bf_df_processed, test_bf_df_processed, train_features, val_features, test_features, scaler, imputer


def flatten_struct(df: pd.DataFrame, struct_col_name: str) -> pd.DataFrame:
    """
    Flattens a simple struct column in a Pandas DataFrame.
    """
    struct_df = pd.json_normalize(df[struct_col_name])
    struct_df = struct_df.add_prefix(f"{struct_col_name}_")
    df = pd.concat([df.drop(columns=[struct_col_name]), struct_df], axis=1)
    return df


# --- Example Usage ---
project_id = "massmkt-poc"
table_id = "TEST1.ctl_modem_speedtest_event"
result = prepare_bigquery_data_bf(
    project_id,
    table_id,
    # target_column="downloadThroughput",  # Optional
    force_categorical_columns=[
        "eventType", "eventSource", "eventCategory", "eventPublisherId",
        "productClass", "downloadTestStatus", "uploadState", "uploadTestStatus",
        "wtn", "serialNumber"
    ],
    high_null_threshold=0.95,
    limit=1000,  # For testing
    index_col="eventTimestamp"  # Use datetime index for temporal split
)

if result is not None:
    train_bf_df, val_bf_df, test_bf_df, train_features, val_features, test_features, scaler, imputer = result

    # Convert to pandas DataFrames only when necessary
    train_df = train_bf_df.to_pandas()
    val_df = val_bf_df.to_pandas()
    test_df = test_bf_df.to_pandas()

    print("Train DF Shape:", train_df.shape)
    print("Training Features:", train_features)
    print(f"BigFrames version: {bf.__version__}")
    print(f"Pandas version: {pd.__version__}")
    print(f"Scaler type: {type(scaler)}")
else:
    print("Error occurred during data preparation. Check traceback above.")


Schema (bf_df.dtypes): eventId               string[pyarrow]
eventType             string[pyarrow]
eventSource           string[pyarrow]
eventCategory         string[pyarrow]
eventPublisherId      string[pyarrow]
wtn                   string[pyarrow]
serialNumber          string[pyarrow]
productClass          string[pyarrow]
downloadThroughput            Float64
downloadLatency               Float64
downloadTestStatus    string[pyarrow]
uploadState           string[pyarrow]
uploadThroughput              Float64
uploadTestStatus      string[pyarrow]
dtype: object
Performing random split...
Random split ratios: 80/20 (train/temp)


Validation/test split ratios: 50/50 from temp
Preparing dataframe (is_train=True)...
Columns to drop (after filtering): []
Initial features: ['downloadThroughput', 'downloadLatency']
Encoding categorical columns: ['eventType', 'eventSource', 'eventCategory', 'eventPublisherId', 'wtn', 'serialNumber', 'productClass', 'downloadTestStatus', 'uploadState', 'uploadTestStatus']


After datetime feature engineering, columns: ['eventId', 'downloadThroughput', 'downloadLatency', 'uploadThroughput', 'eventType_Create', 'eventSource_PC360', 'eventCategory_PC360_Event', 'eventPublisherId_PC360', 'wtn_1000247195', 'wtn_1000303624', 'wtn_1000332141', 'wtn_1100005591', 'wtn_1100020488', 'wtn_1100020668', 'wtn_1100020705', 'wtn_1100020834', 'wtn_1100021198', 'wtn_1500002566', 'wtn_1500017644', 'wtn_1500022989', 'wtn_1500035626', 'wtn_1500036835', 'wtn_1500040944', 'wtn_1500042692', 'wtn_1500056528', 'wtn_1500063674', 'wtn_1500067899', 'wtn_1500069551', 'wtn_1500069582', 'wtn_1500071356', 'wtn_1500071635', 'wtn_1500081777', 'wtn_1500102184', 'wtn_1500118760', 'wtn_1500119464', 'wtn_1500158163', 'wtn_1500160533', 'wtn_1500191274', 'wtn_1500198761', 'wtn_1500241380', 'wtn_1500280766', 'wtn_1500281624', 'wtn_1500316464', 'wtn_1500339563', 'wtn_6238761000', 'serialNumber_C4000BZS210Z48005293', 'serialNumber_C4000BZS220Z37010089', 'serialNumber_C4000XG2007055551', 'serialNumbe

Scaler and imputer fitted and applied on training data.
Columns to drop due to high nulls: []
Features after dropping high null columns: ['downloadThroughput', 'downloadLatency', 'eventType_Create', 'eventSource_PC360', 'eventCategory_PC360_Event', 'eventPublisherId_PC360', 'wtn_1000247195', 'wtn_1000303624', 'wtn_1000332141', 'wtn_1100005591', 'wtn_1100020488', 'wtn_1100020668', 'wtn_1100020705', 'wtn_1100020834', 'wtn_1100021198', 'wtn_1500002566', 'wtn_1500017644', 'wtn_1500022989', 'wtn_1500035626', 'wtn_1500036835', 'wtn_1500040944', 'wtn_1500042692', 'wtn_1500056528', 'wtn_1500063674', 'wtn_1500067899', 'wtn_1500069551', 'wtn_1500069582', 'wtn_1500071356', 'wtn_1500071635', 'wtn_1500081777', 'wtn_1500102184', 'wtn_1500118760', 'wtn_1500119464', 'wtn_1500158163', 'wtn_1500160533', 'wtn_1500191274', 'wtn_1500198761', 'wtn_1500241380', 'wtn_1500280766', 'wtn_1500281624', 'wtn_1500316464', 'wtn_1500339563', 'wtn_6238761000', 'serialNumber_C4000BZS210Z48005293', 'serialNumber_C4000BZS

After datetime feature engineering, columns: ['eventId', 'downloadThroughput', 'downloadLatency', 'uploadThroughput', 'eventType_Create', 'eventSource_PC360', 'eventCategory_PC360_Event', 'eventPublisherId_PC360', 'wtn_1000247195', 'wtn_1000303624', 'wtn_1000332141', 'wtn_1100005591', 'wtn_1100020488', 'wtn_1100020668', 'wtn_1100020705', 'wtn_1100020834', 'wtn_1100021198', 'wtn_1500002566', 'wtn_1500017644', 'wtn_1500022989', 'wtn_1500035626', 'wtn_1500036835', 'wtn_1500040944', 'wtn_1500042692', 'wtn_1500056528', 'wtn_1500063674', 'wtn_1500067899', 'wtn_1500069551', 'wtn_1500069582', 'wtn_1500071356', 'wtn_1500071635', 'wtn_1500081777', 'wtn_1500102184', 'wtn_1500118760', 'wtn_1500119464', 'wtn_1500158163', 'wtn_1500160533', 'wtn_1500191274', 'wtn_1500198761', 'wtn_1500241380', 'wtn_1500280766', 'wtn_1500281624', 'wtn_1500316464', 'wtn_1500339563', 'wtn_6238761000', 'serialNumber_C4000BZS210Z48005293', 'serialNumber_C4000BZS220Z37010089', 'serialNumber_C4000XG2007055551', 'serialNumbe

Scaler and imputer applied on validation/test data.
Columns to drop due to high nulls: []
Features after dropping high null columns: ['downloadThroughput', 'downloadLatency', 'eventType_Create', 'eventSource_PC360', 'eventCategory_PC360_Event', 'eventPublisherId_PC360', 'wtn_1000247195', 'wtn_1000303624', 'wtn_1000332141', 'wtn_1100005591', 'wtn_1100020488', 'wtn_1100020668', 'wtn_1100020705', 'wtn_1100020834', 'wtn_1100021198', 'wtn_1500002566', 'wtn_1500017644', 'wtn_1500022989', 'wtn_1500035626', 'wtn_1500036835', 'wtn_1500040944', 'wtn_1500042692', 'wtn_1500056528', 'wtn_1500063674', 'wtn_1500067899', 'wtn_1500069551', 'wtn_1500069582', 'wtn_1500071356', 'wtn_1500071635', 'wtn_1500081777', 'wtn_1500102184', 'wtn_1500118760', 'wtn_1500119464', 'wtn_1500158163', 'wtn_1500160533', 'wtn_1500191274', 'wtn_1500198761', 'wtn_1500241380', 'wtn_1500280766', 'wtn_1500281624', 'wtn_1500316464', 'wtn_1500339563', 'wtn_6238761000', 'serialNumber_C4000BZS210Z48005293', 'serialNumber_C4000BZS220Z

After datetime feature engineering, columns: ['eventId', 'downloadThroughput', 'downloadLatency', 'uploadThroughput', 'eventType_Create', 'eventSource_PC360', 'eventCategory_PC360_Event', 'eventPublisherId_PC360', 'wtn_1000247195', 'wtn_1000303624', 'wtn_1000332141', 'wtn_1100005591', 'wtn_1100020488', 'wtn_1100020668', 'wtn_1100020705', 'wtn_1100020834', 'wtn_1100021198', 'wtn_1500002566', 'wtn_1500017644', 'wtn_1500022989', 'wtn_1500035626', 'wtn_1500036835', 'wtn_1500040944', 'wtn_1500042692', 'wtn_1500056528', 'wtn_1500063674', 'wtn_1500067899', 'wtn_1500069551', 'wtn_1500069582', 'wtn_1500071356', 'wtn_1500071635', 'wtn_1500081777', 'wtn_1500102184', 'wtn_1500118760', 'wtn_1500119464', 'wtn_1500158163', 'wtn_1500160533', 'wtn_1500191274', 'wtn_1500198761', 'wtn_1500241380', 'wtn_1500280766', 'wtn_1500281624', 'wtn_1500316464', 'wtn_1500339563', 'wtn_6238761000', 'serialNumber_C4000BZS210Z48005293', 'serialNumber_C4000BZS220Z37010089', 'serialNumber_C4000XG2007055551', 'serialNumbe

Scaler and imputer applied on validation/test data.
Columns to drop due to high nulls: []
Features after dropping high null columns: ['downloadThroughput', 'downloadLatency', 'eventType_Create', 'eventSource_PC360', 'eventCategory_PC360_Event', 'eventPublisherId_PC360', 'wtn_1000247195', 'wtn_1000303624', 'wtn_1000332141', 'wtn_1100005591', 'wtn_1100020488', 'wtn_1100020668', 'wtn_1100020705', 'wtn_1100020834', 'wtn_1100021198', 'wtn_1500002566', 'wtn_1500017644', 'wtn_1500022989', 'wtn_1500035626', 'wtn_1500036835', 'wtn_1500040944', 'wtn_1500042692', 'wtn_1500056528', 'wtn_1500063674', 'wtn_1500067899', 'wtn_1500069551', 'wtn_1500069582', 'wtn_1500071356', 'wtn_1500071635', 'wtn_1500081777', 'wtn_1500102184', 'wtn_1500118760', 'wtn_1500119464', 'wtn_1500158163', 'wtn_1500160533', 'wtn_1500191274', 'wtn_1500198761', 'wtn_1500241380', 'wtn_1500280766', 'wtn_1500281624', 'wtn_1500316464', 'wtn_1500339563', 'wtn_6238761000', 'serialNumber_C4000BZS210Z48005293', 'serialNumber_C4000BZS220Z

Train DF Shape: (194, 88)
Training Features: ['downloadThroughput', 'downloadLatency', 'eventType_Create', 'eventSource_PC360', 'eventCategory_PC360_Event', 'eventPublisherId_PC360', 'wtn_1000247195', 'wtn_1000303624', 'wtn_1000332141', 'wtn_1100005591', 'wtn_1100020488', 'wtn_1100020668', 'wtn_1100020705', 'wtn_1100020834', 'wtn_1100021198', 'wtn_1500002566', 'wtn_1500017644', 'wtn_1500022989', 'wtn_1500035626', 'wtn_1500036835', 'wtn_1500040944', 'wtn_1500042692', 'wtn_1500056528', 'wtn_1500063674', 'wtn_1500067899', 'wtn_1500069551', 'wtn_1500069582', 'wtn_1500071356', 'wtn_1500071635', 'wtn_1500081777', 'wtn_1500102184', 'wtn_1500118760', 'wtn_1500119464', 'wtn_1500158163', 'wtn_1500160533', 'wtn_1500191274', 'wtn_1500198761', 'wtn_1500241380', 'wtn_1500280766', 'wtn_1500281624', 'wtn_1500316464', 'wtn_1500339563', 'wtn_6238761000', 'serialNumber_C4000BZS210Z48005293', 'serialNumber_C4000BZS220Z37010089', 'serialNumber_C4000XG2007055551', 'serialNumber_C4000XG2044189633', 'serialNu

## Apply Temporal Stride Processing

Now let's apply temporal stride processing to the extracted features.

In [None]:
from ember_ml.models.stride_aware_cfc import (
    create_liquid_network_with_motor_neuron,
    create_lstm_gated_liquid_network,
    create_multi_stride_liquid_network
)
# Create temporal processor
temporal_processor = TerabyteTemporalStrideProcessor(
    window_size=10,
    stride_perspectives=[1, 3, 5],
    pca_components=32,
    batch_size=10000,
    use_incremental_pca=True,
    verbose=True
)

# Define a generator to yield data in batches
def data_generator(df, features, batch_size=10000):
    for i in range(0, len(df), batch_size):
        yield df.iloc[i:i+batch_size][features].values

# Process data - make sure train_df and train_features are defined
if len(train_df) > 0 and len(train_features) > 0:
    stride_perspectives = temporal_processor.process_large_dataset(
        data_generator(train_df, train_features, batch_size=10000)
    )

    # Print stride perspective shapes
    for stride, data in stride_perspectives.items():
        print(f"Stride {stride}: shape {data.shape}")
    
    # Visualize explained variance for each stride
    explained_variances = [temporal_processor.get_explained_variance(stride) for stride in stride_perspectives.keys()]
    plt.figure(figsize=(10, 6))
    plt.bar(stride_perspectives.keys(), explained_variances)
    plt.xlabel('Stride Length')
    plt.ylabel('Explained Variance Ratio')
    plt.title('Explained Variance by Stride Length')
    plt.show()
else:
    print("Cannot process data: train_df or train_features is empty")

2025-04-22 12:31:59,506 - terabyte_feature_extractor - INFO - Using mlx backend for computation
2025-04-22 12:31:59,506 - terabyte_feature_extractor - INFO - Initialized TerabyteTemporalStrideProcessor with window_size=10, stride_perspectives=[1, 3, 5], batch_size=10000
2025-04-22 12:31:59,511 - terabyte_feature_extractor - INFO - Processing batch 1 with 194 rows


ValueError: Invalid type  ndarray received in array initialization.

In [None]:
import bigframes.pandas as bf
from sklearn.preprocessing import StandardScaler  # Will be replaced with BigFrames scaler
from sklearn.impute import SimpleImputer
from typing import List, Optional, Tuple, Dict
import pandas as pd  # For struct flattening and one-hot encoding
import warnings
import numpy as np  # For random number generation
from bigframes.ml.preprocessing import MaxAbsScaler  # BigFrames scaler
import gc  # For memory management

# --- Imports for corrected type checking ---
import pyarrow.types as pat  # Explicit PyArrow types import
# --- End corrected type checking imports ---

PROJECT_ID = "massmkt-poc"  # @param {type:"string"}
REGION = "US"  # @param {type:"string"}

# Set BigQuery DataFrames options
# Note: The project option is not required in all environments.
# On BigQuery Studio, the project ID is automatically detected.
bf.options.bigquery.project = PROJECT_ID

# Note: The location option is not required.
# It defaults to the location of the first table or query
# passed to read_gbq(). For APIs where a location can't be
# auto-detected, the location defaults to the "US" location.
bf.options.bigquery.location = REGION

In [None]:
def prepare_bigquery_data_bf(
    project_id: str,
    table_id: str,
    target_column: Optional[str] = None,
    force_categorical_columns: List[str] = None,
    drop_columns: List[str] = None,
    high_null_threshold: float = 0.9,
    limit: Optional[int] = None,
    index_col: Optional[str] = None,
) -> Tuple[bf.DataFrame, bf.DataFrame, bf.DataFrame, List[str], List[str], List[str], MaxAbsScaler, SimpleImputer]:
    """
    Prepares data from a BigQuery table using BigFrames.

    Args:
        project_id: GCP project ID.
        table_id: BigQuery table ID (dataset.table).
        target_column: Target variable name. Heuristics if None.
        force_categorical_columns: Always treat as categorical.
        drop_columns: Columns to drop.
        high_null_threshold: Drop columns with > this % nulls (after encoding).
        limit: Optional row limit for testing.
        index_col: Optional index column.

    Returns:
        Tuple: (train_bf_df, val_bf_df, test_bf_df, train_features, val_features, test_features, scaler, imputer)
    """

    # --- Session and Memory Cleanup ---
    bf.close_session()
    gc.collect()

    # Set fixed random seed for reproducibility
    np.random.seed(42)
    bf.options.bigquery.project = project_id

    if force_categorical_columns is None:
        force_categorical_columns = []
    if drop_columns is None:
        drop_columns = []

    bf_df = bf.read_gbq(table_id, index_col=index_col)
    gc.collect()

    # --- Data Type Conversion ---
    numeric_columns = ['downloadThroughput', 'downloadLatency', 'uploadThroughput']
    for col in numeric_columns:
        if col in bf_df.columns:
            bf_df[col] = bf_df[col].astype('Float64')
    if 'eventTimestamp' in bf_df.columns:
        bf_df['eventTimestamp'] = bf_df['eventTimestamp'].astype('datetime64[ns]')
    # --- End data type conversion ---

    if limit:
        bf_df = bf_df.head(limit)
    print("Schema (bf_df.dtypes):", bf_df.dtypes)

    # --- Robust Numeric and Datetime Type Detection ---
    def is_numeric_type(col_type) -> bool:
        try:
            if pd.api.types.is_numeric_dtype(col_type):
                return True
            if hasattr(col_type, "id"):
                return pat.is_integer(col_type) or pat.is_floating(col_type)
            return False
        except TypeError as e:
            print(f"Error checking numeric type for {col_type}: {e}")
            return False

    def is_datetime_type(col_type) -> bool:
        try:
            if pd.api.types.is_datetime64_any_dtype(col_type):
                return True
            if hasattr(col_type, "id"):
                return pat.is_timestamp(col_type) or pat.is_date(col_type)
            return False
        except TypeError as e:
            print(f"Error checking datetime type for {col_type}: {e}")
            return False

    schema = bf_df.dtypes
    numeric_cols: List[str] = []
    datetime_cols: List[str] = []
    categorical_cols: List[str] = []
    struct_cols: List[str] = []
    boolean_cols: List[str] = []

    for col_name, col_type in schema.items():
        if is_numeric_type(col_type):
            numeric_cols.append(col_name)
        elif is_datetime_type(col_type):
            datetime_cols.append(col_name)
        elif hasattr(bf, 'BooleanDtype') and col_type == bf.BooleanDtype():
            boolean_cols.append(col_name)
        elif hasattr(bf, 'StringDtype') and (col_type == bf.StringDtype() or col_name in force_categorical_columns):
            categorical_cols.append(col_name)
        elif "STRUCT" in str(col_type).upper():
            struct_cols.append(col_name)

    # Modify prepare_dataframe to accept fixed_dummy_columns
    def prepare_dataframe(bf_df_split, is_train=False, scaler=None, imputer=None,
                          high_null_cols_train=None,
                          fixed_dummy_columns: Optional[List[str]] = None
                         ) -> Optional[Tuple[bf.DataFrame, List[str], MaxAbsScaler, SimpleImputer, List[str], Optional[List[str]]]]:
        try:
            print(f"Preparing dataframe (is_train={is_train})...")

            # Filter drop_columns if any
            cols_to_drop = [col for col in drop_columns if col in bf_df_split.columns]
            print(f"Columns to drop (after filtering): {cols_to_drop}")
            if cols_to_drop:
                bf_df_split = bf_df_split.drop(columns=cols_to_drop)

            # Flatten STRUCT columns
            for struct_col in struct_cols:
                if struct_col in bf_df_split.columns:
                    print(f"Flattening struct column: {struct_col}")
                    bf_df_split = bf_df_split.ml.flatten(struct_col)

            # Convert datetime columns
            for col in datetime_cols:
                if col in bf_df_split.columns:
                    print(f"Converting to timestamp: {col}")
                    bf_df_split[col] = bf_df_split[col].to_timestamp()

            # Determine target column
            _target_column = target_column
            if _target_column is None:
                eligible = [col for col in numeric_cols if col in bf_df_split.columns and col not in categorical_cols]
                if eligible:
                    _target_column = eligible[-1]
                    print(f"WARNING: No target specified. Using '{_target_column}' as target.")
                else:
                    raise ValueError("No suitable target column found.")

            # Initial features: numeric (excluding target) plus boolean columns
            features = [col for col in numeric_cols if col != _target_column and col in bf_df_split.columns]
            features.extend(boolean_cols)
            print(f"Initial features: {features}")

            # One-hot encode categorical columns
            if categorical_cols:
                print(f"Encoding categorical columns: {categorical_cols}")
                df_pd = bf_df_split.to_pandas()
                df_dummy = pd.get_dummies(df_pd, columns=categorical_cols, drop_first=False)
                df_dummy.columns = df_dummy.columns.str.replace(r'[^0-9a-zA-Z_]', '_', regex=True)
                if is_train:
                    fixed_dummy = list(df_dummy.columns)
                else:
                    if fixed_dummy_columns is not None:
                        df_dummy = df_dummy.reindex(columns=fixed_dummy_columns, fill_value=0)
                    fixed_dummy = fixed_dummy_columns
                bf_df_split = bf.DataFrame(df_dummy)
            else:
                print("No categorical columns to encode.")
                fixed_dummy = fixed_dummy_columns

            # Add cyclical features for datetime columns
            for col in datetime_cols:
                if col in bf_df_split.columns:
                    print(f"Adding cyclical features for: {col}")
                    bf_df_split[col + '_sin_hour'] = ops.sin(2 * ops.pi * bf_df_split[col].dt.hour / 23.0)
                    bf_df_split[col + '_cos_hour'] = ops.cos(2 * ops.pi * bf_df_split[col].dt.hour / 23.0)
                    bf_df_split[col + '_sin_dayofweek'] = ops.sin(2 * ops.pi * bf_df_split[col].dt.dayofweek / 6.0)
                    bf_df_split[col + '_cos_dayofweek'] = ops.cos(2 * ops.pi * bf_df_split[col].dt.dayofweek / 6.0)
                    bf_df_split[col + '_sin_dayofmonth'] = ops.sin(2 * ops.pi * bf_df_split[col].dt.day / 30.0)
                    bf_df_split[col + '_cos_dayofmonth'] = ops.cos(2 * ops.pi * bf_df_split[col].dt.day / 30.0)
                    bf_df_split[col + '_sin_month'] = ops.sin(2 * ops.pi * bf_df_split[col].dt.month / 11.0)
                    bf_df_split[col + '_cos_month'] = ops.cos(2 * ops.pi * bf_df_split[col].dt.month / 11.0)

            print(f"After datetime feature engineering, columns: {bf_df_split.columns.tolist()}")

            # Re-assess numeric columns
            current_numeric = [col for col, dtype in bf_df_split.dtypes.items() if is_numeric_type(dtype)]
            features = [col for col in current_numeric if col != _target_column and col in bf_df_split.columns]
            print(f"Final features: {features}")

            # Drop high-null columns (if any)
            high_null_cols_to_drop_list = []
            if is_train:
                train_pd = bf_df_split[features + [_target_column]].to_pandas()
                imputer = SimpleImputer(strategy='median')
                train_pd[features] = imputer.fit_transform(train_pd[features])
                scaler = MaxAbsScaler()
                train_pd[features] = scaler.fit_transform(train_pd[features])
                # Update numeric features: drop then join updated ones.
                updated_numeric = bf.DataFrame(train_pd[features], index=train_pd.index)
                bf_df_split = bf_df_split.drop(columns=features).join(updated_numeric)
                print("Scaler and imputer fitted and applied on training data.")
            else:
                if high_null_cols_train is None:
                    raise ValueError("High null columns from training must be provided for validation/test sets.")
                high_null_cols_to_drop_list = high_null_cols_train
                temp_pd = bf_df_split[features + [_target_column]].to_pandas()
                temp_pd[features] = imputer.transform(temp_pd[features])
                temp_pd[features] = scaler.transform(temp_pd[features])
                updated_numeric = bf.DataFrame(temp_pd[features], index=temp_pd.index)
                bf_df_split = bf_df_split.drop(columns=features).join(updated_numeric)
                print("Scaler and imputer applied on validation/test data.")

            # (Optional) Drop any high-null columns
            high_null_cols_to_drop_final = [col for col in high_null_cols_to_drop_list if col in bf_df_split.columns]
            print(f"Columns to drop due to high nulls: {high_null_cols_to_drop_final}")
            if high_null_cols_to_drop_final:
                bf_df_split = bf_df_split.drop(columns=high_null_cols_to_drop_final)
            features = [col for col in features if col not in high_null_cols_to_drop_final]
            print(f"Features after dropping high null columns: {features}")

            gc.collect()
            print("Dataframe prepared successfully. Returning split.")
            return bf_df_split, features, scaler, imputer, high_null_cols_to_drop_final, fixed_dummy
        except Exception as e:
            print(f"ERROR in prepare_dataframe (is_train={is_train}): {e}")
            import traceback
            traceback.print_exc()
            return None

    # --- Split Data ---
    if index_col and pd.api.types.is_datetime64_any_dtype(bf_df.index):
        print("Performing temporal split...")
        split_date = bf_df.index.quantile(0.8)
        train_bf_df = bf_df[bf_df.index <= split_date]
        temp_bf_df = bf_df[bf_df.index > split_date]
        print(f"Temporal split date: {split_date}")
    else:
        print("Performing random split...")
        bf_df['__split_rand'] = bf.Series(np.random.rand(len(bf_df.index)).tolist(), index=bf_df.index)
        train_bf_df = bf_df[bf_df['__split_rand'] <= 0.8].drop(columns=['__split_rand'])
        temp_bf_df = bf_df[(bf_df['__split_rand'] > 0.8) & (bf_df['__split_rand'] <= 1.0)]
        print("Random split ratios: 80/20 (train/temp)")

    # Manually split temp_bf_df into validation and test sets.
    temp_bf_df['__split_rand2'] = bf.Series(np.random.rand(len(temp_bf_df.index)).tolist(), index=temp_bf_df.index)
    val_bf_df = temp_bf_df[temp_bf_df['__split_rand2'] <= 0.5].drop(columns=['__split_rand2'])
    test_bf_df = temp_bf_df[temp_bf_df['__split_rand2'] > 0.5].drop(columns=['__split_rand2'])
    if '__split_rand' in temp_bf_df.columns:
        temp_bf_df = temp_bf_df.drop(columns=['__split_rand'])
    print("Validation/test split ratios: 50/50 from temp")

    # --- Process Splits ---
    res_train = prepare_dataframe(train_bf_df, is_train=True)
    if res_train is None:
        return None
    train_bf_df_processed, train_features, scaler, imputer, high_null_cols_train, fixed_dummy_columns = res_train

    res_val = prepare_dataframe(val_bf_df, is_train=False, scaler=scaler, imputer=imputer, high_null_cols_train=high_null_cols_train, fixed_dummy_columns=fixed_dummy_columns)
    if res_val is None:
        return None
    val_bf_df_processed, val_features, _, _, _, _ = res_val

    res_test = prepare_dataframe(test_bf_df, is_train=False, scaler=scaler, imputer=imputer, high_null_cols_train=high_null_cols_train, fixed_dummy_columns=fixed_dummy_columns)
    if res_test is None:
        return None
    test_bf_df_processed, test_features, _, _, _, _ = res_test

    gc.collect()
    return train_bf_df_processed, val_bf_df_processed, test_bf_df_processed, train_features, val_features, test_features, scaler, imputer

In [None]:
def flatten_struct(df: pd.DataFrame, struct_col_name: str) -> pd.DataFrame:
    """
    Flattens a simple struct column in a Pandas DataFrame.
    """
    struct_df = pd.json_normalize(df[struct_col_name])
    struct_df = struct_df.add_prefix(f"{struct_col_name}_")
    df = pd.concat([df.drop(columns=[struct_col_name]), struct_df], axis=1)
    return df

# --- Example Usage ---
project_id = "massmkt-poc"
table_id = "TEST1.ctl_modem_speedtest_event"
result = prepare_bigquery_data_bf(
    project_id,
    table_id,
    target_column="downloadLatency",  # Set target column to match the one in the feature extractor
    force_categorical_columns=[
        "eventType", "eventSource", "eventCategory", "eventPublisherId",
        "productClass", "downloadTestStatus", "uploadState", "uploadTestStatus",
        "wtn", "serialNumber"
    ],
    high_null_threshold=0.95,
    limit=1000,  # For testing
    index_col="eventTimestamp"  # Use datetime index for temporal split
)

if result is not None:
    train_bf_df, val_bf_df, test_bf_df, train_features, val_features, test_features, scaler, imputer = result

    # Convert to pandas DataFrames only when necessary
    train_df = train_bf_df.to_pandas()
    val_df = val_bf_df.to_pandas()
    test_df = test_bf_df.to_pandas()

    print("Train DF Shape:", train_df.shape)
    print("Training Features:", train_features)
    print(f"BigFrames version: {bf.__version__}")
    print(f"Pandas version: {pd.__version__}")
    print(f"Scaler type: {type(scaler)}")
else:
    print("Error occurred during data preparation. Check traceback above.")

## Train Restricted Boltzmann Machine

Now let's train an RBM on the extracted features.

In [None]:
# Create RBM
if len(train_features) > 0:
    rbm = OptimizedRBM(
        n_visible=len(train_features),
        n_hidden=64,
        learning_rate=0.01,
        momentum=0.5,
        weight_decay=0.0001,
        batch_size=100,
        use_binary_states=False,
        use_gpu=True,
        verbose=True
    )

    # Define a generator to yield data in batches
    def rbm_data_generator(data, features, batch_size=100):
        # Shuffle data
        indices = tensor.random_permutation(len(data))
        data = data.iloc[indices]
        
        for i in range(0, len(data), batch_size):
            yield data.iloc[i:i+batch_size][features].values

    # Train RBM
    training_errors = rbm.train_in_chunks(
        rbm_data_generator(train_df, train_features, batch_size=100),
        epochs=10,
        k=1
    )

    # Plot training errors
    plt.figure(figsize=(10, 6))
    plt.plot(training_errors)
    plt.xlabel('Epoch')
    plt.ylabel('Reconstruction Error')
    plt.title('RBM Training Error')
    plt.show()
else:
    print("Cannot train RBM: train_features is empty")

## Extract RBM Features

Now let's extract features from the trained RBM.

In [None]:
# Define a generator to yield data in batches
def feature_generator(data, features, batch_size=1000):
    for i in range(0, len(data), batch_size):
        yield data.iloc[i:i+batch_size][features].values

# Extract features if RBM is trained and train_features is not empty
if 'rbm' in locals() and len(train_features) > 0:
    # Extract features
    train_rbm_features = rbm.transform_in_chunks(
        feature_generator(train_df, train_features, batch_size=1000)
    )

    val_rbm_features = rbm.transform_in_chunks(
        feature_generator(val_df, val_features, batch_size=1000)
    )

    test_rbm_features = rbm.transform_in_chunks(
        feature_generator(test_df, test_features, batch_size=1000)
    )

    print(f"Train RBM features shape: {train_rbm_features.shape}")
    print(f"Validation RBM features shape: {val_rbm_features.shape}")
    print(f"Test RBM features shape: {test_rbm_features.shape}")
else:
    print("Cannot extract RBM features: RBM not trained or train_features is empty")

## Visualize RBM Features

Let's visualize the RBM features to understand what patterns it has learned.

In [None]:
# Visualize RBM features if they exist
if 'train_rbm_features' in locals():
    # Visualize RBM feature distributions
    plt.figure(figsize=(12, 8))

    # Plot histograms for first 16 RBM features
    for i in range(min(16, train_rbm_features.shape[1])):
        plt.subplot(4, 4, i+1)
        plt.hist(train_rbm_features[:, i], bins=30, alpha=0.7)
        plt.title(f'Feature {i+1}')
        plt.tight_layout()

    plt.suptitle('RBM Feature Distributions', y=1.02)
    plt.show()

    # Visualize feature correlations
    plt.figure(figsize=(10, 8))
    corr_matrix = np.corrcoef(train_rbm_features, rowvar=False)
    plt.imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)
    plt.colorbar()
    plt.title('RBM Feature Correlations')
    plt.show()
else:
    print("Cannot visualize RBM features: train_rbm_features not available")

## Create and Train Liquid Neural Network

Now let's create and train a liquid neural network with the RBM features.

In [None]:
# Create and train liquid neural network if RBM features exist
if 'train_rbm_features' in locals():
    # For demonstration, we'll create dummy targets
    # In a real application, you would use actual targets from your data
    train_targets = np.random.rand(len(train_rbm_features), 1)
    val_targets = np.random.rand(len(val_rbm_features), 1)
    test_targets = np.random.rand(len(test_rbm_features), 1)

    # Reshape RBM features for sequence input
    train_rbm_seq = train_rbm_features.reshape(train_rbm_features.shape[0], 1, train_rbm_features.shape[1])
    val_rbm_seq = val_rbm_features.reshape(val_rbm_features.shape[0], 1, val_rbm_features.shape[1])
    test_rbm_seq = test_rbm_features.reshape(test_rbm_features.shape[0], 1, test_rbm_features.shape[1])

    # Create liquid neural network
    liquid_network = create_liquid_network_with_motor_neuron(
        input_dim=train_rbm_features.shape[1],
        units=128,
        output_dim=1,
        sparsity_level=0.5,
        stride_length=1,
        time_scale_factor=1.0,
        threshold=0.5,
        adaptive_threshold=True,
        mixed_memory=True
    )

    # Set up callbacks
    callbacks = [
        # Early stopping
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True
        ),
        
        # Learning rate scheduling
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-6
        ),
        
        # TensorBoard logging
        tf.keras.callbacks.TensorBoard(
            log_dir='./logs',
            histogram_freq=1
        )
    ]

    # Train liquid network
    history = liquid_network.fit(
        train_rbm_seq,
        train_targets,
        validation_data=(val_rbm_seq, val_targets),
        epochs=50,
        batch_size=32,
        callbacks=callbacks,
        verbose=1
    )

    # Plot training history
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Train')
    plt.plot(history.history['val_loss'], label='Validation')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['mae'], label='Train')
    plt.plot(history.history['val_mae'], label='Validation')
    plt.xlabel('Epoch')
    plt.ylabel('MAE')
    plt.title('Mean Absolute Error')
    plt.legend()

    plt.tight_layout()
    plt.show()
else:
    print("Cannot train liquid neural network: RBM features not available")

## Process Test Data and Analyze Motor Neuron Output

Now let's process the test data through the complete pipeline and analyze the motor neuron output.

In [None]:
# Process test data if liquid network is trained
if 'liquid_network' in locals() and 'test_rbm_seq' in locals():
    # Process test data
    outputs = liquid_network.predict(test_rbm_seq)

    # Extract motor neuron outputs and trigger signals
    if isinstance(outputs, list):
        motor_outputs = outputs[0]
        trigger_signals = outputs[1][0]  # First element is trigger
        threshold_values = outputs[1][1]  # Second element is threshold
    else:
        motor_outputs = outputs
        trigger_signals = (motor_outputs > 0.5).astype(float)
        threshold_values = np.full_like(trigger_signals, 0.5)

    # Print statistics
    print(f"Motor neuron output range: {motor_outputs.min():.4f} to {motor_outputs.max():.4f}")
    print(f"Trigger rate: {trigger_signals.mean():.4f}")

    # Plot motor neuron outputs and triggers
    plt.figure(figsize=(12, 8))

    plt.subplot(2, 1, 1)
    plt.plot(motor_outputs[:100], label='Motor Neuron Output')
    plt.plot(threshold_values[:100], 'r--', label='Threshold')
    plt.xlabel('Sample')
    plt.ylabel('Output Value')
    plt.title('Motor Neuron Output and Threshold')
    plt.legend()

    plt.subplot(2, 1, 2)
    plt.plot(trigger_signals[:100], 'g', label='Trigger Signal')
    plt.axhline(y=trigger_signals.mean(), color='r', linestyle='--', 
               label=f'Trigger Rate: {trigger_signals.mean():.2f}')
    plt.xlabel('Sample')
    plt.ylabel('Trigger (0/1)')
    plt.title('Exploration Trigger Signals')
    plt.legend()

    plt.tight_layout()
    plt.show()
else:
    print("Cannot process test data: liquid network not trained or test_rbm_seq not available")

## Analyze Triggered Samples

Let's analyze the samples that triggered deeper exploration.

In [None]:
# Analyze triggered samples if motor outputs and test RBM features exist
if 'motor_outputs' in locals() and 'test_rbm_features' in locals() and 'trigger_signals' in locals():
    # Get indices of triggered samples
    triggered_indices = ops.where(trigger_signals == 1)[0]
    non_triggered_indices = ops.where(trigger_signals == 0)[0]

    print(f"Number of triggered samples: {len(triggered_indices)}")
    print(f"Number of non-triggered samples: {len(non_triggered_indices)}")

    # Compare RBM features for triggered vs. non-triggered samples
    if len(triggered_indices) > 0 and len(non_triggered_indices) > 0:
        # Calculate mean features
        triggered_mean = stats.mean(test_rbm_features[triggered_indices], axis=0)
        non_triggered_mean = stats.mean(test_rbm_features[non_triggered_indices], axis=0)
        
        # Calculate feature difference
        feature_diff = triggered_mean - non_triggered_mean
        
        # Plot feature difference
        plt.figure(figsize=(12, 6))
        plt.bar(range(len(feature_diff)), feature_diff)
        plt.xlabel('RBM Feature')
        plt.ylabel('Difference (Triggered - Non-triggered)')
        plt.title('Feature Difference Between Triggered and Non-triggered Samples')
        plt.axhline(y=0, color='r', linestyle='--')
        plt.show()
        
        # Plot feature distributions for top 3 differentiating features
        top_features = np.argsort(ops.abs(feature_diff))[-3:]
        
        plt.figure(figsize=(15, 5))
        for i, feature_idx in enumerate(top_features):
            plt.subplot(1, 3, i+1)
            plt.hist(test_rbm_features[triggered_indices, feature_idx], bins=20, alpha=0.5, label='Triggered')
            plt.hist(test_rbm_features[non_triggered_indices, feature_idx], bins=20, alpha=0.5, label='Non-triggered')
            plt.xlabel(f'Feature {feature_idx}')
            plt.ylabel('Count')
            plt.title(f'Feature {feature_idx} Distribution')
            plt.legend()
        
        plt.tight_layout()
        plt.show()
else:
    print("Cannot analyze triggered samples: motor_outputs, test_rbm_features, or trigger_signals not available")

## Save Models

Let's save the trained models for future use.

In [None]:
# Create directory for models
os.makedirs('./models', exist_ok=True)

# Save RBM if it exists
if 'rbm' in locals():
    rbm.save('./models/rbm.npy')
    print("RBM saved to ./models/rbm.npy")
else:
    print("Cannot save RBM: not trained")

# Save liquid network if it exists
if 'liquid_network' in locals():
    liquid_network.save('./models/liquid_network')
    print("Liquid network saved to ./models/liquid_network")
else:
    print("Cannot save liquid network: not trained")

## Using the Integrated Pipeline

Now let's demonstrate how to use the integrated pipeline for a more streamlined workflow.

In [None]:
# Create integrated pipeline
pipeline = IntegratedPipeline(
    project_id=PROJECT_ID,
    rbm_hidden_units=64,
    cfc_units=128,
    lstm_units=32,
    stride_perspectives=[1, 3, 5],
    sparsity_level=0.5,
    threshold=0.5,
    use_gpu=True,
    verbose=True
)

# Initialize feature extractor
pipeline.initialize_feature_extractor(CREDENTIALS_PATH)

# Extract features
train_features_pipeline, val_features_pipeline, test_features_pipeline = pipeline.extract_features(
    table_id=TABLE_ID,
    target_column=TARGET_COLUMN,
    limit=LIMIT
)

# Check if features were extracted successfully
if train_features_pipeline is not None:
    # Apply temporal processing
    train_temporal = pipeline.apply_temporal_processing(train_features_pipeline)

    # Train RBM
    pipeline.train_rbm(train_features_pipeline, epochs=10)

    # Extract RBM features
    train_rbm_features_pipeline = pipeline.extract_rbm_features(train_features_pipeline)
    val_rbm_features_pipeline = pipeline.extract_rbm_features(val_features_pipeline)
    test_rbm_features_pipeline = pipeline.extract_rbm_features(test_features_pipeline)

    # Create dummy targets for demonstration
    train_targets_pipeline = np.random.rand(len(train_rbm_features_pipeline), 1)
    val_targets_pipeline = np.random.rand(len(val_rbm_features_pipeline), 1)

    # Train liquid network
    pipeline.train_liquid_network(
        features=train_rbm_features_pipeline,
        targets=train_targets_pipeline,
        validation_data=(val_rbm_features_pipeline, val_targets_pipeline),
        epochs=50,
        batch_size=32,
        network_type='lstm_gated'
    )

    # Process test data
    motor_outputs_pipeline, trigger_signals_pipeline = pipeline.process_data(test_rbm_features_pipeline)

    # Print results
    print(f"Processed {len(test_rbm_features_pipeline)} test samples")
    print(f"Motor neuron output range: {motor_outputs_pipeline.min():.4f} to {motor_outputs_pipeline.max():.4f}")
    print(f"Trigger rate: {trigger_signals_pipeline.mean():.4f}")

    # Save models
    pipeline.save_model('./models')

    # Print pipeline summary
    print(pipeline.summary())
else:
    print("Pipeline feature extraction failed")

## Conclusion

In this notebook, we've demonstrated how to:

1. Extract and prepare data from BigQuery tables using our terabyte-scale feature extractor
2. Apply temporal stride processing to capture patterns at different time scales
3. Train a Restricted Boltzmann Machine to learn latent representations
4. Feed the RBM output into a CfC-based liquid neural network with LSTM neurons for gating
5. Implement a motor neuron that outputs a value to trigger deeper exploration
6. Analyze the results to understand which samples trigger deeper exploration

This pipeline can be used for processing terabyte-sized tables efficiently through chunked processing, making it suitable for large-scale data analysis and exploration.