## Download the Numerai Crypto Datasets

In [None]:
# Install required packages
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null ## we use jdk 11
!pip install -U h2o
!pip install pyspark==3.5.0
!pip install numerapi requests pyarrow fastparquet
!pip install findspark
# Install H2O Sparkling Water - corrected installation
!pip install h2o-pysparkling-3.5
# Install torch for GPU distribution
!pip install torch
# Step 1: Downgrade NumPy to the latest 1.x version
#!pip install numpy<2

!pip install pyarrow xgboost
!pip install cudf-cu12 --extra-index-url=https://pypi.nvidia.com
!pip install rmm-cu12 pylibcudf-cu12

# Step 3: Verify the installation works without errors
!python -c "import numpy; import scipy; from sklearn.base import BaseEstimator; print('Modules imported successfully')"


Looking in indexes: https://pypi.org/simple, https://pypi.nvidia.com
Modules imported successfully


In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MinimalExample") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .master("local[*]") \
    .getOrCreate()
spark.stop()

In [None]:
# Cell 1: Imports and Advanced Setup Functions
import os
import sys
import socket
import subprocess
import traceback
import warnings
import tempfile
import numpy as np
import requests
import h2o
import gc
import time
from functools import reduce
from pyspark.sql import DataFrame

# Enhanced GPU and Library Imports
try:
    import pynvml
    NVML_AVAILABLE = True
except ImportError:
    NVML_AVAILABLE = False

try:
    import torch
    TORCH_AVAILABLE = True
except ImportError:
    TORCH_AVAILABLE = False

import builtins
from h2o.automl import H2OAutoML
from h2o.exceptions import H2ODependencyWarning
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, DoubleType

# Advanced Warning and Logging Configuration
warnings.filterwarnings('ignore', category=H2ODependencyWarning)
warnings.filterwarnings('ignore', message='.*GPU.*')
warnings.filterwarnings('ignore', message='.*Arrow.*')

def get_system_resources():
    """Dynamically detect and configure system resources."""
    config = {
        'limit_features': False,
        'features_per_group': 50,
        'max_runtime_secs': 1200,
        'max_runtime_secs_combined': 2400,
        'max_models': 100,
        'nfolds': 5,
        'batch_size': 1000,
        'chunk_size': 10000
    }

    # Detect GPU Resources
    if NVML_AVAILABLE:
        try:
            pynvml.nvmlInit()
            gpu_count = pynvml.nvmlDeviceGetCount()
            config['gpu_count'] = gpu_count

            # Dynamically adjust memory and processing parameters
            total_memory = builtins.min(gpu_count * 60, 360)  # Use builtins.min explicitly
            config['memory'] = f'{total_memory}g'
            config['gpu_memory_reserve'] = f'{builtins.max(8, gpu_count * 4)}g'  # Use builtins.max
            config['executor_instances'] = gpu_count
        except Exception as e:
            print(f"GPU detection error: {e}")
            config['memory'] = '224g'
            config['gpu_memory_reserve'] = '8g'
            config['executor_instances'] = gpu_count
    else:
        config['memory'] = '180g'
        config['gpu_memory_reserve'] = '8g'
        config['executor_instances'] = gpu_count

    return config

# Dynamic Configuration
CONFIG = get_system_resources()

def get_local_network_config():
    """Advanced network configuration detection."""
    try:
        # Prefer IPv4 local address
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        local_ip = s.getsockname()[0]
        s.close()

        # Get hostname
        hostname = socket.gethostname()

        return {
            'local_ip': local_ip,
            'hostname': hostname,
            'available_ports': [54321, 8080, 7077]  # Common distributed computing ports
        }
    except Exception as e:
        print(f"Network configuration error: {e}")
        return {
            'local_ip': '127.0.0.1',
            'hostname': 'localhost',
            'available_ports': [54321]
        }


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import h2o
from pyspark.sql.functions import col
from h2o.automl import H2OAutoML

# Initialize Spark with needed configs
spark = SparkSession.builder \
   .appName("CryptoDataProcessing") \
   .config("spark.driver.memory", "8g") \
   .config("spark.executor.memory", "4g") \
   .config("spark.dynamicAllocation.enabled", "false") \
   .master("local[*]") \
   .getOrCreate()

# Initialize H2O
h2o.init()

# Load data
h2o_frame = h2o.import_file("yiedl_latest.parquet")

import numerapi
import os

napi = numerapi.NumerAPI()

# Download training data and live universe
napi.download_dataset("crypto/v1.0/train_targets.parquet", os.getcwd() + "/numerai_train_targets.parquet")
napi.download_dataset("crypto/v1.0/live_universe.parquet", os.getcwd() + "/numerai_live_universe.parquet")

# Load into h2o
train = h2o.import_file("numerai_train_targets.parquet")
live = h2o.import_file("numerai_live_universe.parquet")

def feature_engineering(frame):
   for col in ['pvm', 'sentiment', 'onchain']:
       if col in frame.columns:
           frame[f'{col}_squared'] = frame[col]**2
           frame[f'{col}_cubed'] = frame[col]**3

   if all(x in frame.columns for x in ['pvm', 'sentiment', 'onchain']):
       frame['pvm_sentiment'] = frame['pvm'] * frame['sentiment']
       frame['pvm_onchain'] = frame['pvm'] * frame['onchain']
       frame['sentiment_onchain'] = frame['sentiment'] * frame['onchain']

   return frame

enhanced_train = feature_engineering(train)
enhanced_live = feature_engineering(live)

aml = H2OAutoML(max_models=20, seed=1, max_runtime_secs=26000)
aml.train(x=[col for col in enhanced_train.columns if col not in ['target', 'era']],
         y='target',
         training_frame=enhanced_train)

print(aml.leaderboard)


# Cleanup
h2o.cluster().shutdown()
spark.stop()

Checking whether there is an H2O instance running at http://localhost:54321..... not found.
Attempting to start a local H2O server...
  Java Version: openjdk version "1.8.0_432"; OpenJDK Runtime Environment (build 1.8.0_432-8u432-ga~us1-0ubuntu2~22.04-ga); OpenJDK 64-Bit Server VM (build 25.432-bga, mixed mode)
  Starting server from /home/knight1/numerai_EH/lib/python3.10/site-packages/h2o/backend/bin/h2o.jar
  Ice root: /tmp/tmp6n7fpyg0
  JVM stdout: /tmp/tmp6n7fpyg0/h2o_knight1_started_from_python.out
  JVM stderr: /tmp/tmp6n7fpyg0/h2o_knight1_started_from_python.err
  Server is running at http://127.0.0.1:54321
Connecting to H2O server at http://127.0.0.1:54321 ... successful.


0,1
H2O_cluster_uptime:,01 secs
H2O_cluster_timezone:,Europe/Amsterdam
H2O_data_parsing_timezone:,UTC
H2O_cluster_version:,3.46.0.6
H2O_cluster_version_age:,2 months and 22 days
H2O_cluster_name:,H2O_from_python_knight1_l1z605
H2O_cluster_total_nodes:,1
H2O_cluster_free_memory:,26.63 Gb
H2O_cluster_total_cores:,20
H2O_cluster_allowed_cores:,20


Parse progress: |████████████████████████████████████████████████████████████████| (done) 100%
Parse progress: |████████████████████████████████████████████████████████████████| (done) 100%
Parse progress: |████████████████████████████████████████████████████████████████| (done) 100%
AutoML progress: |███████

In [None]:
from pyspark.sql import SparkSession
from pysparkling.conf import H2OConf
from pyspark import SparkConf
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.pipeline import Pipeline
from h2o.automl import H2OAutoML
from pysparkling import H2OContext


from pysparkling import H2OContext
from pysparkling.conf import H2OConf

import h2o
from pyspark.sql import SparkSession

# Initialize H2O
h2o.init()

import os

