# DS210 - Optimizing Portfolio to Adapt to Regime Change
## Deep Learning Model - SageMaker Training Framework
### Elaine edits marked as "## ELAINE" in code

Only includes SageMaker Framework code.

In [69]:
%%time
def train_model(rank, model_class, model_args, learning_rate, batch_size, train_dataset, val_dataset, 
                criterion, num_epochs,  patience, 
                device, is_distributed, world_size, train_loss_ll, val_loss_ll):

    if is_distributed:
        dist.init_process_group(backend="nccl", rank=rank, world_size=world_size)  # PyTorch assigns ranks automatically
        torch.cuda.set_device(rank)

        # create model inside function 
        model = model_class(*model_args).to(rank)
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
        # model = torch.nn.DataParallel(model, device_ids=[rank])
        
        train_sampler = torch.utils.data.DistributedSampler(train_dataset) if is_distributed else None
        train_dataloader = DataLoader(train_dataset, batch_size, shuffle=False, sampler=train_sampler) #IN: make sure do not shuffle for time series data
        val_sampler = torch.utils.data.DistributedSampler(val_dataset) if is_distributed else None
        val_dataloader = DataLoader(val_dataset, batch_size, shuffle=False, sampler=val_sampler) #IN: make sure do not shuffle for time series data

    else:
        # create model inside function 
        model = model_class(**model_args) # if model_args is a list, use *, if model_args is a dic, use **
        rank = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model.to(rank)
        train_dataloader = DataLoader(train_dataset, batch_size, shuffle=False) #IN: make sure do not shuffle for time series data
        val_dataloader = DataLoader(val_dataset, batch_size, shuffle=False) #IN: make sure do not shuffle for time series data

    # Create optimizer inside function
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    model.train()
    train_loss_l_process = []
    val_loss_l_process = []
    best_loss = np.inf
    patience_counter = 0
    best_model_state = None
    
    for epoch in range(num_epochs):
        if is_distributed:
            train_sampler.set_epoch(epoch)  # Ensures proper shuffling per epoch

        train_loss_ep = 0
        val_loss_ep = 0 

        for batch_idx, batch_item in enumerate(train_dataloader):
            x_batch, y_batch = \
                batch_item['features'].to(rank), \
                    batch_item['target'].to(rank)

            optimizer.zero_grad()
            pred = model(x_batch)
            loss = criterion(pred, y_batch)
            loss.backward()
            optimizer.step() 
            train_loss_ep +=loss.item()
        
        avg_train_loss_ep = train_loss_ep/len(train_dataloader)
        print (f'Epoch {epoch+1}/{num_epochs}, Train loss: {avg_train_loss_ep:.3f}')
        train_loss_l_process.append(avg_train_loss_ep)

        # log validation loss
        model.eval() # set the model to evaluation mode
        with torch.no_grad():
            for batch_idx, batch_item in enumerate(val_dataloader):
                x_batch, y_batch = \
                    batch_item['features'].to(rank), \
                        batch_item['target'].to(rank)
                pred = model(x_batch)
                loss = criterion(pred, y_batch)
                val_loss_ep+=loss.item()
        avg_val_loss_ep = val_loss_ep/len(val_dataloader)
        val_loss_l_process.append(avg_val_loss_ep)

        print (f'Epoch {epoch+1}/{num_epochs}, Val loss: {avg_val_loss_ep:.3f}')

        # check if stop early
        if avg_val_loss_ep < best_loss:
            best_loss = avg_val_loss_ep 
            patience_counter = 0 # reset patience counter once improved
            # best_model_state = model.state_dict() # mark best model
            # best_model_state = model.module.state_dict() # mark best model (in DDP framework, save in a way that loading is as normal later)
            best_model_state = model.module.state_dict() if is_distributed else model.state_dict()
        else:
            patience_counter +=1 
        
        if patience_counter >= patience:
            print ('Early stopping triggered')
            break # stop training if validatio loss failed to keep reducing after > patience epochs
    
    # load best model weights
    if best_model_state is not None:
      model.load_state_dict(best_model_state)

    # log results for this processor
    train_loss_ll[rank] = train_loss_l_process
    val_loss_ll[rank] = val_loss_l_process

    # clear up
    dist.destroy_process_group()

    # save the model
    # # dl_model_directory = r'C:\Mine\U.S.-2019\NPB living - 2 - related\School-part time\Berkeley-202308\MIDS classes\210-Capstone\Project-related\code-IN/dl_model/'
    # dl_model_directory = f'/content/drive/MyDrive/Datasci-210/dl_model/{dl_model_checkpoint}/{rebal_freq}'
    # model_filename = f'model_dl_att_2_{rank}.pth'  # Choose a filename for your model; model_dl_att_1
    # torch.save(model.state_dict(), dl_model_directory+model_filename)

