Skip to content

Commit

Permalink
Implement TConfiguration for java
Browse files Browse the repository at this point in the history
  • Loading branch information
zeshuai007 committed Sep 1, 2020
1 parent 021cb27 commit ff1a8dc
Show file tree
Hide file tree
Showing 62 changed files with 2,106 additions and 1,186 deletions.
13 changes: 6 additions & 7 deletions compiler/cpp/src/thrift/generate/t_java_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3861,19 +3861,18 @@ void t_java_generator::generate_deserialize_container(ostream& out,
// Declare variables, read header
if (ttype->is_map()) {
indent(out) << "org.apache.thrift.protocol.TMap " << obj
<< " = new org.apache.thrift.protocol.TMap("
<< " = iprot.readMapBegin("
<< type_to_enum(((t_map*)ttype)->get_key_type()) << ", "
<< type_to_enum(((t_map*)ttype)->get_val_type()) << ", "
<< "iprot.readI32());" << endl;
<< type_to_enum(((t_map*)ttype)->get_val_type()) << ");" << endl;
} else if (ttype->is_set()) {
indent(out) << "org.apache.thrift.protocol.TSet " << obj
<< " = new org.apache.thrift.protocol.TSet("
<< type_to_enum(((t_set*)ttype)->get_elem_type()) << ", iprot.readI32());"
<< " = iprot.readSetBegin("
<< type_to_enum(((t_set*)ttype)->get_elem_type()) << ");"
<< endl;
} else if (ttype->is_list()) {
indent(out) << "org.apache.thrift.protocol.TList " << obj
<< " = new org.apache.thrift.protocol.TList("
<< type_to_enum(((t_list*)ttype)->get_elem_type()) << ", iprot.readI32());"
<< " = iprot.readListBegin("
<< type_to_enum(((t_list*)ttype)->get_elem_type()) << ");"
<< endl;
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/java/src/org/apache/thrift/TAsyncProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public interface TAsyncProcessor {
/**
* Process a single frame.
* <b>Note:</b> Implementations must call fb.responseReady() once processing
* is complete
*
Expand Down
101 changes: 101 additions & 0 deletions lib/java/src/org/apache/thrift/TConfiguration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.thrift;


public class TConfiguration {
public static final int DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
public static final int DEFAULT_MAX_FRAME_SIZE = 16384000; // this value is used consistently across all Thrift libraries
public static final int DEFAULT_RECURSION_DEPTH = 64;

private int maxMessageSize;
private int maxFrameSize;
private int recursionLimit;

public TConfiguration() {
this(DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_FRAME_SIZE, DEFAULT_RECURSION_DEPTH);
}
public TConfiguration(int maxMessageSize, int maxFrameSize, int recursionLimit) {
this.maxFrameSize = maxFrameSize;
this.maxMessageSize = maxMessageSize;
this.recursionLimit = recursionLimit;
}

public int getMaxMessageSize() {
return maxMessageSize;
}

public int getMaxFrameSize() {
return maxFrameSize;
}

public int getRecursionLimit() {
return recursionLimit;
}

public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}

public void setMaxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}

public void setRecursionLimit(int recursionLimit) {
this.recursionLimit = recursionLimit;
}

public static final TConfiguration DEFAULT = new Builder().build();

public static TConfiguration.Builder custom() {
return new Builder();
}

public static class Builder {
private int maxMessageSize ;
private int maxFrameSize;
private int recursionLimit ;

Builder() {
super();
this.maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
this.recursionLimit = DEFAULT_RECURSION_DEPTH;
}

public Builder setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
return this;
}

public Builder setMaxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
return this;
}

public Builder setRecursionLimit(int recursionLimit) {
this.recursionLimit = recursionLimit;
return this;
}

public TConfiguration build() {
return new TConfiguration(maxMessageSize, maxFrameSize, recursionLimit);
}
}
}
13 changes: 7 additions & 6 deletions lib/java/src/org/apache/thrift/TDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.thrift.protocol.TProtocolUtil;
import org.apache.thrift.protocol.TType;
import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TTransportException;

/**
* Generic utility for easily deserializing objects from a byte array or Java
Expand All @@ -42,7 +43,7 @@ public class TDeserializer {
/**
* Create a new TDeserializer that uses the TBinaryProtocol by default.
*/
public TDeserializer() {
public TDeserializer() throws TTransportException {
this(new TBinaryProtocol.Factory());
}

Expand All @@ -52,8 +53,8 @@ public TDeserializer() {
*
* @param protocolFactory Factory to create a protocol
*/
public TDeserializer(TProtocolFactory protocolFactory) {
trans_ = new TMemoryInputTransport();
public TDeserializer(TProtocolFactory protocolFactory) throws TTransportException {
trans_ = new TMemoryInputTransport(new TConfiguration());
protocol_ = protocolFactory.getProtocol(trans_);
}

Expand Down Expand Up @@ -105,19 +106,19 @@ public void deserialize(TBase base, String data, String charset) throws TExcepti

/**
* Deserialize only a single Thrift object (addressed by recursively using field id)
* from a byte record.
* from a byte record.
* @param tb The object to read into
* @param bytes The serialized object to read from
* @param fieldIdPathFirst First of the FieldId's that define a path tb
* @param fieldIdPathRest The rest FieldId's that define a path tb
* @throws TException
* @throws TException
*/
public void partialDeserialize(TBase tb, byte[] bytes, TFieldIdEnum fieldIdPathFirst, TFieldIdEnum ... fieldIdPathRest) throws TException {
try {
if (locateField(bytes, fieldIdPathFirst, fieldIdPathRest) != null) {
// if this line is reached, iprot will be positioned at the start of tb.
tb.read(protocol_);
}
}
} catch (Exception e) {
throw new TException(e);
} finally {
Expand Down
25 changes: 13 additions & 12 deletions lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
*
*/
public class TNonblockingMultiFetchClient {

private static final Logger LOGGER = LoggerFactory.getLogger(
TNonblockingMultiFetchClient.class);

Expand All @@ -86,7 +86,7 @@ public class TNonblockingMultiFetchClient {
// time limit for fetching data from all servers (in second)
private int fetchTimeoutSeconds;

// store request that will be sent to servers
// store request that will be sent to servers
private ByteBuffer requestBuf;
private ByteBuffer requestBufDuplication;

Expand All @@ -104,7 +104,7 @@ public TNonblockingMultiFetchClient(int maxRecvBufBytesPerServer,
this.fetchTimeoutSeconds = fetchTimeoutSeconds;
this.requestBuf = requestBuf;
this.servers = servers;

stats = new TNonblockingMultiFetchStats();
recvBuf = null;
}
Expand All @@ -128,7 +128,7 @@ public synchronized ByteBuffer getRequestBuf() {
if (requestBufDuplication == null) {
requestBufDuplication = requestBuf.duplicate();
}
return requestBufDuplication;
return requestBufDuplication;
}
}

Expand Down Expand Up @@ -171,7 +171,7 @@ public synchronized ByteBuffer[] fetch() {
task.cancel(true);
LOGGER.error("Exception during fetch", ee);
} catch (TimeoutException te) {
// attempt to cancel execution of the task.
// attempt to cancel execution of the task.
task.cancel(true);
LOGGER.error("Timeout for fetch", te);
}
Expand Down Expand Up @@ -207,10 +207,10 @@ public void run() {
// buffer for receiving response from servers
recvBuf = new ByteBuffer[numTotalServers];
// buffer for sending request
ByteBuffer sendBuf[] = new ByteBuffer[numTotalServers];
long numBytesRead[] = new long[numTotalServers];
int frameSize[] = new int[numTotalServers];
boolean hasReadFrameSize[] = new boolean[numTotalServers];
ByteBuffer[] sendBuf = new ByteBuffer[numTotalServers];
long[] numBytesRead = new long[numTotalServers];
int[] frameSize = new int[numTotalServers];
boolean[] hasReadFrameSize = new boolean[numTotalServers];

try {
selector = Selector.open();
Expand Down Expand Up @@ -240,10 +240,11 @@ public void run() {
} catch (Exception e) {
stats.incNumConnectErrorServers();
LOGGER.error("Set up socket to server {} error", server, e);

// free resource
if (s != null) {
try {s.close();} catch (Exception ex) {}
}
}
if (key != null) {
key.cancel();
}
Expand All @@ -253,7 +254,7 @@ public void run() {
// wait for events
while (stats.getNumReadCompletedServers() +
stats.getNumConnectErrorServers() < stats.getNumTotalServers()) {
// if the thread is interrupted (e.g., task is cancelled)
// if the thread is interrupted (e.g., task is cancelled)
if (Thread.currentThread().isInterrupted()) {
return;
}
Expand Down Expand Up @@ -380,4 +381,4 @@ public void close() {
}
}
}
}
}
8 changes: 5 additions & 3 deletions lib/java/src/org/apache/thrift/TSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransportException;

/**
* Generic utility for easily serializing objects into a byte array or Java
Expand All @@ -42,7 +43,7 @@ public class TSerializer {
/**
* This transport wraps that byte array
*/
private final TIOStreamTransport transport_ = new TIOStreamTransport(baos_);
private final TIOStreamTransport transport_;

/**
* Internal protocol used for serializing objects.
Expand All @@ -52,7 +53,7 @@ public class TSerializer {
/**
* Create a new TSerializer that uses the TBinaryProtocol by default.
*/
public TSerializer() {
public TSerializer() throws TTransportException {
this(new TBinaryProtocol.Factory());
}

Expand All @@ -62,7 +63,8 @@ public TSerializer() {
*
* @param protocolFactory Factory to create a protocol
*/
public TSerializer(TProtocolFactory protocolFactory) {
public TSerializer(TProtocolFactory protocolFactory) throws TTransportException {
transport_ = new TIOStreamTransport(new TConfiguration(), baos_);
protocol_ = protocolFactory.getProtocol(transport_);
}

Expand Down
8 changes: 4 additions & 4 deletions lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.thrift.transport.TMemoryBuffer;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
Expand Down Expand Up @@ -98,19 +98,19 @@ protected boolean isFinished() {
protected long getStartTime() {
return startTime;
}

protected long getSequenceId() {
return sequenceId;
}

public TAsyncClient getClient() {
return client;
}

public boolean hasTimeout() {
return timeout > 0;
}

public long getTimeoutTimestamp() {
return timeout + startTime;
}
Expand Down
Loading

0 comments on commit ff1a8dc

Please sign in to comment.