Skip to content

NIFI-7549 Adding Hazelcast based DistributedMapCacheClient support#4349

Closed
simonbence wants to merge 2 commits intoapache:masterfrom
simonbence:NIFI-7549
Closed

NIFI-7549 Adding Hazelcast based DistributedMapCacheClient support#4349
simonbence wants to merge 2 commits intoapache:masterfrom
simonbence:NIFI-7549

Conversation

@simonbence
Copy link
Contributor

@simonbence simonbence commented Jun 18, 2020

NIFI-7549

The PR contains my proposal of Hazelcast support for DistributedMapCacheClient. In general, I followed the patterns I found in the existing implementations, for the cases were not explicitly documented the behaviour follows them, mainly the ones were added with the feature itself (I considered them the most relevant and accurate implementations)

As for the organisation of the implementation, I did split the feature into three "layers". The package structure follows this as well. In the bottom, there is the HazelcastCache, and the implementation. This layer is responsible to directly communicate with the Hazelcast (via a provided connection) and hide the details of the used data structure. The current implementation is based on IMap, but there is the possibility to change or extend this. Also, the map-like data structure's interface is heavily changed between Hazelcast 3.x and 4.x. In case if the support would be needed for older implementations, wrapping the logic could help to avoid sprawl of the changes.

The layer above is the "cache manager" (HazelcastCacheManager). This is responsible to create the cache instances and maintain the connection. Currently there are two implementation: one which starts an embedded Hazelcast for easy usage and one which connects to a Hazelcast cluster running outside NiFi. The embedded provides a limited capability for configuration, but it could serve effectively as local cache. The "standalone" could joint to any non-enterprise Hazelcast. Note: I looked after how to connect with secured Hazelcast, but as I found it is part of the enterprise package. For now, it was not part of my intent to support that. This layer should hide all Hazelcast specific interface or implementation.

The top layer is the actual DistributedMapCacheClient implementation. Depends on both the bottom ones, as the manager is needed for acquiring the cache which it works with. All the NiFi specific logic is within this. AtomicDistributedMapCacheClient methods are supported. The revision handling comes in with this is general for all the entries. A long-based version is attached to all the entries.

Please share your thoughts on the proposal, I hope it would be useful for the community!
Enables X functionality; fixes bug NIFI-YYYY.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • Have you verified that the full build is successful on JDK 8?
  • Have you verified that the full build is successful on JDK 11?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.

@simonbence
Copy link
Contributor Author

Please hold on, I would like to add some changes

public static final PropertyDescriptor HAZELCAST_CLUSTER_NAME = new PropertyDescriptor.Builder()
.name("hazelcast-cluster-name")
.displayName("Hazelcast Cluster Name")
.description("Name of the embedded Hazelcast instance's cluster")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

embedded in this abstract class?

public static final PropertyDescriptor HAZELCAST_INSTANCE_NAME = new PropertyDescriptor.Builder()
.name("hazelcast-instance-name")
.displayName("Hazelcast Instance Name")
.description("Name of the embedded Hazelcast instance")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

embedded in this abstract class?

final NetworkConfig networkConfig = config.getNetworkConfig();
networkConfig.setPort(context.getProperty(HAZELCAST_PORT).asInteger());

if (context.getProperty(HAZELCAST_PORT_COUNT).isSet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't port count and port increment be allowed to be configured even when using the default port (i.e. context.getProperty(HAZELCAST_PORT).isSet() is false)?

.name("hazelcast-instance-name")
.displayName("Hazelcast Instance Name")
.description("Name of the embedded Hazelcast instance")
.required(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it shouldn't be necessary for the user to be bothered with this.
We could default it to the uuid of the service for example.

.name("hazelcast-cluster-name")
.displayName("Hazelcast Cluster Name")
.description("Name of the embedded Hazelcast instance's cluster")
.required(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default cluster name is "dev". Not sure if we want to leave it that way.

}

void lock() {
repository.lock(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock behind the scenes is reentrant so extra care should be taken so that the number of lock and unlock calls are consistent. I'd move this repository.lock(key) into the constructor.


@Override
public void close() throws IOException {
getLogger().debug("Closing HazelcastMapCacheClient");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
getLogger().debug("Closing HazelcastMapCacheClient");
getLogger().debug("Closing " + this.getClass().getSimpleName());

try(final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) {
final byte[] oldValue = cache.get(key);

if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_VERSION)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does entry.getRevision().get() < STARTING_VERSION resolve to true?

@simonbence simonbence closed this Jul 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants