Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Travis issues about timeout and the socket problem in the Sync module #940

Merged
merged 2 commits into from Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -140,7 +140,7 @@ matrix:
- mvn -version
# Output something every 10 minutes or Travis kills the job
- while sleep 540; do echo "=====[ $SECONDS seconds still running ]====="; done &
- travis_wait 20 mvn -B clean integration-test
- travis_wait 40 mvn -B clean integration-test
# Killing background sleep loop
- kill %1

Expand Down
Expand Up @@ -20,17 +20,20 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.JDBCServiceEventHandler;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
import org.apache.iotdb.db.sync.thrift.SyncServiceEventHandler;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand All @@ -54,6 +57,11 @@ public class SyncServerManager implements IService {

private Thread syncServerThread;

//we add this latch for avoiding in some ITs, the syncService is not startup but the IT has finished.
private CountDownLatch startLatch;
//stopLatch is also for letting the IT know whether the socket is closed.
private CountDownLatch stopLatch;

private SyncServerManager() {
}

Expand All @@ -80,10 +88,18 @@ public void start() throws StartupException {
"Sync server failed to start because IP white list is null, please set IP white list.");
return;
}
startLatch = new CountDownLatch(1);
stopLatch = new CountDownLatch(1);
conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
syncServerThread = new SyncServiceThread();
syncServerThread = new SyncServiceThread(startLatch, stopLatch);
syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
syncServerThread.start();
try {
startLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StartupException(this.getID().getName(), e.getMessage());
}
logger.info("Sync server has started.");
}

Expand All @@ -95,6 +111,12 @@ public void stop() {
if (conf.isSyncEnable()) {
FileLoaderManager.getInstance().stop();
((SyncServiceThread) syncServerThread).close();
try {
stopLatch.await();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
}

Expand All @@ -115,9 +137,14 @@ private class SyncServiceThread extends Thread {
private TProtocolFactory protocolFactory;
private Processor<SyncService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
//we add this latch for avoiding in some ITs, the syncService is not startup but the IT has finished.
private CountDownLatch threadStartLatch;
private CountDownLatch threadStopLatch;

public SyncServiceThread() {
public SyncServiceThread(CountDownLatch startLatch, CountDownLatch stopLatch) {
processor = new SyncService.Processor<>(new SyncServiceImpl());
this.threadStartLatch = startLatch;
this.threadStopLatch = stopLatch;
}

@Override
Expand All @@ -138,6 +165,7 @@ public void run() {
poolArgs.protocolFactory(protocolFactory);
poolArgs.processor(processor);
poolServer = new TThreadPoolServer(poolArgs);
poolServer.setServerEventHandler(new SyncServiceEventHandler(threadStartLatch));
poolServer.serve();
} catch (TTransportException e) {
logger.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME,
Expand All @@ -146,8 +174,12 @@ public void run() {
logger.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e);
} finally {
close();
if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
threadStopLatch.countDown();
}
logger.info("{}: close TThreadPoolServer and TServerSocket for {}",
IoTDBConstant.GLOBAL_DB_NAME, getID().getName());

}
}

Expand Down
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.sync.thrift;

import java.util.concurrent.CountDownLatch;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;

public class SyncServiceEventHandler implements TServerEventHandler {

private CountDownLatch startLatch;

public SyncServiceEventHandler(CountDownLatch startLatch) {
this.startLatch = startLatch;
}

@Override
public void preServe() {
startLatch.countDown();
}

@Override
public ServerContext createContext(TProtocol input, TProtocol output) {
return null;
}

@Override
public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {

}

@Override
public void processContext(ServerContext serverContext, TTransport inputTransport,
TTransport outputTransport) {

}
}