# if __name__ == "__main__":
# Train model with logging - parallelism
world_size = torch.cuda.device_count() 
manager = mp.Manager()
train_loss_ll = manager.list([[] for _ in range(world_size)]) # Create shared list for losses for processor
val_loss_ll = manager.list([[] for _ in range(world_size)])

# launch processes
model_dl_class = ReturnPredictionModel_Transformer
model_args = {
    'input_dim': len(feature_cols),
    'lstm_dim': 64,
    'lstm_layers': 10,
    'transformer_layers': 3,
    'num_heads': 8,
    'attn_dropout': 0.1,
    'forward_dim': 50,
    'hidden_dims_l': [32, 16],
    'dropout': 0.3,
    'output_dim': len(tgt_cols)
}

# model_args = (
#     len(feature_cols),
#     64,
#     10,
#     3,
#     8,
#     0.1,
#     50,
#     [32, 16],
#     0.3,
#     len(tgt_cols)
# )

learning_rate = 0.0002
batch_size = 48

mp.spawn(train_model, args=(model_dl_class, model_args, learning_rate, batch_size, train_dataset_dl, val_dataset_dl,
                criterion, num_epochs,  patience, 
                device, is_distributed, world_size, train_loss_ll, val_loss_ll),
                nprocs=world_size)

# process the results from all processors
train_loss_l = pd.DataFrame(train_loss_ll).mean().tolist()
val_loss_l = pd.DataFrame(val_loss_ll).mean().tolist()

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/conda/lib/python3.11/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'train_model' on <module '__main__' (built-in)>


KeyboardInterrupt: 

In [None]:
world_size