spark = SparkSession.builder \
    .appName("CryptoDataProcessing") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "220g") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .config("spark.hadoop.fs.hdfs.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .master("local[*]") \
    .getOrCreate()

# Helper function to download files
def download_file(url, output_filename):
    response = requests.get(url)
    if response.status_code == 200:
        with open(output_filename, 'wb') as file:
            file.write(response.content)
        print(f"File downloaded successfully as {output_filename}")
    else:
        print("Failed to download file")

# Use H2O directly instead of H2OContext
# Download YIEDL crypto latest dataset
url = 'https://api.yiedl.ai/yiedl/v1/downloadDataset?type=latest'
output_filename = 'yiedl_latest.parquet'
download_file(url, output_filename)

# Get absolute path
absolute_path = os.path.abspath("yiedl_latest.parquet")

# Create H2O configuration
h2o_conf = H2OConf()
h2o_conf.setInternalClusterMode()

# Read Parquet file directly into H2O Frame
h2o_frame = h2o.import_file(f"file://{absolute_path}")

# Print H2O Frame
#print(h2o_frame)

# Stop Spark session
spark.stop()



# Advanced feature preprocessing
def advanced_feature_preprocessing(df):
    symbol_window = Window.partitionBy('symbol')
    date_window = Window.partitionBy('date')

    enhanced_df = (df
        .withColumn('pvm_rolling_mean_5', F.avg('pvm').over(symbol_window.orderBy('date').rowsBetween(-5, 0)))
        .withColumn('pvm_rolling_mean_10', F.avg('pvm').over(symbol_window.orderBy('date').rowsBetween(-10, 0)))
        .withColumn('sentiment_rolling_mean_5', F.avg('sentiment').over(symbol_window.orderBy('date').rowsBetween(-5, 0)))
        .withColumn('onchain_rolling_mean_5', F.avg('onchain').over(symbol_window.orderBy('date').rowsBetween(-5, 0)))
        .withColumn('pvm_rsi', F.expr('(pvm - lag(pvm, 14) over (partition by symbol order by date)) / 14'))
        .withColumn('sentiment_rsi', F.expr('(sentiment - lag(sentiment, 14) over (partition by symbol order by date)) / 14'))
        .withColumn('pvm_volatility', F.stddev('pvm').over(symbol_window.orderBy('date').rowsBetween(-10, 0)))
        .withColumn('sentiment_volatility', F.stddev('sentiment').over(symbol_window.orderBy('date').rowsBetween(-10, 0)))
        .withColumn('pvm_percentile', F.percent_rank().over(symbol_window.orderBy('pvm')))
        .withColumn('sentiment_percentile', F.percent_rank().over(symbol_window.orderBy('sentiment')))
    )

    return enhanced_df

# Add advanced polynomial features
def add_advanced_polynomial_features(df):
    base_features = ['pvm', 'sentiment', 'onchain']

    for feature in base_features:
        df = (df
            .withColumn(f"{feature}_squared", F.pow(F.col(feature), 2))
            .withColumn(f"{feature}_cubed", F.pow(F.col(feature), 3))
            .withColumn(f"{feature}_log", F.log(F.abs(F.col(feature)) + 1))
        )

    interaction_features = [('pvm', 'sentiment'), ('pvm', 'onchain'), ('sentiment', 'onchain')]

    for (feat1, feat2) in interaction_features:
        df = (df
            .withColumn(f"{feat1}_{feat2}_interaction", F.col(feat1) * F.col(feat2))
            .withColumn(f"{feat1}_{feat2}_diff", F.abs(F.col(feat1) - F.col(feat2)))
        )

    return df

# Preprocess and enhance the dataset
preprocessed_df = advanced_feature_preprocessing(h2o_frame)
enhanced_df = add_advanced_polynomial_features(preprocessed_df)

# Convert Spark DataFrame to H2O Frame
h2o_frame = hc.asH2OFrame(enhanced_df)

# Initialize H2O AutoML
aml = H2OAutoML(
    max_models=20,
    seed=1,
    max_runtime_secs=600  # 10 minutes
)

# Train models using AutoML
target_column = "target"  # Replace with your actual target column
aml.train(x=h2o_frame.columns[:-1],  # exclude target
          y=target_column,
          training_frame=h2o_frame)

# Display the H2O leaderboard
leaderboard = aml.leaderboard
print(leaderboard)

# Stop H2O and Spark sessions
h2o.cluster().shutdown()
spark.stop()

Checking whether there is an H2O instance running at http://localhost:54321. connected.


0,1
H2O_cluster_uptime:,1 hour 55 mins
H2O_cluster_timezone:,Europe/Amsterdam
H2O_data_parsing_timezone:,UTC
H2O_cluster_version:,3.46.0.6
H2O_cluster_version_age:,2 months and 22 days
H2O_cluster_name:,H2O_from_python_knight1_5e51as
H2O_cluster_total_nodes:,1
H2O_cluster_free_memory:,24.85 Gb
H2O_cluster_total_cores:,20
H2O_cluster_allowed_cores:,20


KeyboardInterrupt: 

In [None]:
# Cell 2: Core Processing Functions with Enhanced Error Handling
def analyze_feature_columns(df, prefix, sample_size=1000):
    """
    Advanced feature column analysis with robust error handling.

    Args:
        df (DataFrame): Spark DataFrame to analyze
        prefix (str): Feature column prefix
        sample_size (int): Sample size for analysis

    Returns:
        tuple or None: Feature range information
    """
    try:
        # Get feature columns with robust filtering
        feature_cols = sorted([
            col for col in df.columns
            if col.startswith(prefix + '_') and
            not col.endswith('_array') and
            '_' in col.split(prefix + '_')[1]
        ])

        if not feature_cols:
            print(f"No features found for prefix: {prefix}")
            return None

        # Efficient non-null count
        counts = df.select([
            F.count(F.when(F.col(col).isNotNull(), True)).alias(col)
            for col in feature_cols[:5]
        ]).collect()[0].asDict()

        print(f"\n{prefix} feature sample counts: {counts}")

        # Extract valid feature numbers with error resilience
        valid_numbers = []
        for col in feature_cols:
            try:
                # More robust number extraction
                num = int(col.split('_')[-1])
                row_count = df.select(
                    F.count(F.when(F.col(col).isNotNull(), True))
                ).collect()[0][0]

                if row_count > 0:
                    valid_numbers.append(num)
            except (ValueError, IndexError) as e:
                print(f"Skipping invalid column {col}: {e}")
                continue

        if valid_numbers:
            # Use builtins to avoid PySpark function conflicts
            min_num = builtins.min(valid_numbers)
            max_num = builtins.max(valid_numbers)
            return min_num, max_num, len(valid_numbers)

        return None

    except Exception as e:
        print(f"Comprehensive error in feature column analysis: {e}")
        traceback.print_exc()
        return None

def get_feature_ranges(df):
    """
    Dynamically determine feature ranges with comprehensive error handling.

    Args:
        df (DataFrame): Spark DataFrame containing features

    Returns:
        dict: Feature ranges for different feature types
    """
    feature_ranges = {}
    feature_prefixes = ['pvm', 'sentiment', 'onchain']

    for prefix in feature_prefixes:
        try:
            result = analyze_feature_columns(df, prefix)
            if result:
                min_num, max_num, count = result
                feature_ranges[prefix] = (min_num, max_num)
                print(f"Found {count} valid {prefix} features: using range {min_num} to {max_num}")
            else:
                print(f"No valid features found for {prefix}")
        except Exception as e:
            print(f"Error processing {prefix} features: {str(e)}")
            traceback.print_exc()

    if not feature_ranges:
        raise ValueError("No valid feature ranges found across all prefixes")

    return feature_ranges

def process_feature_group(df, feature_prefix, start_idx, end_idx):
    """
    Advanced feature group processing with enhanced memory management.

    Args:
        df (DataFrame): Input Spark DataFrame
        feature_prefix (str): Prefix for feature columns
        start_idx (int): Starting feature index
        end_idx (int): Ending feature index

    Returns:
        DataFrame: Processed feature DataFrame
    """
    print(f"\nProcessing {feature_prefix} features {start_idx} to {end_idx}")

    cols = [f'{feature_prefix}_{str(i).zfill(4)}' for i in range(start_idx, end_idx + 1)]
    existing_cols = [col for col in cols if col in df.columns]

    if not existing_cols:
        raise ValueError(f"No valid columns found for {feature_prefix}")

    print(f"Found {len(existing_cols)} columns for {feature_prefix}")

    # GPU memory monitoring
    if NVML_AVAILABLE:
        monitor_gpu_memory()

    try:
        # Distributed batch processing
        result = None
        batch_size = CONFIG.get('batch_size', 1000)

        for i in range(0, len(existing_cols), batch_size):
            batch_cols = existing_cols[i:min(i + batch_size, len(existing_cols))]

            # Batch processing with repartitioning
            batch_df = df.select(['date', 'symbol'] + batch_cols)
            batch_df = batch_df.repartition(200)

            # Null handling and type conversion
            for col in batch_cols:
                batch_df = batch_df.withColumn(
                    col,
                    F.when(F.col(col).isNull(), 0.0)
                     .otherwise(F.col(col).cast("double"))
                )

            # Vector assembler for batch
            assembler = VectorAssembler(
                inputCols=batch_cols,
                outputCol=f"features_{feature_prefix}_{start_idx}_{end_idx}",
                handleInvalid="keep"
            )

            batch_df = assembler.transform(batch_df)

            # Union or set result
            result = batch_df if result is None else result.unionAll(batch_df)

            # Memory cleanup
            cleanup_memory()

        # Final GPU memory check
        if NVML_AVAILABLE:
            monitor_gpu_memory()

        return result

    except Exception as e:
        print(f"Error processing {feature_prefix}: {str(e)}")
        cleanup_memory()
        raise

def monitor_gpu_memory():
    """
    Advanced GPU memory monitoring with detailed insights.
    """
    if not NVML_AVAILABLE:
        print("NVML not available for GPU monitoring.")
        return

    try:
        pynvml.nvmlInit()
        gpu_count = pynvml.nvmlDeviceGetCount()

        print("\n--- GPU Memory Monitor ---")
        for i in range(gpu_count):
            handle = pynvml.nvmlDeviceGetHandleByIndex(i)
            name = pynvml.nvmlDeviceGetName(handle)
            memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)

            print(f"GPU {i} ({name.decode()}):")
            print(f"  Memory Used: {memory_info.used / 1024**3:.2f} GB")
            print(f"  Memory Free: {memory_info.free / 1024**3:.2f} GB")
            print(f"  Memory Total: {memory_info.total / 1024**3:.2f} GB")

            # Optional: Get compute utilization if available
            try:
                utilization = pynvml.nvmlDeviceGetUtilizationRates(handle)
                print(f"  GPU Utilization: {utilization.gpu}%")
                print(f"  Memory Utilization: {utilization.memory}%")
            except:
                pass

    except Exception as e:
        print(f"GPU monitoring error: {e}")
    finally:
        try:
            pynvml.nvmlShutdown()
        except:
            pass

In [None]:
# Cell 1: Environment Initialization Functions
def initialize_spark_session():
    """
    Create an advanced Spark session with comprehensive GPU and distributed computing support.

    Returns:
        SparkSession or None: Configured Spark session
    """
    try:
        # Comprehensive environment configuration
        network_config = get_local_network_config()
        spark = (SparkSession.builder
            .appName("DistributedFeatureProcessing")
            .config("spark.driver.host", network_config['local_ip'])
            .config("spark.driver.bindAddress", network_config['local_ip'])

            # Memory and Resource Configurations
            .config("spark.driver.memory", CONFIG['memory'])
            .config("spark.executor.memory", CONFIG['memory'])

            # GPU Specific Configurations
            .config("spark.executor.instances", CONFIG.get('executor_instances', 1))
            .config("spark.executor.resource.gpu.amount", "1")
            .config("spark.rapids.sql.concurrentGpuTasks", "1")
            .config("spark.rapids.memory.gpu.pool", "ARENA")
            .config("spark.rapids.memory.gpu.reserve", CONFIG['gpu_memory_reserve'])

            # Advanced Distributed Processing
            .config("spark.sql.adaptive.enabled", "true")
            .config("spark.dynamicAllocation.enabled", "false")
            .config("spark.sql.shuffle.partitions", CONFIG.get('executor_instances', 1) * 2)

            # Performance Tuning
            .config("spark.sql.files.maxPartitionBytes", "128m")
            .config("spark.default.parallelism", CONFIG.get('executor_instances', 1) * 2)

            .master("local[*]")
            .getOrCreate())

        # Reduce logging noise
        spark.sparkContext.setLogLevel("ERROR")

        print("Spark session created with advanced GPU support.")
        return spark

    except Exception as e:
        print(f"Spark session initialization error: {e}")
        traceback.print_exc()
        return None

def main():
    """
    Main execution function with comprehensive error handling.
    """
    spark = None
    try:
        # Initialize Spark
        spark = initialize_spark_session()
        if not spark:
            print("Failed to create Spark session.")
            return

        # Additional initialization and processing steps would go here
        print("Distributed GPU processing environment is ready.")

    except Exception as e:
        print(f"Main execution error: {e}")
        traceback.print_exc()

    finally:
        # Cleanup
        if spark:
            spark.stop()

if __name__ == "__main__":
    main()

Spark session created with advanced GPU support.
Distributed GPU processing environment is ready.


In [None]:
import torch
import os
import time

def clear_gpu_memory():
    """Clear GPU memory using PyTorch."""
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.reset_peak_memory_stats()
        print("GPU memory cleared.")

def allocate_and_compute_on_gpus():
    """Allocate memory and perform compute operations on each GPU separately."""
    num_gpus = torch.cuda.device_count()
    if num_gpus == 0:
        print("No GPUs detected.")
        return

    print(f"Detected {num_gpus} GPUs.")

    for gpu_id in range(num_gpus):
        device = torch.device(f"cuda:{gpu_id}")
        torch.cuda.set_device(device)

        # Clear GPU memory before allocation
        clear_gpu_memory()

        # Allocate 8 GB of memory on the GPU
        print(f"Allocating 8 GB of memory on GPU {gpu_id}...")
        try:
            # Create a tensor of size (1024, 2048, 1024) ~ 8 GB
            tensor1 = torch.randn((1024, 2048, 1024), device=device, dtype=torch.float32)
            # Create another tensor of size (1024, 2048, 1024) ~ 8 GB
            tensor2 = torch.randn((1024, 2048, 1024), device=device, dtype=torch.float32)
            # Total memory allocated: ~16 GB (8 GB + 8 GB)
            # Additional memory will be used during computation (e.g., intermediate results)

            print(f"Memory allocated on GPU {gpu_id}.")

            # Verify memory usage before computation
            memory_allocated = torch.cuda.memory_allocated(device) / 1024**3  # Convert to GB
            memory_reserved = torch.cuda.memory_reserved(device) / 1024**3  # Convert to GB
            print(f"GPU {gpu_id} memory usage (before computation): Allocated = {memory_allocated:.2f} GB, Reserved = {memory_reserved:.2f} GB")

            # Perform a compute-heavy operation (matrix multiplication) in a loop
            print(f"Performing matrix multiplication on GPU {gpu_id}...")
            start_time = time.time()
            for _ in range(10):  # Repeat computation 10 times
                result = torch.matmul(tensor1, tensor2)  # Matrix multiplication
                torch.cuda.synchronize()  # Wait for the operation to complete
            compute_time = time.time() - start_time
            print(f"Matrix multiplication completed on GPU {gpu_id} in {compute_time:.2f} seconds.")

            # Verify memory usage after computation
            memory_allocated = torch.cuda.memory_allocated(device) / 1024**3  # Convert to GB
            memory_reserved = torch.cuda.memory_reserved(device) / 1024**3  # Convert to GB
            print(f"GPU {gpu_id} memory usage (after computation): Allocated = {memory_allocated:.2f} GB, Reserved = {memory_reserved:.2f} GB")

            # Free memory
            del tensor1, tensor2, result
            torch.cuda.empty_cache()
        except RuntimeError as e:
            print(f"Failed to allocate memory or perform computation on GPU {gpu_id}: {e}")

if __name__ == "__main__":
    # Set environment variable to reduce fragmentation
    os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
    allocate_and_compute_on_gpus()

Detected 3 GPUs.
GPU memory cleared.
Allocating 8 GB of memory on GPU 0...
Memory allocated on GPU 0.
GPU 0 memory usage (before computation): Allocated = 16.00 GB, Reserved = 16.02 GB
Performing matrix multiplication on GPU 0...
Failed to allocate memory or perform computation on GPU 0: Expected size for first two dimensions of batch2 tensor to be: [1024, 1024] but got: [1024, 2048].
GPU memory cleared.
Allocating 8 GB of memory on GPU 1...
Memory allocated on GPU 1.
GPU 1 memory usage (before computation): Allocated = 16.00 GB, Reserved = 16.02 GB
Performing matrix multiplication on GPU 1...
Failed to allocate memory or perform computation on GPU 1: Expected size for first two dimensions of batch2 tensor to be: [1024, 1024] but got: [1024, 2048].
GPU memory cleared.
Allocating 8 GB of memory on GPU 2...
Memory allocated on GPU 2.
GPU 2 memory usage (before computation): Allocated = 16.00 GB, Reserved = 16.02 GB
Performing matrix multiplication on GPU 2...
Failed to allocate memory or

