Skip to content

Commit

Permalink
Remove SimpleMemoryPipelineChannelTest.assertFetchRecordsTimeoutCorre…
Browse files Browse the repository at this point in the history
…ctly (#29651)
  • Loading branch information
sandynz authored Jan 4, 2024
1 parent a74cdc1 commit d164a4c
Showing 1 changed file with 3 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,26 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTaskAckCallback;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

class MemoryPipelineChannelTest {

@SneakyThrows(InterruptedException.class)
@Test
void assertZeroQueueSizeWorks() {
MemoryPipelineChannel channel = new MemoryPipelineChannel(0, records -> {

});
MemoryPipelineChannel channel = new MemoryPipelineChannel(0, new InventoryTaskAckCallback(new AtomicReference<>()));
List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition()));
Thread thread = new Thread(() -> channel.push(records));
thread.start();
assertThat(channel.fetch(1, 500L), is(records));
thread.join();
}

@Test
void assertFetchRecordsTimeoutCorrectly() {
MemoryPipelineChannel channel = new MemoryPipelineChannel(10, records -> {

});
long startMillis = System.currentTimeMillis();
channel.fetch(1, 1L);
long delta = System.currentTimeMillis() - startMillis;
assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " + delta);
startMillis = System.currentTimeMillis();
channel.fetch(1, 500L);
delta = System.currentTimeMillis() - startMillis;
assertTrue(delta >= 500 && delta < 750, "Delta is not in [500,750) : " + delta);
}
}

0 comments on commit d164a4c

Please sign in to comment.