Skip to content

Commit

Permalink
change how a service's isqlmap is configured. i think this way is a b…
Browse files Browse the repository at this point in the history
…it cleaner.
  • Loading branch information
chenson42 committed Jan 20, 2012
1 parent b690001 commit d40ea2c
Show file tree
Hide file tree
Showing 26 changed files with 303 additions and 314 deletions.
11 changes: 10 additions & 1 deletion symmetric/symmetric-assemble/TODO.txt
Expand Up @@ -43,7 +43,16 @@ DONE = +
+ What to do with SqlTemplate? Move sqltemplate into dbdialect
* Change the default server mode to multiserver
+ Clean up symmetric.messages
* SqlMap create constants for columns and table names
* SqlMap create constants for columns and table names

Performance Improvement Opportunities
* Pluggable data loaders.
* Sync based on updated column values (timestamp or flag)
* Transaction Log Parsing (vs. triggers)
* Optimize triggers (don't capture old data. option to capture only primary keys)
* Routing optimizations - allow a channel to always sync to everybody. Update sym_outgoing_batch to have a PK of batch_id, node_id.
* Web Sockets protocol
* HttpClient (pool connections)

Documentation
* Change getting started to instruct user to update properties in the engines directory (for multiserver mode)
Expand Down
Expand Up @@ -25,14 +25,14 @@
import java.util.Date;
import java.util.List;

import org.jumpmind.db.sql.ISqlMap;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IRouterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,16 +50,16 @@ public class DataGapDetector implements IDataToRouteGapDetector {

private ISymmetricDialect symmetricDialect;

private ISqlMap sqlMap;
private IRouterService routerService;

public DataGapDetector() {
}

public DataGapDetector(IDataService dataService, IParameterService parameterService,
ISymmetricDialect symmetricDialect, ISqlMap sqlMap) {
ISymmetricDialect symmetricDialect, IRouterService routerService) {
this.dataService = dataService;
this.parameterService = parameterService;
this.sqlMap = sqlMap;
this.routerService = routerService;
this.symmetricDialect = symmetricDialect;
}

Expand All @@ -80,7 +80,7 @@ public void beforeRouting() {
.getInt(ParameterConstants.ROUTING_LARGEST_GAP_SIZE);
for (final DataGap dataGap : gaps) {
final boolean lastGap = dataGap.equals(gaps.get(gaps.size() - 1));
String sql = sqlMap.getSql("selectDistinctDataIdFromDataEventUsingGapsSql");
String sql = routerService.getSql("selectDistinctDataIdFromDataEventUsingGapsSql");
ISqlTemplate sqlTemplate = symmetricDialect.getPlatform().getSqlTemplate();
Object[] params = new Object[] { dataGap.getStartId(), dataGap.getEndId() };
lastDataId = -1;
Expand Down
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.TimeUnit;

import org.hsqldb.types.Types;
import org.jumpmind.db.sql.ISqlMap;
import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTemplate;
Expand All @@ -45,6 +44,7 @@
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.util.AppUtils;
import org.jumpmind.util.FormatUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -75,14 +75,14 @@ public class DataGapRouteReader implements IDataToRouteReader {

protected ISymmetricDialect symmetricDialect;

protected ISqlMap sqlMap;
protected IRouterService routerService;

protected boolean reading = true;

public DataGapRouteReader(ISqlMap sqlMap, ChannelRouterContext context,
public DataGapRouteReader(IRouterService routerService, ChannelRouterContext context,
IDataService dataService, ISymmetricDialect symmetricDialect,
IParameterService parameterService) {
this.sqlMap = sqlMap;
this.routerService = routerService;
this.symmetricDialect = symmetricDialect;
this.dataQueue = new LinkedBlockingQueue<Data>(
symmetricDialect != null ? symmetricDialect.getRouterDataPeekAheadCount() : 1000);
Expand Down Expand Up @@ -259,7 +259,7 @@ protected String qualifyUsingDataGaps(List<DataGap> dataGaps, int numberOfGapsTo
}

protected String getSql(String sqlName, Channel channel) {
String select = sqlMap.getSql(sqlName);
String select = routerService.getSql(sqlName);
if (!channel.isUseOldDataToRoute()) {
select = select.replace("d.old_data", "''");
}
Expand Down
Expand Up @@ -23,9 +23,11 @@
import java.util.List;
import java.util.Map;

import org.jumpmind.db.sql.ISqlReadCursor;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.ext.IHeartbeatListener;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.load.IReloadListener;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEvent;
Expand Down Expand Up @@ -131,7 +133,6 @@ public interface IDataService {

public long findMaxDataId();

public String getDataSelectSql(long batchId, long startDataId, String channelId,
boolean descending);
public ISqlReadCursor<Data> selectDataFor(Batch batch);

}
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Set;

import org.jumpmind.db.sql.ISqlMap;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.route.IBatchAlgorithm;
Expand All @@ -38,7 +37,7 @@
*
* @since 2.0
*/
public interface IRouterService {
public interface IRouterService extends IService {

public long routeData();

Expand All @@ -59,7 +58,6 @@ public interface IRouterService {

public void stop ();

public void destroy();
public void destroy();

public ISqlMap getSqlMap();
}
Expand Down
Expand Up @@ -32,4 +32,6 @@ public interface IService {
*/
public void synchronize(Runnable runnable);

public String getSql(String... keys);

}
Expand Down
Expand Up @@ -16,11 +16,11 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License. */


package org.jumpmind.symmetric.service.impl;

* under the License.
*/

package org.jumpmind.symmetric.service.impl;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
Expand All @@ -35,94 +35,93 @@
import org.jumpmind.symmetric.service.IService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract public class AbstractService implements IService {

protected final Logger log = LoggerFactory.getLogger(getClass());

protected IParameterService parameterService;


abstract public class AbstractService implements IService {

protected final Logger log = LoggerFactory.getLogger(getClass());

protected IParameterService parameterService;

protected ISymmetricDialect symmetricDialect;
protected ISqlTemplate sqlTemplate;


protected ISqlTemplate sqlTemplate;

protected String tablePrefix;

private ISqlMap sqlMap;

public AbstractService(IParameterService parameterService, ISymmetricDialect symmetricDialect) {
this.symmetricDialect = symmetricDialect;
this.parameterService = parameterService;
this.tablePrefix = parameterService.getTablePrefix();
this.sqlTemplate = symmetricDialect.getPlatform().getSqlTemplate();
this.symmetricDialect = symmetricDialect;
this.parameterService = parameterService;
this.tablePrefix = parameterService.getTablePrefix();
this.sqlTemplate = symmetricDialect.getPlatform().getSqlTemplate();
}

protected void setSqlMap(ISqlMap sqlMap) {
this.sqlMap = sqlMap;
}

public ISqlTemplate getJdbcTemplate() {
return symmetricDialect.getPlatform().getSqlTemplate();
}

synchronized public void synchronize(Runnable runnable) {
runnable.run();
}


synchronized public void synchronize(Runnable runnable) {
runnable.run();
}

protected boolean isSet(Object value) {
if (value != null && value.toString().equals("1")) {
return true;
} else {
return false;
}
}

@SuppressWarnings("unchecked")
protected SQLException unwrapSqlException(Throwable e) {
List<Throwable> exs = ExceptionUtils.getThrowableList(e);
for (Throwable throwable : exs) {
if (throwable instanceof SQLException) {
return (SQLException) throwable;
}
}
return null;
}

public ISqlMap getSqlMap() {
if (sqlMap == null) {
sqlMap = createSqlMap();

@SuppressWarnings("unchecked")
protected SQLException unwrapSqlException(Throwable e) {
List<Throwable> exs = ExceptionUtils.getThrowableList(e);
for (Throwable throwable : exs) {
if (throwable instanceof SQLException) {
return (SQLException) throwable;
}
}
return sqlMap;
return null;
}

abstract protected ISqlMap createSqlMap();

protected Map<String,String> createSqlReplacementTokens() {

protected Map<String, String> createSqlReplacementTokens() {
return createSqlReplacementTokens(this.tablePrefix);
}
protected static Map<String,String> createSqlReplacementTokens(String tablePrefix) {
Map<String,String> map = new HashMap<String, String>();
}

protected static Map<String, String> createSqlReplacementTokens(String tablePrefix) {
Map<String, String> map = new HashMap<String, String>();
map.put("prefixName", tablePrefix);
return map;
}

}

public String getSql(String... keys) {
return getSqlMap().getSql(keys);
}

if (sqlMap != null) {
return sqlMap.getSql(keys);
} else {
return null;
}
}

public IParameterService getParameterService() {
return parameterService;
}

public ISymmetricDialect getSymmetricDialect() {
return symmetricDialect;
}

public String getTablePrefix() {
return tablePrefix;
}

protected void close(ISqlTransaction transaction) {
if (transaction != null) {
transaction.close();
}
}

}

}
Expand Up @@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.List;

import org.jumpmind.db.sql.AbstractSqlMap;
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.BatchInfo;
Expand Down Expand Up @@ -52,12 +51,8 @@ public AcknowledgeService(IParameterService parameterService,
super(parameterService, symmetricDialect);
this.outgoingBatchService = outgoingBatchService;
this.registrationService = registrationService;
}

@Override
protected AbstractSqlMap createSqlMap() {
return new AcknowledgeServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens());
setSqlMap(new AcknowledgeServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
}

public void ack(final BatchInfo batch) {
Expand Down
Expand Up @@ -36,7 +36,6 @@
import java.util.Map;

import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.sql.AbstractSqlMap;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.UniqueKeyException;
Expand All @@ -56,6 +55,8 @@ public class ClusterService extends AbstractService implements IClusterService {

public ClusterService(IParameterService parameterService, ISymmetricDialect dialect) {
super(parameterService, dialect);
setSqlMap(new ClusterServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
}

public void initLockTable() {
Expand Down Expand Up @@ -155,9 +156,4 @@ public void clearInfiniteLock(String action) {
}
}

@Override
protected AbstractSqlMap createSqlMap() {
return new ClusterServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens());
}
}

0 comments on commit d40ea2c

Please sign in to comment.