In [None]:
import torch
#spark.stop()
def clear_gpu_memory():
    """Clear GPU memory on all available GPUs."""
    if torch.cuda.is_available():
        for gpu_id in range(torch.cuda.device_count()):
            device = torch.device(f"cuda:{gpu_id}")
            torch.cuda.set_device(device)
            torch.cuda.empty_cache()
            torch.cuda.reset_peak_memory_stats()
            print(f"GPU {gpu_id} memory cleared.")
    else:
        print("No GPUs detected.")

# Clear GPU memory
clear_gpu_memory()

GPU 0 memory cleared.
GPU 1 memory cleared.
GPU 2 memory cleared.


In [None]:
# Cell 2: Core Processing Functions with Enhanced Error Handling

def analyze_feature_columns(df, prefix, sample_size=1000):
    """
    Advanced feature column analysis with robust error handling.

    Args:
        df (DataFrame): Spark DataFrame to analyze
        prefix (str): Feature column prefix
        sample_size (int): Sample size for analysis

    Returns:
        tuple or None: Feature range information
    """
    try:
        # Get feature columns with robust filtering
        feature_cols = sorted([
            col for col in df.columns
            if col.startswith(prefix + '_') and
            not col.endswith('_array') and
            '_' in col.split(prefix + '_')[1]
        ])

        if not feature_cols:
            print(f"No features found for prefix: {prefix}")
            return None

        # Efficient non-null count
        counts = df.select([
            F.count(F.when(F.col(col).isNotNull(), True)).alias(col)
            for col in feature_cols[:5]
        ]).collect()[0].asDict()

        print(f"\n{prefix} feature sample counts: {counts}")

        # Extract valid feature numbers with error resilience
        valid_numbers = []
        for col in feature_cols:
            try:
                # More robust number extraction
                num = int(col.split('_')[-1])
                row_count = df.select(
                    F.count(F.when(F.col(col).isNotNull(), True))
                ).collect()[0][0]

                if row_count > 0:
                    valid_numbers.append(num)
            except (ValueError, IndexError) as e:
                print(f"Skipping invalid column {col}: {e}")
                continue

        if valid_numbers:
            # Use builtins to avoid PySpark function conflicts
            min_num = builtins.min(valid_numbers)
            max_num = builtins.max(valid_numbers)
            return min_num, max_num, len(valid_numbers)

        return None

    except Exception as e:
        print(f"Comprehensive error in feature column analysis: {e}")
        traceback.print_exc()
        return None

def get_feature_ranges(df):
    """
    Dynamically determine feature ranges with comprehensive error handling.

    Args:
        df (DataFrame): Spark DataFrame containing features

    Returns:
        dict: Feature ranges for different feature types
    """
    feature_ranges = {}
    feature_prefixes = ['pvm', 'sentiment', 'onchain']

    for prefix in feature_prefixes:
        try:
            result = analyze_feature_columns(df, prefix)
            if result:
                min_num, max_num, count = result
                feature_ranges[prefix] = (min_num, max_num)
                print(f"Found {count} valid {prefix} features: using range {min_num} to {max_num}")
            else:
                print(f"No valid features found for {prefix}")
        except Exception as e:
            print(f"Error processing {prefix} features: {str(e)}")
            traceback.print_exc()

    if not feature_ranges:
        raise ValueError("No valid feature ranges found across all prefixes")

    return feature_ranges

def process_feature_group(df, feature_prefix, start_idx, end_idx):
    """
    Advanced feature group processing with enhanced memory management.

    Args:
        df (DataFrame): Input Spark DataFrame
        feature_prefix (str): Prefix for feature columns
        start_idx (int): Starting feature index
        end_idx (int): Ending feature index

    Returns:
        DataFrame: Processed feature DataFrame
    """
    print(f"\nProcessing {feature_prefix} features {start_idx} to {end_idx}")

    cols = [f'{feature_prefix}_{str(i).zfill(4)}' for i in range(start_idx, end_idx + 1)]
    existing_cols = [col for col in cols if col in df.columns]

    if not existing_cols:
        raise ValueError(f"No valid columns found for {feature_prefix}")

    print(f"Found {len(existing_cols)} columns for {feature_prefix}")

    # GPU memory monitoring
    if NVML_AVAILABLE:
        monitor_gpu_memory()

    try:
        # Distributed batch processing
        result = None
        batch_size = CONFIG.get('batch_size', 1000)

        for i in range(0, len(existing_cols), batch_size):
            batch_cols = existing_cols[i:min(i + batch_size, len(existing_cols))]

            # Batch processing with repartitioning
            batch_df = df.select(['date', 'symbol'] + batch_cols)
            batch_df = batch_df.repartition(200)

            # Null handling and type conversion
            for col in batch_cols:
                batch_df = batch_df.withColumn(
                    col,
                    F.when(F.col(col).isNull(), 0.0)
                     .otherwise(F.col(col).cast("double"))
                )

            # Vector assembler for batch
            assembler = VectorAssembler(
                inputCols=batch_cols,
                outputCol=f"features_{feature_prefix}_{start_idx}_{end_idx}",
                handleInvalid="keep"
            )

            batch_df = assembler.transform(batch_df)

            # Union or set result
            result = batch_df if result is None else result.unionAll(batch_df)

            # Memory cleanup
            cleanup_memory()

        # Final GPU memory check
        if NVML_AVAILABLE:
            monitor_gpu_memory()

        return result

    except Exception as e:
        print(f"Error processing {feature_prefix}: {str(e)}")
        cleanup_memory()
        raise


In [None]:
import torch
print(torch.cuda.device_count())  # Should print 3
if torch.cuda.is_available():
    print(f"Number of GPUs available: {torch.cuda.device_count()}")
    for i in range(torch.cuda.device_count()):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
else:
    print("No GPUs available.")

In [None]:
import cupy as cp

# Check available GPUs
print(f"Number of GPUs: {cp.cuda.runtime.getDeviceCount()}")

# Test basic CuPy functionality
x = cp.array([1, 2, 3])
print(x * 2)

In [None]:
import cupy as cp

# Check available GPUs
print(f"Number of GPUs: {cp.cuda.runtime.getDeviceCount()}")

# Test each GPU
for i in range(cp.cuda.runtime.getDeviceCount()):
    with cp.cuda.Device(i):
        print(f"GPU {i}: {cp.cuda.runtime.getDeviceProperties(i)['name']}")
        x = cp.array([1, 2, 3])
        print(f"GPU {i} test: {x * 2}")

Number of GPUs: 3
GPU 0: b'NVIDIA GeForce RTX 3090'
GPU 0 test: [2 4 6]
GPU 1: b'NVIDIA GeForce RTX 3090'
GPU 1 test: [2 4 6]
GPU 2: b'NVIDIA RTX A5000'
GPU 2 test: [2 4 6]


In [None]:
import h2o
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import socket
import os
import time
import uuid

def get_local_ip():
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except:
        return "127.0.0.1"

def ensure_directory(path):
    if not os.path.exists(path):
        os.makedirs(path)
    return path

def create_spark_session():
    """Create a new Spark session with basic configurations"""
    local_ip = get_local_ip()

    # Stop any existing sessions
    active_session = SparkSession._instantiatedSession
    if active_session is not None:
        active_session.stop()

    # Create new configuration
    conf = SparkConf()
    conf.set("spark.master", "local[*]")
    conf.set("spark.app.name", "SparkH2O")
    conf.set("spark.driver.memory", "4g")
    conf.set("spark.executor.memory", "4g")
    conf.set("spark.driver.host", local_ip)
    conf.set("spark.driver.bindAddress", local_ip)
    conf.set("spark.network.timeout", "3600s")
    conf.set("spark.executor.heartbeatInterval", "120s")

    return SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

def init_h2o(cluster_name):
    """Initialize H2O"""
    try:
        print("Starting H2O cluster...")

        # Create ice_root directory
        ice_root = ensure_directory("/tmp/h2o_ice_root")
        print(f"Created ice_root directory at {ice_root}")

        # Get the local IP for later status message
        local_ip = get_local_ip()
        print(f"Local IP: {local_ip}")

        # Initialize H2O on localhost
        h2o.init(
            port=54321,
            ip="localhost",  # Start on localhost
            nthreads=-1,
            ice_root=ice_root,
            start_h2o=True,
            enable_assertions=False,
            bind_to_localhost=True,  # Bind to localhost initially
            max_mem_size="4G",
            strict_version_check=False,
            name=cluster_name,
            jvm_custom_args=['-Xmx4g']
        )

        # Get cluster info
        connection = h2o.connection()
        cluster_url = connection.base_url
        print(f"\nH2O cluster URL: {cluster_url}")
        print(f"H2O connection IP: {local_ip}")

        # Show cluster status
        print("\nCluster Status:")
        h2o.cluster().show_status()

        print("H2O cluster started successfully")
        return connection

    except Exception as e:
        print(f"Failed to start H2O: {str(e)}")
        raise

def spark_to_h2o(spark_df):
    """Convert Spark DataFrame to H2O Frame"""
    try:
        # First save as CSV
        temp_path = "/tmp/temp_data.csv"
        spark_df.toPandas().to_csv(temp_path, index=False)
        # Load into H2O
        h2o_frame = h2o.import_file(temp_path)
        os.remove(temp_path)
        return h2o_frame
    except Exception as e:
        print(f"Error converting Spark DataFrame to H2O Frame: {str(e)}")
        raise

def h2o_to_spark(h2o_frame, spark):
    """Convert H2O Frame to Spark DataFrame"""
    try:
        # Convert to pandas first
        pandas_df = h2o_frame.as_data_frame()
        # Convert to Spark
        return spark.createDataFrame(pandas_df)
    except Exception as e:
        print(f"Error converting H2O Frame to Spark DataFrame: {str(e)}")
        raise

def main():
    try:
        # Get H2O driver path
        h2o_jar = os.path.join(os.path.dirname(h2o.__file__), 'backend', 'bin', 'h2o.jar')
        if not os.path.exists(h2o_jar):
            raise Exception(f"H2O driver JAR not found at {h2o_jar}")

        print(f"Using H2O JAR: {h2o_jar}")
        os.environ["H2O_DRIVER_JAR"] = h2o_jar

        # Kill any existing H2O processes
        os.system("pkill -f h2o.jar")
        time.sleep(2)  # Wait for processes to be killed

        # Generate a unique cluster name
        cluster_name = f"h2o_spark_cluster_{str(uuid.uuid4())[:8]}"
        print(f"Using cluster name: {cluster_name}")

        # Initialize H2O first
        connection = init_h2o(cluster_name)

        # Then create Spark session
        spark = create_spark_session()

        print("\nSetup complete!")
        print(f"H2O Connection Info: {connection.base_url}")
        print(f"Spark Session Info: {spark.sparkContext.applicationId}")

        # Test the connection
        print("\nTesting H2O connection...")
        test_frame = h2o.create_frame(rows=10, cols=5)
        print("H2O test frame shape:", test_frame.shape)

        return spark, connection

    except Exception as e:
        print(f"Error: {str(e)}")
        # Clean up
        try:
            h2o.cluster().shutdown()
        except:
            pass
        raise
    finally:
        if os.path.exists("/tmp/h2o_ice_root"):
            try:
                import shutil
                shutil.rmtree("/tmp/h2o_ice_root")
            except:
                pass

if __name__ == "__main__":
    main()

In [None]:
from xgboost.spark import SparkXGBRegressor

def train_xgboost(spark, train_df, test_df):
    """Train a distributed XGBoost model with GPU support."""
    # Define feature and label columns
    feature_cols = [col for col in train_df.columns if col != "label"]
    label_col = "label"

    # Create XGBoost regressor
    xgb_regressor = SparkXGBRegressor(
        features_col=feature_cols,
        label_col=label_col,
        num_workers=3,  # Number of GPUs
        device="cuda",  # Use GPU
        tree_method="gpu_hist",  # GPU-accelerated histogram-based algorithm
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        objective="reg:squarederror",
        eval_metric="rmse"
    )

    # Train the model
    model = xgb_regressor.fit(train_df)

    # Make predictions
    predictions = model.transform(test_df)
    predictions.show()

    return model

In [None]:
from xgboost.spark import SparkXGBRegressor

def train_xgboost(spark, train_df, test_df):
    """Train a distributed XGBoost model with GPU support."""
    # Define feature and label columns
    feature_cols = [col for col in train_df.columns if col != "label"]
    label_col = "label"

    # Create XGBoost regressor
    xgb_regressor = SparkXGBRegressor(
        features_col=feature_cols,
        label_col=label_col,
        num_workers=3,  # Number of GPUs
        device="cuda",  # Use GPU
        tree_method="gpu_hist",  # GPU-accelerated histogram-based algorithm
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        objective="reg:squarederror",
        eval_metric="rmse"
    )

    # Train the model
    model = xgb_regressor.fit(train_df)

    # Make predictions
    predictions = model.transform(test_df)
    predictions.show()

    return model

