Skip to content

Commit

Permalink
Merge cf92a37 into 655ecc1
Browse files Browse the repository at this point in the history
  • Loading branch information
bullet-tooth committed Jun 4, 2020
2 parents 655ecc1 + cf92a37 commit f40093c
Show file tree
Hide file tree
Showing 21 changed files with 676 additions and 63 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -10,6 +10,8 @@ branches:
# … and release branches, e.g. jlc/v0.2.0-release
- /^.+-release$/
- core-1.0
# Temporary enable
- async-migrations

addons:
sonarcloud:
Expand Down
5 changes: 4 additions & 1 deletion exonum-java-binding/CHANGELOG.md
Expand Up @@ -16,8 +16,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added

- Java 14 support. (#1509)
- Asynchronous data migration support. (#1555, #1558)

### Changed
- `ServiceModule`'s can be package-private now. (#1557)

## [0.10.0] - 2020-04-03

Expand Down
Expand Up @@ -49,6 +49,9 @@ public abstract class ServiceArtifactId {

/**
* Returns the artifact version of this service (e.g., "1.2.0").
* The value should have a valid <b>Semantic versioning</b> format.
*
* @see <a href="https://semver.org/">Semantic versioning</a>
*/
public abstract String getVersion();

Expand Down
6 changes: 6 additions & 0 deletions exonum-java-binding/core/pom.xml
Expand Up @@ -111,6 +111,12 @@
<artifactId>pf4j</artifactId>
</dependency>

<dependency>
<groupId>com.github.zafarkhaja</groupId>
<artifactId>java-semver</artifactId>
<version>${java-semver.version}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Expand Up @@ -18,7 +18,9 @@

import com.exonum.binding.common.runtime.ServiceArtifactId;
import com.exonum.binding.core.service.ServiceModule;
import com.exonum.binding.core.service.migration.MigrationScript;
import com.google.auto.value.AutoValue;
import java.util.List;
import java.util.function.Supplier;


Expand All @@ -41,8 +43,17 @@ abstract class LoadedServiceDefinition {
*/
public abstract Supplier<ServiceModule> getModuleSupplier();

/**
* Returns {@linkplain MigrationScript migration script} suppliers for performing asynchronous
* migration of the service.
* It will always return the same number of scripts, but different instances.
*/
public abstract List<Supplier<MigrationScript>> getMigrationScripts();

static LoadedServiceDefinition newInstance(ServiceArtifactId artifactId,
Supplier<ServiceModule> serviceModuleSupplier) {
return new AutoValue_LoadedServiceDefinition(artifactId, serviceModuleSupplier);
Supplier<ServiceModule> serviceModuleSupplier,
List<Supplier<MigrationScript>> migrationScripts) {
return new AutoValue_LoadedServiceDefinition(artifactId, serviceModuleSupplier,
migrationScripts);
}
}
Expand Up @@ -23,6 +23,7 @@

import com.exonum.binding.common.runtime.ServiceArtifactId;
import com.exonum.binding.core.service.ServiceModule;
import com.exonum.binding.core.service.migration.MigrationScript;
import com.google.common.base.MoreObjects;
import com.google.inject.Inject;
import java.nio.file.Path;
Expand All @@ -34,6 +35,7 @@
import java.util.TreeMap;
import java.util.function.Supplier;
import org.pf4j.Extension;
import org.pf4j.ExtensionPoint;
import org.pf4j.PluginManager;
import org.pf4j.PluginState;

Expand Down Expand Up @@ -122,9 +124,10 @@ private void startPlugin(String pluginId) throws ServiceLoadingException {
/** Loads the service definition from the already loaded plugin with the given id. */
private LoadedServiceDefinition loadDefinition(String pluginId) throws ServiceLoadingException {
ServiceArtifactId artifactId = extractServiceId(pluginId);
Supplier<ServiceModule> serviceModuleSupplier = findServiceModuleSupplier(pluginId);
var serviceModuleSupplier = findServiceModuleSupplier(pluginId);
var migrationScripts = findMigrationScripts(pluginId);
LoadedServiceDefinition serviceDefinition =
LoadedServiceDefinition.newInstance(artifactId, serviceModuleSupplier);
LoadedServiceDefinition.newInstance(artifactId, serviceModuleSupplier, migrationScripts);

assert !loadedServices.containsKey(artifactId);
loadedServices.put(artifactId, serviceDefinition);
Expand Down Expand Up @@ -153,11 +156,30 @@ private Supplier<ServiceModule> findServiceModuleSupplier(String pluginId)
checkServiceModules(pluginId, extensionClasses);

Class<? extends ServiceModule> serviceModuleClass = extensionClasses.get(0);
return createReflectiveExtensionSupplier(pluginId, serviceModuleClass);
}

private List<Supplier<MigrationScript>> findMigrationScripts(String pluginId)
throws ServiceLoadingException {
List<Class<? extends MigrationScript>> classes = pluginManager
.getExtensionClasses(MigrationScript.class, pluginId);

List<Supplier<MigrationScript>> suppliers = new ArrayList<>(classes.size());
for (var extension : classes) {
suppliers.add(createReflectiveExtensionSupplier(pluginId, extension));
}

return suppliers;
}

private <T extends ExtensionPoint> Supplier<T> createReflectiveExtensionSupplier(
String pluginId,
Class<? extends T> extensionClass) throws ServiceLoadingException {
try {
return new ReflectiveModuleSupplier(serviceModuleClass);
return new ReflectiveExtensionSupplier<>(extensionClass);
} catch (NoSuchMethodException | IllegalAccessException e) {
String message = String.format("Cannot load a plugin (%s): module (%s) is not valid",
pluginId, serviceModuleClass);
String message = String.format("Cannot load a plugin (%s): extension (%s) is not valid",
pluginId, extensionClass);
throw new ServiceLoadingException(message, e);
}
}
Expand Down
Expand Up @@ -16,60 +16,58 @@

package com.exonum.binding.core.runtime;

import com.exonum.binding.core.service.ServiceModule;
import com.google.common.base.MoreObjects;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.function.Supplier;

// todo: such implementation is nicer in terms of error handling (it happens upfront), but
// does not allow package-private modules until Java 9 with MethodHandles#privateLookupIn
// [ECR-3008, ECR-521]
import org.pf4j.ExtensionPoint;

/**
* A reflective supplier of service modules that instantiates them with a no-arg constructor.
*/
public final class ReflectiveModuleSupplier implements Supplier<ServiceModule> {
public final class ReflectiveExtensionSupplier<T extends ExtensionPoint> implements Supplier<T> {

private final Class<? extends ServiceModule> moduleClass;
private final MethodHandle moduleConstructor;
private final Class<? extends T> extensionClass;
private final MethodHandle extensionConstructor;

/**
* Creates a module supplier for a given service module class.
*
* @throws NoSuchMethodException if the constructor of given service module class does not exist
* @throws IllegalAccessException if accessing the no-arg module constructor failed
*/
public ReflectiveModuleSupplier(Class<? extends ServiceModule> moduleClass)
public ReflectiveExtensionSupplier(Class<? extends T> extensionClass)
throws NoSuchMethodException, IllegalAccessException {
this.moduleClass = moduleClass;
MethodHandles.Lookup lookup = MethodHandles.lookup();
this.extensionClass = extensionClass;
MethodHandles.Lookup lookup = MethodHandles.privateLookupIn(extensionClass,
MethodHandles.lookup());
MethodType mt = MethodType.methodType(void.class);
moduleConstructor = lookup.findConstructor(moduleClass, mt);
extensionConstructor = lookup.findConstructor(extensionClass, mt);
}

@Override
public ServiceModule get() {
return newServiceModule();
public T get() {
return initializeExtension();
}

private ServiceModule newServiceModule() {
@SuppressWarnings("unchecked")
private T initializeExtension() {
try {
return (ServiceModule) moduleConstructor.invoke();
return (T) extensionConstructor.invoke();
} catch (Throwable throwable) {
String message = String
.format("Cannot instantiate a service module of class %s using constructor %s",
moduleClass, moduleConstructor);
.format("Cannot instantiate extension of class %s using constructor %s",
extensionClass, extensionConstructor);
throw new IllegalStateException(message, throwable);
}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("moduleClass", moduleClass)
.add("moduleConstructor", moduleConstructor)
.add("extensionClass", extensionClass)
.add("extensionConstructor", extensionConstructor)
.toString();
}
}
Expand Up @@ -19,6 +19,9 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;

import com.exonum.binding.common.crypto.PublicKey;
import com.exonum.binding.common.hash.HashCode;
Expand All @@ -28,23 +31,29 @@
import com.exonum.binding.core.service.BlockCommittedEventImpl;
import com.exonum.binding.core.service.ExecutionContext;
import com.exonum.binding.core.service.ExecutionException;
import com.exonum.binding.core.service.migration.MigrationScript;
import com.exonum.binding.core.storage.database.Snapshot;
import com.exonum.binding.core.transport.Server;
import com.exonum.messages.core.runtime.Errors.ErrorKind;
import com.exonum.messages.core.runtime.Lifecycle.InstanceStatus;
import com.exonum.messages.core.runtime.Lifecycle.InstanceStatus.Simple;
import com.github.zafarkhaja.semver.Version;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -587,4 +596,99 @@ private void checkStoppedService(Integer serviceId) {
Optional<ServiceWrapper> findService(String name) {
return Optional.ofNullable(services.get(name));
}

/**
* Returns migration script for the given artifact to perform asynchronous data migration.
*
* @param artifactId Java service artifact id
* @param dataVersion base data version migrate from
* @return migration script instance or {@link Optional#empty()} if there is no scripts found
* @throws IllegalArgumentException if the provided artifact is not deployed
* @throws IllegalStateException if scripts aren't singular i.e. there are more then one script
* for the same target version;
* Or migration scripts too old i.e. base data version is greater then max target version
* specified in migration scripts;
* Or found a script which requires data version greater the provided
*/
public Optional<MigrationScript> migrate(ServiceArtifactId artifactId, String dataVersion) {
try {
synchronized (lock) {
LoadedServiceDefinition serviceDefinition = serviceLoader.findService(artifactId)
.orElseThrow(() -> new IllegalArgumentException(
"Migration called on unknown artifactId: " + artifactId));

if (serviceDefinition.getMigrationScripts().isEmpty()) {
logger.info("Service data migration is not required, skipping."
+ " No scripts found for the given artifact {}", artifactId);
return Optional.empty();
}

Version artifactVersion = Version.valueOf(artifactId.getVersion());
Version baseDataVersion = Version.valueOf(dataVersion);

List<MigrationScript> scripts = serviceDefinition.getMigrationScripts()
.stream()
.map(Supplier::get)
.collect(Collectors.toList());

checkScriptVersionsUnique(scripts);
checkScriptsCompatibility(scripts, baseDataVersion);

Optional<MigrationScript> nextLinearScript = scripts
.stream()
.filter(m -> Version.valueOf(m.targetVersion()).lessThanOrEqualTo(artifactVersion))
.min(Comparator.comparing(script -> Version.valueOf(script.targetVersion())));

nextLinearScript.flatMap(MigrationScript::minSupportedVersion)
.ifPresent(minSupportedVersion -> {
if (Version.valueOf(minSupportedVersion).lessThan(baseDataVersion)) {
throw new IllegalStateException(
String.format("Migration script requires at least %s "
+ "data version, but actual is %s", minSupportedVersion, baseDataVersion));
}
});
logger.info("Performing service migration from {} version data for the given artifact {}"
+ " using script {}", baseDataVersion, artifactId, nextLinearScript);
return nextLinearScript;
}
} catch (Exception e) {
logger.error("Failed to perform migration for the given artifact {} and base data version {}",
artifactId, dataVersion);
throw e;
}
}

private void checkScriptsCompatibility(List<MigrationScript> scripts, Version baseDataVersion) {
Optional<Version> maxTargetVersion = scripts.stream()
.map(MigrationScript::targetVersion)
.map(Version::valueOf)
.max(Comparator.naturalOrder());

maxTargetVersion.ifPresent(v -> {
if (v.lessThan(baseDataVersion)) {
throw new IllegalStateException(String.format("Scripts too old. "
+ "Base data version is %s, but migration scripts are up to %s only",
baseDataVersion, v));
}
}
);
}

private void checkScriptVersionsUnique(List<MigrationScript> scripts) {
Map<Version, Long> targetVersionsCount = scripts.stream()
.map(MigrationScript::targetVersion)
.map(Version::valueOf)
.collect(groupingBy(identity(), counting()));

targetVersionsCount.entrySet()
.stream()
.filter(e -> e.getValue() > 1)
.findAny()
.ifPresent(duplicate -> {
throw new IllegalStateException(String.format("Migration scripts should be singular, "
+ "but duplications found: %s scripts for the same version %s",
duplicate.getValue(), duplicate.getKey()));
});
}

}
Expand Up @@ -24,6 +24,7 @@
import com.exonum.binding.core.proxy.Cleaner;
import com.exonum.binding.core.proxy.CloseFailuresException;
import com.exonum.binding.core.service.ExecutionException;
import com.exonum.binding.core.service.migration.MigrationScript;
import com.exonum.binding.core.storage.database.Snapshot;
import com.exonum.messages.core.runtime.Base.ArtifactId;
import com.exonum.messages.core.runtime.Base.InstanceSpec;
Expand Down Expand Up @@ -186,6 +187,23 @@ void updateServiceStatus(byte[] instanceSpec, byte[] instanceStatus) {
serviceRuntime.updateInstanceStatus(javaInstanceSpec, status);
}

/**
* Returns migration script for the given artifact to the native to perform asynchronous data
* migration.
*
* @param artifactId bytes representation of the Java service artifact id as a serialized message
* @param dataVersion base data version migrate from
* @return migration script instance or {@code null} if there is no scripts found
* @see ServiceRuntime#migrate(ServiceArtifactId, String)
*/
MigrationScript migrate(byte[] artifactId, String dataVersion) {
ArtifactId artifact = parseArtifact(artifactId);
ServiceArtifactId javaServiceArtifact = ServiceArtifactId.fromProto(artifact);

return serviceRuntime.migrate(javaServiceArtifact, dataVersion)
.orElse(null);
}

private static ServiceInstanceSpec parseInstanceSpec(byte[] instanceSpec) {
try {
InstanceSpec spec = InstanceSpec.parseFrom(instanceSpec);
Expand Down

0 comments on commit f40093c

Please sign in to comment.