From eb519bce12e9408543b63ac6f5b3264f0edeff00 Mon Sep 17 00:00:00 2001 From: jhkim Date: Fri, 5 Sep 2014 20:07:35 +0900 Subject: [PATCH] TAJO-1029: TAJO_PULLSERVER_STANDALONE should be false in default tajo-env.sh --- .../org/apache/tajo/worker/TajoWorker.java | 2 +- .../org/apache/tajo/TajoTestingCluster.java | 1 - tajo-dist/src/main/bin/tajo-daemon.sh | 1 + tajo-dist/src/main/conf/tajo-env.sh | 2 +- .../tajo/pullserver/TajoPullServer.java | 2 +- .../pullserver/TajoPullServerService.java | 51 +++++++++---------- 6 files changed, 29 insertions(+), 30 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 8e6118d84a..584c60e623 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -207,7 +207,7 @@ public void init(Configuration conf) { addService(tajoWorkerManagerService); if(!yarnContainerMode) { - if(taskRunnerMode && !TajoPullServerService.isStandaloneMode()) { + if(taskRunnerMode && !TajoPullServerService.isStandalone()) { pullService = new TajoPullServerService(); addService(pullService); } diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index b07ba96b91..346fa69043 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -103,7 +103,6 @@ void setTestingFlagProperties() { } void initPropertiesAndConfigs() { - System.setProperty("TAJO_PULLSERVER_STANDALONE", "false"); if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) { String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname); Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName())); diff --git a/tajo-dist/src/main/bin/tajo-daemon.sh b/tajo-dist/src/main/bin/tajo-daemon.sh index ff3a764a0c..48790e991e 100755 --- a/tajo-dist/src/main/bin/tajo-daemon.sh +++ b/tajo-dist/src/main/bin/tajo-daemon.sh @@ -94,6 +94,7 @@ fi # some variables export TAJO_LOGFILE=tajo-$TAJO_IDENT_STRING-$command-$HOSTNAME.log export TAJO_ROOT_LOGGER_APPENDER="${TAJO_ROOT_LOGGER_APPENDER:-DRFA}" +export TAJO_PULLSERVER_STANDALONE="${TAJO_PULLSERVER_STANDALONE:-false}" log=$TAJO_LOG_DIR/tajo-$TAJO_IDENT_STRING-$command-$HOSTNAME.out pid=$TAJO_PID_DIR/tajo-$TAJO_IDENT_STRING-$command.pid diff --git a/tajo-dist/src/main/conf/tajo-env.sh b/tajo-dist/src/main/conf/tajo-env.sh index bd14af6961..92d00bd8bb 100755 --- a/tajo-dist/src/main/conf/tajo-env.sh +++ b/tajo-dist/src/main/conf/tajo-env.sh @@ -77,4 +77,4 @@ export TAJO_WORKER_STANDBY_MODE=true # export HIVE_JDBC_DRIVER_DIR= # Tajo PullServer mode. the default mode is standalone mode -export TAJO_PULLSERVER_STANDALONE=true \ No newline at end of file +# export TAJO_PULLSERVER_STANDALONE=false \ No newline at end of file diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java index 7d7065eaed..d030eedef6 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java @@ -60,7 +60,7 @@ public void start() { public static void main(String[] args) throws Exception { StringUtils.startupShutdownMessage(PullServer.class, args, LOG); - if (!TajoPullServerService.isStandaloneMode()) { + if (!TajoPullServerService.isStandalone()) { LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'"); return; } diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 150ac85b55..f7bc489827 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -29,6 +30,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.mapred.FadvisedChunkedFile; import org.apache.hadoop.mapred.FadvisedFileRegion; @@ -122,6 +124,15 @@ public class TajoPullServerService extends AbstractService { public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; + private static boolean STANDALONE = false; + + static { + String standalone = System.getenv("TAJO_PULLSERVER_STANDALONE"); + if (!StringUtils.isEmpty(standalone)) { + STANDALONE = standalone.equalsIgnoreCase("true"); + } + } + @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo") static class ShuffleMetrics implements ChannelFutureListener { @Metric({"OutputBytes","PullServer output in bytes"}) @@ -245,48 +256,41 @@ public synchronized void start() { sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); - if (isStandaloneMode()) { + if (STANDALONE) { File pullServerPortFile = getPullServerPortFile(); if (pullServerPortFile.exists()) { pullServerPortFile.delete(); } pullServerPortFile.getParentFile().mkdirs(); LOG.info("Write PullServerPort to " + pullServerPortFile); + FileOutputStream out = null; try { - FileOutputStream out = new FileOutputStream(pullServerPortFile); + out = new FileOutputStream(pullServerPortFile); out.write(("" + port).getBytes()); - out.close(); } catch (Exception e) { LOG.fatal("PullServer exists cause can't write PullServer port to " + pullServerPortFile + ", " + e.getMessage(), e); System.exit(-1); + } finally { + IOUtils.closeStream(out); } } LOG.info("TajoPullServerService started: port=" + port); } + public static boolean isStandalone() { + return STANDALONE; + } + private static File getPullServerPortFile() { String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR"); - if (pullServerPortInfoFile == null || pullServerPortInfoFile.isEmpty()) { + if (StringUtils.isEmpty(pullServerPortInfoFile)) { pullServerPortInfoFile = "/tmp"; } - return new File(pullServerPortInfoFile + "/pullserver.port"); } - public static boolean isStandaloneMode() { - String mode = System.getenv("TAJO_PULLSERVER_STANDALONE"); - if (mode == null || mode.trim().isEmpty()) { - mode = System.getProperty("TAJO_PULLSERVER_STANDALONE"); - } - - if (mode == null || mode.trim().isEmpty()) { - return true; - } else { - return mode.equalsIgnoreCase("true"); - } - } - + // TODO change to get port from master or tajoConf public static int readPullServerPort() { FileInputStream in = null; try { @@ -299,16 +303,11 @@ public static int readPullServerPort() { byte[] buf = new byte[1024]; int readBytes = in.read(buf); return Integer.parseInt(new String(buf, 0, readBytes)); - } catch (Exception e) { - LOG.error(e.getMessage(), e); + } catch (IOException e) { + LOG.fatal(e.getMessage(), e); return -1; } finally { - if (in != null) { - try { - in.close(); - } catch (IOException e) { - } - } + IOUtils.closeStream(in); } }