In [None]:
from xgboost.spark import SparkXGBRegressor

def train_xgboost(spark, train_df, test_df):
    """Train a distributed XGBoost model with GPU support."""
    # Define feature and label columns
    feature_cols = [col for col in train_df.columns if col != "label"]
    label_col = "label"

    # Create XGBoost regressor
    xgb_regressor = SparkXGBRegressor(
        features_col=feature_cols,
        label_col=label_col,
        num_workers=3,  # Number of GPUs
        device="cuda",  # Use GPU
        tree_method="gpu_hist",  # GPU-accelerated histogram-based algorithm
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        objective="reg:squarederror",
        eval_metric="rmse"
    )

    # Train the model
    model = xgb_regressor.fit(train_df)

    # Make predictions
    predictions = model.transform(test_df)
    predictions.show()

    return model

In [None]:
from pyspark.sql import SparkSession

def create_spark_session():
    """
    Create and configure a Spark session.
    """
    spark = SparkSession.builder \
        .appName("H2O-Spark Integration") \
        .master("local[*]") \
        .config("spark.executor.memory", "200g") \
        .config("spark.driver.memory", "2g") \
        .config("spark.executor.cores", "1") \
        .getOrCreate()
    return spark

def spark_to_h2o(spark_df):
    """Convert Spark DataFrame to H2O Frame."""
    pandas_df = spark_df.toPandas()
    h2o_frame = h2o.H2OFrame(pandas_df)
    return h2o_frame

def h2o_to_spark(h2o_frame, spark):
    """Convert H2O Frame to Spark DataFrame."""
    pandas_df = h2o_frame.as_data_frame()
    spark_df = spark.createDataFrame(pandas_df)
    return spark_df

In [None]:
def main():
    try:
        # Create Spark session
        spark = create_spark_session()

        # Initialize H2O
        init_h2o()

        # Load data into Spark DataFrame
        train_data_path = "path/to/train_data.parquet"
        test_data_path = "path/to/test_data.parquet"
        train_df = spark.read.parquet(train_data_path)
        test_df = spark.read.parquet(test_data_path)

        # Convert Spark DataFrame to H2O Frame (optional)
        h2o_train_frame = spark_to_h2o(train_df)
        h2o_test_frame = spark_to_h2o(test_df)

        # Train XGBoost model with GPU support
        xgb_model = train_xgboost(spark, train_df, test_df)

        # Convert H2O Frame back to Spark DataFrame (optional)
        spark_train_df = h2o_to_spark(h2o_train_frame, spark)
        spark_test_df = h2o_to_spark(h2o_test_frame, spark)

        print("Workflow completed successfully")

    except Exception as e:
        print(f"Error: {str(e)}")
        raise
    finally:
        # Shutdown H2O cluster
        h2o.cluster().shutdown()
        # Stop Spark session
        spark.stop()

if __name__ == "__main__":
    main()

In [None]:
# Cell 2: Core Processing Functions

def analyze_feature_columns(df, prefix, sample_size=1000):
    """
    Analyze feature columns with efficient sampling
    """
    # Get feature columns
    feature_cols = sorted([col for col in df.columns
                         if col.startswith(prefix + '_') and
                         not col.endswith('_array')])

    if not feature_cols:
        return None

    # Count non-null values in first few columns
    counts = df.select([
        F.count(F.when(F.col(col).isNotNull(), True)).alias(col)
        for col in feature_cols[:5]
    ]).collect()[0].asDict()

    print(f"\n{prefix} feature sample counts: {counts}")

    # Extract valid feature numbers
    valid_numbers = []
    for col in feature_cols:
        try:
            num = int(col.split('_')[-1])
            row_count = df.select(
                F.count(F.when(F.col(col).isNotNull(), True))
            ).collect()[0][0]
            if row_count > 0:
                valid_numbers.append(num)
        except (ValueError, IndexError):
            continue

    if valid_numbers:
        # Use builtins explicitly to avoid confusion with PySpark functions
        import builtins
        min_num = builtins.min(valid_numbers)
        max_num = builtins.max(valid_numbers)
        return min_num, max_num, len(valid_numbers)

    return None

def get_feature_ranges(df):
    """
    Determine feature ranges dynamically for each feature type.
    """
    feature_ranges = {}
    feature_prefixes = ['pvm', 'sentiment', 'onchain']

    for prefix in feature_prefixes:
        try:
            result = analyze_feature_columns(df, prefix)
            if result:
                min_num, max_num, count = result
                feature_ranges[prefix] = (min_num, max_num)
                print(f"Found {count} valid {prefix} features: using range {min_num} to {max_num}")
            else:
                print(f"No valid features found for {prefix}")
        except Exception as e:
            print(f"Error processing {prefix} features: {str(e)}")
            traceback.print_exc()
            continue

    if not feature_ranges:
        raise ValueError("No valid feature ranges found")

    return feature_ranges

def process_feature_group(df, feature_prefix, start_idx, end_idx):
    """Process features in batches for better memory management."""
    print(f"\nProcessing {feature_prefix} features {start_idx} to {end_idx}")

    cols = [f'{feature_prefix}_{str(i).zfill(4)}' for i in range(start_idx, end_idx + 1)]
    existing_cols = [col for col in cols if col in df.columns]

    if not existing_cols:
        raise ValueError(f"No valid columns found for {feature_prefix}")

    print(f"Found {len(existing_cols)} columns for {feature_prefix}")
    monitor_gpu_memory()

    try:
        # Process in batches
        result = None
        batch_size = CONFIG['batch_size']

        for i in range(0, len(existing_cols), batch_size):
            batch_cols = existing_cols[i:min(i + batch_size, len(existing_cols))]

            # Process batch
            batch_df = df.select(['date', 'symbol'] + batch_cols)
            batch_df = batch_df.repartition(200)

            for col in batch_cols:
                batch_df = batch_df.withColumn(
                    col,
                    F.when(F.col(col).isNull(), 0.0)
                     .otherwise(F.col(col).cast("double"))
                )

            # Create VectorAssembler for batch
            assembler = VectorAssembler(
                inputCols=batch_cols,
                outputCol=f"features_{feature_prefix}_{start_idx}_{end_idx}",
                handleInvalid="keep"
            )

            batch_df = assembler.transform(batch_df)

            if result is None:
                result = batch_df
            else:
                result = result.unionAll(batch_df)

            # Clean up batch
            cleanup_memory()

        monitor_gpu_memory()
        return result

    except Exception as e:
        print(f"Error processing {feature_prefix}: {str(e)}")
        cleanup_memory()
        raise

