Skip to content

Commit

Permalink
API-553 Compact serialization and Generic Records (#1172)
Browse files Browse the repository at this point in the history
  • Loading branch information
srknzl committed Apr 22, 2022
1 parent 5a07045 commit e1123a5
Show file tree
Hide file tree
Showing 111 changed files with 13,372 additions and 1,040 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
"scripts": {
"clean": "rimraf lib *.jar *.log *.xml coverage",
"compile": "tsc",
"precompile": "rimraf lib",
"test": "node scripts/test-runner.js all",
"test:unit": "node scripts/test-runner.js unit",
"test:integration": "node scripts/test-runner.js integration",
Expand Down
2 changes: 1 addition & 1 deletion scripts/code-sample-checker.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ exports.main = async (cluster) => {
console.log(`Will run ${JSON.stringify(files)}`);

const numberOfFiles = files.length;
let counter = 0;
let counter = 1;
for (const file of files) {
// start and terminate for each code sample to avoid map name clashes
const member = await RC.startMember(cluster.id);
Expand Down
4 changes: 2 additions & 2 deletions scripts/download-rc.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const HZ_VERSION = '5.1-SNAPSHOT';
const HZ_TEST_VERSION = '5.1-SNAPSHOT';
const HZ_VERSION = '5.1';
const HZ_TEST_VERSION = '5.1';
const HAZELCAST_TEST_VERSION = HZ_TEST_VERSION;
const HAZELCAST_VERSION = HZ_VERSION;
const HAZELCAST_ENTERPRISE_VERSION = HZ_VERSION;
Expand Down
3 changes: 1 addition & 2 deletions scripts/test-runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ let testProcess;
let runTests = true;
let CLASSPATH = `hazelcast-remote-controller-${HAZELCAST_RC_VERSION}.jar${PATH_SEPARATOR}`
+ `hazelcast-${HAZELCAST_TEST_VERSION}-tests.jar${PATH_SEPARATOR}`
+ `hazelcast-sql-${HAZELCAST_VERSION}.jar${PATH_SEPARATOR}`
+ 'test/javaclasses';
+ `hazelcast-sql-${HAZELCAST_VERSION}.jar${PATH_SEPARATOR}`;

if (HAZELCAST_ENTERPRISE_KEY) {
CLASSPATH = `hazelcast-enterprise-${HAZELCAST_ENTERPRISE_VERSION}.jar${PATH_SEPARATOR}`
Expand Down
44 changes: 26 additions & 18 deletions src/HazelcastClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import {ClientMessage} from './protocol/ClientMessage';
import {Connection} from './network/Connection';
import {ConnectionRegistryImpl} from './network/ConnectionRegistry';
import {SqlService, SqlServiceImpl} from './sql/SqlService';
import {SchemaService} from './serialization/compact/SchemaService';

/**
* Hazelcast client instance. When you want to use Hazelcast's distributed
Expand Down Expand Up @@ -129,6 +130,8 @@ export class HazelcastClient {
private readonly sqlService: SqlService;
/** @internal */
private shutdownPromise: Promise<void> | undefined;
/** @internal */
private readonly schemaService: SchemaService;

/** @internal */
constructor(config?: ClientConfigImpl, failoverConfig?: ClientFailoverConfigImpl) {
Expand All @@ -140,26 +143,28 @@ export class HazelcastClient {
this.loadBalancer = this.initLoadBalancer();
this.failoverConfig = failoverConfig;
this.errorFactory = new ClientErrorFactory();
this.serializationService = new SerializationServiceV1(this.config.serialization);
this.instanceName = this.config.instanceName || 'hz.client_' + this.id;
this.loggingService = new LoggingService(this.config.customLogger,
this.config.properties['hazelcast.logging.level'] as string);
const logger = this.loggingService.getLogger();
this.schemaService = new SchemaService(() => this.invocationService, logger);
this.serializationService = new SerializationServiceV1(this.config.serialization, this.schemaService);
this.instanceName = this.config.instanceName || 'hz.client_' + this.id;
this.nearCacheManager = new NearCacheManager(
this.config,
this.serializationService
);
this.partitionService = new PartitionServiceImpl(
this.loggingService.getLogger(),
logger,
this.serializationService
);
this.lifecycleService = new LifecycleServiceImpl(
this.config.lifecycleListeners,
this.loggingService.getLogger()
logger
);
this.clusterFailoverService = this.initClusterFailoverService();
this.clusterService = new ClusterService(
this.config,
this.loggingService.getLogger(),
logger,
this.clusterFailoverService
);
this.connectionRegistry = new ConnectionRegistryImpl(
Expand All @@ -171,17 +176,19 @@ export class HazelcastClient {
);
this.invocationService = new InvocationService(
this.config,
this.loggingService.getLogger(),
logger,
this.partitionService as PartitionServiceImpl,
this.errorFactory,
this.lifecycleService,
this.connectionRegistry
this.connectionRegistry,
this.schemaService,
this.serializationService
);
this.connectionManager = new ConnectionManager(
this,
this.instanceName,
this.config,
this.loggingService.getLogger(),
logger,
this.partitionService,
this.serializationService,
this.lifecycleService,
Expand All @@ -192,15 +199,15 @@ export class HazelcastClient {
this.connectionRegistry
);
this.listenerService = new ListenerService(
this.loggingService.getLogger(),
logger,
this.config.network.smartRouting,
this.connectionManager,
this.invocationService
);
this.lockReferenceIdGenerator = new LockReferenceIdGenerator();
this.proxyManager = new ProxyManager(
this.config,
this.loggingService.getLogger(),
logger,
this.invocationService,
this.listenerService,
this.partitionService,
Expand All @@ -209,7 +216,8 @@ export class HazelcastClient {
() => this.getRepairingTask(),
this.clusterService,
this.lockReferenceIdGenerator,
this.connectionRegistry
this.connectionRegistry,
this.schemaService
);
this.statistics = new Statistics(
this.loggingService.getLogger(),
Expand All @@ -220,14 +228,14 @@ export class HazelcastClient {
this.connectionManager
);
this.clusterViewListenerService = new ClusterViewListenerService(
this.loggingService.getLogger(),
logger,
this.connectionManager,
this.partitionService as PartitionServiceImpl,
this.clusterService,
this.invocationService
);
this.cpSubsystem = new CPSubsystemImpl(
this.loggingService.getLogger(),
logger,
this.instanceName,
this.invocationService,
this.serializationService
Expand Down Expand Up @@ -304,12 +312,11 @@ export class HazelcastClient {
const clientMessage = ClientGetDistributedObjectsCodec.encodeRequest();
let localDistributedObjects: Set<string>;
let responseMessage: ClientMessage;
return this.invocationService.invokeOnRandomTarget(clientMessage)
return this.invocationService.invokeOnRandomTarget(clientMessage, x => x)
.then((resp) => {
responseMessage = resp;
return this.proxyManager.getDistributedObjects();
})
.then((distributedObjects) => {
}).then((distributedObjects) => {
localDistributedObjects = new Set<string>();
distributedObjects.forEach((obj) => {
localDistributedObjects.add(obj.getServiceName() + NAMESPACE_SEPARATOR + obj.getName());
Expand Down Expand Up @@ -491,7 +498,6 @@ export class HazelcastClient {

/**
* Returns a service to execute distributed SQL queries.
* The service is in beta state. Behavior and API might be changed in future releases.
*
* @returns SQL service
*
Expand Down Expand Up @@ -562,7 +568,9 @@ export class HazelcastClient {

/** @internal */
sendStateToCluster(): Promise<void> {
return this.proxyManager.createDistributedObjectsOnCluster();
return this.schemaService.sendAllSchemas().then(() => {
return this.proxyManager.createDistributedObjectsOnCluster();
});
}

/** @internal */
Expand Down
16 changes: 14 additions & 2 deletions src/PartitionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import {ILogger} from './logging/ILogger';
import {Connection} from './network/Connection';
import {ClientOfflineError, UUID} from './core';
import {ClientOfflineError, HazelcastSerializationError, SchemaNotReplicatedError, UUID} from './core';
import {SerializationService} from './serialization/SerializationService';

/**
Expand Down Expand Up @@ -47,6 +47,7 @@ export interface PartitionService {
* @param key
* @returns the partition id.
* @throws {@link ClientOfflineError} if the partition table has not arrived yet.
* @throws {@link HazelcastSerializationError} if key cannot be serialized
*/
getPartitionId(key: any): number;

Expand Down Expand Up @@ -106,7 +107,18 @@ export class PartitionServiceImpl implements PartitionService {
if (typeof key === 'object' && 'getPartitionHash' in key) {
partitionHash = key.getPartitionHash();
} else {
partitionHash = this.serializationService.toData(key).getPartitionHash();
try {
partitionHash = this.serializationService.toData(key).getPartitionHash();
} catch (e) {
if (e instanceof SchemaNotReplicatedError) {
throw new HazelcastSerializationError(
'Cannot compute partition id of compact objects whose schema is not replicated in the cluster. ' +
'To replicate its schema to the cluster, you can use this object in any async operation where ' +
'it is serialized.'
, e
);
}
}
}
return Math.abs(partitionHash) % this.partitionCount;
}
Expand Down
55 changes: 55 additions & 0 deletions src/codec/ClientFetchSchemaCodec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* eslint-disable max-len */
import {BitsUtil} from '../util/BitsUtil';
import {FixSizedTypesCodec} from './builtin/FixSizedTypesCodec';
import {ClientMessage, Frame, PARTITION_ID_OFFSET} from '../protocol/ClientMessage';
import * as Long from 'long';
import {Schema} from '../serialization/compact/Schema';
import {SchemaCodec} from './custom/SchemaCodec';
import {CodecUtil} from './builtin/CodecUtil';

// hex: 0x001400
const REQUEST_MESSAGE_TYPE = 5120;
// hex: 0x001401
// RESPONSE_MESSAGE_TYPE = 5121

const REQUEST_SCHEMA_ID_OFFSET = PARTITION_ID_OFFSET + BitsUtil.INT_SIZE_IN_BYTES;
const REQUEST_INITIAL_FRAME_SIZE = REQUEST_SCHEMA_ID_OFFSET + BitsUtil.LONG_SIZE_IN_BYTES;

/** @internal */
export class ClientFetchSchemaCodec {
static encodeRequest(schemaId: Long): ClientMessage {
const clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(true);

const initialFrame = Frame.createInitialFrame(REQUEST_INITIAL_FRAME_SIZE);
FixSizedTypesCodec.encodeLong(initialFrame.content, REQUEST_SCHEMA_ID_OFFSET, schemaId);
clientMessage.addFrame(initialFrame);
clientMessage.setMessageType(REQUEST_MESSAGE_TYPE);
clientMessage.setPartitionId(-1);

return clientMessage;
}

static decodeResponse(clientMessage: ClientMessage): Schema {
// empty initial frame
clientMessage.nextFrame();

return CodecUtil.decodeNullable(clientMessage, SchemaCodec.decode);
}
}
45 changes: 45 additions & 0 deletions src/codec/ClientSendAllSchemasCodec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* eslint-disable max-len */
import {BitsUtil} from '../util/BitsUtil';
import {ClientMessage, Frame, PARTITION_ID_OFFSET} from '../protocol/ClientMessage';
import {Schema} from '../serialization/compact/Schema';
import {SchemaCodec} from './custom/SchemaCodec';
import {ListMultiFrameCodec} from './builtin/ListMultiFrameCodec';

// hex: 0x001500
const REQUEST_MESSAGE_TYPE = 5376;
// hex: 0x001501
// RESPONSE_MESSAGE_TYPE = 5377

const REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_OFFSET + BitsUtil.INT_SIZE_IN_BYTES;

/** @internal */
export class ClientSendAllSchemasCodec {
static encodeRequest(schemas: Schema[]): ClientMessage {
const clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(true);

const initialFrame = Frame.createInitialFrame(REQUEST_INITIAL_FRAME_SIZE);
clientMessage.addFrame(initialFrame);
clientMessage.setMessageType(REQUEST_MESSAGE_TYPE);
clientMessage.setPartitionId(-1);

ListMultiFrameCodec.encode(clientMessage, schemas, SchemaCodec.encode);
return clientMessage;
}
}
44 changes: 44 additions & 0 deletions src/codec/ClientSendSchemaCodec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* eslint-disable max-len */
import {BitsUtil} from '../util/BitsUtil';
import {ClientMessage, Frame, PARTITION_ID_OFFSET} from '../protocol/ClientMessage';
import {Schema} from '../serialization/compact/Schema';
import {SchemaCodec} from './custom/SchemaCodec';

// hex: 0x001300
const REQUEST_MESSAGE_TYPE = 4864;
// hex: 0x001301
// RESPONSE_MESSAGE_TYPE = 4865

const REQUEST_INITIAL_FRAME_SIZE = PARTITION_ID_OFFSET + BitsUtil.INT_SIZE_IN_BYTES;

/** @internal */
export class ClientSendSchemaCodec {
static encodeRequest(schema: Schema): ClientMessage {
const clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(true);

const initialFrame = Frame.createInitialFrame(REQUEST_INITIAL_FRAME_SIZE);
clientMessage.addFrame(initialFrame);
clientMessage.setMessageType(REQUEST_MESSAGE_TYPE);
clientMessage.setPartitionId(-1);

SchemaCodec.encode(clientMessage, schema);
return clientMessage;
}
}
Loading

0 comments on commit e1123a5

Please sign in to comment.