Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
aa58950
SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration (#1489)
dxichen May 6, 2021
c117a68
SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async com…
dxichen May 7, 2021
c85aade
SAMZA-2591: Async Commit [3/3]: Container restore lifecycle (#1491)
dxichen May 12, 2021
7cc4eaa
SAMZA-2657: Blob Store as backend for Samza State backup and restore …
shekhars-li May 25, 2021
c2b77f0
Fix StorageConfig bug from newly introduced BlobStoreRestore factory …
shekhars-li May 28, 2021
354620f
SnapshotIndexSerde serialization fixes
dxichen Jun 2, 2021
48a8785
Checkpoint-tool changes for checkpoint v2
dxichen Jun 1, 2021
2063155
HOT FIX: KafkaConsumerProxy without any registered TopicPartitions
dxichen Jun 4, 2021
271ef64
Fix async cleanup TaskMetric
dxichen Jun 7, 2021
7d61a7d
Added lastCommitNs and commitTimedOut metrics, updated default timeou…
shekhars-li Jun 8, 2021
7f5ff3a
Updated log line in BlobStoreUtil
shekhars-li Jun 11, 2021
35c0dbc
Checkpoint version config precedence list
dxichen Jun 18, 2021
7676da9
Per store restore manager config precendence
dxichen Jun 30, 2021
5379f97
Job level state backend configs
dxichen Jun 30, 2021
0697457
Fixed case where there are no stores to backup for CSM
dxichen Jul 28, 2021
e74c241
Added tests for CSM for non configured stores
dxichen Jul 29, 2021
ad408d5
Cleanup of TODOs
dxichen Aug 2, 2021
785d506
checkstype cleanup
dxichen Aug 3, 2021
3c8a462
checkstyle for samza-api
dxichen Aug 3, 2021
65b5df6
Test fix for failing job: only backup persistent stores
shekhars-li Aug 3, 2021
48699c4
Merge pull request #1512 from dxichen/checkpoint-restore-precendence-…
prateekm Aug 5, 2021
e975b86
Merge branch 'master' into state-backend-async-commit
dxichen Aug 6, 2021
c95d828
Merge pull request #1513 from dxichen/merge-state-backend-async-commit
prateekm Aug 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ project(":samza-core_$scalaSuffix") {
compile "org.apache.commons:commons-lang3:$commonsLang3Version"
compile "commons-io:commons-io:$commonsIoVersion"
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
compile "org.scala-lang:scala-library:$scalaVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
yarnVersion = "2.7.1"
zkClientVersion = "0.11"
zookeeperVersion = "3.4.13"
failsafeVersion = "1.1.0"
failsafeVersion = "2.4.0"
jlineVersion = "3.8.2"
jnaVersion = "4.5.1"
couchbaseClientVersion = "2.7.2"
Expand Down
57 changes: 15 additions & 42 deletions samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,27 @@

package org.apache.samza.checkpoint;

import java.util.Map;
import org.apache.samza.system.SystemStreamPartition;

import java.util.Collections;
import java.util.Map;

/**
* A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
* It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
* of restarting a failed container within a running job.
*/
public class Checkpoint {
private final Map<SystemStreamPartition, String> offsets;

public interface Checkpoint {
/**
* Constructs a new checkpoint based off a map of Samza stream offsets.
* @param offsets Map of Samza streams to their current offset.
* Gets the version number of the Checkpoint
* @return Short indicating the version number
*/
public Checkpoint(Map<SystemStreamPartition, String> offsets) {
this.offsets = offsets;
}
short getVersion();

/**
* Gets a unmodifiable view of the current Samza stream offsets.
* @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
* Gets a unmodifiable view of the last processed offsets for {@link SystemStreamPartition}s.
* The returned value differs based on the Checkpoint version:
* <ol>
* <li>For {@link CheckpointV1}, returns the input {@link SystemStreamPartition} offsets, as well
* as the latest KafkaStateChangelogOffset for any store changelog {@link SystemStreamPartition} </li>
* <li>For {@link CheckpointV2} returns the input offsets only.</li>
* </ol>
*
* @return a unmodifiable view of last processed offsets for {@link SystemStreamPartition}s.
*/
public Map<SystemStreamPartition, String> getOffsets() {
return Collections.unmodifiableMap(offsets);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Checkpoint)) return false;

Checkpoint that = (Checkpoint) o;

if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;

return true;
}

@Override
public int hashCode() {
return offsets != null ? offsets.hashCode() : 0;
}

@Override
public String toString() {
return "Checkpoint [offsets=" + offsets + "]";
}
Map<SystemStreamPartition, String> getOffsets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,64 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.checkpoint;

import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.annotation.InterfaceStability;


/**
* Checkpoint ID has the format: [currentTimeMillis, last 6 digits of nanotime], separated by a dash.
* This is to avoid conflicts, e.g when requesting frequent manual commits.
*
* It is expected that persistent stores use the {@link #toString()} representation of the checkpoint id
* It is expected that persistent stores use the {@link #serialize()} representation of the checkpoint id
* as the store checkpoint directory name.
*/
@InterfaceStability.Unstable
public class CheckpointId {
public class CheckpointId implements Comparable<CheckpointId> {
public static final String SEPARATOR = "-";

private final long millis;
private final long nanos;
private final long nanoId;

public CheckpointId(long millis, long nanos) {
private CheckpointId(long millis, long nanoId) {
this.millis = millis;
this.nanos = nanos;
this.nanoId = nanoId;
}

public static CheckpointId create() {
return new CheckpointId(System.currentTimeMillis(), System.nanoTime() % 1000000);
}

public static CheckpointId fromString(String checkpointId) {
public static CheckpointId deserialize(String checkpointId) {
if (StringUtils.isBlank(checkpointId)) {
throw new IllegalArgumentException("Invalid checkpoint id: " + checkpointId);
}
String[] parts = checkpointId.split(SEPARATOR);
return new CheckpointId(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
try {
String[] parts = checkpointId.split(SEPARATOR);
return new CheckpointId(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(String.format(
"Could not deserialize CheckpointId: %s", checkpointId), ex);
}
}

public long getMillis() {
return millis;
}

public long getNanos() {
return nanos;
public long getNanoId() {
return nanoId;
}

@Override
public String toString() {
return String.format("%s%s%s", millis, SEPARATOR, nanos);
/**
* Serialization of {@link CheckpointId} as part of task checkpoints, in conjunction with {@link #deserialize(String)}.
* @return the String representation of this {@link CheckpointId}.
*/
public String serialize() {
return String.format("%s%s%s", getMillis(), SEPARATOR, getNanoId());
}

@Override
Expand All @@ -72,11 +82,22 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
CheckpointId that = (CheckpointId) o;
return millis == that.millis &&
nanos == that.nanos;
nanoId == that.nanoId;
}

@Override
public int hashCode() {
return Objects.hash(millis, nanos);
return Objects.hash(millis, nanoId);
}

@Override
public int compareTo(CheckpointId that) {
if (this.millis != that.millis) return Long.compare(this.millis, that.millis);
else return Long.compare(this.nanoId, that.nanoId);
}

@Override
public String toString() {
return String.format("%s%s%s", millis, SEPARATOR, nanoId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.checkpoint;

import org.apache.samza.system.SystemStreamPartition;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

/**
* A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
* It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
* of restarting a failed container within a running job.
*/
public class CheckpointV1 implements Checkpoint {
public static final short CHECKPOINT_VERSION = 1;

private final Map<SystemStreamPartition, String> offsets;

/**
* Constructs a new checkpoint based off a map of Samza stream offsets.
*
* @param offsets Map of Samza streams to their current offset.
*/
public CheckpointV1(Map<SystemStreamPartition, String> offsets) {
this.offsets = offsets;
}

public short getVersion() {
return CHECKPOINT_VERSION;
}

/**
* Gets a unmodifiable view of the current Samza input stream offsets.
*
* @return a unmodifiable view of a Map of Samza streams to their recorded offsets.
*/
@Override
public Map<SystemStreamPartition, String> getOffsets() {
return Collections.unmodifiableMap(offsets);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CheckpointV1 that = (CheckpointV1) o;

return Objects.equals(offsets, that.offsets);
}

@Override
public int hashCode() {
return Objects.hash(offsets);
}

@Override
public String toString() {
return "CheckpointV1 [CHECKPOINT_VERSION=" + CHECKPOINT_VERSION + ", offsets=" + offsets + "]";
}
}
123 changes: 123 additions & 0 deletions samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.checkpoint;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Objects;
import org.apache.samza.system.SystemStreamPartition;

import java.util.Map;

/**
* A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
* It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
* of restarting a failed container within a running job.
*/

public class CheckpointV2 implements Checkpoint {
public static final short CHECKPOINT_VERSION = 2;

private final CheckpointId checkpointId;
private final Map<SystemStreamPartition, String> inputOffsets;
private final Map<String, Map<String, String>> stateCheckpointMarkers;

/**
* Constructs the checkpoint with separated input and state offsets
*
* @param checkpointId {@link CheckpointId} associated with this checkpoint
* @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
* @param stateCheckpoints Map of state backend factory name to map of local state store names
* to state checkpoints
*/
public CheckpointV2(CheckpointId checkpointId,
Map<SystemStreamPartition, String> inputOffsets,
Map<String, Map<String, String>> stateCheckpoints) {
Preconditions.checkArgument(inputOffsets != null,
"inputOffsets for CheckpointV2 must not be null");
Preconditions.checkArgument(stateCheckpoints != null,
"stateCheckpoints for CheckpointV2 must not be null");
this.checkpointId = checkpointId;
this.inputOffsets = ImmutableMap.copyOf(inputOffsets);
this.stateCheckpointMarkers = ImmutableMap.copyOf(stateCheckpoints);
}

public short getVersion() {
return CHECKPOINT_VERSION;
}

/**
* Gets the checkpoint id for the checkpoint
* @return The timestamp based checkpoint identifier associated with the checkpoint
*/
public CheckpointId getCheckpointId() {
return checkpointId;
}

/**
* Gets a unmodifiable view of the current input {@link SystemStreamPartition} offsets.
* @return An unmodifiable map of input {@link SystemStreamPartition}s to their recorded offsets.
*/
@Override
public Map<SystemStreamPartition, String> getOffsets() {
return inputOffsets;
}

/**
* Gets the state checkpoint markers for all stores for each configured state backend.
*
* Note: We don't add this method to the {@link Checkpoint} interface since it is difficult
* to implement it for {@link CheckpointV1} without changing the underlying serialization format -
* the changelog SSP offsets are serialized in the same way as input offsets, and at
* deserialization time we don't have enough information (e.g. configs) to decide whether a
* particular entry is for an input SSP or a changelog SSP.
*
* @return Map of state backend factory name to map of local state store names to state checkpoint markers
*/
public Map<String, Map<String, String>> getStateCheckpointMarkers() {
return stateCheckpointMarkers;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CheckpointV2 that = (CheckpointV2) o;

return checkpointId.equals(that.checkpointId) &&
Objects.equals(inputOffsets, that.inputOffsets) &&
Objects.equals(stateCheckpointMarkers, that.stateCheckpointMarkers);
}

@Override
public int hashCode() {
return Objects.hash(checkpointId, inputOffsets, stateCheckpointMarkers);
}

@Override
public String toString() {
return "CheckpointV2 [CHECKPOINT_VERSION=" + CHECKPOINT_VERSION + ", checkpointId=" + checkpointId +
", inputOffsets=" + inputOffsets + ", stateCheckpointMarkers=" + stateCheckpointMarkers + "]";
}
}
Loading