Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich committed May 21, 2024
1 parent b80ff14 commit 41f6b35
Showing 1 changed file with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,13 +421,24 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {

setEmittedAtToNull(actualMessages)

val expectedMessages = airbyteMessagesReadOneColumn
val expectedMessages: MutableList<AirbyteMessage> = airbyteMessagesReadOneColumn

expectedMessages.addFirst(AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage(
AirbyteStreamStatusHolder(AirbyteStreamNameNamespacePair(streamName(), defaultNamespace), AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)
))

expectedMessages.addLast(AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage(
AirbyteStreamStatusHolder(AirbyteStreamNameNamespacePair(streamName(), defaultNamespace), AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)
))
setTraceEmittedAtToNull(actualMessages)
setTraceEmittedAtToNull(expectedMessages)

Assertions.assertEquals(expectedMessages.size, actualMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualMessages))
Assertions.assertTrue(actualMessages.containsAll(expectedMessages))
}

protected open val airbyteMessagesReadOneColumn: List<AirbyteMessage>
protected open val airbyteMessagesReadOneColumn: MutableList<AirbyteMessage>
get() {
val expectedMessages =
testMessages
Expand Down Expand Up @@ -765,10 +776,19 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
.count()
.toInt(),
)
val expectedMessages = getExpectedAirbyteMessagesSecondSync(namespace)
val expectedMessages: MutableList<AirbyteMessage> = getExpectedAirbyteMessagesSecondSync(namespace)

setEmittedAtToNull(actualMessagesSecondSync)

expectedMessages.addFirst(AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage(
AirbyteStreamStatusHolder(AirbyteStreamNameNamespacePair(configuredCatalog.streams[0].stream.name, defaultNamespace), AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)
))

expectedMessages.addLast(AirbyteTraceMessageUtility.makeStreamStatusTraceAirbyteMessage(
AirbyteStreamStatusHolder(AirbyteStreamNameNamespacePair(configuredCatalog.streams[0].stream.name, defaultNamespace), AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)
))
setTraceEmittedAtToNull(actualMessagesSecondSync)
setTraceEmittedAtToNull(expectedMessages)
Assertions.assertEquals(expectedMessages.size, actualMessagesSecondSync.size)
Assertions.assertTrue(expectedMessages.containsAll(actualMessagesSecondSync))
Assertions.assertTrue(actualMessagesSecondSync.containsAll(expectedMessages))
Expand All @@ -788,7 +808,7 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {

protected open fun getExpectedAirbyteMessagesSecondSync(
namespace: String?
): List<AirbyteMessage> {
): MutableList<AirbyteMessage> {
val expectedMessages: MutableList<AirbyteMessage> = ArrayList()
expectedMessages.add(
AirbyteMessage()
Expand Down

0 comments on commit 41f6b35

Please sign in to comment.