In [None]:
# try and error
args_list = [
        ('--model_dir', {'type': str, 'default': 's3://capstone-general/NN-related/dl_model/', 
            'help':'put in model directory before model checkpoint name and rebal freq'}), 
        # ('--model_checkpoint', {'type': str, 'default': 'dl_model_checkpoint0', 
        #     'help':'put in model checkpoint name'}), 
        # ('--rebal_freq', {'type':str, 'default':'D', 'help':'put in rebal_freq'}),
        ('--model_filename',{'type':str, 'default':'model_dl_gen_sm_0', 'help':'put in wanted model file name'}),
        ('--batch_size', {'type':int, 'default': 48, 'help':'put in batch size for loader'}),
        ('--learning_rate', {'type':float, 'default':0.0002, 'help':'put in learning rate'}),
        ('--input_dim', {'type':int, 'default':9, 'help':'input_dim'}),
        ('--lstm_dim', {'type':int, 'default':64, 'help':'lstm_dim'}),
        ('--lstm_layers', {'type':int, 'default':5, 'help':'lstm_layers'}),
        ('--transformer_layers', {'type':int, 'default':3, 'help':'transformer_layers'}),
        ('--num_heads', {'type':int, 'default':4, 'help':'num_heads'}),
        ('--attn_dropout', {'type':float, 'default':0.1, 'help':'attn_dropout'}),
        ('--forward_dim', {'type':int, 'default':64, 'help':'forward_dim'}),
        ('--hidden_dims_l', {'type':str, 'default':'[128,32]', 'help':'hidden_dims_l'}),
        ('--dropout', {'type':float, 'default':0.3, 'help':'dropout'}),
        ('--output_dim', {'type':int, 'default':1, 'help':'output_dim'}),
        ('--num_epochs', {'type':int, 'default':30, 'help':'num_epochs'}),
        ('--patience', {'type':int, 'default':5, 'help':'patience'}),
        ('--seed', {'type':int, 'default':42, 'help':'seed'})

### Use SageMaker Framework (in addition to what's already in this notebook)

In [6]:
import sagemaker
from sagemaker.pytorch import PyTorch

sagemaker_session = sagemaker.Session()  # what's this for?
role = sagemaker.get_execution_role()
import s3fs
fs = s3fs.S3FileSystem()

In [15]:
%%time
# call the estimator
s3_folder = 's3://capstone-general/NN-related/dl_model/'
model_checkpoint = 'dl_model_checkpoint0'
rebal_freq = 'D'
model_content_full_path = f'{s3_folder}{model_checkpoint}/{rebal_freq}/'

estimator = PyTorch(
    entry_point="dl_train_trf.py",
    role=role,
    dependencies=["requirements.txt"], # !CHANGE! (separately) create req.txt from dependencies
    py_version="py39", #? python version ## ELAINE
    framework_version="1.13.1", #pytorch version ## ELAINE
    instance_count=1,
    # instance_type="ml.c5.xlarge",
    # instance_type="ml.p4d.24xlarge",
    instance_type="ml.g5.12xlarge",
    hyperparameters={
        'model_dir': model_content_full_path,
        'model_filename': 'model_dl_trf_sm_0',
        'batch_size': 12,  ## ELAINE - SET AS 1/4 DESIRED BATCH SIZE FOR GRAD ACCUMULATION
        'learning_rate': 0.0002,
        'input_dim': 9,
        'lstm_dim': 64,
        'lstm_layers': 5,
        'transformer_layers': 3,
        'num_heads': 4,
        'attn_dropout': 0.1,
        'forward_dim': 5,
        'hidden_dims_l':'[128,32]',
        'dropout': 0.3,
        'output_dim': 1,
        'num_epochs': 30,
        'patience': 5,
        'seed': 42
    },
    distribution={"smdistributed": {"dataparallel": {"enabled": True}}},  ## ELAINE
)
# output_path="s3://capstone-general/text-models/output", # !CHANGE! where to put model.tar.gz

# Start training
estimator.fit()

INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker:Creating training-job with name: pytorch-training-2025-03-03-04-08-09-029


2025-03-03 04:08:09 Starting - Starting the training job
2025-03-03 04:08:09 Pending - Training job waiting for capacity......
2025-03-03 04:09:06 Pending - Preparing the instances for training...
2025-03-03 04:09:40 Downloading - Downloading input data...
2025-03-03 04:09:50 Downloading - Downloading the training image...............
2025-03-03 04:12:37 Training - Training image download completed. Training in progress...[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
  "cipher": algorithms.TripleDES,[0m
  "class": algorithms.TripleDES,[0m
[34m2025-03-03 04:12:57,916 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training[0m
[34m2025-03-03 04:12:57,957 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2025-03-03 04:12:57,971 sagemaker_pytorch_container.training INFO     Block until all host DNS lookups succeed.[0m


In [17]:
# get model from bucket
!aws s3 cp s3://capstone-general/NN-related/dl_model/dl_model_checkpoint0/D/models/model.pth ./model.pth

download: s3://capstone-general/NN-related/dl_model/dl_model_checkpoint0/D/models/model.pth to ./model.pth


In [18]:
import tarfile

# Create the tar.gz file
with tarfile.open('model.tar.gz', "w:gz") as tar:
    tar.add('model.pth')  # Add the model.pth inside the tar.gz
    tar.add('inference.py')
    tar.add('requirements.txt')

In [None]:
# place model.tar.gz in bucket
!aws s3 cp /model.tar.gz s3://capstone-general/NN-related/dl_model/dl_model_checkpoint0/D/models/model.tar.gz .

In [None]:
# get the results from estomator for later processing

### HPO using sagemaker - not attempted 2025-03-02

In [None]:
from sagemaker.tuner import HyperparameterTuner, IntegerParameter, ContinuousParameter

# Define the hyperparameter ranges
hyperparameter_ranges = {
    'batch_size': IntegerParameter(24, 128),
    'learning_rate': ContinuousParameter(0.00005, 0.0005),
    'lstm_dim': IntegerParameter(48, 128),
    'lstm_layers': IntegerParameter(2, 12),
    'transformer_layers': IntegerParameter(1, 10),
    'num_heads': IntegerParameter(2, 10),
    'att_dropout': ContinuousParameter(0.1, 0.6),
    'forward_dim': IntegerParameter(2, 10),
    # 'hidden_dims_l': [128,32],
    'dropout': ContinuousParameter(0.4, 0.6),
    'hidden_dim': IntegerParameter(64, 256),
    'dropout': ContinuousParameter(0.1, 0.5)
}

# Create the hyperparameter tuner
tuner = HyperparameterTuner(
    estimator,
    objective_metric_name='validation:L1Loss',
    hyperparameter_ranges=hyperparameter_ranges,
    max_jobs=10,
    max_parallel_jobs=1,
    objective_type='Maximize'
)

# Launch the hyperparameter tuning job
tuner.fit()
