Skip to content

Commit

Permalink
Merge pull request #101 from apache/Sync-Reconstruct
Browse files Browse the repository at this point in the history
[IOTDB-51]Reimplement post-back module, rename it to synchronization module
  • Loading branch information
fanhualta committed Mar 22, 2019
2 parents 7dd3722 + 0bbf2f3 commit 59d1ae8
Show file tree
Hide file tree
Showing 37 changed files with 1,910 additions and 1,708 deletions.
Expand Up @@ -29,7 +29,7 @@ set IOTDB_CONF=%IOTDB_HOME%\conf
set IOTDB_LOGS=%IOTDB_HOME%\logs


if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.postback.sender.FileSenderImpl
if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.sync.sender.FileSenderImpl
if NOT DEFINED JAVA_HOME goto :err

@REM -----------------------------------------------------------------------------
Expand All @@ -46,7 +46,7 @@ set CLASSPATH="%IOTDB_HOME%\lib"

REM For each jar in the IOTDB_HOME lib directory call append to build the CLASSPATH variable.
for %%i in ("%IOTDB_HOME%\lib\*.jar") do call :append "%%i"
set CLASSPATH=%CLASSPATH%;postBackClient
set CLASSPATH=%CLASSPATH%;SyncClient
goto okClasspath

:append
Expand Down
Expand Up @@ -47,8 +47,8 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done

MAIN_CLASS=org.apache.iotdb.db.postback.sender.FileSenderImpl
MAIN_CLASS=org.apache.iotdb.db.sync.sender.FileSenderImpl

"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -DTSFILE_HOME=${IOTDB_HOME} -DIOTDB_CONF=${IOTDB_CONF} -Dlogback.configurationFile=${IOTDB_CONF}/logback.xml $IOTDB_DERBY_OPTS $IOTDB_JMX_OPTS -Dname=postBackClient -cp "$CLASSPATH" "$MAIN_CLASS"
"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -DTSFILE_HOME=${IOTDB_HOME} -DIOTDB_CONF=${IOTDB_CONF} -Dlogback.configurationFile=${IOTDB_CONF}/logback.xml $IOTDB_DERBY_OPTS $IOTDB_JMX_OPTS -Dname=SyncClient -cp "$CLASSPATH" "$MAIN_CLASS"

exit $?
Expand Up @@ -19,5 +19,5 @@

@echo off

wmic process where (commandline like "%%postBackClient%%" and not name="wmic.exe") delete
rem ps ax | grep -i 'postBackClient' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
wmic process where (commandline like "%%SyncClient%%" and not name="wmic.exe") delete
rem ps ax | grep -i 'SyncClient' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
Expand Up @@ -19,12 +19,12 @@
#


PIDS=$(ps ax | grep -i 'postBackClient' | grep java | grep -v grep | awk '{print $1}')
PIDS=$(ps ax | grep -i 'SyncClient' | grep java | grep -v grep | awk '{print $1}')

if [ -z "$PIDS" ]; then
echo "No post back Client to stop"
echo "No sync client to stop"
exit 1
else
kill -s TERM $PIDS
echo "close PostBackClient"
echo "close sync client"
fi
14 changes: 7 additions & 7 deletions iotdb/iotdb/conf/iotdb-engine.properties
Expand Up @@ -201,20 +201,20 @@ schema_manager_cache_size=300000
# Generally the default value 4MB is enough.
max_log_entry_size=4194304

# IoTDB postBack server properties
# IoTDB sync server properties
# Whether to allow to post back, the default allowed
is_postback_enable=true
is_sync_enable=true

# PostBack server port address
postback_server_port=5555
# Sync server port address
sync_server_port=5555

# White IP list of Postback client.
# White IP list of Sync client.
# Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16
# If there are more than one IP segment, please separate them by commas
# The default is to allow all IP to postback
# The default is to allow all IP to sync
IP_white_list=0.0.0.0/0

# Choose a postBack strategy of merging historical data:
# Choose a sync strategy of loading historical data:
#1. It's more likely to update historical data, please choose "true".
#2. It's more likely not to update historical data or you don't know exactly, please choose "false".
update_historical_data_possibility=false
Expand Up @@ -17,16 +17,19 @@
# under the License.
#

# Sync server port address
server_ip=127.0.0.1
# PostBack server port address

# Sync client port
server_port=5555
# PostBack client port
client_port=6666

# The cycle time of post data back to receiver, the unit of time is second
upload_cycle_in_seconds=600
# Set bufferWrite data absolute path of IoTDB

# Set bufferWrite data absolute path of IoTDB
# It needs to be set with iotdb_schema_directory, they have to belong to the same IoTDB
# iotdb_bufferWrite_directory = D:\\iotdb\\data\\data\\settled