def load_data_in_chunks(spark, path):
    """Load and process data in chunks with optimized resource utilization."""
    chunk_size = CONFIG['chunk_size']
    print(f"\nLoading data in chunks of {chunk_size} rows...")

    try:
        # Get total row count
        total_rows = spark.read.parquet(path).count()
        print(f"Total rows to process: {total_rows}")

        # Get GPU count
        if NVML_AVAILABLE:
            pynvml.nvmlInit()
            num_gpus = pynvml.nvmlDeviceGetCount()
        else:
            num_gpus = 3  # Default to 1 if no GPUs are detected

        # Calculate partition size
        partition_size = builtins.max(chunk_size // num_gpus, 1000)  # Explicitly use built-in max

        chunks = []
        for start in range(0, total_rows, chunk_size):
            end = start + chunk_size if start + chunk_size < total_rows else total_rows

            # Read chunk
            chunk = spark.read.parquet(path).repartition(num_gpus * 3).orderBy('date').limit(chunk_size)
            chunks.append(chunk)
            print(f"Loaded chunk {len(chunks)}: rows {start} to {end}")
            monitor_gpu_memory()

        print(f"Loaded {len(chunks)} chunks")

        # Union all chunks
        result = reduce(DataFrame.unionAll, chunks)
        result = result.repartition(num_gpus * 3)
        return result

    except Exception as e:
        print(f"Error loading data: {str(e)}")
        traceback.print_exc()
        return None

def convert_to_h2o_frames(results, chunk_size=10000):
    """Convert Spark DataFrames to H2O Frames with chunking."""
    h2o_frames = {}

    for feature_type, result_df in results.items():
        try:
            feature_col = f"features_{feature_type}_1_50_array"

            # Process in chunks
            total_rows = result_df.count()
            num_chunks = (total_rows + chunk_size - 1) // chunk_size

            all_chunks = []
            for i in range(num_chunks):
                # Get chunk of data
                chunk_df = result_df.limit(chunk_size).offset(i * chunk_size)
                chunk_rows = chunk_df.select("date", "symbol", feature_col).collect()

                if not chunk_rows:
                    continue

                # Create chunk DataFrame
                chunk_data = []
                for row in chunk_rows:
                    date = str(row['date']) if row['date'] is not None else ''
                    symbol = str(row['symbol']) if row['symbol'] is not None else ''
                    features = row[feature_col]

                    if features is None:
                        features = [np.nan] * 50
                    else:
                        features = (features + [np.nan] * 50)[:50]

                    features = [None if pd.isna(f) else float(f) for f in features]
                    chunk_data.append([date, symbol] + features)

                all_chunks.extend(chunk_data)

            # Convert all chunks to H2O Frame
            if all_chunks:
                columns = ['date', 'symbol'] + [
                    f'{feature_type}_{i+1:04d}'
                    for i in range(len(all_chunks[0])-2)
                ]

                pandas_df = pd.DataFrame(all_chunks, columns=columns)
                pandas_df = pandas_df[
                    (pandas_df['date'] != '') &
                    (pandas_df['symbol'] != '')
                ]

                h2o_frame = h2o.H2OFrame(pandas_df)
                h2o_frames[feature_type] = h2o_frame
                print(f"Successfully created H2O frame for {feature_type}")

        except Exception as e:
            print(f"Error importing {feature_type} to H2O Frame: {e}")
            traceback.print_exc()

    return h2o_frames

def run_enhanced_automl(h2o_frames, target_strategy='random'):
    """Run AutoML on H2O frames with enhanced group analysis and combined predictions"""
    def add_target_column(h2o_frames, target_strategy='random'):
        """Add target column to H2O frames."""
        frames_with_target = h2o_frames.copy()

        for feature_type, h2o_frame in frames_with_target.items():
            num_rows = h2o_frame.nrows

            if target_strategy == 'random':
                target = h2o.H2OFrame(np.random.randint(0, 2, size=(num_rows, 1)),
                                    column_names=['target'])
                target = target.asfactor()
            elif target_strategy == 'symbol_based':
                target = h2o.H2OFrame(
                    h2o_frame['symbol']
                        .apply(lambda x: 1 if '$' in str(x) else 0)
                        .as_data_frame()
                ).set_names(['target']).asfactor()
            elif target_strategy == 'date_based':
                target = h2o.H2OFrame(
                    h2o_frame['date']
                        .apply(lambda x: 1 if str(x)[:4] > '2020' else 0)
                        .as_data_frame()
                ).set_names(['target']).asfactor()
            else:
                raise ValueError(f"Unknown target strategy: {target_strategy}")

            h2o_frame['target'] = target
            frames_with_target[feature_type] = h2o_frame

        return frames_with_target

    def create_combined_frame(frames_with_target):
        """Create a combined H2O frame with features from all groups."""
        print("\nCreating combined feature frame...")
        try:
            # Get the first frame to use as base
            base_frame = next(iter(frames_with_target.values()))

            # Convert base frame to pandas first
            base_pd = base_frame.as_data_frame()

            # Initialize combined data with core columns
            combined_data = base_pd[['date', 'symbol']].copy()

            # Add target column
            target_pd = base_frame['target'].as_data_frame()
            combined_data['target'] = target_pd

            # Add features from each group with prefixes
            for feature_type, h2o_frame in frames_with_target.items():
                feature_cols = [col for col in h2o_frame.columns
                              if col not in ['date', 'symbol', 'target']]

                if not feature_cols:
                    print(f"Warning: No feature columns found for {feature_type}")
                    continue

                # Convert feature columns to pandas
                feature_df = h2o_frame[feature_cols].as_data_frame()

                # Add prefix to column names
                feature_df.columns = [f"{feature_type}_{col}" for col in feature_cols]

                print(f"\nAdding {len(feature_cols)} features from {feature_type}")
                print("Sample of new feature names:", list(feature_df.columns)[:3])

                # Join with combined data
                combined_data = pd.concat([combined_data, feature_df], axis=1)

            print(f"\nFinal combined frame shape: {combined_data.shape}")
            print(f"Total features: {combined_data.shape[1] - 3}")

            # Convert back to H2O Frame
            combined_h2o = h2o.H2OFrame(combined_data)
            combined_h2o['target'] = combined_h2o['target'].asfactor()

            return combined_h2o

        except Exception as e:
            print(f"Error in create_combined_frame: {str(e)}")
            traceback.print_exc()
            return None

    def run_automl_for_frame(train, valid, name, max_runtime_secs=1200):
        """Run AutoML for a specific frame with comprehensive settings."""
        print(f"\nRunning AutoML for {name}")

        train['target'] = train['target'].asfactor()
        valid['target'] = valid['target'].asfactor()

        predictors = [col for col in train.columns
                     if col not in ['date', 'symbol', 'target']]

        aml = H2OAutoML(
            max_runtime_secs=max_runtime_secs,
            seed=1234,
            sort_metric='AUC',
            max_models=CONFIG['max_models'],
            stopping_metric='AUC',
            nfolds=CONFIG['nfolds'],
            keep_cross_validation_predictions=True,
            verbosity='debug'
        )

        try:
            aml.train(
                x=predictors,
                y='target',
                training_frame=train,
                validation_frame=valid
            )

            best_model = aml.leader
            performance = best_model.model_performance(valid)

            # Enhanced model analysis with better error handling
            try:
                cv_metrics = None
                if hasattr(best_model, 'cross_validation_metrics'):
                    try:
                        cv_metrics = best_model.cross_validation_metrics()
                    except:
                        print(f"Warning: Could not get cross validation metrics for {name}")

                variable_importance = None
                if hasattr(best_model, 'varimp'):
                    try:
                        variable_importance = best_model.varimp()
                    except:
                        print(f"Warning: Could not get variable importance for {name}")

                model_analysis = {
                    'automl': aml,
                    'best_model': best_model,
                    'leaderboard': aml.leaderboard,
                    'performance': performance,
                    'cv_metrics': cv_metrics,
                    'variable_importance': variable_importance
                }
            except Exception as e:
                print(f"Warning: Error in model analysis for {name}: {str(e)}")
                model_analysis = {
                    'automl': aml,
                    'best_model': best_model,
                    'leaderboard': aml.leaderboard,
                    'performance': performance,
                    'cv_metrics': None,
                    'variable_importance': None
                }

            print(f"\nBest {name} Model Performance:")
            print(performance)
            print("\nConfusion Matrix:")
            print(performance.confusion_matrix())
            print(f"\n{name} Leaderboard:")
            print(aml.leaderboard)

            return model_analysis

        except Exception as e:
            print(f"Error in AutoML for {name}: {e}")
            traceback.print_exc()
            return None

    # Add target column to all frames
    frames_with_target = add_target_column(h2o_frames, target_strategy)

    # Create combined frame
    combined_frame = create_combined_frame(frames_with_target)

    # Results dictionary for all models
    all_results = {
        'individual_models': {},
        'combined_model': None,
        'ensemble_predictions': None,
        'feature_analysis': {}
    }

    # Run AutoML for each feature type
    for feature_type, h2o_frame in frames_with_target.items():
        train, valid = h2o_frame.split_frame(ratios=[0.8], seed=1234)
        result = run_automl_for_frame(train, valid, feature_type)

        if result:
            all_results['individual_models'][feature_type] = result

    # Run AutoML on combined features
    if combined_frame is not None:
        train_combined, valid_combined = combined_frame.split_frame(ratios=[0.8], seed=1234)
        combined_result = run_automl_for_frame(
            train_combined,
            valid_combined,
            "combined_features",
            max_runtime_secs=CONFIG['max_runtime_secs_combined']
        )

        if combined_result:
            all_results['combined_model'] = combined_result

    # Create ensemble predictions
    if all_results['individual_models']:
        print("\nCreating ensemble predictions...")
        ensemble_predictions = {}
        weights = {}

        # Calculate weights based on model performance
        for feature_type, result in all_results['individual_models'].items():
            auc = result['performance'].auc()
            weights[feature_type] = auc

        # Normalize weights
        total_weight = sum(weights.values())
        weights = {k: v/total_weight for k, v in weights.items()}

        # Get predictions from all models
        for feature_type, result in all_results['individual_models'].items():
            predictions = result['best_model'].predict(valid_combined)
            ensemble_predictions[feature_type] = predictions

        # Calculate weighted ensemble predictions
        weighted_preds = None
        for feature_type, preds in ensemble_predictions.items():
            if weighted_preds is None:
                weighted_preds = preds * weights[feature_type]
            else:
                weighted_preds += preds * weights[feature_type]

        all_results['ensemble_predictions'] = {
            'predictions': weighted_preds,
            'weights': weights
        }

    # Analyze feature importance across all models
    feature_importance = {}
    for feature_type, result in all_results['individual_models'].items():
        if result['variable_importance'] is not None:
            feature_importance[feature_type] = result['variable_importance']

    if all_results['combined_model'] and all_results['combined_model']['variable_importance'] is not None:
        feature_importance['combined'] = all_results['combined_model']['variable_importance']

    all_results['feature_analysis'] = feature_importance

    return all_results

## oude code voor deepseek.com

In [None]:
from pysparkling import H2OContext

import torch
import pynvml

def clear_gpu_memory():
    """Clear GPU memory."""
    try:
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            print("PyTorch GPU cache cleared.")

        if pynvml:
            pynvml.nvmlInit()
            device_count = pynvml.nvmlDeviceGetCount()
            for i in range(device_count):
                handle = pynvml.nvmlDeviceGetHandleByIndex(i)
                mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
                print(f"Before Cleanup - GPU {i}: Used {mem_info.used / 1024**3:.2f} GB")
                # Reset compute mode (if necessary)
                try:
                    pynvml.nvmlDeviceSetComputeMode(handle, pynvml.NVML_COMPUTEMODE_DEFAULT)
                except pynvml.NVMLError as e:
                    print(f"Could not reset compute mode for GPU {i}: {str(e)}")
                mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
                print(f"After Cleanup - GPU {i}: Used {mem_info.used / 1024**3:.2f} GB")
        else:
            print("pynvml not available.")
    except Exception as e:
        print(f"Error during GPU memory cleanup: {str(e)}")


In [None]:
'''
%%time
# Cell 3: Main Execution

def analyze_results(enhanced_results):
    """Analyze and visualize model results."""
    if not enhanced_results:
        print("No results to analyze")
        return

    try:
        # Create performance comparison plots
        plt.figure(figsize=(12, 6))

        # Model performance comparison
        aucs = []
        accuracies = []
        model_names = []

        # Gather metrics from individual models
        for feature_type, results in enhanced_results['individual_models'].items():
            aucs.append(results['performance'].auc())
            accuracies.append(results['performance'].accuracy())
            model_names.append(feature_type)

        # Add combined model metrics if available
        if enhanced_results.get('combined_model'):
            aucs.append(enhanced_results['combined_model']['performance'].auc())
            accuracies.append(enhanced_results['combined_model']['performance'].accuracy())
            model_names.append('combined')

        # Plot performance metrics
        x = np.arange(len(model_names))
        width = 0.35

        fig, ax = plt.subplots(figsize=(10, 6))
        ax.bar(x - width/2, aucs, width, label='AUC')
        ax.bar(x + width/2, accuracies, width, label='Accuracy')

        ax.set_ylabel('Score')
        ax.set_title('Model Performance Comparison')
        ax.set_xticks(x)
        ax.set_xticklabels(model_names)
        ax.legend()

        plt.tight_layout()
        plt.show()

        # Feature importance visualization
        if enhanced_results.get('feature_analysis'):
            for model_type, importance in enhanced_results['feature_analysis'].items():
                if importance is not None:
                    plt.figure(figsize=(10, 6))
                    if isinstance(importance, pd.DataFrame):
                        importance.head(10).plot(kind='barh')
                    else:
                        imp_df = pd.DataFrame(importance).head(10)
                        imp_df.plot(kind='barh')
                    plt.title(f'{model_type} Feature Importance')
                    plt.tight_layout()
                    plt.show()
    except Exception as e:
        print(f"Error in result analysis: {str(e)}")
        traceback.print_exc()

def main():
    """Main execution function with memory management."""
    spark = None
    h2o_conn = None
    ice_dir = None
    cached_dfs = []

    try:
        # Setup environment
        if not setup_environment():
            print("Environment setup failed.")
            return None, None, None, None, None

        # Create Spark session and initialize H2O
        spark = create_spark_session()
        h2o_conn, ice_dir = initialize_h2o()

        if not h2o_conn:
            print("Failed to initialize H2O.")
            return None, None, None, None, None

        # Load data in chunks
        print("\nLoading data...")
        absolute_path = os.path.abspath('yiedl_latest.parquet')
        df = load_data_in_chunks(spark, "file://" + absolute_path)

        if df is None:
            print("Failed to load data.")
            return None, None, None, None, None

        df.cache()
        cached_dfs.append(df)

        # Get feature ranges
        feature_types = get_feature_ranges(df)
        if not feature_types:
            print("No features found to process.")
            return None, None, None, None, None

        print("\nDetected feature ranges:", feature_types)

        # Process features
        results = {}
        for feature_type, (start, end) in feature_types.items():
            try:
                result_df = process_feature_group(df, feature_type, start, end)
                result_df.cache()
                cached_dfs.append(result_df)
                results[feature_type] = result_df

                feature_col = f"features_{feature_type}_{start}_{end}_array"
                print(f"\n{feature_type.upper()} Features Sample:")
                result_df.select("date", "symbol", feature_col).show(5, truncate=False)

            except Exception as e:
                print(f"Error processing {feature_type}: {str(e)}")
                continue

        if not results:
            print("No features were successfully processed.")
            return None, None, None, None, None

        # Convert to H2O frames
        print("\nConverting to H2O frames...")
        h2o_frames = convert_to_h2o_frames(results)

        if not h2o_frames:
            print("Failed to create H2O frames.")
            return None, None, None, None, None

        # Run Enhanced AutoML
        print("\nRunning Enhanced AutoML with combined features and ensemble predictions...")
        enhanced_results = run_enhanced_automl(h2o_frames, target_strategy='random')

        if enhanced_results:
            print("\nModel Performance Summary:")

            # Individual model results
            for feature_type, model_results in enhanced_results['individual_models'].items():
                print(f"\n{feature_type.upper()} Model:")
                print(f"AUC: {model_results['performance'].auc():.4f}")
                print(f"Accuracy: {model_results['performance'].accuracy():.4f}")
                if model_results.get('cv_metrics'):
                    print(f"CV AUC: {model_results['cv_metrics'].auc():.4f}")

            # Combined model results
            if enhanced_results.get('combined_model'):
                print("\nCombined Model:")
                combined_perf = enhanced_results['combined_model']['performance']
                print(f"AUC: {combined_perf.auc():.4f}")
                print(f"Accuracy: {combined_perf.accuracy():.4f}")

            # Ensemble results
            if enhanced_results.get('ensemble_predictions'):
                print("\nEnsemble Weights:")
                for model_type, weight in enhanced_results['ensemble_predictions']['weights'].items():
                    print(f"{model_type}: {weight:.4f}")

        return spark, h2o_conn, results, h2o_frames, enhanced_results

    except Exception as e:
        print(f"Processing error: {str(e)}")
        traceback.print_exc()
        return None, None, None, None, None

    finally:
        # Cleanup
        print("\nPerforming cleanup...")
        cleanup_memory()

        # Unpersist cached DataFrames
        for cached_df in cached_dfs:
            try:
                if cached_df is not None:
                    cached_df.unpersist()
            except Exception as e:
                print(f"Error unpersisting DataFrame: {str(e)}")

        # Cleanup H2O and remove ice directory
        try:
            if h2o.connection():
                print("Shutting down H2O cluster...")
                h2o.cluster().shutdown()
            if ice_dir and os.path.exists(ice_dir):
                print(f"Removing H2O ice directory: {ice_dir}")
                import shutil
                shutil.rmtree(ice_dir)
        except Exception as e:
            print(f"H2O cleanup error: {str(e)}")

        # Stop Spark
        try:
            if spark:
                spark.stop()
                print("Spark session stopped successfully")
        except Exception as e:
            print(f"Error stopping Spark: {str(e)}")

if __name__ == "__main__":
    # Run main process
    spark, h2o_conn, results, h2o_frames, enhanced_results = main()

    # Analyze results if available
    if enhanced_results:
        analyze_results(enhanced_results)
'''

'\n%%time\n# Cell 3: Main Execution\n\ndef analyze_results(enhanced_results):\n    """Analyze and visualize model results."""\n    if not enhanced_results:\n        print("No results to analyze")\n        return\n    \n    try:\n        # Create performance comparison plots\n        plt.figure(figsize=(12, 6))\n        \n        # Model performance comparison\n        aucs = []\n        accuracies = []\n        model_names = []\n        \n        # Gather metrics from individual models\n        for feature_type, results in enhanced_results[\'individual_models\'].items():\n            aucs.append(results[\'performance\'].auc())\n            accuracies.append(results[\'performance\'].accuracy())\n            model_names.append(feature_type)\n        \n        # Add combined model metrics if available\n        if enhanced_results.get(\'combined_model\'):\n            aucs.append(enhanced_results[\'combined_model\'][\'performance\'].auc())\n            accuracies.append(enhanced_results[\'

### old cells below for research using claude.ai en chatgpt

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F
import h2o
from h2o.automl import H2OAutoML
import traceback
import numpy as np
import os
import sys
import socket
import subprocess
import seaborn as sns
import pandas as pd


def setup_environment():
    """
    Set up Python environment with necessary dependencies.

    Returns:
        bool: True if setup is successful, False otherwise
    """
    print("Setting up Python environment...")

    # List of required packages
    required_packages = [
        'pyspark',
        'h2o',
        'numpy',
        'pandas'
    ]

    try:
        # Verify and install required packages
        for package in required_packages:
            try:
                __import__(package.replace('-', '_'))
                print(f"{package} is already installed.")
            except ImportError:
                print(f"Installing {package}...")
                subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])

        return True
    except Exception as e:
        print(f"Environment setup error: {e}")
        return False

