Skip to content

Commit

Permalink
Feature / Job cache plugin (#414)
Browse files Browse the repository at this point in the history
* Move job cache framework into lib-orch

* Use plugin manager to create the job cache

* Add jobCache section to sample config files

* Rename CacheEntry

* Consistent naming for cache entry getters

* Clean up job cache API

* Start adding some basic tests for the job cache interface

* Do not increment cache revision on open ticket (only on modifying operations)

* Test stubs for job cache ticket mgmt

* Move local cache implementation of cache test suite into its own test class

* Job cache tests for openNewTicket()

* Job cache fixes needed for test suite

* Work on job cache tests

* Let local executor unit test work on windows

* Include run config for lib orch unit tests

* Finish tests for openTicket()

* Add a test to ensure transient members are not stored in the cache

* Add a test to check caching for different data types

* Change executor API to use Java Serializable for job state, rather than protobuf

* Change local batch executor to match updates in executor API

* Change SSH batch executor to match updates in executor API

* Cache corruption error class

* Update job cache interface

* Rename CacheTicket for job cache

* Update local job cache impl to match changes in cache API

* Update job cache test suite for API changes

* Update executor basic test suite for API changes

* Update orch service implementation after job cache changes

* Update orch API method impl for checkJob

* New LocalBatchState without protobuf

* Remove protobuf dependency from -lib-orch

* Fix a warning in CacheTicket

* Compile time type safety on the job cache

* Fix imports for orch service main class

* Add tests for closing cache tickets

* Add illegal arg check for closing a ticket

* Make superseded() return true after ticket expiry time

* Handle duplicate tickets for new items in local job cache

* Fix some IDEA warnings

* Remove protobuf dependency for SSH executor

* No longer necessary to force versions of transitive dependencies for the SSH executor (this is managed automatically in settings.gradle)

* Start sketching out tests for cache CRUD operations

* Introduce the concept of cache manager to create strongly-typed cache instances

* Update plugin framework to give out cache manager instances rather than typed cache instances

* Use the new cache manager to set up the job cache in the orchestrator

* Fill in some unit tests for the job cache

* Fail for tests not added yet

* Job cache testing work

* New exception for invalid cache operations

* Test stubs for cache CRUD operations

* Job cache test cases for readEntry()

* First pass on CRUD unit tests for job cache

* Rename job cache CRUD operations

* Valid entry status in job cache

* No expiry for missing cache tickets

* Fix expected exception types for missing / new tickets

* Fix remaining tests for job cache

* Bump dependency versions for compliance

* Fix use info and executor state in job cache

* Force update of Nimbus JWT version for Azure SDK

* Increase timeout on compliance jobs
  • Loading branch information
martin-traverse committed Feb 24, 2024
1 parent 4ed4e3d commit f1204c4
Show file tree
Hide file tree
Showing 40 changed files with 3,449 additions and 651 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/compliance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
platform_compliance:

runs-on: ubuntu-latest
timeout-minutes: 20
timeout-minutes: 60

steps:

Expand Down Expand Up @@ -75,7 +75,7 @@ jobs:
python_runtime_compliance:

runs-on: ubuntu-latest
timeout-minutes: 20
timeout-minutes: 60

steps:

Expand Down Expand Up @@ -134,7 +134,7 @@ jobs:
web_api_compliance:

runs-on: ubuntu-latest
timeout-minutes: 20
timeout-minutes: 60

steps:

Expand Down
4 changes: 4 additions & 0 deletions dev/config/trac-devlocal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ executor:
venvPath: C:\Dev\trac\venv


jobCache:
protocol: LOCAL


instances:

meta:
Expand Down
13 changes: 13 additions & 0 deletions dev/ide/idea/runConfigurations/Unit_tests___lib_orch.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Unit tests: -lib-orch" type="JUnit" factoryName="JUnit">
<module name="tracdap.tracdap-lib-orch.test" />
<option name="MAIN_CLASS_NAME" value="" />
<option name="METHOD_NAME" value="" />
<option name="TEST_OBJECT" value="tags" />
<option name="VM_PARAMETERS" value="-ea --add-opens java.base/jdk.internal.misc=ALL-UNNAMED --add-opens java.base/java.nio=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true" />
<tag value="!integration &amp; !slow" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
5 changes: 5 additions & 0 deletions dist/template/etc/trac-platform.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ repositories:
properties:
repoUrl: https://github.com/finos/tracdap


executor:
protocol: LOCAL
properties:
# Set this to the venv of your local TRAC execution environment
venvPath: /path/to/venv


jobCache:
protocol: LOCAL


instances:

meta:
Expand Down
7 changes: 5 additions & 2 deletions gradle/versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ext {
h2_version = '2.1.214'
mysql_version = '8.2.0'
mariadb_version = '3.0.8'
postgresql_version = '42.6.0'
postgresql_version = '42.7.2'
sqlserver_version = '9.4.1.jre11' // Update to SqlServer 10.x driver series is a breaking change
oracle_version = '19.14.0.0'

Expand All @@ -68,7 +68,10 @@ ext {

aws_sdk_version = '2.21.11'
gcp_sdk_version = '26.26.0'
azure_sdk_version = '1.2.19'
azure_sdk_version = '1.2.20'

// Temporary override to address CVE-2023-52428 (remove once Azure SDK updates this dependency)
azure_nimbus_version = '9.37.3'

// AWS SDK uses Reactive Streams
// Only the latest version has a good license (MIT-0, prior versions were Creative Commons)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package org.finos.tracdap.common.auth.internal;

import java.io.Serializable;

public class UserInfo {

public class UserInfo implements Serializable {

private final static long serialVersionUID = 1L;

private String userId;
private String displayName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Accenture Global Solutions Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.finos.tracdap.common.exception;


/**
* Thrown when a cache entry has become corrupt
*
* <p>This error indicates that an individual cache entry has become corrupt and is no longer
* readable. TRAC will try to handle this error so that individual corrupt entries do not
* pollute the cache and stop it from functioning. For cache query operations corrupt
* entries will be returned with no value, cacheError() will return an ECacheCorruption error.
* Calls to getEntry() will throw this exception if the entry is corrupt.
*
* <p>One cause of cache corruption is jobs in the cache that reference old class files.
* This could happen after the upgrade of an executor plugin, if there are incompatible
* changes in the executor state class. Although TRAC has error handling to clean up affected
* jobs, executors should take care when updating their state classes to avoid this issue.</p>
*/
public class ECacheCorruption extends ECache {

public ECacheCorruption(String message, Throwable cause) {
super(message, cause);
}

public ECacheCorruption(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Accenture Global Solutions Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.finos.tracdap.common.exception;

/**
* Job already exists in the orchestrator service
*/
public class ECacheDuplicate extends ECache {

public ECacheDuplicate(String message, Throwable cause) {
super(message, cause);
}

public ECacheDuplicate(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Accenture Global Solutions Limited
* Copyright 2023 Accenture Global Solutions Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,27 +14,19 @@
* limitations under the License.
*/

syntax = 'proto3';
package tracdap.exec.ssh;
package org.finos.tracdap.common.exception;

option java_package = "org.finos.tracdap.plugins.exec.ssh";
option java_multiple_files = true;

/**
* A request was made to perform an illegal cache operation.
*/
public class ECacheOperation extends ECache {

message SshBatchState {

string remoteHost = 1;
int32 remotePort = 2;
string batchUser = 3;

string batchDir = 4;
repeated string volumes = 5;

int64 pid = 6;
}


message SshExecutorState {

public ECacheOperation(String message, Throwable cause) {
super(message, cause);
}

public ECacheOperation(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class PluginServiceInfo {
/** Standard service types for execution services **/
public static final String EXECUTION_SERVICE_TYPE = "EXECUTION";

/** Standard service types for execution services **/
public static final String JOB_CACHE_SERVICE_TYPE = "JOB_CACHE";

/** Standard service type for the metadata DAL **/
public static final String METADATA_DAL_SERVICE_TYPE = "METADATA_DAL";

Expand All @@ -72,6 +75,7 @@ public class PluginServiceInfo {
Map.entry("org.finos.tracdap.common.storage.IFileStorage", FILE_STORAGE_SERVICE_TYPE),
Map.entry("org.finos.tracdap.common.storage.IDataStorage", DATA_STORAGE_SERVICE_TYPE),
Map.entry("org.finos.tracdap.common.exec.IBatchExecutor", EXECUTION_SERVICE_TYPE),
Map.entry("org.finos.tracdap.common.cache.IJobCacheManager", JOB_CACHE_SERVICE_TYPE),
Map.entry("org.finos.tracdap.svc.meta.dal.IMetadataDal", METADATA_DAL_SERVICE_TYPE));

private final Class<?> serviceClass;
Expand Down
30 changes: 1 addition & 29 deletions tracdap-libs/tracdap-lib-orch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

plugins {
id "java-library"
id "com.google.protobuf"
}

apply from: "${rootProject.projectDir.path}/gradle/base-java.gradle"
Expand All @@ -34,33 +33,6 @@ dependencies {
testImplementation project(':tracdap-lib-test')
}

protobuf {

protoc {
artifact = "com.google.protobuf:protoc:$proto_version"
}

generateProtoTasks.generatedFilesBaseDir = "${project.buildDir}/generated-sources"

generateProtoTasks {
//noinspection GroovyAssignabilityCheck
all().each { task -> task.builtins { java {} } }
}
}

sourceSets {
main {
java {
// Include generated code from protoc
// Including the proto src dir lets autocomplete work in the IDE
srcDirs "${project.buildDir}/generated-sources/main/java"
srcDirs "${projectDir}/src/main/proto"
}
}
}



// Compiling dependent targets does not require processResources to be executed by default
// E.g. running tests from the IDE will not trigger processResources - this can cause confusion!
// This dependency ensures resources are always processed, even for partial builds
Expand All @@ -76,7 +48,7 @@ configurations {
}
}

task testJar(type: Jar) {
tasks.register('testJar', Jar) {
archiveClassifier.set('test')
from sourceSets.test.output
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,38 @@
* limitations under the License.
*/

package org.finos.tracdap.svc.orch.cache;
package org.finos.tracdap.common.cache;

import org.finos.tracdap.common.exception.ECache;

public class CacheQueryResult<TValue> {

public final class CacheEntry<TValue> {

private final String key;
private final int revision;
private final String status;
private final TValue value;
private final ECache cacheError;

public static <TValue>
CacheEntry<TValue> forValue(String key, int revision, String status, TValue value) {

return new CacheEntry<>(key, revision, status, value, null);
}

public static <TValue>
CacheEntry<TValue> error(String key, int revision, String status, ECache cacheError) {

return new CacheEntry<>(key, revision, status, null, cacheError);
}

private CacheEntry(String key, int revision, String status, TValue value, ECache cacheError) {

public CacheQueryResult(String key, int revision, String status, TValue value) {
this.key = key;
this.revision = revision;
this.status = status;
this.value = value;
this.cacheError = cacheError;
}

public String key() {
Expand All @@ -39,11 +56,24 @@ public int revision() {
return revision;
}

public String getStatus() {
public String status() {
return status;
}

public TValue value() {

// Attempting to access the value for an invalid entry - throw the error
if (cacheError != null)
throw cacheError;

return value;
}

public boolean cacheOk() {
return cacheError == null;
}

public ECache cacheError() {
return cacheError;
}
}
Loading

0 comments on commit f1204c4

Please sign in to comment.