Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.client;

/**
* Thin client services facade.
*/
public interface ClientServices {
/**
* Gets the cluster group to which this {@code ClientServices} instance belongs.
*
* @return Cluster group to which this {@code ClientServices} instance belongs.
*/
public ClientClusterGroup clusterGroup();

/**
* Gets a remote handle on the service.
* <p>
* Note: There are no guarantees that each method invocation for the same proxy will always contact the same remote
* service (on the same remote node).
*
* @param name Service name.
* @param svcItf Interface for the service.
* @return Proxy over remote service.
*/
public <T> T serviceProxy(String name, Class<? super T> svcItf);

/**
* Gets a remote handle on the service with timeout.
* <p>
* Note: There are no guarantees that each method invocation for the same proxy will always contact the same remote
* service (on the same remote node).
*
* @param name Service name.
* @param svcItf Interface for the service.
* @param timeout If greater than 0 created proxy will wait for service availability only specified time,
* and will limit remote service invocation time.
* @return Proxy over remote service.
*/
public <T> T serviceProxy(String name, Class<? super T> svcItf, long timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,24 @@ public interface IgniteClient extends AutoCloseable {
* @return Client cluster facade.
*/
public ClientCluster cluster();

/**
* Gets {@code services} facade over all cluster nodes started in server mode.
*
* @return Services facade over all cluster nodes started in server mode.
*/
public ClientServices services();

/**
* Gets {@code services} facade over nodes within the cluster group. All operations
* on the returned {@link ClientServices} instance will only include nodes from
* the specified cluster group.
*
* Note: In some cases there will be additional requests for each service invocation from client to server
* to resolve cluster group.
*
* @param grp Cluster group.
* @return {@code Services} functionality over given cluster group.
*/
public ClientServices services(ClientClusterGroup grp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ enum ClientOperation {
/** Get nodes info by IDs. */CLUSTER_GROUP_GET_NODE_INFO(5101),

/** Execute compute task. */COMPUTE_TASK_EXECUTE(6000),
/** Finished compute task notification. */COMPUTE_TASK_FINISHED(6001, true);
/** Finished compute task notification. */COMPUTE_TASK_FINISHED(6001, true),

/** Invoke service. */SERVICE_INVOKE(7000);

/** Code. */
private final int code;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.client.thin;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.client.ClientClusterGroup;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientServices;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.platform.PlatformServiceMethod;

/**
* Implementation of {@link ClientServices}.
*/
class ClientServicesImpl implements ClientServices {
/** Channel. */
private final ReliableChannel ch;

/** Binary marshaller. */
private final ClientBinaryMarshaller marsh;

/** Utils for serialization/deserialization. */
private final ClientUtils utils;

/** Cluster group. */
private final ClientClusterGroupImpl grp;

/** Constructor. */
ClientServicesImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, ClientClusterGroupImpl grp) {
this.ch = ch;
this.marsh = marsh;
this.grp = grp;

utils = new ClientUtils(marsh);
}

/** {@inheritDoc} */
@Override public ClientClusterGroup clusterGroup() {
return grp;
}

/** {@inheritDoc} */
@Override public <T> T serviceProxy(String name, Class<? super T> svcItf) {
return serviceProxy(name, svcItf, 0);
}

/** {@inheritDoc} */
@Override public <T> T serviceProxy(String name, Class<? super T> svcItf, long timeout) {
A.notNullOrEmpty(name, "name");
A.notNull(svcItf, "svcItf");

return (T)Proxy.newProxyInstance(svcItf.getClassLoader(), new Class[] {svcItf},
new ServiceInvocationHandler<>(name, timeout, grp));
}

/**
* Gets services facade over the specified cluster group.
*
* @param grp Cluster group.
*/
ClientServices withClusterGroup(ClientClusterGroupImpl grp) {
A.notNull(grp, "grp");

return new ClientServicesImpl(ch, marsh, grp);
}

/**
* Service invocation handler.
*/
private class ServiceInvocationHandler<T> implements InvocationHandler {
/** Flag "Has parameter types" mask. */
private static final byte FLAG_PARAMETER_TYPES_MASK = 0x02;

/** Service name. */
private final String name;

/** Timeout. */
private final long timeout;

/** Cluster group. */
private final ClientClusterGroupImpl grp;

/**
* @param name Service name.
* @param timeout Timeout.
*/
private ServiceInvocationHandler(String name, long timeout, ClientClusterGroupImpl grp) {
this.name = name;
this.timeout = timeout;
this.grp = grp;
}

/** {@inheritDoc} */
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
Collection<UUID> nodeIds = grp.nodeIds();

if (nodeIds != null && nodeIds.isEmpty())
throw new ClientException("Cluster group is empty.");

return ch.service(ClientOperation.SERVICE_INVOKE,
req -> writeServiceInvokeRequest(req, nodeIds, method, args),
res -> utils.readObject(res.in(), false)
);
}
catch (ClientError e) {
throw new ClientException(e);
}
}

/**
* @param ch Payload output channel.
*/
private void writeServiceInvokeRequest(
PayloadOutputChannel ch,
Collection<UUID> nodeIds,
Method method,
Object[] args
) {
ch.clientChannel().protocolCtx().checkFeatureSupported(ProtocolBitmaskFeature.SERVICE_INVOKE);

try (BinaryRawWriterEx writer = utils.createBinaryWriter(ch.out())) {
writer.writeString(name);
writer.writeByte(FLAG_PARAMETER_TYPES_MASK); // Flags.
writer.writeLong(timeout);

if (nodeIds == null)
writer.writeInt(0);
else {
writer.writeInt(nodeIds.size());

for (UUID nodeId : nodeIds) {
writer.writeLong(nodeId.getMostSignificantBits());
writer.writeLong(nodeId.getLeastSignificantBits());
}
}

PlatformServiceMethod ann = method.getDeclaredAnnotation(PlatformServiceMethod.class);

writer.writeString(ann != null ? ann.value() : method.getName());

Class<?>[] paramTypes = method.getParameterTypes();

if (F.isEmpty(args))
writer.writeInt(0);
else {
writer.writeInt(args.length);

assert args.length == paramTypes.length : "args=" + args.length + ", types=" + paramTypes.length;

for (int i = 0; i < args.length; i++) {
writer.writeInt(marsh.context().typeId(paramTypes[i].getName()));
writer.writeObject(args[i]);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.client.thin;

import java.io.IOException;
import java.lang.reflect.Array;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryReaderHandles;
import org.apache.ignite.internal.binary.BinarySchema;
import org.apache.ignite.internal.binary.BinaryThreadLocalContext;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
Expand Down Expand Up @@ -527,6 +529,13 @@ void writeObject(BinaryOutputStream out, Object obj) {
out.writeByteArray(marsh.marshal(obj));
}

/**
* @param out Output stream.
*/
BinaryRawWriterEx createBinaryWriter(BinaryOutputStream out) {
return new BinaryWriterExImpl(marsh.context(), out, BinaryThreadLocalContext.get().schemaHolder(), null);
}

/** Read Ignite binary object from input stream. */
<T> T readObject(BinaryInputStream in, boolean keepBinary) {
if (keepBinary)
Expand Down Expand Up @@ -590,7 +599,7 @@ private Object[] unwrapArray(Object[] arr, BinaryReaderHandles hnds) {
if (BinaryUtils.knownArray(arr))
return arr;

Object[] res = new Object[arr.length];
Object[] res = (Object[])Array.newInstance(arr.getClass().getComponentType(), arr.length);

for (int i = 0; i < arr.length; i++)
res[i] = unwrapBinary(arr[i], hnds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public enum ProtocolBitmaskFeature {
CLUSTER_STATES(2),

/** Cluster groups. */
CLUSTER_GROUPS(4);
CLUSTER_GROUPS(4),

/** Invoke service methods. */
SERVICE_INVOKE(5);

/** */
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.client.thin;

import java.util.EnumSet;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;

/**
* Protocol Context.
Expand Down Expand Up @@ -45,6 +46,17 @@ public boolean isFeatureSupported(ProtocolBitmaskFeature feature) {
return features.contains(feature);
}

/**
* Check that feature is supported by the server.
*
* @param feature Feature.
* @throws ClientFeatureNotSupportedByServerException If feature is not supported by the server.
*/
public void checkFeatureSupported(ProtocolBitmaskFeature feature) throws ClientFeatureNotSupportedByServerException {
if (!isFeatureSupported(feature))
throw new ClientFeatureNotSupportedByServerException(feature);
}

/**
* @return {@code true} if protocol version feature supported.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.ignite.client.ClientClusterGroup;
import org.apache.ignite.client.ClientCompute;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientServices;
import org.apache.ignite.client.ClientTransactions;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;
Expand Down Expand Up @@ -78,6 +79,9 @@ public class TcpIgniteClient implements IgniteClient {
/** Cluster facade. */
private final ClientClusterImpl cluster;

/** Services facade. */
private final ClientServicesImpl services;

/** Marshaller. */
private final ClientBinaryMarshaller marsh;

Expand Down Expand Up @@ -115,6 +119,8 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
cluster = new ClientClusterImpl(ch, marsh);

compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup());

services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup());
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -228,6 +234,16 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
return cluster;
}

/** {@inheritDoc} */
@Override public ClientServices services() {
return services;
}

/** {@inheritDoc} */
@Override public ClientServices services(ClientClusterGroup grp) {
return services.withClusterGroup((ClientClusterGroupImpl)grp);
}

/**
* Initializes new instance of {@link IgniteClient}.
*
Expand Down
Loading