def get_local_ip():
    """Retrieve the local IP address."""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except Exception:
        return "127.0.0.1"

def create_spark_session():
    """
    Create a comprehensive Spark session with optimal configurations.

    Returns:
        SparkSession: Configured Spark session
    """
    from pyspark.sql import SparkSession

    # Get local IP for configuration
    local_ip = get_local_ip()

    # Create Spark session with comprehensive configurations
    spark = SparkSession.builder \
        .appName("SparkH2ODataProcessor") \
        .master("local[*]") \
        .config("spark.driver.host", local_ip) \
        .config("spark.driver.bindAddress", local_ip) \
        .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
        .config("spark.driver.memory", "180g") \
        .config("spark.executor.memory", "180g") \
        .config("spark.network.timeout", "3600s") \
        .config("spark.executor.heartbeatInterval", "120s") \
        .config("spark.sql.session.timeZone", "UTC") \
        .getOrCreate()

    return spark

def initialize_h2o_context():
    """
    Initialize H2O context with robust error handling.

    Returns:
        H2OContext or None: Initialized H2O context
    """
    try:
        print("Initializing H2O...")

        h2o.init(
            ip="localhost",
            port=54321,
            strict_version_check=False,
            max_mem_size="180G"
        )

        return h2o.connection()

    except Exception as e:
        print(f"H2O initialization error: {e}")
        traceback.print_exc()
        return None

def process_feature_group(df, feature_prefix, start_idx, end_idx):
    """
    Process a group of features and return transformed dataframe.
    """
    from pyspark.ml.feature import VectorAssembler
    from pyspark.sql.types import ArrayType, DoubleType

    # Generate column names
    cols = [f'{feature_prefix}_{str(i).zfill(4)}' for i in range(start_idx, end_idx + 1)]

    # Select relevant columns and cast to double
    df_subset = df.select(['date', 'symbol'] + cols)

    for col_name in cols:
        df_subset = df_subset.withColumn(col_name, F.col(col_name).cast("double"))

    # Vector to array conversion function
    def vector_to_array(vector):
        return vector.toArray().tolist() if vector is not None else None
    vector_to_array_udf = F.udf(vector_to_array, ArrayType(DoubleType()))

    # Create VectorAssembler
    assembler = VectorAssembler(
        inputCols=cols,
        outputCol=f"features_{feature_prefix}_{start_idx}_{end_idx}",
        handleInvalid="keep"
    )

    # Transform and add array column
    result = assembler.transform(df_subset).withColumn(
        f"features_{feature_prefix}_{start_idx}_{end_idx}_array",
        vector_to_array_udf(f"features_{feature_prefix}_{start_idx}_{end_idx}")
    )

    return result

def convert_to_h2o_frames(results):
    """
    Convert Spark DataFrames to H2O Frames with robust handling.
    """
    import pandas as pd

    h2o_frames = {}
    for feature_type, result_df in results.items():
        feature_col = f"features_{feature_type}_1_50_array"

        # Collect rows and prepare data
        rows = result_df.select("date", "symbol", feature_col).collect()

        # Create column names
        columns = ['date', 'symbol'] + [f'{feature_type}_{i+1:04d}' for i in range(50)]

        # Prepare data for conversion
        data_rows = []
        for row in rows:
            # Safely extract values
            date = str(row['date']) if row['date'] is not None else ''
            symbol = str(row['symbol']) if row['symbol'] is not None else ''
            features = row[feature_col]

            # Ensure features is a list of 50 elements
            if features is None:
                features = [np.nan] * 50
            else:
                features = (features + [np.nan] * 50)[:50]

            # Validate and convert features
            features = [
                None if pd.isna(f) else float(f)
                for f in features
            ]

            data_rows.append([date, symbol] + features)

        # Convert to Pandas DataFrame
        pandas_df = pd.DataFrame(data_rows, columns=columns)

        # Remove any rows with completely empty date or symbol
        pandas_df = pandas_df[
            (pandas_df['date'] != '') &
            (pandas_df['symbol'] != '')
        ]

        try:
            # Try H2O import with minimal parameters
            h2o_frame = h2o.H2OFrame(pandas_df)
            h2o_frames[feature_type] = h2o_frame
        except Exception as e:
            print(f"Error importing {feature_type} to H2O Frame: {e}")
            traceback.print_exc()

    return h2o_frames

def run_enhanced_automl(h2o_frames, target_strategy='random'):
    """
    Run AutoML on H2O frames with enhanced group analysis and combined predictions
    """
    def add_target_column(h2o_frames, target_strategy='random'):
        """Add target column to H2O frames."""
        frames_with_target = h2o_frames.copy()

        for feature_type, h2o_frame in frames_with_target.items():
            num_rows = h2o_frame.nrows

            if target_strategy == 'random':
                target = h2o.H2OFrame(np.random.randint(0, 2, size=(num_rows, 1)),
                                    column_names=['target'])
                target = target.asfactor()
            elif target_strategy == 'symbol_based':
                target = h2o.H2OFrame(
                    h2o_frame['symbol']
                        .apply(lambda x: 1 if '$' in str(x) else 0)
                        .as_data_frame()
                ).set_names(['target']).asfactor()
            elif target_strategy == 'date_based':
                target = h2o.H2OFrame(
                    h2o_frame['date']
                        .apply(lambda x: 1 if str(x)[:4] > '2020' else 0)
                        .as_data_frame()
                ).set_names(['target']).asfactor()
            else:
                raise ValueError(f"Unknown target strategy: {target_strategy}")

            h2o_frame['target'] = target
            frames_with_target[feature_type] = h2o_frame

        return frames_with_target

    def create_combined_frame(frames_with_target):
        """Create a combined H2O frame with features from all groups."""
        print("\nCreating combined feature frame...")
        try:
            # Get the first frame to use as base
            base_frame = next(iter(frames_with_target.values()))

            # Convert base frame to pandas first
            base_pd = base_frame.as_data_frame()

            # Initialize combined data with core columns
            combined_data = base_pd[['date', 'symbol']].copy()

            # Add target column
            target_pd = base_frame['target'].as_data_frame()
            combined_data['target'] = target_pd

            # Add features from each group with prefixes
            for feature_type, h2o_frame in frames_with_target.items():
                feature_cols = [col for col in h2o_frame.columns
                              if col not in ['date', 'symbol', 'target']]

                if not feature_cols:
                    print(f"Warning: No feature columns found for {feature_type}")
                    continue

                # Convert feature columns to pandas
                feature_df = h2o_frame[feature_cols].as_data_frame()

                # Add prefix to column names
                feature_df.columns = [f"{feature_type}_{col}" for col in feature_cols]

                print(f"\nAdding {len(feature_cols)} features from {feature_type}")
                print("Sample of new feature names:", list(feature_df.columns)[:3])

                # Join with combined data
                combined_data = pd.concat([combined_data, feature_df], axis=1)

            print(f"\nFinal combined frame shape: {combined_data.shape}")
            print(f"Total features: {combined_data.shape[1] - 3}")

            # Convert back to H2O Frame
            combined_h2o = h2o.H2OFrame(combined_data)
            combined_h2o['target'] = combined_h2o['target'].asfactor()

            return combined_h2o

        except Exception as e:
            print(f"Error in create_combined_frame: {str(e)}")
            traceback.print_exc()
            return None

    def run_automl_for_frame(train, valid, name, max_runtime_secs=1200):
        """Run AutoML for a specific frame with comprehensive settings."""
        print(f"\nRunning AutoML for {name}")

        train['target'] = train['target'].asfactor()
        valid['target'] = valid['target'].asfactor()

        predictors = [col for col in train.columns
                     if col not in ['date', 'symbol', 'target']]

        aml = H2OAutoML(
            max_runtime_secs=max_runtime_secs,
            seed=1234,
            sort_metric='AUC',
            max_models=100,
            stopping_metric='AUC',
            nfolds=5,
            keep_cross_validation_predictions=True,
            verbosity='debug'
        )

        try:
            aml.train(
                x=predictors,
                y='target',
                training_frame=train,
                validation_frame=valid
            )

            best_model = aml.leader
            performance = best_model.model_performance(valid)

            # Enhanced model analysis
            model_analysis = {
                'automl': aml,
                'best_model': best_model,
                'leaderboard': aml.leaderboard,
                'performance': performance,
                'cv_metrics': aml.cross_validation_metrics(),
                'variable_importance': best_model.varimp() if hasattr(best_model, 'varimp') else None
            }

            print(f"\nBest {name} Model Performance:")
            print(performance)
            print("\nConfusion Matrix:")
            print(performance.confusion_matrix())
            print(f"\n{name} Leaderboard:")
            print(aml.leaderboard)

            return model_analysis

        except Exception as e:
            print(f"Error in AutoML for {name}: {e}")
            traceback.print_exc()
            return None

    # Add target column to all frames
    frames_with_target = add_target_column(h2o_frames, target_strategy)

    # Create combined frame
    combined_frame = create_combined_frame(frames_with_target)

    # Results dictionary for all models
    all_results = {
        'individual_models': {},
        'combined_model': None,
        'ensemble_predictions': None,
        'feature_analysis': {}
    }

    # Run AutoML for each feature type
    for feature_type, h2o_frame in frames_with_target.items():
        train, valid = h2o_frame.split_frame(ratios=[0.8], seed=1234)
        result = run_automl_for_frame(train, valid, feature_type)

        if result:
            all_results['individual_models'][feature_type] = result

    # Run AutoML on combined features
    if combined_frame is not None:
        train_combined, valid_combined = combined_frame.split_frame(ratios=[0.8], seed=1234)
        combined_result = run_automl_for_frame(
            train_combined,
            valid_combined,
            "combined_features",
            max_runtime_secs=2400
        )

        if combined_result:
            all_results['combined_model'] = combined_result

    # Create ensemble predictions
    if all_results['individual_models']:
        print("\nCreating ensemble predictions...")
        ensemble_predictions = {}
        weights = {}

        # Calculate weights based on model performance
        for feature_type, result in all_results['individual_models'].items():
            auc = result['performance'].auc()
            weights[feature_type] = auc

        # Normalize weights
        total_weight = sum(weights.values())
        weights = {k: v/total_weight for k, v in weights.items()}

        # Get predictions from all models
        for feature_type, result in all_results['individual_models'].items():
            predictions = result['best_model'].predict(valid_combined)
            ensemble_predictions[feature_type] = predictions

        # Calculate weighted ensemble predictions
        weighted_preds = None
        for feature_type, preds in ensemble_predictions.items():
            if weighted_preds is None:
                weighted_preds = preds * weights[feature_type]
            else:
                weighted_preds += preds * weights[feature_type]

        all_results['ensemble_predictions'] = {
            'predictions': weighted_preds,
            'weights': weights
        }

    # Analyze feature importance across all models
    feature_importance = {}
    for feature_type, result in all_results['individual_models'].items():
        if result['variable_importance'] is not None:
            feature_importance[feature_type] = result['variable_importance']

    if all_results['combined_model'] and all_results['combined_model']['variable_importance'] is not None:
        feature_importance['combined'] = all_results['combined_model']['variable_importance']

    all_results['feature_analysis'] = feature_importance

    return all_results

