Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,16 @@ public Builder toRequestBuilder() {
new StreamMetadata(streamId, epoch, resp.startOffset(), resp.nextOffset(), StreamState.OPENED));
case NODE_EPOCH_EXPIRED:
case NODE_EPOCH_NOT_EXIST:
LOGGER.error("Node epoch expired or not exist: {}, code: {}", req, code);
LOGGER.error("Node epoch expired or not exist, stream {}, epoch {}, code: {}", streamId, epoch, code);
throw code.exception();
case STREAM_NOT_EXIST:
case STREAM_FENCED:
case STREAM_INNER_ERROR:
LOGGER.error("Unexpected error while opening stream: {}, code: {}", req, code);
LOGGER.error("Unexpected error while opening stream: {}, epoch {}, code: {}", streamId, epoch, code);
throw code.exception();
case STREAM_NOT_CLOSED:
default:
LOGGER.error("Error while opening stream: {}, code: {}, retry later", req, code);
LOGGER.error("Error while opening stream: {}, epoch {}, code: {}, retry later", streamId, epoch, code);
return ResponseHandleResult.withRetry();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ object ElasticLog extends Logging {
val meta = new ElasticStreamSegmentMeta()
meta.baseOffset(baseOffset)
meta.streamSuffix(suffix)
meta.createTimestamp(time.milliseconds())
val segment: ElasticLogSegment = ElasticLogSegment(dir, meta, streamSliceManager, config, time, logSegmentManager.logSegmentEventListener())
var metaSaveCf: CompletableFuture[Void] = CompletableFuture.completedFuture(null)
if (suffix.equals("")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,13 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,
}

def timeWaitedForRoll(now: Long, messageTimestamp: Long): Long = {
// Load the timestamp of the first message into memory
loadFirstBatchTimestamp()
rollingBasedTimestamp match {
case Some(t) if t >= 0 => messageTimestamp - t
case _ => now - created
val createTime = if (meta.createTimestamp() > 0) {
meta.createTimestamp()
} else {
created
}
// To avoid reading log, we use the create time of the segment instead of the time of the first batch.
now - createTime
}

def getFirstBatchTimestamp(): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class ElasticLogSegmentTest {
segments.clear()
logDir = TestUtils.tempDir()
Context.enableTestMode()
ElasticLogManager.enable(true)
ElasticLogManager.init(kafkaConfig, "fake_cluster_id")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ class ElasticLogTest {
logDirFailureChannel: LogDirFailureChannel = logDirFailureChannel,
clusterId: String = "test_cluster"): ElasticLog = {
Context.enableTestMode()
ElasticLogManager.enable(true)
ElasticLogManager.init(kafkaConfig, clusterId)
ElasticLogManager.getOrCreateLog(dir = dir,
config = config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ElasticUnifiedLogTest {
val props = TestUtils.createSimpleEsBrokerConfig()
config = KafkaConfig.fromProps(props)
Context.enableTestMode()
ElasticLogManager.enable(true)
ElasticLogManager.init(config, clusterId)
}

Expand Down Expand Up @@ -442,14 +443,9 @@ class ElasticUnifiedLogTest {
log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0)
assertEquals(5, log.numberOfSegments, "A new segment should have been rolled out")

// move the wall clock beyond log rolling time
mockTime.sleep(log.config.segmentMs + 1)
log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0)
assertEquals(5, log.numberOfSegments, "Log should not roll because the roll should depend on timestamp of the first message.")

val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
log.appendAsLeader(recordWithExpiredTimestamp, leaderEpoch = 0)
assertEquals(6, log.numberOfSegments, "Log should roll because the timestamp in the message should make the log segment expire.")
assertEquals(5, log.numberOfSegments, "Log should not roll since the log rolling is based on wall clock.")

val numSegments = log.numberOfSegments
mockTime.sleep(log.config.segmentMs + 1)
Expand Down