Skip to content

Commit

Permalink
🎉 Source Fecebook-Marketing: add asset_feed_spec to ad creatives …
Browse files Browse the repository at this point in the history
…stream (#8962)
  • Loading branch information
bazarnov committed Dec 21, 2021
1 parent 7c0d195 commit 1029945
Show file tree
Hide file tree
Showing 21 changed files with 610 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.2.29",
"dockerImageTag": "0.2.30",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/facebook-marketing",
"icon": "facebook.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.29
dockerImageTag: 0.2.30
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.29"
- dockerImage: "airbyte/source-facebook-marketing:0.2.30"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ protected Map<UploaderType, BigQueryRecordFormatter> getFormatterMap(JsonNode js
}

/**
* BigQuery might have different structure of the Temporary table.
* If this method returns TRUE, temporary table will have only three common Airbyte attributes.
* In case of FALSE, temporary table structure will be in line with Airbyte message JsonSchema.
* BigQuery might have different structure of the Temporary table. If this method returns TRUE,
* temporary table will have only three common Airbyte attributes. In case of FALSE, temporary table
* structure will be in line with Airbyte message JsonSchema.
*
* @return use default AirbyteSchema or build using JsonSchema
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,10 @@ protected Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> getUp
}

/**
* BigQuery might have different structure of the Temporary table.
* If this method returns TRUE, temporary table will have only three common Airbyte attributes.
* In case of FALSE, temporary table structure will be in line with Airbyte message JsonSchema.
* BigQuery might have different structure of the Temporary table. If this method returns TRUE,
* temporary table will have only three common Airbyte attributes. In case of FALSE, temporary table
* structure will be in line with Airbyte message JsonSchema.
*
* @return use default AirbyteSchema or build using JsonSchema
*/
protected boolean isDefaultAirbyteTmpTableSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import org.slf4j.LoggerFactory;

/**
* The class formats incoming JsonSchema and AirbyteRecord in order to be inline with a corresponding uploader.
* The class formats incoming JsonSchema and AirbyteRecord in order to be inline with a
* corresponding uploader.
*/
public abstract class BigQueryRecordFormatter {

Expand Down Expand Up @@ -60,11 +61,10 @@ protected void logFieldFail(String error, String fieldName) {
public void printAndCleanFieldFails() {
if (!mapOfFailedFields.isEmpty()) {
mapOfFailedFields.forEach(
(error, fieldNames) ->
LOGGER.warn(
"Field(s) fail with error {}. Fields : {} ",
error,
String.join(", ", fieldNames)));
(error, fieldNames) -> LOGGER.warn(
"Field(s) fail with error {}. Fields : {} ",
error,
String.join(", ", fieldNames)));
mapOfFailedFields.clear();
} else {
LOGGER.info("No field fails during record format.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import java.util.concurrent.TimeUnit;

/**
* Default BigQuery formatter.
* Represents default Airbyte schema (three columns).
* Note! Default formatter is used inside Direct uploader.
* Default BigQuery formatter. Represents default Airbyte schema (three columns). Note! Default
* formatter is used inside Direct uploader.
*/
public class DefaultBigQueryRecordFormatter extends BigQueryRecordFormatter {

Expand All @@ -38,8 +37,7 @@ public JsonNode formatRecord(AirbyteRecordMessage recordMessage) {
return Jsons.jsonNode(Map.of(
JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, getEmittedAtField(recordMessage),
JavaBaseConstants.COLUMN_NAME_DATA, getData(recordMessage))
);
JavaBaseConstants.COLUMN_NAME_DATA, getData(recordMessage)));
}

protected Object getEmittedAtField(AirbyteRecordMessage recordMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;

/**
* Formatter for GCS Avro uploader.
* Contains specific filling of default Airbyte attributes.
* Formatter for GCS Avro uploader. Contains specific filling of default Airbyte attributes.
*/
public class GcsAvroBigQueryRecordFormatter extends DefaultBigQueryRecordFormatter {

Expand All @@ -27,4 +26,5 @@ protected Object getEmittedAtField(AirbyteRecordMessage recordMessage) {
protected Object getData(AirbyteRecordMessage recordMessage) {
return StandardNameTransformer.formatJsonPath(recordMessage.getData()).toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import io.airbyte.integrations.destination.StandardNameTransformer;

/**
* Formatter for GCS CSV uploader.
* Contains specific filling of default Airbyte attributes.
* Note! That it might be extended during CSV GCS integration.
* Formatter for GCS CSV uploader. Contains specific filling of default Airbyte attributes. Note!
* That it might be extended during CSV GCS integration.
*/
public class GcsCsvBigQueryRecordFormatter extends DefaultBigQueryRecordFormatter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class AvroRecordFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ public Schema getAvroSchema(final JsonNode jsonSchema,
return assembler.endRecord();
}

Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType, final JsonNode fieldDefinition, final boolean appendExtraProps, final boolean addStringToLogicalTypes) {
Schema getSingleFieldType(final String fieldName,
final JsonSchemaType fieldType,
final JsonNode fieldDefinition,
final boolean appendExtraProps,
final boolean addStringToLogicalTypes) {
Preconditions
.checkState(fieldType != JsonSchemaType.NULL, "Null types should have been filtered out");

Expand Down Expand Up @@ -201,7 +205,8 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType
}
case COMBINED -> {
final Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
final List<Schema> unionTypes = getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get(), appendExtraProps, addStringToLogicalTypes);
final List<Schema> unionTypes =
getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get(), appendExtraProps, addStringToLogicalTypes);
fieldSchema = Schema.createUnion(unionTypes);
}
case ARRAY -> {
Expand All @@ -210,7 +215,8 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType
LOGGER.warn("Source connector provided schema for ARRAY with missed \"items\", will assume that it's a String type");
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else if (items.isObject()) {
fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items, appendExtraProps, addStringToLogicalTypes));
fieldSchema =
Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items, appendExtraProps, addStringToLogicalTypes));
} else if (items.isArray()) {
final List<Schema> arrayElementTypes = getSchemasFromTypes(fieldName, (ArrayNode) items, appendExtraProps, addStringToLogicalTypes);
arrayElementTypes.add(0, NULL_SCHEMA);
Expand All @@ -227,7 +233,10 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType
return fieldSchema;
}

List<Schema> getSchemasFromTypes(final String fieldName, final ArrayNode types, final boolean appendExtraProps, final boolean addStringToLogicalTypes) {
List<Schema> getSchemasFromTypes(final String fieldName,
final ArrayNode types,
final boolean appendExtraProps,
final boolean addStringToLogicalTypes) {
return MoreIterators.toList(types.elements())
.stream()
.flatMap(definition -> getNonNullTypes(fieldName, definition).stream().flatMap(type -> {
Expand All @@ -245,7 +254,10 @@ List<Schema> getSchemasFromTypes(final String fieldName, final ArrayNode types,
/**
* @param fieldDefinition - Json schema field definition. E.g. { type: "number" }.
*/
Schema getNullableFieldTypes(final String fieldName, final JsonNode fieldDefinition, final boolean appendExtraProps, final boolean addStringToLogicalTypes) {
Schema getNullableFieldTypes(final String fieldName,
final JsonNode fieldDefinition,
final boolean appendExtraProps,
final boolean addStringToLogicalTypes) {
// Filter out null types, which will be added back in the end.
final List<Schema> nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public static Connection getConnection(final JsonNode config) throws SQLExceptio
// https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application
// identify airbyte traffic to snowflake to enable partnership & optimization opportunities
properties.put("application", "airbyte");
// Needed for JDK17 - see https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
// Needed for JDK17 - see
// https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
properties.put("JDBC_QUERY_RESULT_FORMAT", "JSON");

return DriverManager.getConnection(connectUrl, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {

Expand All @@ -39,18 +38,22 @@ public AirbyteConnectionStatus check(JsonNode config) {
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
}
}

private static void attemptSQLCreateAndDropStages(String outputSchema, JdbcDatabase database, SnowflakeSQLNameTransformer namingResolver, SnowflakeStagingSqlOperations sqlOperations) throws Exception {
private static void attemptSQLCreateAndDropStages(String outputSchema,
JdbcDatabase database,
SnowflakeSQLNameTransformer namingResolver,
SnowflakeStagingSqlOperations sqlOperations)
throws Exception {

// verify we have permissions to create/drop stage
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""));
String stageName = namingResolver.getStageName(outputSchema, outputTableName);;
sqlOperations.createStageIfNotExists(database, stageName);
sqlOperations.dropStageIfExists(database,stageName);
// verify we have permissions to create/drop stage
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""));
String stageName = namingResolver.getStageName(outputSchema, outputTableName);;
sqlOperations.createStageIfNotExists(database, stageName);
sqlOperations.dropStageIfExists(database, stageName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.29
LABEL io.airbyte.version=0.2.30
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python -m pytest integration_tests -p integration_tests.acceptance
docker build . --no-cache -t airbyte/source-facebook-marketing:dev \
&& python -m pytest -p source_acceptance_test.plugin
```
To run your integration tests with docker

Expand Down

0 comments on commit 1029945

Please sign in to comment.