Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fixed test for gcs, csv, json-local, mongodb and meilisearch destination #6134

Merged
merged 5 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
"name": "MeiliSearch",
"dockerRepository": "airbyte/destination-meilisearch",
"dockerImageTag": "0.2.9",
"dockerImageTag": "0.2.10",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.9
dockerImageTag: 0.2.10
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,9 @@ public void testSyncWithLargeRecordBatch(String messagesFilename, String catalog
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());

final List<AirbyteMessage> largeNumberRecords = Collections.nCopies(1000, messages).stream().flatMap(List::stream).collect(Collectors.toList());
final List<AirbyteMessage> largeNumberRecords = Collections.nCopies(400, messages).stream().flatMap(List::stream).collect(Collectors.toList());
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved

final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false);
}

Expand Down Expand Up @@ -972,7 +971,11 @@ private List<AirbyteMessage> runSync(JsonNode config, List<AirbyteMessage> messa
final AirbyteDestination destination = getDestination();

destination.start(destinationConfig, jobRoot);
messages.forEach(message -> Exceptions.toRuntime(() -> destination.accept(message)));
try {
messages.forEach(message -> Exceptions.toRuntime(() -> destination.accept(message)));
} catch (Exception e) {
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.error(e.getMessage());
}
destination.notifyEndOfStream();

List<AirbyteMessage> destinationOutput = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
JsonNode streamSchema)
throws Exception {
final List<Path> allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());

final Optional<Path> streamOutput =
allOutputs.stream().filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName)))
allOutputs.stream()
.filter(path -> path.getFileName().toString().endsWith(new StandardNameTransformer().getRawTableName(streamName) + ".csv"))
.findFirst();

assertTrue(streamOutput.isPresent(), "could not find output file for stream: " + streamName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.airbyte.integrations.destination.s3.S3DestinationConstants.NAME_TRANSFORMER;

/**
* When adding a new GCS destination acceptance test, extend this class and do the following:
* <li>Implement {@link #getFormatConfig} that returns a {@link S3FormatConfig}</li>
Expand Down Expand Up @@ -107,6 +109,7 @@ protected List<S3ObjectSummary> getAllSyncedObjects(String streamName, String na
.listObjects(config.getBucketName(), outputPrefix)
.getObjectSummaries()
.stream()
.filter(o -> o.getKey().contains(NAME_TRANSFORMER.convertStreamName(streamName) + "/"))
.sorted(Comparator.comparingLong(o -> o.getLastModified().getTime()))
.collect(Collectors.toList());
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
throws Exception {
final List<Path> allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());
final Optional<Path> streamOutput = allOutputs.stream()
.filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName)))
.filter(path -> path.getFileName().toString().endsWith(new StandardNameTransformer().getRawTableName(streamName) + ".jsonl"))
.findFirst();

assertTrue(streamOutput.isPresent(), "could not find output file for stream: " + streamName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.9
LABEL io.airbyte.version=0.2.10
LABEL io.airbyte.name=airbyte/destination-meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,8 +86,10 @@ public class MeiliSearchDestination extends BaseConnector implements Destination
private static final Logger LOGGER = LoggerFactory.getLogger(MeiliSearchDestination.class);

private static final int MAX_BATCH_SIZE = 10000;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSSSSSSS");

public static final String AB_PK_COLUMN = "_ab_pk";
public static final String AB_EMITTED_AT_COLUMN = "_ab_emitted_at";

@Override
public AirbyteConnectionStatus check(JsonNode config) {
Expand Down Expand Up @@ -164,6 +169,7 @@ private static RecordWriter recordWriterFunction(final Map<String, Index> indexN
.stream()
.map(AirbyteRecordMessage::getData)
.peek(o -> ((ObjectNode) o).put(AB_PK_COLUMN, Names.toAlphanumericAndUnderscore(UUID.randomUUID().toString())))
.peek(o -> ((ObjectNode) o).put(AB_EMITTED_AT_COLUMN, LocalDateTime.now().format(FORMATTER)))
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList()));
final String s = index.addDocuments(json);
LOGGER.info("add docs response {}", s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -101,10 +102,12 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv env,
final Index index = meiliSearchClient.index(Names.toAlphanumericAndUnderscore(streamName));
final String responseString = index.getDocuments();
final JsonNode response = Jsons.deserialize(responseString);
return MoreStreams.toStream(response.iterator())
return MoreStreams.toStream(response.iterator())
// strip out the airbyte primary key because the test cases only expect the data, no the airbyte
// metadata column.
.peek(r -> ((ObjectNode) r).remove(MeiliSearchDestination.AB_PK_COLUMN))
.sorted(Comparator.comparing(o -> o.get(MeiliSearchDestination.AB_EMITTED_AT_COLUMN).asText()))
.peek(r -> ((ObjectNode) r).remove(MeiliSearchDestination.AB_EMITTED_AT_COLUMN))
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList());
}

Expand Down