def main():
    """
    Main data processing function with comprehensive error handling and enhanced AutoML integration.
    """
    # Verify and set up environment
    if not setup_environment():
        print("Environment setup failed.")
        return None, None, None, None, None

    # Create Spark session
    spark = create_spark_session()

    # Initialize H2O context
    h2o_conn = initialize_h2o_context()

    try:
        # Test Spark connection
        print("Testing Spark connection...")
        test_df = spark.createDataFrame([(1, "test")], ["id", "value"])
        test_df.show()

        # Check H2O connection
        if not h2o_conn:
            print("Failed to initialize H2O.")
            return None, None, None, None, None

        # Load data
        print("\nLoading data...")
        absolute_path = os.path.abspath('yiedl_latest.parquet')
        df = spark.read.parquet("file://" + absolute_path)

        # Process different feature types
        feature_types = {
            'pvm': (1, 50),
            'sentiment': (1, 50),
            'onchain': (1, 50)
        }

        results = {}
        for feature_type, (start, end) in feature_types.items():
            print(f"\nProcessing {feature_type} features {start}-{end}...")
            results[feature_type] = process_feature_group(df, feature_type, start, end)

            feature_col = f"features_{feature_type}_{start}_{end}_array"
            print(f"\n{feature_type.upper()} Features Sample:")
            results[feature_type].select("date", "symbol", feature_col).show(5, truncate=False)

            print(f"\nStats for {feature_type}:")
            results[feature_type].select(F.count("*").alias("count")).show()

        # Convert to H2O Frames
        h2o_frames = convert_to_h2o_frames(results)

        # Run Enhanced AutoML
        print("\nRunning Enhanced AutoML with combined features and ensemble predictions...")
        enhanced_results = run_enhanced_automl(h2o_frames, target_strategy='random')

        # Print comprehensive results summary
        if enhanced_results:
            print("\nEnhanced AutoML Results Summary:")

            # Individual model results
            for feature_type, model_results in enhanced_results['individual_models'].items():
                print(f"\n{feature_type.upper()} Model Performance:")
                print(f"AUC: {model_results['performance'].auc()}")
                print(f"Accuracy: {model_results['performance'].accuracy()}")
                print("\nTop Models:")
                print(model_results['leaderboard'].head().as_data_frame())

            # Combined model results
            if enhanced_results['combined_model']:
                print("\nCombined Features Model Performance:")
                combined_perf = enhanced_results['combined_model']['performance']
                print(f"AUC: {combined_perf.auc()}")
                print(f"Accuracy: {combined_perf.accuracy()}")
                print("\nTop Combined Models:")
                print(enhanced_results['combined_model']['leaderboard'].head().as_data_frame())

            # Ensemble results
            if enhanced_results['ensemble_predictions']:
                print("\nEnsemble Model Weights:")
                for model_type, weight in enhanced_results['ensemble_predictions']['weights'].items():
                    print(f"{model_type}: {weight:.4f}")

            # Feature importance analysis
            if enhanced_results['feature_analysis']:
                print("\nFeature Importance Summary:")
                for model_type, importance in enhanced_results['feature_analysis'].items():
                    if importance is not None:
                        print(f"\n{model_type.upper()} Top Important Features:")
                        if isinstance(importance, pd.DataFrame):
                            print(importance.head())
                        else:
                            print(importance)

        return spark, h2o_conn, results, h2o_frames, enhanced_results

    except Exception as e:
        print(f"Processing error: {e}")
        traceback.print_exc()
        return None, None, None, None, None

    finally:
        # Cleanup
        try:
            if 'h2o' in sys.modules:
                h2o.cluster().shutdown()
            if spark:
                spark.stop()
        except Exception as cleanup_error:
            print(f"Cleanup error: {cleanup_error}")

def analyze_enhanced_results(enhanced_results):
    """
    Analyze and visualize the enhanced AutoML results
    """
    if not enhanced_results:
        print("No enhanced results to analyze")
        return

    # Create performance comparison plots
    plt.figure(figsize=(12, 6))

    # Model performance comparison
    aucs = []
    accuracies = []
    model_names = []

    # Gather metrics from individual models
    for feature_type, results in enhanced_results['individual_models'].items():
        aucs.append(results['performance'].auc())
        accuracies.append(results['performance'].accuracy())
        model_names.append(feature_type)

    # Add combined model metrics if available
    if enhanced_results['combined_model']:
        aucs.append(enhanced_results['combined_model']['performance'].auc())
        accuracies.append(enhanced_results['combined_model']['performance'].accuracy())
        model_names.append('combined')

    # Plot performance metrics
    x = np.arange(len(model_names))
    width = 0.35

    fig, ax = plt.subplots(figsize=(10, 6))
    ax.bar(x - width/2, aucs, width, label='AUC')
    ax.bar(x + width/2, accuracies, width, label='Accuracy')

    ax.set_ylabel('Score')
    ax.set_title('Model Performance Comparison')
    ax.set_xticks(x)
    ax.set_xticklabels(model_names)
    ax.legend()

    plt.tight_layout()
    plt.show()

    # Feature importance visualization
    if enhanced_results['feature_analysis']:
        for model_type, importance in enhanced_results['feature_analysis'].items():
            if importance is not None:
                plt.figure(figsize=(10, 6))

                if isinstance(importance, pd.DataFrame):
                    importance.head(10).plot(kind='barh')
                else:
                    # Convert to DataFrame if not already
                    imp_df = pd.DataFrame(importance).head(10)
                    imp_df.plot(kind='barh')

                plt.title(f'{model_type} Feature Importance')
                plt.tight_layout()
                plt.show()

# Run the entire process
if __name__ == "__main__":
    spark, h2o_conn, results, h2o_frames, enhanced_results = main()

    if enhanced_results:
        analyze_enhanced_results(enhanced_results)

In [None]:
def analyze_feature_group_models(feature_group_results, h2o_frames):
    """
    Analyze and combine results from different feature group models

    Args:
        feature_group_results (dict): Results from each feature group model
        h2o_frames (dict): H2O frames used for training

    Returns:
        dict: Combined analysis results
    """
    try:
        logger.info("Analyzing feature group model results...")

        combined_analysis = {
            'model_metrics': {},
            'feature_importance': {},
            'cross_group_correlations': {},
            'ensemble_predictions': None
        }

        # Analyze each feature group's model performance
        for feature_type, results in feature_group_results.items():
            # Extract model metrics
            model_metrics = {
                'auc': results.model.model_performance().auc(),
                'logloss': results.model.model_performance().logloss(),
                'accuracy': results.model.model_performance().accuracy(),
                'precision': results.model.model_performance().precision(),
                'recall': results.model.model_performance().recall()
            }
            combined_analysis['model_metrics'][feature_type] = model_metrics

            # Get feature importance if available
            try:
                feature_importance = results.model.varimp(use_pandas=True)
                combined_analysis['feature_importance'][feature_type] = feature_importance
            except:
                logger.warning(f"Could not extract feature importance for {feature_type}")

        # Calculate cross-group prediction correlations
        predictions = {}
        for feature_type, results in feature_group_results.items():
            pred = results.model.predict(h2o_frames[feature_type])
            predictions[feature_type] = pred.as_data_frame()['predict']

        # Convert predictions to pandas DataFrame for correlation analysis
        pred_df = pd.DataFrame(predictions)
        combined_analysis['cross_group_correlations'] = pred_df.corr()

        # Create ensemble predictions using weighted voting
        weights = {
            feature_type: metrics['auc']  # Use AUC as weight
            for feature_type, metrics in combined_analysis['model_metrics'].items()
        }

        # Normalize weights
        total_weight = sum(weights.values())
        weights = {k: v/total_weight for k, v in weights.items()}

        # Calculate weighted ensemble predictions
        ensemble_pred = np.zeros(len(next(iter(predictions.values()))))
        for feature_type, preds in predictions.items():
            ensemble_pred += weights[feature_type] * preds

        combined_analysis['ensemble_predictions'] = (ensemble_pred > 0.5).astype(int)

        # Calculate ensemble metrics
        true_labels = h2o_frames[next(iter(h2o_frames))]['target'].as_data_frame()
        ensemble_metrics = {
            'accuracy': np.mean(combined_analysis['ensemble_predictions'] == true_labels),
            'weighted_auc_score': sum(weights[ft] * metrics['auc']
                                    for ft, metrics in combined_analysis['model_metrics'].items())
        }
        combined_analysis['ensemble_metrics'] = ensemble_metrics

        # Generate summary report
        report = f"""
Feature Group Model Analysis Summary:
-----------------------------------
Number of feature groups analyzed: {len(feature_group_results)}
Best performing group: {max(combined_analysis['model_metrics'].items(),
                          key=lambda x: x[1]['auc'])[0]}
Ensemble weighted AUC: {ensemble_metrics['weighted_auc_score']:.4f}
Ensemble accuracy: {ensemble_metrics['accuracy']:.4f}

Individual Group Performances:
"""
        for feature_type, metrics in combined_analysis['model_metrics'].items():
            report += f"\n{feature_type}:"
            report += f"\n  AUC: {metrics['auc']:.4f}"
            report += f"\n  Accuracy: {metrics['accuracy']:.4f}"
            report += f"\n  Recall: {metrics['recall']:.4f}"

        combined_analysis['summary_report'] = report
        logger.info("Model analysis completed successfully")

        return combined_analysis

    except Exception as e:
        logger.error(f"Error in analyze_feature_group_models: {e}")
        traceback.print_exc()
        return None

# Usage example:
# assuming feature_group_results contains the H2O model results per feature group
# and h2o_frames contains the corresponding H2O frames

analysis_results = analyze_feature_group_models(feature_group_results, h2o_frames)

if analysis_results:
    print(analysis_results['summary_report'])

    # Plot feature importance for each group
    for feature_type, importance_df in analysis_results['feature_importance'].items():
        plt.figure(figsize=(10, 6))
        importance_df.plot(kind='bar', x='variable', y='relative_importance')
        plt.title(f'Feature Importance - {feature_type}')
        plt.tight_layout()
        plt.show()

    # Plot correlation heatmap
    plt.figure(figsize=(8, 6))
    sns.heatmap(analysis_results['cross_group_correlations'],
                annot=True, cmap='coolwarm', center=0)
    plt.title('Cross-Group Prediction Correlations')
    plt.tight_layout()
    plt.show()

In [None]:
import json
import logging
import os
import socket
import subprocess
import sys
import traceback
from typing import Dict, Any

import h2o
import numpy as np
import pandas as pd
from h2o.automl import H2OAutoML
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession, Window, functions as F
from pyspark.sql.types import ArrayType, DoubleType

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)

# Load configurations from file
with open('config.json') as f:
    config = json.load(f)

def setup_environment():
    """
    Set up Python environment with necessary dependencies.

    Returns:
        bool: True if setup is successful, False otherwise
    """
    logger.info("Setting up Python environment...")

    try:
        # Verify and install required packages
        for package in config['required_packages']:
            try:
                __import__(package.replace('-', '_'))
                logger.info(f"{package} is already installed.")
            except ImportError:
                logger.info(f"Installing {package}...")
                subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])

        return True
    except Exception as e:
        logger.error(f"Environment setup error: {e}")
        return False

def get_local_ip():
    """Retrieve the local IP address."""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except Exception:
        return "127.0.0.1"

def create_spark_session():
    """
    Create a Spark session with configurations from file.

    Returns:
        SparkSession: Configured Spark session
    """
    logger.info("Creating Spark session...")

    # Get local IP for configuration
    local_ip = get_local_ip()

    # Create Spark session with configurations from file
    spark_config = config['spark']
    spark = SparkSession.builder \
        .appName(spark_config['app_name']) \
        .master(spark_config['master']) \
        .config("spark.driver.host", local_ip) \
        .config("spark.driver.bindAddress", local_ip) \
        .config("spark.sql.execution.arrow.pyspark.enabled", spark_config['arrow_enabled']) \
        .config("spark.driver.memory", spark_config['driver_memory']) \
        .config("spark.executor.memory", spark_config['executor_memory']) \
        .config("spark.network.timeout", spark_config['network_timeout']) \
        .config("spark.executor.heartbeatInterval", spark_config['heartbeat_interval']) \
        .config("spark.sql.session.timeZone", spark_config['timezone']) \
        .getOrCreate()

    return spark

