# AWS Simulation → Reinforcement Learning Pipeline

This notebook launches a transient **Dask** cluster on **AWS EC2**, runs the social‐simulation parameter sweep, and then trains the reinforcement‑learning choice model. It mirrors `pipeline.py`, but lets you tweak parameters and inspect intermediate artefacts interactively.

**Prerequisites**  
‑ An AWS account + IAM permissions to create EC2 instances and S3 objects.  
‑ `~/.aws/config` and `~/.aws/credentials` configured on the JupyterLab host.  
‑ `pip install dask_cloudprovider s3fs boto3` inside JupyterLab.  

---

In [None]:
import os, re, contextlib, configparser, pathlib, sys, json, time
from platform import python_version

import dask
from dask_cloudprovider.aws import EC2Cluster
from dask.distributed import Client

def get_aws_credentials():
    """Return AWS credentials & region using ~/.aws files."""
    parser = configparser.RawConfigParser()
    parser.read(os.path.expanduser('~/.aws/config'))
    cfg_items = parser.items('default') if parser.has_section('default') else []
    parser.read(os.path.expanduser('~/.aws/credentials'))
    cred_items = parser.items('default') if parser.has_section('default') else []
    env = {k.upper(): v for k, v in [*cfg_items, *cred_items]}
    with contextlib.suppress(KeyError):
        env['AWS_REGION'] = env.pop('REGION')
    return env


In [None]:
# --- User‑adjustable knobs ---------------------------------------------------
N_WORKERS = 7          # e.g. 15
INSTANCE_TYPE = 'r5.large'  # e.g. 'c6i.xlarge'
USE_SPOT = False       # True to save cost if interruption OK

# Reinforcement Learning
ALGORITHM = 'PPO'      # 'PPO' or 'DQN'
TIMESTEPS = 150_000
MAX_ITERATIONS = 15

# S3
USE_S3 = False
S3_BUCKET = 'my‑sim‑bucket'
S3_PREFIX = 'sim‑rl'

OUTPUT_DIR = pathlib.Path('output').resolve()
OUTPUT_DIR.mkdir(exist_ok=True)


In [None]:
env_vars = get_aws_credentials()
env_vars['EXTRA_PIP_PACKAGES'] = 's3fs'

py_tag = '-py' + re.findall(r'\d\.\d+', python_version())[0]
dask_tag = f'daskdev/dask:{dask.__version__}{py_tag}'

cluster = EC2Cluster(
    instance_type=INSTANCE_TYPE,
    n_workers=N_WORKERS,
    docker_image=dask_tag,
    env_vars=env_vars,
    security=False,
    spot=USE_SPOT,
)
client = Client(cluster)
cluster

In [None]:
def _simulate(output_path):
    from simulation import run_parameter_sweep
    import os
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    run_parameter_sweep(output_filename=output_path)
    return output_path

simulation_json = OUTPUT_DIR / 'simulation_results_sweep.json'
future = client.submit(_simulate, simulation_json.as_posix(), pure=False)
simulation_json = pathlib.Path(future.result())
print(f'Simulation results saved to {simulation_json}')

In [None]:
if USE_S3:
    import boto3, pathlib
    s3 = boto3.client('s3')
    key = f"{S3_PREFIX}/{simulation_json.name}"
    s3.upload_file(simulation_json.as_posix(), S3_BUCKET, key)
    simulation_json = f's3://{S3_BUCKET}/{key}'
    print('Uploaded to', simulation_json)

In [None]:
from reinforment_learning import main as rl_main

rl_output = OUTPUT_DIR / 'rl_results'
rl_output.mkdir(exist_ok=True)

# Build argv so reinforment_learning.py parses CLI args as intended
sys.argv = [
    'reinforment_learning.py',
    '--input', str(simulation_json),
    '--output-dir', str(rl_output),
    '--algorithm', ALGORITHM,
    '--timesteps', str(TIMESTEPS),
    '--max-iterations', str(MAX_ITERATIONS),
]
rl_main()
print('RL training complete; results in', rl_output)

In [None]:
client.close()
cluster.close()
print('Cluster shut down.')