In [1]:
import os
import ray
if os.getcwd() != "C:/Users/Jerome/PycharmProjects/FASTgres/fastgres":
    os.chdir("C:/Users/Jerome/PycharmProjects/FASTgres/fastgres")

print(os.getcwd())

from workloads.workload import Workload
from query_encoding.query import Query
from query_encoding.encoded_query import EncodedQuery, EncodingInformation, Context
from baseline.database_connection import DatabaseConnection
from models.experience import Experience
from baseline.experiment import Experiment
from definitions import PG_STACK_OVERFLOW
from models.train_model import Settings
import random
import numpy as np
import baseline.utility as u
from tqdm.notebook import tqdm
import time

workload = Workload("workloads/queries/stack/", "stack")
workload.load_queries()

C:\Users\Jerome\PycharmProjects\FASTgres\fastgres


KeyError: 'DBConnections'

In [2]:
@ray.remote
def build_query(q, w):
    return Query(q, w)

In [3]:
# query_list = list()
# t0 = time.time()
# for query_name in tqdm(workload.queries):
#     query = build_query.remote(query_name, workload)
#     query_list.append(query)
# vals = ray.get(query_list)
# query_dict = dict(list(zip(workload.queries, vals)))
# t1 = time.time()

In [4]:

from models.sync_model import Synchronizer

@ray.remote
class TrainingPhase:

    def __init__(self, experiment: Experiment):
        self.training_queries = experiment.training_queries
        self.enc_info = experiment.encoding_info
        self.db_conn = experiment.db_connection
        self.archive = experiment.archive
        self.models = None
        self.workload = experiment.workload
        self.settings = experiment.settings

    def train(self):
        print("Setting query information")
        global_experience = Experience(self.archive)

        query_list = list()
        t0 = time.time()
        for query_name in tqdm(self.training_queries):
            query = build_query.remote(query_name, workload)
            query_list.append(query)
        vals = ray.get(query_list)
        query_dict = dict(list(zip(workload.queries, vals)))
        t1 = time.time()
        print(f"Query building time: {t1-t0}")

        for query_name in self.training_queries:
            global_experience.add_experience(query_dict[query_name])

        print("Training context models")
        context_queries = global_experience.sort_by_context()
        context_models = dict()
        for context_set in tqdm(context_queries):
            context = Context()
            context.add_context(context_set)
            local_experience = Experience(self.archive)
            local_experience.add_experiences(context_queries[context_set])
            synchronizer = Synchronizer(context, self.db_conn, self.enc_info, local_experience, self.settings)
            synchronizer.build_model()
            context_models[context_set] = synchronizer
        self.models = context_models
        print(f"Finished training context models. Totalling {len(list(self.models.keys()))} models.")
        return self.models

    def save_models(self, save_path: str):
        if self.models is not None:
            u.save_pickle(self.models, save_path)
        else:
            raise ValueError("Trying to save None model")

@ray.remote
class TestingPhase:

    def __init__(self, experiment: Experiment, context_models: dict):
        self.experiment = experiment
        self.archive = experiment.archive
        self.workload = experiment.workload
        self.testing_queries = experiment.testing_queries
        self.encoding_info = experiment.encoding_info
        self.context_models = context_models
        self.predictions = {'initial': {}, 'final': {}}

    def test(self):
        print("Starting testing phase")
        for query_name in self.testing_queries[:10]:
            query = Query(query_name, self.workload)
            context_set = query.context
            context = Context()
            context.add_context(context_set)
            encoded_query = EncodedQuery(context, query, self.encoding_info)
            encoded_query.encode_query()

            synchronizer: Synchronizer = self.context_models[context_set]
            prediction = synchronizer.model.predict(encoded_query.encoded_query)
            print(f"Query {query.name[:8]} with prediction: {prediction}")
            self.predictions['initial'][query_name] = prediction

            # no retraining at the moment
            # synchronizer.pre_execution()
            # execution_time = synchronizer.run_query(query, HintSet(prediction), self.workload, use_timeout=True)
            # global_reduction = synchronizer.post_execution(encoded_query, execution_time)
            # for c_set, sync in self.context_models.items():
            #     sync.reduce_cooldown(global_reduction)

In [5]:
ray.init()

2023-12-14 09:44:15,347	INFO worker.py:1673 -- Started a local Ray instance.


0,1
Python version:,3.9.13
Ray version:,2.8.1


In [6]:
database_connection = DatabaseConnection(PG_STACK_OVERFLOW, "stack_connection")

encoding_info = EncodingInformation(database_connection, "database_statistics/stack/", workload)
encoding_info.load_encoding_info()

print("Prepass done")

print(os.getcwd())
archive_path = "archives/stack/12.4/stack_eval_dict_ax2.json"
train_size = 0.9
seed = 29
percentile = 99
absolute = 1.0
settings = Settings(seed, percentile, absolute)

# Setting random seeds for the run
np.random.seed(settings.seed)
random.Random(settings.seed)

print("Setting up experiment.")

experiment = Experiment(workload, encoding_info, database_connection,
                        archive_path, train_size, settings)

print("Starting Training.")

t0 = time.time()
obj_ref = TrainingPhase.remote(experiment)
# training_phase = ray.get(training_phase)
model_obj_ref = obj_ref.train.remote()
models = ray.get(model_obj_ref)
t1 = time.time()
print(f"Distributed training time: {t1-t0}s.")

# testing_phase = TestingPhase(experiment, training_phase.models)
# testing_phase.test()


Prepass done
C:\Users\Jerome\PycharmProjects\FASTgres\fastgres
Setting up experiment.
Starting Training.
[36m(TrainingPhase pid=22292)[0m Setting query information
[36m(TrainingPhase pid=22292)[0m   0%|          | 0/5571 [00:00<?, ?it/s]
[36m(TrainingPhase pid=22292)[0m Query building time: 49.53076434135437


RayTaskError(KeyError): [36mray::TrainingPhase.train()[39m (pid=22292, ip=127.0.0.1, actor_id=e13ba6826ae7358c4891f28301000000, repr=<__main__.TrainingPhase object at 0x0000012B85676A30>)
  File "python\ray\_raylet.pyx", line 1675, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 1615, in ray._raylet.execute_task.function_executor
  File "C:\Users\Jerome\PycharmProjects\FASTgres\venv\lib\site-packages\ray\_private\function_manager.py", line 726, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "C:\Users\Jerome\PycharmProjects\FASTgres\venv\lib\site-packages\ray\util\tracing\tracing_helper.py", line 467, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\Users\Jerome\AppData\Local\Temp\ipykernel_21980\3568875257.py", line 30, in train
KeyError: 'q8-016.sql'

In [10]:
ray.shutdown()