def initialize_h2o_context():
    """
    Initialize H2O context with configurations from file.

    Returns:
        H2OContext or None: Initialized H2O context
    """
    logger.info("Initializing H2O context...")

    try:
        h2o_config = config['h2o']
        h2o.init(
            ip=h2o_config['ip'],
            port=h2o_config['port'],
            strict_version_check=False,
            max_mem_size=h2o_config['max_mem_size']
        )
        return h2o.connection()
    except Exception as e:
        logger.error(f"H2O initialization error: {e}")
        traceback.print_exc()
        return None

def validate_data(df):
    """
    Validate input data and perform quality checks.

    Args:
        df (pyspark.sql.DataFrame): Input DataFrame

    Returns:
        bool: True if data validation passes, False otherwise
    """
    logger.info("Validating input data...")

    try:
        # Check for required columns
        required_columns = config['required_columns']
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            logger.error(f"Missing required columns: {missing_columns}")
            return False

        # Check for missing values
        missing_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
        if any(count > 0 for count in missing_counts.values()):
            logger.warning(f"Missing values detected: {missing_counts}")

        # Additional data quality checks can be added here

        return True
    except Exception as e:
        logger.error(f"Data validation error: {e}")
        return False

def process_feature_group(df, feature_prefix, start_idx, end_idx):
    """
    Process a group of features and return transformed DataFrame.
    """
    logger.info(f"Processing {feature_prefix} features {start_idx}-{end_idx}...")

    # Generate column names
    cols = [f'{feature_prefix}_{str(i).zfill(4)}' for i in range(start_idx, end_idx + 1)]

    # Select relevant columns and cast to double
    df_subset = df.select(['date', 'symbol'] + cols)

    for col_name in cols:
        df_subset = df_subset.withColumn(col_name, F.col(col_name).cast("double"))

    # Vector to array conversion UDF
    vector_to_array_udf = F.udf(lambda vector: vector.toArray().tolist() if vector is not None else None, ArrayType(DoubleType()))

    # Create VectorAssembler
    assembler = VectorAssembler(
        inputCols=cols,
        outputCol=f"features_{feature_prefix}_{start_idx}_{end_idx}",
        handleInvalid="keep"
    )

    # Transform and add array column
    result = assembler.transform(df_subset).withColumn(
        f"features_{feature_prefix}_{start_idx}_{end_idx}_array",
        vector_to_array_udf(f"features_{feature_prefix}_{start_idx}_{end_idx}")
    )

    return result

def engineer_features(df):
    """
    Perform feature engineering on the input DataFrame.

    Args:
        df (pyspark.sql.DataFrame): Input DataFrame

    Returns:
        pyspark.sql.DataFrame: DataFrame with engineered features
    """
    logger.info("Performing feature engineering...")

    # Rolling window features
    window_spec = Window.partitionBy('symbol').orderBy('date')
    df = df \
        .withColumn('pvm_rolling_mean_5', F.avg('pvm').over(window_spec.rowsBetween(-5, 0))) \
        .withColumn('pvm_rolling_mean_10', F.avg('pvm').over(window_spec.rowsBetween(-10, 0))) \
        .withColumn('sentiment_rolling_mean_5', F.avg('sentiment').over(window_spec.rowsBetween(-5, 0))) \
        .withColumn('onchain_rolling_mean_5', F.avg('onchain').over(window_spec.rowsBetween(-5, 0)))

    # Volatility measures
    df = df \
        .withColumn('pvm_volatility', F.stddev('pvm').over(window_spec.rowsBetween(-10, 0))) \
        .withColumn('sentiment_volatility', F.stddev('sentiment').over(window_spec.rowsBetween(-10, 0)))

    # Additional feature engineering techniques can be added here

    return df

def convert_to_h2o_frames(results):
    """
    Convert Spark DataFrames to H2O Frames with robust handling.
    """
    logger.info("Converting Spark DataFrames to H2O Frames...")

    h2o_frames = {}
    for feature_type, result_df in results.items():
        feature_col = f"features_{feature_type}_1_50_array"

        # Collect rows and prepare data
        rows = result_df.select("date", "symbol", feature_col).collect()

        # Create column names
        columns = ['date', 'symbol'] + [f'{feature_type}_{i+1:04d}' for i in range(50)]

        # Prepare data for conversion
        data_rows = []
        for row in rows:
            date = str(row['date']) if row['date'] is not None else ''
            symbol = str(row['symbol']) if row['symbol'] is not None else ''
            features = row[feature_col]

            # Ensure features is a list of 50 elements
            if features is None:
                features = [np.nan] * 50
            else:
                features = (features + [np.nan] * 50)[:50]

            # Validate and convert features
            features = [
                None if pd.isna(f) else float(f)
                for f in features
            ]

            data_rows.append([date, symbol] + features)

        # Convert to Pandas DataFrame
        pandas_df = pd.DataFrame(data_rows, columns=columns)

        # Remove rows with empty date or symbol
        pandas_df = pandas_df[
            (pandas_df['date'] != '') &
            (pandas_df['symbol'] != '')
        ]

        try:
            h2o_frame = h2o.H2OFrame(pandas_df)
            h2o_frames[feature_type] = h2o_frame
        except Exception as e:
            logger.error(f"Error importing {feature_type} to H2O Frame: {e}")
            traceback.print_exc()

    return h2o_frames

def create_model_pipeline(feature_cols):
    """
    Create a machine learning pipeline with multiple classifiers.

    Args:
        feature_cols (list): List of feature column names

    Returns:
        Pipeline: Machine learning pipeline
    """
    logger.info("Creating model pipeline...")

    # Vector Assembler
    assembler = VectorAssembler(
        inputCols=feature_cols,
        outputCol="features"
    )

    # Feature scaling
    scaler = StandardScaler(
        inputCol="features",
        outputCol="scaled_features"
    )

    # Classifiers
    rf_classifier = RandomForestClassifier(
        labelCol="target",
        featuresCol="scaled_features",
        numTrees=config['random_forest']['num_trees']
    )

    gbt_classifier = GBTClassifier(
        labelCol="target",
        featuresCol="scaled_features",
        maxIter=config['gradient_boosting']['max_iter']
    )

    # Pipeline
    pipeline = Pipeline(stages=[
        assembler,
        scaler,
        rf_classifier,
        gbt_classifier
    ])

    return pipeline

def evaluate_models(models, test_data):
    """
    Evaluate trained models on test data.

    Args:
        models (dict): Trained models
        test_data (pyspark.sql.DataFrame): Test data

    Returns:
        dict: Model evaluation results
    """
    logger.info("Evaluating models...")

    evaluator = BinaryClassificationEvaluator(labelCol="target")

    evaluation_results = {}
    for model_name, model in models.items():
        predictions = model.transform(test_data)

        auc = evaluator.evaluate(predictions)
        accuracy = predictions.filter(predictions.prediction == predictions.target).count() / predictions.count()

        evaluation_results[model_name] = {
            'auc': auc,
            'accuracy': accuracy
        }

    return evaluation_results

def tune_hyperparameters(pipeline, train_data, evaluator):
    """
    Perform hyperparameter tuning using cross-validation.

    Args:
        pipeline (Pipeline): Model pipeline
        train_data (pyspark.sql.DataFrame): Training data
        evaluator (pyspark.ml.evaluation.Evaluator): Evaluation metric

    Returns:
        tuple: Best model and best parameters
    """
    logger.info("Performing hyperparameter tuning...")

    param_grid = ParamGridBuilder() \
        .addGrid(pipeline.stages[-2].numTrees, config['random_forest']['num_trees_grid']) \
        .addGrid(pipeline.stages[-1].maxIter, config['gradient_boosting']['max_iter_grid']) \
        .build()

    cv = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=config['cross_validation']['num_folds']
    )

    cv_model = cv.fit(train_data)

    return cv_model.bestModel, {param.name: value for param, value in cv_model.bestModel.extractParamMap().items()}

def run_automl(h2o_frames, target_strategy='random'):
    """
    Run AutoML on H2O frames with different target strategies.
    """
    logger.info("Running AutoML...")

    def add_target_column(h2o_frames, target_strategy='random'):
        """Add target column to H2O frames."""
        frames_with_target = h2o_frames.copy()

        for feature_type, h2o_frame in frames_with_target.items():
            num_rows = h2o_frame.nrows

            if target_strategy == 'random':
                target = h2o.H2OFrame(
                    np.random.randint(0, 2, size=(num_rows, 1)),
                    column_names=['target']
                ).asfactor()
            elif target_strategy == 'symbol_based':
                target = h2o.H2OFrame(
                    h2o_frame['symbol']
                        .apply(lambda x: 1 if '$' in str(x) else 0)
                        .as_data_frame()
                ).set_names(['target']).asfactor()
            elif target_strategy == 'date_based':
                target = h2o.H2OFrame(
                    h2o_frame['date']
                        .apply(lambda x: 1 if str(x)[:4] > '2020' else 0)
                        .as_data_frame()
                ).set_names(['target']).asfactor()
            else:
                raise ValueError(f"Unknown target strategy: {target_strategy}")

            h2o_frame['target'] = target
            frames_with_target[feature_type] = h2o_frame

        return frames_with

In [None]:
%%time

# Initialize H2O
import h2o
from pysparkling import *
hc = H2OContext.getOrCreate()

# Convert Spark DataFrames to H2O Frames
train_h2o = hc.asH2OFrame(train_processed)
live_h2o = hc.asH2OFrame(live_processed)

# Initialize H2O AutoML
aml = H2OAutoML(
    max_models=20,
    seed=1,
    max_runtime_secs=600  # 3600: 1 hour 600: 10 min
)

# Train models using AutoML
target_column = "target"  # Replace with your actual target column
aml.train(x=train_h2o.columns[:-1],  # exclude target
          y=target_column,
          training_frame=train_h2o)

# Get model performance
print(aml.leader.model_performance(train_h2o))

# Make predictions on live data
predictions = aml.leader.predict(live_h2o)

# Save predictions
predictions_df = h2o.as_list(predictions)
predictions_spark = spark.createDataFrame(predictions_df)
predictions_spark.write.parquet("predictions.parquet")

# Stop H2O and Spark sessions
h2o.cluster().shutdown()
spark.stop()

## Install Dependencies

In [None]:
# Download the Numerai training data to the current directory
import os
napi.download_dataset(filename = "crypto/v1.0/train_targets.parquet",
                      dest_path = os.getcwd() + "/numerai_train_targets.parquet")

In [None]:
# Download the Numerai live crypto universe to the current directory
import os
napi.download_dataset(filename = "crypto/v1.0/live_universe.parquet",
                      dest_path = os.getcwd() + "/numerai_live_universe.parquet")

## Import and Display the Numerai Crypto Datasets

In [None]:
import pandas as pd

In [None]:
# Load the display the Numerai training targets
df_numerai_targets = pd.read_parquet("numerai_train_targets.parquet")

In [None]:
display(df_numerai_targets)

In [None]:
# Load and display the Numerai live universe
df_numerai_universe = pd.read_parquet("numerai_live_universe.parquet")
display(df_numerai_universe)

## Download YIEDL Crypto Datasets

In [None]:
# Helper Function
import requests

def download_file(url, output_filename):
    response = requests.get(url)
    if response.status_code == 200:
        with open(output_filename, 'wb') as file:
            file.write(response.content)
        print(f"File downloaded successfully as {output_filename}")
    else:
        print("Failed to download file")

In [None]:
# Download YIEDL crypto latest dataset to current directory
url = 'https://api.yiedl.ai/yiedl/v1/downloadDataset?type=latest'
output_filename = 'yiedl_latest.parquet'
download_file(url, output_filename)

In [None]:
# Download YIEDL crypto historical dataset to current directory
# NOTE: it is a huge file in zip format. We need to unzip it afterwards
url = 'https://api.yiedl.ai/yiedl/v1/downloadDataset?type=historical'
output_filename = 'yiedl_historical.zip'
download_file(url, output_filename)

In [None]:
# Unzip and rename the file
!unzip -p yiedl_historical.zip > yiedl_historical.parquet

## Import and Display the YIEDL Crypto Datasets

In [None]:
# Load and display the YIEDL historical crypto dataset
df_yield_historical = pd.read_parquet("yiedl_historical.parquet",
                                      engine = "pyarrow",
                                      dtype_backend = "numpy_nullable")

In [None]:
# Check dtypes
df_yield_historical.dtypes

In [None]:
# Display
display(df_yield_historical)

In [None]:
# Load and display the YIEDL latest crypto dataset
df_yield_latest = pd.read_parquet("yiedl_latest.parquet",
                                  engine = "pyarrow",
                                  dtype_backend = "numpy_nullable")

In [None]:
# Check dtypes
df_yield_latest.dtypes

In [None]:
# Display
display(df_yield_latest)