In [1]:
import matplotlib.pyplot as plt
import seaborn as sns


import numpy as np
import pandas as pd

In [2]:
from rate_pricing.model_config import ModelConfigs
from rate_pricing.training import Trainer, TrainingData, ModelMetrics, TrainingChannel
from rate_pricing.config import settings

In [3]:
model_configs = ModelConfigs(collect=True, include_inactive=True)
model_config = model_configs["default"]

In [4]:
input_data = "s3://pps-preprod-data-lake-data-science/tables/mab_training_dataset/requested_on=2024-04-03/"
test_data = None

In [19]:
df = pd.read_parquet(
    input_data,
    engine='pyarrow',  # or use 'fastparquet'
    storage_options={"profile": 'Prep_DS'}
)

In [17]:
df.head()

Unnamed: 0,request_id,requested_at,affiliate_group_id,sell_rate_usd,sell_currency,profit_estimate_usd,total_nightly_rate,default_supplier_adjustment,default_supplier_percent_adjustment,global_adjustment,global_percent_adjustment,property_adjustment,tax,tax_recovery_fee,tax_recovery_fee_offset,fees_collected_at_booking,total,converted,recorded_at
0,1063bfd9f58fc15bc6f5155d,2024-04-03 07:56:46,525ebb7f701c6366050000e7,0.0,AUD,0.0,1230.15,0.0,33.759998,27.58,272.119995,0.0,125.129997,22.530001,0.0,0.0,1711.27,False,2024-04-04 07:22:20.462
1,107d1872869aef7cc827b740,2024-04-03 07:49:52,5ed59db3881860372aeb319b,0.0,EUR,0.0,770.78,0.0,55.5,0.0,64.75,0.0,154.160004,0.0,0.0,0.0,1045.19,False,2024-04-04 07:22:20.462
2,10a14822a1caea82eaf944d6,2024-04-03 07:59:26,62e83fa60a31a612b967ae0d,0.0,USD,0.0,1586.16,0.0,106.099998,0.0,0.0,0.0,149.710007,0.0,0.0,89.11,1931.08,False,2024-04-04 07:22:20.462
3,10daa8849925f12a0c036b42,2024-04-03 07:20:38,636aa622b84f5e00538a4f88,0.0,USD,0.0,10291.47,0.0,0.0,0.0,-252.139999,0.0,1916.109985,399.420013,0.0,0.0,12354.86,False,2024-04-04 07:22:20.462
4,10fef0f26fff6234335ba228,2024-04-03 07:38:38,5ed59e14881860372aeb777c,0.0,EUR,0.0,518.35,0.0,67.029999,0.0,-9.5,0.0,53.790001,3.79,0.0,0.0,633.46,False,2024-04-04 07:22:20.462


In [5]:
training_data_path = (
    model_config.data.s3_url.format(bucket=settings.training_data_bucket)
    if input_data is None
    else input_data
)

In [6]:
training_data = TrainingData(
    train_path=training_data_path,
    data_config=model_config.data,
    
)

In [11]:
data = training_data.read(TrainingChannel.TRAIN, profile="Prep_DS")

In [7]:
import d3rlpy
import os 

DATA_DIRECTORY = "d3rlpy_data"
DROPBOX_URL = "https://www.dropbox.com/s"
CARTPOLE_URL = f"{DROPBOX_URL}/uep0lzlhxpi79pd/cartpole_v1.1.0.h5?dl=1"
url = CARTPOLE_URL
file_name = "cartpole_replay_v1.1.0.h5"
data_path = os.path.join(DATA_DIRECTORY, file_name)


In [2]:
import argparse
import numpy as np

from ray.rllib.policy.sample_batch import convert_ma_batch_to_sample_batch
from ray.rllib.algorithms import cql as cql 
from ray.rllib.algorithms import DQN    
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.offline.estimators import (
    ImportanceSampling,
    WeightedImportanceSampling,
    DirectMethod,
    DoublyRobust,
)
from ray.rllib.offline.estimators.fqe_torch_model import FQETorchModel

  from .autonotebook import tqdm as notebook_tqdm
2024-04-18 11:51:41,240	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2024-04-18 11:51:42,433	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [3]:
parser = argparse.ArgumentParser()
parser.add_argument(
    "--as-test",
    action="store_true",
    help="Whether this script should be run as a test: --stop-reward must "
    "be achieved within --stop-timesteps AND --stop-iters.",
)
parser.add_argument(
    "--stop-iters", type=int, default=5, help="Number of iterations to train."
)
parser.add_argument(
    "--stop-reward", type=float, default=50.0, help="Reward at which we stop training."
)


_StoreAction(option_strings=['--stop-reward'], dest='stop_reward', nargs=None, const=None, default=50.0, type=<class 'float'>, choices=None, required=False, help='Reward at which we stop training.', metavar=None)

In [41]:
#args = parser.parse_args()

    # See rllib/tuned_examples/cql/pendulum-cql.yaml for comparison.
config = (
        cql.CQLConfig()
        .framework(framework="torch")
        .rollouts(num_rollout_workers=0)
        .debugging(log_level="INFO")
        .environment("Pendulum-v1", normalize_actions=True)
        .offline_data(
            input_config={
                "paths": ["tests/data/pendulum/enormous.zip"],
                "format": "json",
            }
        )
        .evaluation(
            evaluation_num_workers=1,
            evaluation_interval=1,
            evaluation_duration=10,
            evaluation_duration_unit="episodes",
            evaluation_config={
                "input": "sampler",
                "postprocess_inputs": False,
                "postprocess_outputs": False,
            },
 

        
        
        )
    )
    # evaluation_parallel_to_training should be False b/c iterations are very long
    # and this would cause evaluation to lag one iter behind training.

    # Check, whether we can learn from the given file in `num_iterations`
    # iterations, up to a reward of `min_reward`.
