Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
32ea9a9
IGNITE-13582 WAL force rollover timeout introduced (#8902)
nizhikov Mar 23, 2021
d028cef
Merge branch 'master' into ignite-cdc
nizhikov Mar 23, 2021
db8bb81
IGNITE-13596 Flag to distinguish DataRecord on primary and backup add…
nizhikov Mar 25, 2021
e579b1e
IGNITE-14360 Refactor FileLockHolder for reusage (#8905)
nizhikov Mar 26, 2021
398a6db
Merge branch 'master' into ignite-cdc
nizhikov Mar 26, 2021
f590a18
IGNITE-14353 Ability to specify postfix for IgniteLogger instead of n…
nizhikov Mar 31, 2021
38e270e
Merge branch 'master' into ignite-cdc
nizhikov Mar 31, 2021
ab7bf2b
IGNITE-14435 Refactor PdsConsistentIdProcessor for reusage (#8946)
nizhikov Mar 31, 2021
b8723fb
Merge branch 'ignite-cdc' of https://github.com/apache/ignite into ig…
nizhikov Mar 31, 2021
7018fef
Merge branch 'master' into ignite-cdc
nizhikov Apr 1, 2021
d318245
IGNITE-13581 compilation fix.
nizhikov Apr 1, 2021
00631f1
Merge branch 'master' into ignite-cdc
nizhikov Apr 13, 2021
c430c51
Merge branch 'master' into ignite-cdc
nizhikov Apr 28, 2021
7f10e44
Merge branch 'master' into ignite-cdc
nizhikov Apr 28, 2021
2a3cdad
Merge branch 'master' into ignite-cdc
nizhikov May 10, 2021
dec2e64
IGNITE-13596 Calculation of primary flag based on tx flags (#9092)
nizhikov May 14, 2021
92e6ff1
ignite-cdc Code review fixes.
nizhikov May 14, 2021
197f3e2
ignite-cdc Code review fixes.
nizhikov May 14, 2021
f364a88
ignite-cdc Code review fixes.
nizhikov May 15, 2021
a9d09f4
ignite-cdc Code review fixes.
nizhikov May 17, 2021
16bea82
Revert "IGNITE-14353 Ability to specify postfix for IgniteLogger inst…
nizhikov May 17, 2021
d181fbb
Merge branch 'master' into ignite-cdc
nizhikov May 20, 2021
1b2de82
ignite-cdc revert Logger changes
nizhikov May 20, 2021
4204deb
IGNITE-14353 Ability to specify application name for IgniteLogger (#9…
nizhikov May 20, 2021
4f1c0d9
Merge branch 'master' into ignite-cdc
nizhikov May 25, 2021
4b369f7
Merge branch 'master' into ignite-cdc
nizhikov May 26, 2021
d8fa305
Merge branch 'master' into ignite-cdc
nizhikov Jun 1, 2021
896f013
Merge branch 'master' into ignite-cdc
nizhikov Jun 6, 2021
7acff01
IGNITE-13581 Capture Data Change implementation (#8909)
nizhikov Jun 7, 2021
a2236a6
IGNITE-13581: Minor fix.
nizhikov Jun 7, 2021
975595c
IGNITE-13581: Minor fix.
nizhikov Jun 8, 2021
53d207c
IGNITE-13581: Minor fix.
nizhikov Jun 10, 2021
8f2b78b
IGNITE-13581: Minor fix.
nizhikov Jun 15, 2021
8f568e5
IGNITE-13581: Minor fix.
nizhikov Jun 18, 2021
a2b3a33
Merge branch 'master' into ignite-cdc
nizhikov Jun 22, 2021
374f31e
ChangeDataCapture -> Cdc (#9190)
nizhikov Jun 23, 2021
2ad5e39
ChangeDataCapture -> Cdc
nizhikov Jun 23, 2021
fe923c5
Improve logging
nizhikov Jun 25, 2021
ce2a1bd
Improve logging
nizhikov Jun 25, 2021
f7176d0
Improve logging
nizhikov Jun 28, 2021
a61013c
Improve logging
nizhikov Jun 29, 2021
ad9cf6f
Merge branch 'master' into ignite-cdc
nizhikov Jun 29, 2021
6536248
fix javadoc
nizhikov Jun 30, 2021
cead6e6
NPE fix
nizhikov Jun 30, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions bin/ignite-cdc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

export MAIN_CLASS="org.apache.ignite.startup.cmdline.CdcCommandLineStartup"

if [ "${IGNITE_HOME:-}" = "" ];
then IGNITE_HOME_TMP="$(dirname "$(cd "$(dirname "$0")"; "pwd")")";
else IGNITE_HOME_TMP=${IGNITE_HOME};
fi

${IGNITE_HOME_TMP}/bin/ignite.sh "$@"
4 changes: 2 additions & 2 deletions config/ignite-log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
<Routing name="FILE">
<Routes pattern="$${sys:nodeId}">
<Route>
<RollingFile name="Rolling-${sys:nodeId}" fileName="${sys:IGNITE_HOME}/work/log/ignite-${sys:nodeId}.log"
filePattern="${sys:IGNITE_HOME}/work/log/ignite-${sys:nodeId}-%i-%d{yyyy-MM-dd}.log.gz">
<RollingFile name="Rolling-${sys:nodeId}" fileName="${sys:IGNITE_HOME}/work/log/${sys:appId}-${sys:nodeId}.log"
filePattern="${sys:IGNITE_HOME}/work/log/${sys:appId}-${sys:nodeId}-%i-%d{yyyy-MM-dd}.log.gz">
<PatternLayout pattern="[%d{ISO8601}][%-5p][%t][%c{1}]%notEmpty{[%markerSimpleName]} %m%n"/>
<Policies>
<TimeBasedTriggeringPolicy interval="6" modulate="true" />
Expand Down
2 changes: 1 addition & 1 deletion config/java.util.logging.properties
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ java.util.logging.ConsoleHandler.level=INFO
# under `$IGNITE_HOME/work/log/` directory. The placeholder `%{id8}` is a truncated node ID.
#
org.apache.ignite.logger.java.JavaLoggerFileHandler.formatter=org.apache.ignite.logger.java.JavaLoggerFormatter
org.apache.ignite.logger.java.JavaLoggerFileHandler.pattern=ignite-%{id8}.%g.log
org.apache.ignite.logger.java.JavaLoggerFileHandler.pattern=%{app}-%{id8}.%g.log
org.apache.ignite.logger.java.JavaLoggerFileHandler.level=INFO
org.apache.ignite.logger.java.JavaLoggerFileHandler.limit=10485760
org.apache.ignite.logger.java.JavaLoggerFileHandler.count=10
4 changes: 2 additions & 2 deletions docs/_docs/code-snippets/xml/log4j2-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
<Routing name="FILE">
<Routes pattern="$${sys:nodeId}">
<Route>
<RollingFile name="Rolling-${sys:nodeId}" fileName="${sys:IGNITE_HOME}/work/log/ignite-${sys:nodeId}.log"
filePattern="${sys:IGNITE_HOME}/work/log/ignite-${sys:nodeId}-%i-%d{yyyy-MM-dd}.log.gz">
<RollingFile name="Rolling-${sys:nodeId}" fileName="${sys:IGNITE_HOME}/work/log/${sys:appId}-${sys:nodeId}.log"
filePattern="${sys:IGNITE_HOME}/work/log/${sys:appId}-${sys:nodeId}-%i-%d{yyyy-MM-dd}.log.gz">
<PatternLayout pattern="[%d{ISO8601}][%-5p][%t][%c{1}]%notEmpty{[%markerSimpleName]} %m%n"/>
<Policies>
<TimeBasedTriggeringPolicy interval="6" modulate="true" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;

import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.parseSubFolderName;
import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver.parseSubFolderName;

/**
* Test for new and old style persistent storage folders generation and compatible startup of current ignite version
Expand Down Expand Up @@ -198,12 +198,12 @@ private void assertNodeIndexesInFolder(Integer... indexes) throws IgniteCheckedE
* @throws IgniteCheckedException if failed.
*/
@NotNull private Set<Integer> getAllNodeIndexesInFolder() throws IgniteCheckedException {
final File curFolder = new File(U.defaultWorkDirectory(), PdsConsistentIdProcessor.DB_DEFAULT_FOLDER);
final File curFolder = new File(U.defaultWorkDirectory(), PdsFolderResolver.DB_DEFAULT_FOLDER);
final Set<Integer> indexes = new TreeSet<>();
final File[] files = curFolder.listFiles(PdsConsistentIdProcessor.DB_SUBFOLDERS_NEW_STYLE_FILTER);
final File[] files = curFolder.listFiles(PdsFolderResolver.DB_SUBFOLDERS_NEW_STYLE_FILTER);

for (File file : files) {
final PdsConsistentIdProcessor.FolderCandidate uid
final PdsFolderResolver.FolderCandidate uid
= parseSubFolderName(file, log);

if (uid != null)
Expand All @@ -223,7 +223,7 @@ private void assertPdsDirsDefaultExist(String subDirName) throws IgniteCheckedEx
assertDirectoryExist(DataStorageConfiguration.DFLT_BINARY_METADATA_PATH, subDirName);
assertDirectoryExist(PersistentStoreConfiguration.DFLT_WAL_STORE_PATH, subDirName);
assertDirectoryExist(PersistentStoreConfiguration.DFLT_WAL_ARCHIVE_PATH, subDirName);
assertDirectoryExist(PdsConsistentIdProcessor.DB_DEFAULT_FOLDER, subDirName);
assertDirectoryExist(PdsFolderResolver.DB_DEFAULT_FOLDER, subDirName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1524,7 +1524,8 @@ private void corruptDataEntry(
new GridCacheVersion(),
0L,
partId,
updateCntr
updateCntr,
DataEntry.EMPTY_FLAGS
);

GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.shared().database();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2638,7 +2638,8 @@ private void corruptDataEntry(
new GridCacheVersion(),
0L,
partId,
updateCntr
updateCntr,
DataEntry.EMPTY_FLAGS
);

GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.shared().database();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.cache;

import java.io.Serializable;
import java.util.Map;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.lang.IgniteExperimental;

/**
* Entry event order.
* Two concurrent updates of the same entry can be ordered based on {@link CacheEntryVersion} comparsion.
* Greater value means that event occurs later.
*
* @see CacheConflictResolutionManager
* @see GridCacheVersionManager#dataCenterId(byte)
*/
@IgniteExperimental
public interface CacheEntryVersion extends Comparable<CacheEntryVersion>, Serializable {
/**
* Order of the update. Value is an incremental counter value. Scope of counter is node.
* @return Version order.
*/
public long order();

/** @return Node order on which this version was assigned. */
public int nodeOrder();

/**
* Cluster id is a value to distinguish updates in case user wants to aggregate and sort updates from several
* Ignite clusters. {@code clusterId} id can be set for the node using
* {@link GridCacheVersionManager#dataCenterId(byte)}.
*
* @return Cluster id.
*/
public byte clusterId();

/** @return Topology version plus number of seconds from the start time of the first grid node. */
public int topologyVersion();

/**
* If source of the update is "local" cluster then {@code null} will be returned.
* If updated comes from the other cluster using {@link IgniteInternalCache#putAllConflict(Map)}
* then entry version for other cluster.
* @return Replication version.
* @see IgniteInternalCache#putAllConflict(Map)
* @see IgniteInternalCache#removeAllConflict(Map)
*/
public CacheEntryVersion otherClusterVersion();
}
101 changes: 101 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/cdc/CdcConfiguration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.cdc;

import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.lang.IgniteExperimental;

/**
* This class defines {@link CdcMain} runtime configuration.
* Configuration is passed to {@link CdcMain} constructor.
*/
@IgniteExperimental
public class CdcConfiguration {
/** */
private static final int DFLT_LOCK_TIMEOUT = 1000;

/** */
private static final long DFLT_CHECK_FREQ = 1000L;

/** */
private static final boolean DFLT_KEEP_BINARY = true;

/** Change Data Capture consumer. */
private CdcConsumer consumer;

/** Keep binary flag.<br>Default value {@code true}. */
private boolean keepBinary = DFLT_KEEP_BINARY;

/**
* {@link CdcMain} acquire file lock on startup to ensure exclusive consumption.
* This property specifies amount of time to wait for lock acquisition.<br>
* Default is {@code 1000 ms}.
*/
private long lockTimeout = DFLT_LOCK_TIMEOUT;

/**
* CDC application periodically scans {@link DataStorageConfiguration#getCdcWalPath()} folder to find new WAL segments.
* This frequency specify amount of time application sleeps between subsequent checks when no new files available.
* Default is {@code 1000 ms}.
*/
private long checkFreq = DFLT_CHECK_FREQ;

/** @return CDC consumer. */
public CdcConsumer getConsumer() {
return consumer;
}

/** @param consumer CDC consumer. */
public void setConsumer(CdcConsumer consumer) {
this.consumer = consumer;
}

/** @return keep binary value. */
public boolean isKeepBinary() {
return keepBinary;
}

/** @param keepBinary keep binary value. */
public void setKeepBinary(boolean keepBinary) {
this.keepBinary = keepBinary;
}

/** @return Amount of time to wait for lock acquisition. */
public long getLockTimeout() {
return lockTimeout;
}

/** @param lockTimeout Amount of time to wait for lock acquisition. */
public void setLockTimeout(long lockTimeout) {
this.lockTimeout = lockTimeout;
}

/** @return Amount of time application sleeps between subsequent checks when no new files available. */
public long getCheckFrequency() {
return checkFreq;
}

/**
* @param checkFreq Amount of time application sleeps between subsequent checks when no new
* files available.
*/
public void setCheckFrequency(long checkFreq) {
this.checkFreq = checkFreq;
}
}
77 changes: 77 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.cdc;

import java.util.Iterator;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.resources.LoggerResource;

/**
* Consumer of WAL data change events.
* This consumer will receive data change events during {@link CdcMain} application invocation.
* The lifecycle of the consumer is the following:
* <ul>
* <li>Start of the consumer {@link #start()}.</li>
* <li>Notification of the consumer by the {@link #onEvents(Iterator)} call.</li>
* <li>Stop of the consumer {@link #stop()}.</li>
* </ul>
*
* In case consumer implementation wants to user {@link IgniteLogger}, please, use, {@link LoggerResource} annotation:
* <pre>
* public class ChangeDataCaptureConsumer implements ChangeDataCaptureConsumer {
* &#64;LoggerResource
* private IgniteLogger log;
*
* ...
* }
* </pre>
*
* Note, consumption of the {@link CdcEvent} will be started from the last saved offset.
* The offset of consumptions is saved on the disk every time {@link #onEvents(Iterator)} returns {@code true}.
*
* @see CdcMain
* @see CdcEvent
* @see CacheEntryVersion
*/
@IgniteExperimental
public interface CdcConsumer {
/**
* Starts the consumer.
*/
public void start();

/**
* Handles entry changes events.
* If this method return {@code true} then current offset will be stored
* and ongoing notifications after CDC application fail/restart will be started from it.
*
* @param events Entry change events.
* @return {@code True} if current offset should be saved on the disk
* to continue from it in case any failures or restart.
*/
public boolean onEvents(Iterator<CdcEvent> events);

/**
* Stops the consumer.
* This methods can be invoked only after {@link #start()}.
*/
public void stop();
}
Loading