In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
from urllib.parse import urlparse
import time
import os
import numpy as np
import math
from datetime import datetime, timedelta
# import holidays
import warnings
import argparse

In [2]:
import sagemaker
import boto3
from sagemaker import get_execution_role

region = boto3.Session().region_name

session = sagemaker.Session()

# You can modify the following to use a bucket of your choosing
bucket = session.default_bucket()
prefix = "sagemaker/autopilot-water-demand-prediction"

role = get_execution_role()

# This is the client we will use to interact with SageMaker Autopilot
sm = boto3.Session().client(service_name="sagemaker", region_name=region)



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
from io import StringIO

s3 = boto3.client("s3")

def list_csv_files(bucket_name, key_path):
    # List objects within the specified bucket and prefix
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=key_path)
    # Filter out the CSV files
    csv_files = [content['Key'] for content in response.get('Contents', []) if content['Key'].endswith('.csv')]
    return csv_files

def read_csv_files_to_dataframes(bucket_name, csv_files):
    dataframes = []
    for key in csv_files:
        # Get the object from S3
        obj = s3.get_object(Bucket=bucket_name, Key=key)
        # Read the CSV file content
        data = obj['Body'].read().decode('utf-8')
        # Convert to DataFrame
        df = pd.read_csv(StringIO(data))
        dataframes.append(df)
    return dataframes

# Example usage
bucket_name = 'niwa-water-demand-modelling'
key_path_rf = 'WWL/bias_adjusted_data/'
key_path_temp = 'WWL/cmip6/'
csv_files_rf = list_csv_files(bucket_name, key_path_rf)
csv_files_temp = list_csv_files(bucket_name, key_path_temp)

In [4]:
csv_files_rf