num_iterations = 5
min_reward = -300

# Test for torch framework (tf not implemented yet).

cql_algorithm = cql.CQL(config=config)

2024-04-18 13:02:49,288	INFO policy.py:1272 -- Policy (worker=local) running on CPU.
2024-04-18 13:02:49,289	INFO torch_policy.py:183 -- Found 0 visible cuda devices.
2024-04-18 13:02:49,297	INFO cql_torch_policy.py:89 -- Current iteration = 0


2024-04-18 13:02:49,308	INFO util.py:118 -- Using connectors:
2024-04-18 13:02:49,309	INFO util.py:119 --     AgentConnectorPipeline
        ObsPreprocessorConnector
        StateBufferConnector
        ViewRequirementAgentConnector
2024-04-18 13:02:49,309	INFO util.py:120 --     ActionConnectorPipeline
        ConvertToNumpyConnector
        NormalizeActionsConnector
        ImmutableActionsConnector
2024-04-18 13:02:49,309	INFO rollout_worker.py:1758 -- Built policy map: <PolicyMap lru-caching-capacity=100 policy-IDs=['default_policy']>
2024-04-18 13:02:49,310	INFO rollout_worker.py:1759 -- Built preprocessor map: {'default_policy': None}
2024-04-18 13:02:49,310	INFO rollout_worker.py:560 -- Built filter map: defaultdict(<class 'ray.rllib.utils.filter.NoFilter'>, {})
2024-04-18 13:02:52,066	INFO worker_set.py:324 -- Inferred observation/action spaces from remote worker (local worker has no env): {'default_policy': (Box([-1. -1. -8.], [1. 1. 8.], (3,), float32), Box(-2.0, 2.0, (1,), f

In [None]:
learnt = False
for i in range(num_iterations):
    print(f"Iter {i}")
    eval_results = cql_algorithm.train().get("evaluation")
    if eval_results:
        print("... R={}".format(eval_results["episode_reward_mean"]))
        # Learn until some reward is reached on an actual live env.
        if eval_results["episode_reward_mean"] >= min_reward:
            # Test passed gracefully.
            
            print("Test passed after {} iterations.".format(i))
            quit(0)
            learnt = True
            break


In [16]:
from ray.rllib.execution.rollout_ops import (
    synchronous_parallel_sample,
)

torch, _ = try_import_torch()


In [17]:
cql_policy = cql_algorithm.get_policy()
cql_model = cql_policy.model

In [19]:
from ray.data import read_json 
reader = read_json("small_evl/output-2024-04-18_11-49-47_worker-0_0.json")
reader.take(1)[0]['obs'][0:3]

2024-04-18 12:25:02,679	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-18_11-52-57_167737_73576/logs
2024-04-18 12:25:02,680	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> LimitOperator[limit=1]

                                                
[A

[[0.4835519790649414, -0.8753156661987305, 0.0943986102938652],
 [0.4569772481918335, -0.8894783854484558, -0.6022850275039673],
 [0.4007592797279358, -0.9161834120750427, -1.2449687719345093]]

In [28]:
from ray.rllib.offline.estimators.importance_sampling import ImportanceSampling
estimates_per_episode = ImportanceSampling.estimate_on_single_step_samples(batch=reader.take(1)[0] )

2024-04-18 12:33:55,721	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-18_11-52-57_167737_73576/logs
2024-04-18 12:33:55,722	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> LimitOperator[limit=1]

                                                
[A

TypeError: ImportanceSampling.estimate_on_single_step_samples() missing 1 required positional argument: 'self'

In [43]:
batch = synchronous_parallel_sample(worker_set=cql_algorithm.workers)
batch = convert_ma_batch_to_sample_batch(batch)
obs = torch.from_numpy(batch["obs"])
# Pass the observations through our model to get the
# features, which then to pass through the Q-head.
model_out, _ = cql_model({"obs": obs})
# The estimated Q-values from the (historic) actions in the batch.
q_values_old = cql_model.get_q_values(
    model_out, torch.from_numpy(batch["actions"])
)[0]
# The estimated Q-values for the new actions computed by our policy.
actions_new = cql_policy.compute_actions_from_input_dict({"obs": obs})[0]
q_values_new = cql_model.get_q_values(model_out, torch.from_numpy(actions_new))[0]
print(f"Q-val batch={q_values_old.detach().numpy()}")
print(f"Q-val policy={q_values_new.detach().numpy()}")

Q-val batch=[[-0.00762615]]
Q-val policy=[[-0.00702446]]


In [35]:
import json
import pandas as pd
file_path = 'small_evl/output-2024-04-18_11-49-47_worker-0_0.json'

# Open the file and load the data
data_list = []

# Open the file and read line by line
with open(file_path, 'r') as file:
    for line in file:
        # Load each line as a JSON object and append to list
        json_obj = json.loads(line)
        data_list.append(json_obj)

# Normalize and concatenate all data into a single DataFrame
df = pd.concat([pd.json_normalize(obj) for obj in data_list], ignore_index=True)

df.to_csv('small_evl/output-2024-04-18_11-49-47_worker-0_0.csv', index=False)