Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

FLUME-286: DFO mode does not detect network failure

  • Loading branch information...
commit d9b316c534f0f85b4942ede8d764a5453123e65f 1 parent ccfaf6b
Jonathan Hsieh jmhsieh authored
7 RELEASENOTES
@@ -15,6 +15,13 @@ configurations or node maps written by masters from <0.9.2
15 15 installations. There is no upgrade path yet to preserve both node maps
16 16 and configurations, but one is planned for the 0.9.2 release.
17 17
  18 +The bug that caused FLUME-286 has been fixed for Thrift RPCs but not
  19 +for Avro-based RPC mechanisms. Previous to the fix, the Thrift
  20 +version would not recover when a downstream RPC server had a network
  21 +partion such as a wire cut or power down. Unlike killing a server,
  22 +these situations provided no failure feedback. The Avro version
  23 +currently does not properly detect or recover from these situations so
  24 +the DFO mode can lose data. This is reported as issue FLUME-313.
18 25
19 26 Flume 0.9.1 Update 1 (CDH3b3) Release Notes
20 27 ===========================================
13 conf/flume-conf.xml
@@ -290,5 +290,18 @@ configuration values placed in flume-site.xml. -->
290 290 <value>/tmp/flume-${user.name}-zk</value>
291 291 <description>The base directory in which the ZBCS stores data.</description>
292 292 </property>
  293 +
  294 +
  295 + <!-- ================================================== -->
  296 + <!-- Thrift RPC settings ============================== -->
  297 + <!-- ================================================== -->
  298 +
  299 + <property>
  300 + <name>flume.thrift.socket.timeout.ms</name>
  301 + <value>10000</value>
  302 + <description>Milliseconds with no transmissions before thrift
  303 + client times out a connection</description>
  304 + </property>
  305 +
293 306
294 307 </configuration>
4 src/java/com/cloudera/flume/agent/ThriftMasterRPC.java
@@ -36,6 +36,7 @@
36 36
37 37 import com.cloudera.flume.conf.thrift.ThriftFlumeClientServer;
38 38 import com.cloudera.flume.conf.FlumeConfigData;
  39 +import com.cloudera.flume.conf.FlumeConfiguration;
39 40 import com.cloudera.flume.conf.thrift.ThriftFlumeClientServer.Client;
40 41 import com.cloudera.flume.handlers.endtoend.AckListener;
41 42 import com.cloudera.flume.handlers.endtoend.CollectorAckListener;
@@ -65,7 +66,8 @@
65 66 ThriftMasterRPC(String masterHostname, int masterPort) throws IOException {
66 67 Preconditions.checkState(masterClient == null,
67 68 "client already initialized -- double init not allowed");
68   - TTransport masterTransport = new TSocket(masterHostname, masterPort);
  69 + int timeout = FlumeConfiguration.get().getThriftSocketTimeoutMs();
  70 + TTransport masterTransport = new TSocket(masterHostname, masterPort, timeout);
69 71 TProtocol protocol = new TBinaryProtocol(masterTransport);
70 72 try {
71 73 masterTransport.open();
20 src/java/com/cloudera/flume/conf/FlumeConfiguration.java
@@ -52,8 +52,11 @@
52 52
53 53 /**
54 54 * Returns the 'FLUME_HOME' location. Taken in order of precedence:
  55 + *
55 56 * - Java system property 'flume.home'
  57 + *
56 58 * - $FLUME_HOME in the environment.
  59 + *
57 60 * - null if neither of these are set.
58 61 */
59 62 public static String getFlumeHome() {
@@ -71,9 +74,13 @@ public static String getFlumeHome() {
71 74
72 75 /**
73 76 * Returns the 'FLUME_CONF_DIR' location. Taken in order of precedence:
  77 + *
74 78 * - Java system property 'flume.conf.dir'
  79 + *
75 80 * - $FLUME_CONF_DIR in the environment
  81 + *
76 82 * - getFlumeHome()/conf
  83 + *
77 84 * - ./conf
78 85 */
79 86 public static String getFlumeConfDir() {
@@ -168,6 +175,7 @@ protected FlumeConfiguration(boolean loadDefaults) {
168 175 public static final String POLLER_QUEUESIZE = "flume.poller.queuesize";
169 176 public static final String THRIFT_QUEUESIZE = "flume.thrift.queuesize";
170 177 public static final String THRIFT_CLOSE_MAX_SLEEP = "flume.thrift.close.maxsleep";
  178 + public static final String THRIFT_SOCKET_TIMEOUT_MS = "flume.thrift.socket.timeout.ms";
171 179 public static final String INSISTENTOPEN_INIT_BACKOFF = "flume.inisistentOpen.init.backoff";
172 180 public static final String HISTORY_DEFAULTPERIOD = "flume.countHistory.period";
173 181 public static final String HISTORY_MAXLENGTH = "flume.history.maxlength";
@@ -550,6 +558,10 @@ public int getThriftQueueSize() {
550 558 return getInt(THRIFT_QUEUESIZE, 1000);
551 559 }
552 560
  561 + public int getThriftSocketTimeoutMs() {
  562 + return getInt(THRIFT_SOCKET_TIMEOUT_MS, 10000);
  563 + }
  564 +
553 565 /**
554 566 * Initial backoff in mills after a failed open attempt in an insistentOpen
555 567 * decorator
@@ -702,7 +714,6 @@ public int getReportServerPort() {
702 714 return getInt(REPORT_SERVER_PORT, DEFAULT_REPORT_SERVER_PORT);
703 715 }
704 716
705   -
706 717 /**
707 718 * This returns the type of RPC mechanism (Thrift or Avro) chosen for the
708 719 * FlumeReportServer.
@@ -716,11 +727,12 @@ public String getReportServerRPC() {
716 727 }
717 728 }
718 729 // defaulting to Thrift with a polite warning
719   - LOG.warn("flume.report.server.rpc.type incorrectly defined, should be either"
720   - + " \"THRIFT\" or \"AVRO\". Defaulting to \"THRIFT\"");
  730 + LOG.warn("flume.report.server.rpc.type incorrectly defined, "
  731 + + "should be either \"THRIFT\" or \"AVRO\". "
  732 + + "Defaulting to \"THRIFT\"");
721 733 return RPC_TYPE_THRIFT;
722 734 }
723   -
  735 +
724 736 /**
725 737 * If MASTER_HEARTBEAT_PORT is set, we use that as our heartbeat port. If not,
726 738 * we look at the list of server:port pairs in MASTER_HEARTBEAT_SERVERS, in
5 src/java/com/cloudera/flume/handlers/thrift/ThriftEventSink.java
@@ -94,13 +94,14 @@ public void close() throws IOException {
94 94 public void open() throws IOException {
95 95
96 96 try {
  97 + int timeout = FlumeConfiguration.get().getThriftSocketTimeoutMs();
97 98 if (nonblocking) {
98 99 // non blocking must use "Framed transport"
99   - transport = new TSocket(host, port);
  100 + transport = new TSocket(host, port, timeout);
100 101 stats = new TStatsTransport(transport);
101 102 transport = new TFramedTransport(stats);
102 103 } else {
103   - transport = new TSocket(host, port);
  104 + transport = new TSocket(host, port, timeout);
104 105 stats = new TStatsTransport(transport);
105 106 transport = stats;
106 107 }

0 comments on commit d9b316c

Please sign in to comment.
Something went wrong with that request. Please try again.