['WWL/bias_adjusted_data/ACCESS-CM2/Birch Lane_ACCESS-CM2_hist_adjusted_1960_2014.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Birch Lane_ACCESS-CM2_ssp126_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Birch Lane_ACCESS-CM2_ssp245_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Birch Lane_ACCESS-CM2_ssp370_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Birch Lane_ACCESS-CM2_ssp585_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_hist_adjusted_1960_2014.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_ssp126_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_ssp245_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_ssp370_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_ssp585_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/AWI-CM-1-1-MR/Birch Lane_AWI-CM-1-1-MR_hist_adjusted_1960_2014.csv',
 'WWL/bias_adju

In [5]:
len(csv_files_rf)

60

In [6]:
csv_files_temp

['WWL/cmip6/tas_ACCESS-CM2_hist_BirchLane.csv',
 'WWL/cmip6/tas_ACCESS-CM2_hist_PinehavenStream.csv',
 'WWL/cmip6/tas_ACCESS-CM2_ssp_BirchLane.csv',
 'WWL/cmip6/tas_ACCESS-CM2_ssp_PinehavenStream.csv',
 'WWL/cmip6/tas_AWI-CM-1-1-MR_hist_BirchLane.csv',
 'WWL/cmip6/tas_AWI-CM-1-1-MR_hist_PinehavenStream.csv',
 'WWL/cmip6/tas_AWI-CM-1-1-MR_ssp_BirchLane.csv',
 'WWL/cmip6/tas_AWI-CM-1-1-MR_ssp_PinehavenStream.csv',
 'WWL/cmip6/tas_CNRM-CM6-1_hist_BirchLane.csv',
 'WWL/cmip6/tas_CNRM-CM6-1_hist_PinehavenStream.csv',
 'WWL/cmip6/tas_CNRM-CM6-1_ssp_BirchLane.csv',
 'WWL/cmip6/tas_CNRM-CM6-1_ssp_PinehavenStream.csv',
 'WWL/cmip6/tas_EC-Earth3_hist_BirchLane.csv',
 'WWL/cmip6/tas_EC-Earth3_hist_PinehavenStream.csv',
 'WWL/cmip6/tas_EC-Earth3_ssp_BirchLane.csv',
 'WWL/cmip6/tas_EC-Earth3_ssp_PinehavenStream.csv',
 'WWL/cmip6/tas_GFDL-ESM4_hist_BirchLane.csv',
 'WWL/cmip6/tas_GFDL-ESM4_hist_PinehavenStream.csv',
 'WWL/cmip6/tas_GFDL-ESM4_ssp_BirchLane.csv',
 'WWL/cmip6/tas_GFDL-ESM4_ssp_Pinehave

In [7]:
def prep_rainfall(df):
    '''
    preprocess rainfall data
    '''
    df['hour'] = df['Time'].dt.hour
    df['wday'] = df['Time'].dt.dayofweek
    df['month'] = df['Time'].dt.month
    df['mday'] = df['Time'].dt.days_in_month
    df["doy"] = df['Time'].dt.dayofyear
    df['Rainlag1'] = df['Rainfall'].shift(1)
    df['Rainlag2'] = df['Rainfall'].shift(2)
    df['Rainlag1'] = df['Rainlag1'].bfill()
    df['Rainlag2'] = df['Rainlag2'].bfill()
    df['Rain_L3HR'] = df.loc[:,'Rainfall'].rolling(window=3).sum()
    df['Rain_L6HR'] = df.loc[:,'Rainfall'].rolling(window=6).sum()
    df['Rain_L12HR'] = df.loc[:,'Rainfall'].rolling(window=12).sum()
    df['Rain_L24HR'] = df.loc[:,'Rainfall'].rolling(window=24).sum()
    df['Rain_L48HR'] = df.loc[:,'Rainfall'].rolling(window=48).sum()
    df['Rain_L3HR'] = df['Rain_L3HR'].bfill()
    df['Rain_L6HR'] = df['Rain_L6HR'].bfill()
    df['Rain_L12HR'] = df['Rain_L12HR'].bfill()
    df['Rain_L24HR'] = df['Rain_L24HR'].bfill()
    df['Rain_L48HR'] = df['Rain_L48HR'].bfill()
    
    df['sin_hour'] = np.sin(2 * np.pi * df['hour'] / 24)
    df['cos_hour'] = np.cos(2 * np.pi * df['hour'] / 24)
    
    # Typical value between 0.8 and 0.95
    k_api = 0.85
    
    # Initialize API and Soil Moisture as columns
    df['API'] = 0
    df['SoilMoisture'] = 0
    
    # Calculate Antecedent Precipitation Index (API)
    for i in range(1, len(df)):
        df.at[i, 'API'] = k_api * df.at[i - 1, 'API'] + df.at[i - 1, 'Rainfall']
    
    # Soil Moisture Index (very simplified: saturates at some threshold)
    # Arbitrary max bucket size (mm)
    max_storage = 100
    # Moisture decay rate
    soil_decay = 0.95
    # Initial condition
    df.at[0, 'SoilMoisture'] = min(max_storage, df.at[0, 'Rainfall'])
    
    for i in range(1, len(df)):
        df.at[i, 'SoilMoisture'] = min(
            max_storage,
            df.at[i - 1, 'SoilMoisture'] * soil_decay + df.at[i - 1, 'Rainfall']
        )

    return df

In [8]:
# Extract exp_name and rf_name from each string
temp_extracts = []
for path in csv_files_temp:
    x = path.split('.csv')[0]
    exp_name = '_'.join(x.split('/')[-1].split('_')[1:3])  # model + scenario
    rf_name = x.split('/')[-1].split('_')[-1] # location
    rf_name = rf_name.replace('Stream', '')
    print({'exp_name': exp_name, 'rf_name': rf_name})
    temp_extracts.append({'exp_name': exp_name, 'rf_name': rf_name})

{'exp_name': 'ACCESS-CM2_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'ACCESS-CM2_hist', 'rf_name': 'Pinehaven'}
{'exp_name': 'ACCESS-CM2_ssp', 'rf_name': 'BirchLane'}
{'exp_name': 'ACCESS-CM2_ssp', 'rf_name': 'Pinehaven'}
{'exp_name': 'AWI-CM-1-1-MR_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'AWI-CM-1-1-MR_hist', 'rf_name': 'Pinehaven'}
{'exp_name': 'AWI-CM-1-1-MR_ssp', 'rf_name': 'BirchLane'}
{'exp_name': 'AWI-CM-1-1-MR_ssp', 'rf_name': 'Pinehaven'}
{'exp_name': 'CNRM-CM6-1_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'CNRM-CM6-1_hist', 'rf_name': 'Pinehaven'}
{'exp_name': 'CNRM-CM6-1_ssp', 'rf_name': 'BirchLane'}
{'exp_name': 'CNRM-CM6-1_ssp', 'rf_name': 'Pinehaven'}
{'exp_name': 'EC-Earth3_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'EC-Earth3_hist', 'rf_name': 'Pinehaven'}
{'exp_name': 'EC-Earth3_ssp', 'rf_name': 'BirchLane'}
{'exp_name': 'EC-Earth3_ssp', 'rf_name': 'Pinehaven'}
{'exp_name': 'GFDL-ESM4_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'GFDL-ESM4_hist', 'rf_name': 'Pineha

In [None]:
import re
import s3fs
import sagemaker
import boto3
import pandas as pd
import numpy as np
import io
import time
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
from sagemaker import get_execution_role
from sagemaker import AutoML
from urllib.parse import urlparse
import threading
import gc
from botocore.config import Config
import logging
import os

# Configure logging to suppress AWS container logs
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress specific AWS/SageMaker loggers
logging.getLogger('sagemaker').setLevel(logging.ERROR)
logging.getLogger('sagemaker-containers').setLevel(logging.ERROR)
logging.getLogger('boto3').setLevel(logging.ERROR)
logging.getLogger('botocore').setLevel(logging.ERROR)
logging.getLogger('urllib3').setLevel(logging.ERROR)
logging.getLogger('s3transfer').setLevel(logging.ERROR)

# Set environment variables to suppress container logs
os.environ['SAGEMAKER_ENABLE_CLOUDWATCH_METRICS'] = 'false'
os.environ['SAGEMAKER_SUBMIT_DIRECTORY'] = '/tmp'

def get_csv_from_s3(s3uri, file_name):
    parsed_url = urlparse(s3uri)
    bucket_name = parsed_url.netloc
    prefix = parsed_url.path[1:].strip("/")
    
    # Use thread-local S3 client with optimized config
    if not hasattr(get_csv_from_s3, '_thread_local'):
        get_csv_from_s3._thread_local = threading.local()
    
    if not hasattr(get_csv_from_s3._thread_local, 's3_client'):
        # Optimized boto3 config for connection pooling
        config = Config(
            max_pool_connections=50,
            retries={'max_attempts': 3},
            region_name=boto3.Session().region_name
        )
        get_csv_from_s3._thread_local.s3_client = boto3.client('s3', config=config)
    
    try:
        response = get_csv_from_s3._thread_local.s3_client.get_object(
            Bucket=bucket_name, 
            Key=f"{prefix}/{file_name}"
        )
        return response['Body'].read().decode('utf-8')
    except Exception as e:
        logger.error(f"Error reading {file_name} from S3: {e}")
        raise

# Global configuration
bucket_name = 'niwa-water-demand-modelling'
region = boto3.Session().region_name
session = sagemaker.Session()
bucket = session.default_bucket()
prefix = "sagemaker/autopilot-water-demand-prediction"
role = get_execution_role()

rf_dict = {
    'Pinehaven': ['PCDUpperHutt'],
    'BirchLane': ['PCDStokesValley', 'PCDLowerHutt']
}

model_lst = ['CUR', 'ABI', 'ABJ', 'ABL', 'ABK']

class OptimizedBatchTransform:
    def __init__(self, max_workers=5, max_concurrent_transforms=3):
        """
        Reduced concurrency to prevent resource exhaustion
        """
        self.max_workers = max_workers
        self.max_concurrent_transforms = max_concurrent_transforms
        self.active_transforms = 0
        self.transform_lock = threading.Lock()
        self.completed_jobs = 0
        self.failed_jobs = 0
        
        # Configure boto3 session with connection pooling
        self.boto_config = Config(
            max_pool_connections=30,
            retries={'max_attempts': 3, 'mode': 'adaptive'},
            region_name=region
        )
        
    def get_thread_local_clients(self):
        """Get thread-local AWS clients to avoid connection issues"""
        if not hasattr(self, '_thread_local'):
            self._thread_local = threading.local()
        
        if not hasattr(self._thread_local, 'sm_client'):
            self._thread_local.sm_client = boto3.client(
                'sagemaker', 
                config=self.boto_config
            )
            self._thread_local.s3_client = boto3.client(
                's3', 
                config=self.boto_config
            )
        
        return self._thread_local.sm_client, self._thread_local.s3_client
        
    def process_single_file_model(self, file_data, job_name, temp_extracts, csv_files_temp):
        """Process a single file with a single model"""
        try:
            path, rf_name, exp_name, scen_name = file_data
            
            logger.info(f"Starting {job_name} for {rf_name}_{exp_name}")
            
            # Find matching temperature file
            temp_key = None
            for i, x in enumerate(temp_extracts):
                exp_name_x = x['exp_name']
                rf_name_x = x['rf_name']
                if rf_name_x.lower() == rf_name.lower() and exp_name_x in exp_name:
                    temp_key = csv_files_temp[i]
                    break
            
            if not temp_key:
                logger.warning(f"No temperature key found for {rf_name}, {exp_name}")
                return None
            
            # Process data with memory management
            try:
                df_result = self.prepare_and_process_data(
                    path, temp_key, rf_name, exp_name, scen_name, job_name
                )
                
                if df_result is not None:
                    self.completed_jobs += 1
                    logger.info(f"✓ Completed {job_name} for {rf_name}_{exp_name} ({self.completed_jobs} total)")
                    return df_result
                else:
                    self.failed_jobs += 1
                    logger.error(f"✗ Failed {job_name} for {rf_name}_{exp_name}")
                    return None
                    
            except Exception as e:
                self.failed_jobs += 1
                logger.error(f"✗ Error in data processing for {job_name}: {str(e)}")
                return None
            finally:
                # Force garbage collection to free memory
                gc.collect()
                
        except Exception as e:
            self.failed_jobs += 1
            logger.error(f"✗ Error processing {job_name} for {rf_name}_{exp_name}: {str(e)}")
            return None
    
    def prepare_and_process_data(self, path, temp_key, rf_name, exp_name, scen_name, job_name):
        """Prepare data and run batch transform with resource management"""
        
        # Data preparation (implement your existing logic here)
        try:
            # You'll need to implement these functions from your original code
            df_rf = read_csv_files_to_dataframes(bucket_name, [path])[0]
            df_rf.rename(columns={'time': 'Time', '0': 'Rainfall'}, inplace=True)
            df_rf = df_rf.set_index(pd.to_datetime(df_rf["Time"], format="%Y-%m-%d %H:%M:%S"))
            df_rf = df_rf.resample('h').sum(['Rainfall'])
            
            df_temp = read_csv_files_to_dataframes(bucket_name, [temp_key])[0]
            df_temp['Dry bulb degC'] = df_temp[scen_name] - 273
            df_temp = df_temp[['date', 'Dry bulb degC']]
            df_temp['date'] = pd.to_datetime(df_temp['date'], format="%Y-%m-%d %H:%M:%S")
            df_temp = df_temp.set_index('date')

            df_rf_1 = prep_rainfall(df_rf[['Rainfall']].reset_index())
            df = df_rf_1.set_index('Time').join(df_temp['Dry bulb degC'])
            df = df.reset_index()

            columns = [e for e in df.columns if e not in ["Time", "sin_hour", "cos_hour"]]
            test_data = df[columns]
            
            # Clean up intermediate dataframes
            del df_rf, df_temp, df_rf_1
            gc.collect()
            
        except Exception as e:
            logger.error(f"Data preparation failed for {job_name}: {e}")
            return None
        
        # Upload test data with unique filename
        test_file = f"{rf_name}_{exp_name}_{job_name}.csv"
        
        try:
            test_data.to_csv(test_file, index=False, header=False)
            test_data_s3_path = session.upload_data(path=test_file, key_prefix=prefix)
            
            # Clean up local file
            if os.path.exists(test_file):
                os.remove(test_file)
                
        except Exception as e:
            logger.error(f"Data upload failed for {job_name}: {e}")
            return None
        
        # Wait for available transform slot with timeout
        max_wait_time = 300  # 5 minutes max wait
        wait_start = time.time()
        
        while True:
            with self.transform_lock:
                if self.active_transforms < self.max_concurrent_transforms:
                    self.active_transforms += 1
                    break
            
            if time.time() - wait_start > max_wait_time:
                logger.error(f"Timeout waiting for transform slot for {job_name}")
                return None
                
            time.sleep(10)  # Wait 10 seconds before checking again
        
        try:
            result = self.run_batch_transform_with_retry(
                job_name, test_data_s3_path, test_file, df, rf_name, exp_name
            )
            return result
        finally:
            with self.transform_lock:
                self.active_transforms -= 1
    
    def run_batch_transform_with_retry(self, job_name, test_data_s3_path, test_file, df, rf_name, exp_name, max_retries=0):
        """Run batch transform with retry logic"""
        
        sm_client, s3_client = self.get_thread_local_clients()
        
        for attempt in range(max_retries + 1):
            try:
                return self._run_single_transform(
                    job_name, test_data_s3_path, test_file, df, rf_name, exp_name, sm_client
                )
            except Exception as e:
                if attempt < max_retries:
                    wait_time = (attempt + 1) * 30  # Exponential backoff
                    logger.warning(f"Transform attempt {attempt + 1} failed for {job_name}, retrying in {wait_time}s: {e}")
                    time.sleep(wait_time)
                else:
                    logger.error(f"All transform attempts failed for {job_name}: {e}")
                    raise
    
    def _run_single_transform(self, job_name, test_data_s3_path, test_file, df, rf_name, exp_name, sm_client):
        """Run a single batch transform job"""
        
        auto_ml_job_name = f"automl-{job_name}"
        
        try:
            best_candidate = sm_client.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)["BestCandidate"]
            best_candidate_name = best_candidate["CandidateName"]
            
            # Create AutoML instance with reduced logging
            automl = AutoML.attach(auto_ml_job_name=auto_ml_job_name)
            
            s3_transform_output_path = f"s3://{bucket}/{prefix}/cmip6-results/"
            model_name = "{0}-model".format(best_candidate_name)
            
            # Suppress SageMaker model creation logs
            with self.suppress_stdout():
                model = automl.create_model(
                    name=model_name,
                    candidate=best_candidate,
                )
            
            output_path = s3_transform_output_path + best_candidate_name + "/"
            
            # Conservative instance settings with reduced logging
            transformer = model.transformer(
                instance_count=1,
                instance_type="ml.m5.xlarge",
                assemble_with="Line",
                strategy="SingleRecord",
                output_path=output_path,
                env={
                    "SAGEMAKER_MODEL_SERVER_TIMEOUT": "180",
                    "SAGEMAKER_MODEL_SERVER_WORKERS": "1",
                    # Suppress container logs
                    "SAGEMAKER_ENABLE_CLOUDWATCH_METRICS": "false",
                    "PYTHONPATH": "/opt/ml/code",
                    "SAGEMAKER_PROGRAM": "inference.py",
                    "SAGEMAKER_SUBMIT_DIRECTORY": "/opt/ml/code"
                },
            )
        
            # Run transform job with suppressed output
            logger.info(f"Starting batch transform for {job_name}...")
            
            with self.suppress_stdout():
                transformer.transform(
                    data=test_data_s3_path,
                    split_type="Line",
                    content_type="text/csv",
                    wait=True,
                    logs=False,  # Disable CloudWatch logs
                    model_client_config={
                        "InvocationsTimeoutInSeconds": 100,
                        "InvocationsMaxRetries": 2
                    },
                )
            
            logger.info(f"Batch transform completed for {job_name}")
            
            # Get predictions
            pred_csv = get_csv_from_s3(transformer.output_path, f"{test_file}.out")
            predictions = pd.read_csv(io.StringIO(pred_csv), header=None)

            df_result = df.copy()
            df_result[job_name] = predictions
            df_result[job_name] = np.where(df_result[job_name] < 0, 0, df_result[job_name])
            
            # Upload results
            pred_file = f"{rf_name}_{exp_name}_{job_name}_pred_data.csv"
            os.makedirs('cmip6', exist_ok=True)
            df_result[['Time', job_name]].to_csv(f'cmip6/{pred_file}', index=False, header=True)
            pred_data_s3_path = session.upload_data(path=f'cmip6/{pred_file}', key_prefix=prefix + "/cmip6-results")
            
            # Clean up local file
            if os.path.exists(f'cmip6/{pred_file}'):
                os.remove(f'cmip6/{pred_file}')
            
            return {
                'job_name': job_name,
                'rf_name': rf_name,
                'exp_name': exp_name,
                'output_path': pred_data_s3_path,
                'status': 'completed'
            }
            
        except Exception as e:
            logger.error(f"Transform job failed for {job_name}: {str(e)}")
            raise
    
    @staticmethod
    def suppress_stdout():
        """Context manager to suppress stdout temporarily"""
        import sys
        from contextlib import contextmanager
        
        @contextmanager
        def suppress():
            with open(os.devnull, "w") as devnull:
                old_stdout = sys.stdout
                old_stderr = sys.stderr
                try:
                    sys.stdout = devnull
                    sys.stderr = devnull
                    yield
                finally:
                    sys.stdout = old_stdout
                    sys.stderr = old_stderr
        
        return suppress()

def prepare_all_jobs(csv_files_rf, temp_extracts, csv_files_temp):
    """Prepare all job combinations with better organization"""
    all_jobs = []
    
    for path in csv_files_rf:
        match = re.search(r'/([^/_]+(?: [^/_]+)*)_([^/_]+_[^/_]+)_adjusted', path)
        if match:
            rf_name = match.group(1).replace(' ', '')
            exp_name = match.group(2)
            scen_name = exp_name.split('_')[-1]
            if scen_name == 'hist':
                scen_name = 'historical'
            
            # Get job names for this location
            if rf_name in rf_dict:
                job_prefixes = rf_dict[rf_name]
                for job_prefix in job_prefixes:
                    for job_suffix in model_lst:
                        job_name = f'{job_prefix}{job_suffix}'
                        all_jobs.append({
                            'file_data': (path, rf_name, exp_name, scen_name),
                            'job_name': job_name,
                            'priority': len(job_name)
                        })
    
    return all_jobs

def run_conservative_parallel_processing(csv_files_rf, temp_extracts, csv_files_temp):
    """
    Conservative parallel processing to avoid resource exhaustion
    """
    all_jobs = prepare_all_jobs(csv_files_rf, temp_extracts, csv_files_temp)
    
    # Very conservative settings
    processor = OptimizedBatchTransform(
        max_workers=3,
        max_concurrent_transforms=2
    )
    
    logger.info(f"Starting conservative processing of {len(all_jobs)} total jobs...")
    logger.info(f"Using {processor.max_workers} workers with max {processor.max_concurrent_transforms} concurrent transforms")
    
    results = []
    start_time = time.time()
    
    # Process in smaller batches to manage memory
    batch_size = 20
    
    for i in range(0, len(all_jobs), batch_size):
        batch_jobs = all_jobs[i:i+batch_size]
        logger.info(f"Processing batch {i//batch_size + 1}/{(len(all_jobs)-1)//batch_size + 1} ({len(batch_jobs)} jobs)")
        
        with ThreadPoolExecutor(max_workers=processor.max_workers) as executor:
            # Submit batch jobs
            future_to_job = {
                executor.submit(
                    processor.process_single_file_model, 
                    job['file_data'], 
                    job['job_name'], 
                    temp_extracts, 
                    csv_files_temp
                ): job for job in batch_jobs
            }
            
            # Process completed jobs in this batch
            for future in as_completed(future_to_job):
                job = future_to_job[future]
                try:
                    result = future.result(timeout=1800)  # 30 minute timeout per job
                    if result:
                        results.append(result)
                except concurrent.futures.TimeoutError:
                    logger.error(f"Job {job['job_name']} timed out")
                    results.append({
                        'job_name': job['job_name'],
                        'status': 'timeout',
                        'error': 'Job timed out after 30 minutes'
                    })
                except Exception as e:
                    logger.error(f"Job {job['job_name']} generated an exception: {e}")
                    results.append({
                        'job_name': job['job_name'],
                        'status': 'failed',
                        'error': str(e)
                    })
        
        # Progress update and memory cleanup between batches
        completed = len([r for r in results if r.get('status') == 'completed'])
        failed = len([r for r in results if r.get('status') in ['failed', 'timeout']])
        
        elapsed_time = time.time() - start_time
        logger.info(f"Batch complete. Overall progress: {len(results)}/{len(all_jobs)} jobs processed")
        logger.info(f"Completed: {completed}, Failed: {failed}, Time elapsed: {elapsed_time:.1f}s")
        
        # Force garbage collection between batches
        gc.collect()
        
        # Brief pause between batches to let resources settle
        if i + batch_size < len(all_jobs):
            time.sleep(10)
    
    # Final summary
    completed = len([r for r in results if r.get('status') == 'completed'])
    failed = len([r for r in results if r.get('status') in ['failed', 'timeout']])
    total_time = time.time() - start_time
    
    logger.info(f"\n" + "="*50)
    logger.info(f"PROCESSING COMPLETE!")
    logger.info(f"Total time: {total_time/3600:.2f} hours")
    logger.info(f"Completed: {completed}")
    logger.info(f"Failed: {failed}")
    logger.info(f"Total: {len(results)}")
    logger.info(f"Success rate: {completed/len(results)*100:.1f}%")
    logger.info(f"Average time per job: {total_time/len(results):.1f} seconds")
    logger.info("="*50)
    
    return results

# Usage example with conservative settings
if __name__ == "__main__":
    # Conservative approach - uncomment to use
    results = run_conservative_parallel_processing(csv_files_rf[2:], temp_extracts, csv_files_temp)
    
    # If you want to process just a few jobs first as a test:
    # test_jobs = csv_files_rf[:5]  # First 5 files only
    # results = run_conservative_parallel_processing(test_jobs, temp_extracts, csv_files_temp)

  df.at[i, 'API'] = k_api * df.at[i - 1, 'API'] + df.at[i - 1, 'Rainfall']
  df.at[i, 'API'] = k_api * df.at[i - 1, 'API'] + df.at[i - 1, 'Rainfall']
  df.at[i, 'API'] = k_api * df.at[i - 1, 'API'] + df.at[i - 1, 'Rainfall']
  df.at[i, 'SoilMoisture'] = min(
  df.at[i, 'SoilMoisture'] = min(
  df.at[i, 'SoilMoisture'] = min(


model name: automl-PCDStokesValleyABIWcTRrT5-001-806c58a4-model, model name length: 51


model name: automl-PCDStokesValleyABJclOr83M-001-b5ec385a-model, model name length: 51


..........................................................

.

  df.at[i, 'API'] = k_api * df.at[i - 1, 'API'] + df.at[i - 1, 'Rainfall']


...[34m2025-06-27 01:38:59,152 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2025-06-27 01:38:59,155 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[34m2025-06-27 01:38:59,155 INFO - sagemaker-containers - nginx config: [0m
[34mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[35m2025-06-27 01:38:59,152 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[35m2025-06-27 01:38:59,155 INFO - sagemaker-containers - No GPUs detected (normal if no gpus installed)[0m
[35m2025-06-27 01:38:59,155 INFO - sagemaker-containers - nginx config: [0m
[35mworker_processes auto;[0m
[35mdaemon off;[0m
[35mpid /tmp/nginx.pid;[0m
[35merror_log  /dev/stderr;[0m
[35mworker_rlimit_nofile 4096;[0m
[35mevents {
  worker_connections 2048;[0m
[34m}[0

  df.at[i, 'SoilMoisture'] = min(


[32m169.254.255.130 - - [27/Jun/2025:01:39:15 +0000] "POST /invocations HTTP/1.1" 200 24 "-" "Go-http-client/1.1"[0m
[32m169.254.255.130 - - [27/Jun/2025:01:39:15 +0000] "POST /invocations HTTP/1.1" 200 24 "-" "Go-http-client/1.1"[0m
[32m169.254.255.130 - - [27/Jun/2025:01:39:15 +0000] "POST /invocations HTTP/1.1" 200 24 "-" "Go-http-client/1.1"[0m
[32m169.254.255.130 - - [27/Jun/2025:01:39:15 +0000] "POST /invocations HTTP/1.1" 200 24 "-" "Go-http-client/1.1"[0m
[32m169.254.255.130 - - [27/Jun/2025:01:39:15 +0000] "POST /invocations HTTP/1.1" 200 24 "-" "Go-http-client/1.1"[0m
[32m169.254.255.130 - - [27/Jun/2025:01:39:15 +0000] "POST /invocations HTTP/1.1" 200 24 "-" "Go-http-client/1.1"[0m
[32m169.254.255.130 - - [27/Jun/2025:01:39:15 +0000] "POST /invocations HTTP/1.1" 200 22 "-" "Go-http-client/1.1"[0m
[32m169.254.255.130 - - [27/Jun/2025:01:39:15 +0000] "POST /invocations HTTP/1.1" 200 22 "-" "Go-http-client/1.1"[0m
[32m169.254.255.130 - - [27/Jun/2025:01:39:15 +

In [10]:
csv_files_rf[2:]

['WWL/bias_adjusted_data/ACCESS-CM2/Birch Lane_ACCESS-CM2_ssp245_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Birch Lane_ACCESS-CM2_ssp370_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Birch Lane_ACCESS-CM2_ssp585_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_hist_adjusted_1960_2014.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_ssp126_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_ssp245_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_ssp370_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/ACCESS-CM2/Pinehaven_ACCESS-CM2_ssp585_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/AWI-CM-1-1-MR/Birch Lane_AWI-CM-1-1-MR_hist_adjusted_1960_2014.csv',
 'WWL/bias_adjusted_data/AWI-CM-1-1-MR/Birch Lane_AWI-CM-1-1-MR_ssp126_adjusted_2015_2100.csv',
 'WWL/bias_adjusted_data/AWI-CM-1-1-MR/Birch Lane_AWI-CM-1-1-MR_ssp245_adjusted_2015_2100.csv',
 

In [None]:
import re
import s3fs
import sagemaker
import boto3
from sagemaker import get_execution_role
from sagemaker import AutoML

def get_csv_from_s3(s3uri, file_name):
    parsed_url = urlparse(s3uri)
    bucket_name = parsed_url.netloc
    prefix = parsed_url.path[1:].strip("/")
    s3 = boto3.resource("s3")
    obj = s3.Object(bucket_name, "{}/{}".format(prefix, file_name))
    return obj.get()["Body"].read().decode("utf-8")

bucket_name = 'niwa-water-demand-modelling'
fs = s3fs.S3FileSystem(anon=False)
region = boto3.Session().region_name
session = sagemaker.Session()
bucket = session.default_bucket()
prefix = "sagemaker/autopilot-water-demand-prediction"
role = get_execution_role()

rf_dict = {
    'Pinehaven': ['PCDUpperHutt'],
    'BirchLane': ['PCDStokesValley', 'PCDLowerHutt']
}

model_lst = ['CUR', 'ABI', 'ABJ', 'ABL']

# Extract exp_name and rf_name from each string
temp_extracts = []
for path in csv_files_temp:
    x = path.split('.csv')[0]
    exp_name = '_'.join(x.split('/')[-1].split('_')[1:3])  # model + scenario
    rf_name = x.split('/')[-1].split('_')[-1] # location
    rf_name = rf_name.replace('Stream', '')
    print({'exp_name': exp_name, 'rf_name': rf_name})
    temp_extracts.append({'exp_name': exp_name, 'rf_name': rf_name})

# Extract exp_name and rf_name from each string
for path in csv_files_rf:
    match = re.search(r'/([^/_]+(?: [^/_]+)*)_([^/_]+_[^/_]+)_adjusted', path)
    if match:
        rf_name = match.group(1)
        rf_name = rf_name.replace(' ', '')
        exp_name = match.group(2)
        scen_name = exp_name.split('_')[-1]
        if scen_name == 'hist':
            scen_name = 'historical'
        
        # look for temp extracts info
        for i, x in enumerate(temp_extracts):
            exp_name_x = x['exp_name']
            rf_name_x = x['rf_name']
            if rf_name_x.lower() == rf_name.lower() and exp_name_x in exp_name:
                temp_key = csv_files_temp[i]
                print({'rf_name': rf_name, 'exp_name': exp_name, 'scen_name': scen_name})
                print(f'temp key_file: {temp_key}')
                break
        df_rf = read_csv_files_to_dataframes(bucket_name, [path])[0]
        df_rf.rename(columns={'time': 'Time', '0': 'Rainfall'}, inplace=True)
        df_rf = df_rf.set_index(pd.to_datetime(df_rf["Time"], format="%Y-%m-%d %H:%M:%S"))
        df_rf = df_rf.resample('h').sum(['Rainfall'])
        df_temp = read_csv_files_to_dataframes(bucket_name, [temp_key])[0]
        df_temp['Dry bulb degC'] = df_temp[scen_name] - 273
        df_temp = df_temp[['date', 'Dry bulb degC']]
        df_temp['date'] = pd.to_datetime(df_temp['date'], format="%Y-%m-%d %H:%M:%S")
        df_temp = df_temp.set_index('date')

        df_rf_1 = prep_rainfall(df_rf[['Rainfall']].reset_index())
        df = df_rf_1.set_index('Time').join(df_temp['Dry bulb degC'])
        df = df.reset_index()

        columns = [e for e in df.columns if e not in ["Time", "sin_hour", "cos_hour"]]
        test_data = df[columns] # Features
        test_file = f"{rf_name}_{exp_name}.csv"
        test_data.to_csv(test_file, index=False, header=False)
        test_data_s3_path = session.upload_data(path=test_file, key_prefix=prefix)
        print("Test data uploaded to: " + test_data_s3_path)

        # find suitable job names
        job_names = []
        job_prefixes = rf_dict[rf_name]
        for job_prefix in job_prefixes:
            for job_suffix in model_lst:
                job_names.append(f'{job_prefix}{job_suffix}')
        print("all job names: ", job_names)

        for job_name in job_names:
            print("running job: ", job_name)
            # This is the client we will use to interact with SageMaker Autopilot
            sm = boto3.Session().client(service_name="sagemaker", region_name=region)
            auto_ml_job_name = f"automl-{job_name}"
            print("AutoMLJobName: " + auto_ml_job_name)
            best_candidate = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)["BestCandidate"]
            best_candidate_name = best_candidate["CandidateName"]
            
            print("\n")
            print("CandidateName: " + best_candidate_name)
            print(
                "FinalAutoMLJobObjectiveMetricName: "
                + best_candidate["FinalAutoMLJobObjectiveMetric"]["MetricName"]
            )
            print(
                "FinalAutoMLJobObjectiveMetricValue: "
                + str(best_candidate["FinalAutoMLJobObjectiveMetric"]["Value"])
            )
        
            automl = AutoML.attach(auto_ml_job_name=auto_ml_job_name)
            
            s3_transform_output_path = "s3://{}/{}/cmip6-results/".format(bucket, prefix)
            
            model_name = "{0}-model".format(best_candidate_name)
            
            model = automl.create_model(
                name=model_name,
                candidate=best_candidate,
            )
            
            output_path = s3_transform_output_path + best_candidate_name + "/"
            
            transformer = model.transformer(
                instance_count=1,
                instance_type="ml.m5.xlarge",
                assemble_with="Line",
                strategy="SingleRecord",
                output_path=output_path,
                env={"SAGEMAKER_MODEL_SERVER_TIMEOUT": "100", "SAGEMAKER_MODEL_SERVER_WORKERS": "1"},
            )
        
            transformer.transform(
                data=test_data_s3_path,
                split_type="Line",
                content_type="text/csv",
                wait=False,
                model_client_config={"InvocationsTimeoutInSeconds": 80, "InvocationsMaxRetries": 1},
            )
            
            print("Starting transform job {}".format(transformer._current_job_name))
        
            ## Wait for jobs to finish
            pending_complete = True
            batch_job_name = transformer._current_job_name
            
            while pending_complete:
                pending_complete = False
            
                description = sm.describe_transform_job(TransformJobName=batch_job_name)
                if description["TransformJobStatus"] not in ["Failed", "Completed"]:
                    pending_complete = True
            
                print("{} transform job is running.".format(batch_job_name))
                time.sleep(60)
            
            print("\nCompleted.")
            
            job_status = sm.describe_transform_job(TransformJobName=batch_job_name)["TransformJobStatus"]
            
            if job_status == "Completed":
                pred_csv = get_csv_from_s3(transformer.output_path, "{}.out".format(test_file))
                predictions = pd.read_csv(io.StringIO(pred_csv), header=None)
    
                df[job_name] = predictions
                df[job_name] = np.where(df[job_name] < 0, 0, df[job_name])
                pred_to_move = df.pop(job_name)
                df.insert(1, pred_to_move.name, pred_to_move)
        
            ### Upload the dataset to S3
            pred_file = f"{rf_name}_{exp_name}_{job_name}_pred_data.csv"
            df[['Time', job_name]].to_csv(f'cmip6/{pred_file}', index=False, header=True)
            pred_data_s3_path = session.upload_data(path=f'cmip6/{pred_file}', key_prefix=prefix + f"/cmip6-results")
            print("Full pred results uploaded to: " + pred_data_s3_path)


{'exp_name': 'ACCESS-CM2_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'ACCESS-CM2_hist', 'rf_name': 'Pinehaven'}
{'exp_name': 'ACCESS-CM2_ssp', 'rf_name': 'BirchLane'}
{'exp_name': 'ACCESS-CM2_ssp', 'rf_name': 'Pinehaven'}
{'exp_name': 'AWI-CM-1-1-MR_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'AWI-CM-1-1-MR_hist', 'rf_name': 'Pinehaven'}
{'exp_name': 'AWI-CM-1-1-MR_ssp', 'rf_name': 'BirchLane'}
{'exp_name': 'AWI-CM-1-1-MR_ssp', 'rf_name': 'Pinehaven'}
{'exp_name': 'CNRM-CM6-1_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'CNRM-CM6-1_hist', 'rf_name': 'Pinehaven'}
{'exp_name': 'CNRM-CM6-1_ssp', 'rf_name': 'BirchLane'}
{'exp_name': 'CNRM-CM6-1_ssp', 'rf_name': 'Pinehaven'}
{'exp_name': 'EC-Earth3_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'EC-Earth3_hist', 'rf_name': 'Pinehaven'}
{'exp_name': 'EC-Earth3_ssp', 'rf_name': 'BirchLane'}
{'exp_name': 'EC-Earth3_ssp', 'rf_name': 'Pinehaven'}
{'exp_name': 'GFDL-ESM4_hist', 'rf_name': 'BirchLane'}
{'exp_name': 'GFDL-ESM4_hist', 'rf_name': 'Pineha

  df.at[i, 'API'] = k_api * df.at[i - 1, 'API'] + df.at[i - 1, 'Rainfall']
  df.at[i, 'SoilMoisture'] = min(


Test data uploaded to: s3://sagemaker-ap-southeast-2-851725470721/sagemaker/autopilot-water-demand-prediction/BirchLane_ACCESS-CM2_hist.csv
all job names:  ['PCDStokesValleyCUR', 'PCDStokesValleyABI', 'PCDStokesValleyABJ', 'PCDStokesValleyABL', 'PCDLowerHuttCUR', 'PCDLowerHuttABI', 'PCDLowerHuttABJ', 'PCDLowerHuttABL']
running job:  PCDStokesValleyCUR
AutoMLJobName: automl-PCDStokesValleyCUR


CandidateName: automl-PCDStokesValleyCURzNNb5wX-001-0bfb2f08
FinalAutoMLJobObjectiveMetricName: validation:mse
FinalAutoMLJobObjectiveMetricValue: 0.0


Starting transform job automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDStokesValleyCURzNNb5wX-001-0b-2025-06-26-09-50-07-856 transform job is running.
automl-PCDSto

Starting transform job automl-PCDStokesValleyABLB9dEzBy-001-37-2025-06-26-12-56-32-439
automl-PCDStokesValleyABLB9dEzBy-001-37-2025-06-26-12-56-32-439 transform job is running.
automl-PCDStokesValleyABLB9dEzBy-001-37-2025-06-26-12-56-32-439 transform job is running.
automl-PCDStokesValleyABLB9dEzBy-001-37-2025-06-26-12-56-32-439 transform job is running.


In [58]:
df.dropna()

Unnamed: 0,Time,Rainfall,hour,wday,month,mday,doy,Rainlag1,Rainlag2,Rain_L3HR,Rain_L6HR,Rain_L12HR,Rain_L24HR,Rain_L48HR,sin_hour,cos_hour,API,SoilMoisture,Dry bulb degC
1,2015-01-01 01:00:00,0.0,1,3,1,31,1,0.0,0.0,0.0,0.0,0.0,0.000000,17.503797,0.258819,0.965926,0.000000,0.000000,18.23334
2,2015-01-01 02:00:00,0.0,2,3,1,31,1,0.0,0.0,0.0,0.0,0.0,0.000000,17.503797,0.500000,0.866025,0.000000,0.000000,18.83430
3,2015-01-01 03:00:00,0.0,3,3,1,31,1,0.0,0.0,0.0,0.0,0.0,0.000000,17.503797,0.707107,0.707107,0.000000,0.000000,18.86465
4,2015-01-01 04:00:00,0.0,4,3,1,31,1,0.0,0.0,0.0,0.0,0.0,0.000000,17.503797,0.866025,0.500000,0.000000,0.000000,18.64935
5,2015-01-01 05:00:00,0.0,5,3,1,31,1,0.0,0.0,0.0,0.0,0.0,0.000000,17.503797,0.965926,0.258819,0.000000,0.000000,17.66574
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
745100,2099-12-31 20:00:00,0.0,20,3,12,31,365,0.0,0.0,0.0,0.0,0.0,46.231087,59.292310,-0.866025,0.500000,2.969079,22.200667,17.22397
745101,2099-12-31 21:00:00,0.0,21,3,12,31,365,0.0,0.0,0.0,0.0,0.0,44.157857,59.292310,-0.707107,0.707107,2.523717,21.090633,18.37833
745102,2099-12-31 22:00:00,0.0,22,3,12,31,365,0.0,0.0,0.0,0.0,0.0,41.740886,59.292310,-0.500000,0.866025,2.145159,20.036102,19.26114
745103,2099-12-31 23:00:00,0.0,23,3,12,31,365,0.0,0.0,0.0,0.0,0.0,39.268081,59.292310,-0.258819,0.965926,1.823385,19.034297,20.01312


In [48]:
df_rf

Unnamed: 0,time,Rainfall
0,2015-01-01 00:30:00,0.000000
1,2015-01-01 01:30:00,0.000000
2,2015-01-01 02:30:00,0.000000
3,2015-01-01 03:30:00,0.000000
4,2015-01-01 04:30:00,0.000000
...,...,...
753355,2100-12-31 19:30:00,0.000000
753356,2100-12-31 20:30:00,0.000000
753357,2100-12-31 21:30:00,0.000000
753358,2100-12-31 22:30:00,1.029932


In [52]:
df_rf

Unnamed: 0_level_0,Rainfall
time,Unnamed: 1_level_1
2015-01-01 00:00:00,0.000000
2015-01-01 01:00:00,0.000000
2015-01-01 02:00:00,0.000000
2015-01-01 03:00:00,0.000000
2015-01-01 04:00:00,0.000000
...,...
2100-12-31 19:00:00,0.000000
2100-12-31 20:00:00,0.000000
2100-12-31 21:00:00,0.000000
2100-12-31 22:00:00,1.029932


In [49]:
df_temp

Unnamed: 0,date,Dry bulb degC
0,2015-01-01 01:00:00,18.23334
1,2015-01-01 02:00:00,18.83430
2,2015-01-01 03:00:00,18.86465
3,2015-01-01 04:00:00,18.64935
4,2015-01-01 05:00:00,17.66574
...,...,...
745099,2099-12-31 20:00:00,17.22397
745100,2099-12-31 21:00:00,18.37833
745101,2099-12-31 22:00:00,19.26114
745102,2099-12-31 23:00:00,20.01312


In [53]:
df_temp

Unnamed: 0_level_0,Dry bulb degC
date,Unnamed: 1_level_1
2015-01-01 01:00:00,18.23334
2015-01-01 02:00:00,18.83430
2015-01-01 03:00:00,18.86465
2015-01-01 04:00:00,18.64935
2015-01-01 05:00:00,17.66574
...,...
2099-12-31 20:00:00,17.22397
2099-12-31 21:00:00,18.37833
2099-12-31 22:00:00,19.26114
2099-12-31 23:00:00,20.01312