# Set schema file absolute path of IoTDB
# It needs to be set with iotdb_bufferWrite_directory, they have to belong to the same IoTDB
# iotdb_schema_directory = D:\\iotdb\\data\\system\\schema\\mlog.txt
Expand Up @@ -34,7 +34,10 @@ public enum ThreadName {
FLUSH_SERVICE("Flush-ServerServiceImpl"),
WAL_DAEMON("IoTDB-MultiFileLogNodeManager-Sync-Thread"),
WAL_FORCE_DAEMON("IoTDB-MultiFileLogNodeManager-Force-Thread"),
INDEX_SERVICE("Index-ServerServiceImpl");
INDEX_SERVICE("Index-ServerServiceImpl"),
SYNC_CLIENT("Sync-Client"),
SYNC_SERVER("Sync-Server"),
SYNC_MONITOR("Sync-Monitor");

private String name;

Expand Down
24 changes: 12 additions & 12 deletions iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Expand Up @@ -253,13 +253,13 @@ public class IoTDBConfig {
*/
private int maxLogEntrySize = 4 * 1024 * 1024;
/**
* Is this IoTDB instance a receiver of postback or not.
* Is this IoTDB instance a receiver of sync or not.
*/
private boolean isPostbackEnable = true;
private boolean isSyncEnable = true;
/**
* If this IoTDB instance is a receiver of postback, set the server port.
* If this IoTDB instance is a receiver of sync, set the server port.
*/
private int postbackServerPort = 5555;
private int syncServerPort = 5555;
/*
* Set the language version when loading file including error information, default value is "EN"
*/
Expand Down Expand Up @@ -768,20 +768,20 @@ public void setMaxLogEntrySize(int maxLogEntrySize) {
this.maxLogEntrySize = maxLogEntrySize;
}

public boolean isPostbackEnable() {
return isPostbackEnable;
public boolean isSyncEnable() {
return isSyncEnable;
}

public void setPostbackEnable(boolean postbackEnable) {
isPostbackEnable = postbackEnable;
public void setSyncEnable(boolean syncEnable) {
isSyncEnable = syncEnable;
}

public int getPostbackServerPort() {
return postbackServerPort;
public int getSyncServerPort() {
return syncServerPort;
}

public void setPostbackServerPort(int postbackServerPort) {
this.postbackServerPort = postbackServerPort;
public void setSyncServerPort(int syncServerPort) {
this.syncServerPort = syncServerPort;
}

public String getLanguageVersion() {
Expand Down
24 changes: 11 additions & 13 deletions iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
Expand Up @@ -190,15 +190,15 @@ private void loadProps() {
properties.getProperty("overflow_file_size_threshold",
Long.toString(conf.getOverflowFileSizeThreshold())).trim()));

conf.setPostbackEnable(Boolean
.parseBoolean(properties.getProperty("is_postback_enable",
Boolean.toString(conf.isPostbackEnable()))));
conf.setPostbackServerPort(Integer
.parseInt(properties.getProperty("postback_server_port",
Integer.toString(conf.getPostbackServerPort())).trim()));
conf.setSyncEnable(Boolean
.parseBoolean(properties.getProperty("is_sync_enable",
Boolean.toString(conf.isSyncEnable()))));
conf.setSyncServerPort(Integer
.parseInt(properties.getProperty("sync_server_port",
Integer.toString(conf.getSyncServerPort())).trim()));
conf.setUpdate_historical_data_possibility(Boolean.parseBoolean(
properties.getProperty("update_historical_data_possibility",
Boolean.toString(conf.isPostbackEnable()))));
Boolean.toString(conf.isSyncEnable()))));
conf.setIpWhiteList(properties.getProperty("IP_white_list", conf.getIpWhiteList()));

if (conf.getMemThresholdWarning() <= 0) {
Expand Down Expand Up @@ -257,12 +257,10 @@ private void loadProps() {
} finally {
// update all data seriesPath
conf.updatePath();
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
LOGGER.error("Fail to close config file input stream because ", e);
}
try {
inputStream.close();
} catch (IOException e) {
LOGGER.error("Fail to close config file input stream because ", e);
}
}
}
Expand Down
Expand Up @@ -78,6 +78,7 @@
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
import org.apache.iotdb.db.query.reader.IReader;
import org.apache.iotdb.db.sync.conf.Constans;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
Expand Down Expand Up @@ -838,9 +839,8 @@ public void appendFile(TsFileResource appendFile, String appendFilePath)
appendFile.getFilePath()));
}
if (!originFile.renameTo(targetFile)) {
LOGGER.warn("File renaming failed when appending new file. Origin: {}, target: {}",
originFile.getPath(),
targetFile.getPath());
LOGGER.warn("File renaming failed when appending new file. Origin: {}, Target: {}",
originFile.getPath(), targetFile.getPath());
}
// append the new tsfile
this.newFileNodes.add(appendFile);
Expand Down Expand Up @@ -886,8 +886,10 @@ private void getOverlapFiles(TsFileResource appendFile, TsFileResource tsFileRes
tsFileResource.getEndTime(entry.getKey()) >= entry.getValue()
&& tsFileResource.getStartTime(entry.getKey()) <= appendFile
.getEndTime(entry.getKey())) {
String relativeFilePath = "postback" + File.separator + uuid + File.separator + "backup"
+ File.separator + tsFileResource.getRelativePath();
String relativeFilePath =
Constans.SYNC_SERVER + File.separatorChar + uuid + File.separatorChar
+ Constans.BACK_UP_DIRECTORY_NAME
+ File.separatorChar + tsFileResource.getRelativePath();
File newFile = new File(
Directories.getInstance().getTsFileFolder(tsFileResource.getBaseDirIndex()),
relativeFilePath);
Expand Down
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.exception;

public class SyncConnectionException extends Exception {


private static final long serialVersionUID = -6661904365503849681L;

public SyncConnectionException(String message) {
super(message);
}

public SyncConnectionException(String message, Throwable cause) {
super(message, cause);
}

public SyncConnectionException(Throwable cause) {
super(cause);
}

}

0 comments on commit 59d1ae8

Please sign in to comment.