Skip to content

Commit

Permalink
Wait on close (#71)
Browse files Browse the repository at this point in the history
* close by first stop all

* close by first stop all

* v2.3.1

* stopped writer

* small

* dont finish file on close..

* fix test

* test

* synchronized

Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
  • Loading branch information
ohadbitt and ohbitton committed Jun 13, 2022
1 parent 324e48c commit 8421d38
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -9,7 +9,7 @@
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<description>A Kafka Connect plugin for Azure Data Explorer (Kusto) Database</description>
<version>3.0.0</version>
<version>3.0.1</version>
<properties>
<kafka.version>1.0.0</kafka.version>
<json.version>20090211</json.version>
Expand Down
Expand Up @@ -59,6 +59,7 @@ public class FileWriter implements Closeable {
private final IngestionProperties.DataFormat format;
private BehaviorOnError behaviorOnError;
private boolean shouldWriteAvroAsBytes = false;
private boolean stopped = false;

/**
* @param basePath - This is path to which to write the files to.
Expand Down Expand Up @@ -131,6 +132,11 @@ void finishFile(Boolean delete) throws IOException, DataException {
recordWriter.commit();
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
gzip.finish();

// It could be we were waiting on the lock when task suddenly stops and we should not ingest anymore
if (stopped) {
return;
}
try {
onRollCallback.accept(currentFile);
} catch (ConnectException e) {
Expand All @@ -148,6 +154,7 @@ void finishFile(Boolean delete) throws IOException, DataException {
}
} else {
outputStream.close();
currentFile = null;
}
}

Expand All @@ -162,16 +169,19 @@ private void handleErrors(String message, Exception e) {
}

private void dumpFile() throws IOException {
countingStream.close();
currentFileDescriptor = null;
boolean deleted = currentFile.file.delete();
if (!deleted) {
log.warn("couldn't delete temporary file. File exists: " + currentFile.file.exists());
}
SourceFile temp = currentFile;
currentFile = null;
if (temp != null) {
countingStream.close();
currentFileDescriptor = null;
boolean deleted = temp.file.delete();
if (!deleted) {
log.warn("couldn't delete temporary file. File exists: " + temp.file.exists());
}
}
}

public void rollback() throws IOException {
public synchronized void rollback() throws IOException {
if (countingStream != null) {
countingStream.close();
if (currentFile != null && currentFile.file != null) {
Expand All @@ -180,16 +190,19 @@ public void rollback() throws IOException {
}
}

public void close() throws IOException, DataException {
if (timer != null) {
timer.cancel();
}
@Override
public synchronized void close() throws IOException {
stop();
}

// Flush last file, updating index
finishFile(true);
public synchronized void stop() throws DataException {
stopped = true;

// Setting to null so subsequent calls to close won't write it again
currentFile = null;
if (timer != null) {
Timer temp = timer;
timer = null;
temp.cancel();
}
}

// Set shouldDestroyTimer to true if the current running task should be cancelled
Expand Down Expand Up @@ -218,10 +231,15 @@ public void run() {
void flushByTimeImpl() {
// Flush time interval gets the write lock so that it won't starve
try(AutoCloseableLock ignored = new AutoCloseableLock(reentrantReadWriteLock.writeLock())) {
if (stopped) {
return;
}

// Lock before the check so that if a writing process just flushed this won't ingest empty files
if (isDirty()) {
finishFile(true);
}

resetFlushTimer(false);
} catch (Exception e) {
String fileName = currentFile == null ? "no file created yet" : currentFile.file.getName();
Expand All @@ -231,7 +249,7 @@ void flushByTimeImpl() {
}
}

public synchronized void writeData(SinkRecord record) throws IOException, DataException {
public void writeData(SinkRecord record) throws IOException, DataException {
if (flushError != null) {
throw new ConnectException(flushError);
}
Expand Down
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;


/**
Expand Down Expand Up @@ -347,13 +348,21 @@ public void open(Collection<TopicPartition> partitions) {

@Override
public void close(Collection<TopicPartition> partitions) {
log.warn("Closing writers in KustoSinkTask");
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// First stop so that no more ingestions trigger from timer flushes
partitions.forEach((TopicPartition tp) -> writers.get(tp).stop());
for (TopicPartition tp : partitions) {
try {
writers.get(tp).close();
// TODO: if we still get duplicates from rebalance - consider keeping writers aside - we might
// just get the same topic partition again
writers.remove(tp);
assignment.remove(tp);
} catch (ConnectException e) {
log.error("Error closing writer for {}.", tp, e);
log.error("Error closing topic partition for {}.", tp, e);
} finally {
countDownLatch.countDown();
}
}
}
Expand Down Expand Up @@ -404,6 +413,11 @@ private static boolean isStreamingPolicyEnabled (
@Override
public void stop() {
log.warn("Stopping KustoSinkTask");
// First stop so that no more ingestions trigger from timer flushes
for (TopicPartitionWriter writer : writers.values()) {
writer.stop();
}

for (TopicPartitionWriter writer : writers.values()) {
writer.close();
}
Expand Down Expand Up @@ -453,7 +467,6 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
throw new ConnectException("Topic Partition not configured properly. " +
"verify your `topics` and `kusto.tables.topics.mapping` configurations");
}

Long lastCommittedOffset = writers.get(tp).lastCommittedOffset;

if (lastCommittedOffset != null) {
Expand Down
Expand Up @@ -239,7 +239,7 @@ void open() {
void close() {
try {
fileWriter.rollback();
// fileWriter.close(); TODO ?
fileWriter.close();
} catch (IOException e) {
log.error("Failed to rollback with exception={}", e);
}
Expand All @@ -257,6 +257,10 @@ void close() {
}
}

void stop() {
fileWriter.stop();
}

static String getTempDirectoryName(String tempDirPath) {
String tempDir = "kusto-sink-connector-" + UUID.randomUUID().toString();
Path path = Paths.get(tempDirPath, tempDir);
Expand Down
Expand Up @@ -106,15 +106,15 @@ public void testGzipFileWriter() throws IOException {
Assertions.assertEquals(1, Objects.requireNonNull(folder.listFiles()).length);

// close current file
fileWriter.close();
fileWriter.rotate(54L);
Assertions.assertEquals(5, files.size());

List<Long> sortedFiles = new ArrayList<>(files.values());
sortedFiles.sort((Long x, Long y) -> (int) (y - x));
Assertions.assertEquals(sortedFiles, Arrays.asList((long) 108, (long) 108, (long) 108, (long) 108, (long) 54));

// make sure folder is clear once done
Assertions.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
// make sure folder is clear once done - with only the new file
Assertions.assertEquals(1, Objects.requireNonNull(folder.listFiles()).length);
}

@Test
Expand Down Expand Up @@ -142,7 +142,8 @@ public void testGzipFileWriterFlush() throws IOException, InterruptedException {
Thread.sleep(1000);

Assertions.assertEquals(0, files.size());
fileWriter.close();
fileWriter.rotate(10L);
fileWriter.stop();
Assertions.assertEquals(1, files.size());

String path2 = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2_2").toString();
Expand All @@ -151,7 +152,7 @@ public void testGzipFileWriterFlush() throws IOException, InterruptedException {
Assertions.assertTrue(mkdirs);

Function<Long, String> generateFileName2 = (Long l) -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
// Expect one file to be ingested as flushInterval had changed and is shorter than sleep time
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, new ReentrantReadWriteLock(), ingestionProps.getDataFormat(), BehaviorOnError.FAIL);

String msg2 = "Second Message";
Expand All @@ -167,7 +168,7 @@ public void testGzipFileWriterFlush() throws IOException, InterruptedException {

// make sure folder is clear once done
fileWriter2.close();
Assertions.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
Assertions.assertEquals(1, Objects.requireNonNull(folder.listFiles()).length);
}

@Test
Expand Down Expand Up @@ -244,7 +245,7 @@ class Offsets {
}});

// make sure folder is clear once done
fileWriter2.close();
fileWriter2.stop();
Assertions.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
}

Expand Down
@@ -1,6 +1,8 @@
package com.microsoft.azure.kusto.kafka.connect.sink;

import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -9,6 +11,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.io.File;
import java.nio.charset.StandardCharsets;
Expand All @@ -17,11 +20,14 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

public class KustoSinkTaskTest {
File currentDirectory;
Expand All @@ -37,7 +43,7 @@ public final void before() {

@AfterEach
public final void after() {
currentDirectory.delete();
boolean delete = currentDirectory.delete();
}

@Test
Expand Down Expand Up @@ -123,4 +129,108 @@ public void getTable() {
Assertions.assertNull(kustoSinkTaskSpy.getIngestionProps("topic3"));
}
}

@Test
public void closeTaskAndWaitToFinish() {
HashMap<String, String> configs = KustoSinkConnectorConfigTest.setupConfigs();
KustoSinkTask kustoSinkTask = new KustoSinkTask();
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
kustoSinkTaskSpy.start(configs);
ArrayList<TopicPartition> tps = new ArrayList<>();
tps.add(new TopicPartition("topic1", 1));
tps.add(new TopicPartition("topic2", 2));
tps.add(new TopicPartition("topic2", 3));
kustoSinkTaskSpy.open(tps);

// Clean fast close
long l1 = System.currentTimeMillis();
kustoSinkTaskSpy.close(tps);
long l2 = System.currentTimeMillis();

assertTrue(l2-l1 < 1000);

// Check close time when one close takes time to close
TopicPartition tp = new TopicPartition("topic2", 4);
IngestClient mockedClient = mock(IngestClient.class);
TopicIngestionProperties props = new TopicIngestionProperties();

props.ingestionProperties = kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties;

TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, new KustoSinkConfig(configs), false, null, null);
TopicPartitionWriter writerSpy = spy(writer);
long sleepTime = 2 * 1000;
Answer<Void> answer = invocation -> {
Thread.sleep(sleepTime);
return null;
};

doAnswer(answer).when(writerSpy).close();
kustoSinkTaskSpy.open(tps);
writerSpy.open();
tps.add(tp);
kustoSinkTaskSpy.writers.put(tp, writerSpy);

kustoSinkTaskSpy.close(tps);
long l3 = System.currentTimeMillis();
System.out.println("l3-l2 " + (l3-l2));
assertTrue(l3-l2 > sleepTime && l3-l2 < sleepTime + 1000);
}

@Test
public void precommitDoesntCommitNewerOffsets() throws InterruptedException {
HashMap<String, String> configs = KustoSinkConnectorConfigTest.setupConfigs();
configs.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, "100");
KustoSinkTask kustoSinkTask = new KustoSinkTask();
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
kustoSinkTaskSpy.start(configs);
ArrayList<TopicPartition> tps = new ArrayList<>();
TopicPartition topic1 = new TopicPartition("topic1", 1);
tps.add(topic1);
kustoSinkTaskSpy.open(tps);
TopicPartitionWriter topicPartitionWriter = kustoSinkTaskSpy.writers.get(topic1);
IngestClient mockedClient = mock(IngestClient.class);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties;
TopicPartitionWriter topicPartitionWriterSpy = spy(new TopicPartitionWriter(topic1, mockedClient, props, new KustoSinkConfig(configs), false, null, null));
topicPartitionWriterSpy.open();
kustoSinkTaskSpy.writers.put(topic1,topicPartitionWriterSpy);

ExecutorService executor = Executors.newSingleThreadExecutor();
final Stopped stoppedObj = new Stopped();
AtomicInteger offset = new AtomicInteger(1);
Runnable insertExec = () -> {
while (!stoppedObj.stopped) {
List<SinkRecord> records = new ArrayList<>();

records.add(new SinkRecord("topic1", 1, null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), offset.getAndIncrement()));
kustoSinkTaskSpy.put(new ArrayList<>(records));
try {
Thread.sleep(10);
} catch (InterruptedException ignore) {
}
}
};
Future<?> runner = executor.submit(insertExec);
Thread.sleep(500);
stoppedObj.stopped = true;
runner.cancel(true);
int current = offset.get();
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(topic1, new OffsetAndMetadata(current));

Map<TopicPartition, OffsetAndMetadata> returnedOffsets = kustoSinkTaskSpy.preCommit(offsets);
kustoSinkTaskSpy.close(tps);

// Decrease one cause preCommit adds one
Assertions.assertEquals(returnedOffsets.get(topic1).offset() - 1,topicPartitionWriterSpy.lastCommittedOffset);
Thread.sleep(500);
// No ingestion occur even after waiting
Assertions.assertEquals(returnedOffsets.get(topic1).offset() - 1,topicPartitionWriterSpy.lastCommittedOffset);
}

static class Stopped {
boolean stopped;
}
}

0 comments on commit 8421d38

Please sign in to comment.