From 4952373e554034749ff3729c953610994199c12d Mon Sep 17 00:00:00 2001 From: Daniel Jimenez Date: Sun, 29 Apr 2018 10:01:34 +0200 Subject: [PATCH] NIFI-4731: BQ Processors and GCP library update. --- .../nifi-gcp-processors/pom.xml | 10 +- .../processors/gcp/AbstractGCPProcessor.java | 3 +- .../bigquery/AbstractBigQueryProcessor.java | 122 ++++++++ .../gcp/bigquery/BigQueryAttributes.java | 80 ++++++ .../nifi/processors/gcp/bigquery/BqUtils.java | 84 ++++++ .../gcp/bigquery/PutBigQueryBatch.java | 269 ++++++++++++++++++ .../gcp/storage/AbstractGCSProcessor.java | 20 +- .../processors/gcp/storage/ListGCSBucket.java | 11 +- .../org.apache.nifi.processor.Processor | 3 +- .../gcp/bigquery/AbstractBQTest.java | 96 +++++++ .../gcp/bigquery/AbstractBigQueryIT.java | 79 +++++ .../gcp/bigquery/PutBigQueryBatchIT.java | 137 +++++++++ .../gcp/bigquery/PutBigQueryBatchTest.java | 153 ++++++++++ .../gcp/storage/ListGCSBucketTest.java | 1 - .../nifi-gcp-services-api/pom.xml | 2 +- 15 files changed, 1048 insertions(+), 22 deletions(-) create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index 4fa1924c3fee..c09c72ea2d95 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -63,7 +63,7 @@ com.google.cloud - google-cloud-storage + google-cloud-core com.google.code.findbugs @@ -71,6 +71,14 @@ + + com.google.cloud + google-cloud-storage + + + com.google.cloud + google-cloud-bigquery + com.google.cloud google-cloud-pubsub diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java index 0da6c62468ce..e52f4d31211e 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java @@ -43,7 +43,8 @@ public abstract class AbstractGCPProcessor< .Builder().name("gcp-project-id") .displayName("Project ID") .description("Google Cloud Project ID") - .required(true) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java new file mode 100644 index 000000000000..b52a552fd194 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.common.collect.ImmutableList; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.AbstractGCPProcessor; +import org.apache.nifi.util.StringUtils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Base class for creating processors that connect to GCP BiqQuery service + */ +public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor { + static final int BUFFER_SIZE = 65536; + public static final Relationship REL_SUCCESS = + new Relationship.Builder().name("success") + .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.") + .build(); + public static final Relationship REL_FAILURE = + new Relationship.Builder().name("failure") + .description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.") + .build(); + + public static final Set relationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + public static final PropertyDescriptor DATASET = new PropertyDescriptor + .Builder().name(BigQueryAttributes.DATASET_ATTR) + .displayName("Dataset") + .description(BigQueryAttributes.DATASET_DESC) + .required(true) + .defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor + .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR) + .displayName("Table Name") + .description(BigQueryAttributes.TABLE_NAME_DESC) + .required(true) + .defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor + .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR) + .displayName("Table Schema") + .description(BigQueryAttributes.TABLE_SCHEMA_DESC) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor + .Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR) + .displayName("Read Timeout") + .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC) + .required(true) + .defaultValue("5 minutes") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList.builder() + .addAll(super.getSupportedPropertyDescriptors()) + .build(); + } + + @Override + protected BigQueryOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) { + final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(); + final Integer retryCount = Integer.valueOf(context.getProperty(RETRY_COUNT).getValue()); + + BigQueryOptions.Builder builder = BigQueryOptions.newBuilder().setCredentials(credentials); + + if (!StringUtils.isBlank(projectId)) { + builder.setProjectId(projectId); + } + + return builder + .setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(retryCount).build()) + .build(); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java new file mode 100644 index 000000000000..380f7017fcef --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.bigquery; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors; + +/** + * Attributes associated with the BigQuery processors + */ +public class BigQueryAttributes { + private BigQueryAttributes() {} + + public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE; + + public static final String DATASET_ATTR = "bq.dataset"; + public static final String DATASET_DESC = "BigQuery dataset"; + + public static final String TABLE_NAME_ATTR = "bq.table.name"; + public static final String TABLE_NAME_DESC = "BigQuery table name"; + + public static final String TABLE_SCHEMA_ATTR = "bq.table.schema"; + public static final String TABLE_SCHEMA_DESC = "BigQuery table name"; + + public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition"; + public static final String CREATE_DISPOSITION_DESC = "Options for table creation"; + + public static final String JOB_ERROR_MSG_ATTR = "bq.error.message"; + public static final String JOB_ERROR_MSG_DESC = "Load job error message"; + + public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason"; + public static final String JOB_ERROR_REASON_DESC = "Load job error reason"; + + public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location"; + public static final String JOB_ERROR_LOCATION_DESC = "Load job error location"; + + public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout"; + public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out"; + + + // Batch Attributes + public static final String SOURCE_TYPE_ATTR = "bq.load.type"; + public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded"; + + public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown"; + public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema"; + + public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition"; + public static final String WRITE_DISPOSITION_DESC = "Options for writing to table"; + + public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords"; + public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error"; + + public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time"; + public static final String JOB_CREATE_TIME_DESC = "Time load job creation"; + + public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time"; + public static final String JOB_END_TIME_DESC = "Time load job ended"; + + public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time"; + public static final String JOB_START_TIME_DESC = "Time load job started"; + + public static final String JOB_LINK_ATTR = "bq.job.link"; + public static final String JOB_LINK_DESC = "API Link to load job"; +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java new file mode 100644 index 000000000000..f7f5d665860b --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +/** + * + */ +public class BqUtils { + private final static Type gsonSchemaType = new TypeToken>() { + }.getType(); + + public static Field mapToField(Map fMap) { + String typeStr = fMap.get("type").toString(); + String nameStr = fMap.get("name").toString(); + String modeStr = fMap.get("mode").toString(); + LegacySQLTypeName type = null; + + if (typeStr.equals("BOOLEAN")) { + type = LegacySQLTypeName.BOOLEAN; + } else if (typeStr.equals("STRING")) { + type = LegacySQLTypeName.STRING; + } else if (typeStr.equals("BYTES")) { + type = LegacySQLTypeName.BYTES; + } else if (typeStr.equals("INTEGER")) { + type = LegacySQLTypeName.INTEGER; + } else if (typeStr.equals("FLOAT")) { + type = LegacySQLTypeName.FLOAT; + } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE") + || typeStr.equals("TIME") || typeStr.equals("DATETIME")) { + type = LegacySQLTypeName.TIMESTAMP; + } else if (typeStr.equals("RECORD")) { + type = LegacySQLTypeName.RECORD; + } + + return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build(); + } + + public static List listToFields(List m_fields) { + List fields = new ArrayList(m_fields.size()); + for (Map m : m_fields) { + fields.add(mapToField(m)); + } + + return fields; + } + + public static Schema schemaFromString(String schemaStr) { + if (schemaStr == null) { + return null; + } else { + Gson gson = new Gson(); + List fields = gson.fromJson(schemaStr, gsonSchemaType); + return Schema.of(BqUtils.listToFields(fields)); + } + } + +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java new file mode 100644 index 000000000000..99c7f2a159e8 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.cloud.RetryOption; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableDataWriteChannel; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import com.google.common.collect.ImmutableList; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject; +import org.apache.nifi.processors.gcp.storage.PutGCSObject; +import org.apache.nifi.util.StringUtils; +import org.threeten.bp.Duration; +import org.threeten.bp.temporal.ChronoUnit; + +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * A processor for batch loading data into a Google BigQuery table + */ + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"google", "google cloud", "bq", "bigquery"}) +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.") +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class}) + +@WritesAttributes({ + @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC), + @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC), + @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC), + @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC) +}) + +public class PutBigQueryBatch extends AbstractBigQueryProcessor { + + public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor + .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR) + .displayName("Load file type") + .description(BigQueryAttributes.SOURCE_TYPE_DESC) + .required(true) + .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType()) + .defaultValue(FormatOptions.avro().getType()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR) + .displayName("Ignore Unknown Values") + .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC) + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR) + .displayName("Create Disposition") + .description(BigQueryAttributes.CREATE_DISPOSITION_DESC) + .required(true) + .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name()) + .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR) + .displayName("Write Disposition") + .description(BigQueryAttributes.WRITE_DISPOSITION_DESC) + .required(true) + .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name()) + .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.MAX_BADRECORDS_ATTR) + .displayName("Max Bad Records") + .description(BigQueryAttributes.MAX_BADRECORDS_DESC) + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private Schema schemaCache = null; + + public PutBigQueryBatch() { + + } + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList.builder() + .addAll(super.getSupportedPropertyDescriptors()) + .add(DATASET) + .add(TABLE_NAME) + .add(TABLE_SCHEMA) + .add(SOURCE_TYPE) + .add(CREATE_DISPOSITION) + .add(WRITE_DISPOSITION) + .add(MAXBAD_RECORDS) + .add(IGNORE_UNKNOWN) + .build(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Map attributes = new HashMap<>(); + + final BigQuery bq = getCloudService(); + + final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(); + final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + final TableId tableId; + if (StringUtils.isEmpty(projectId)) { + tableId = TableId.of(dataset, tableName); + } else { + tableId = TableId.of(projectId, dataset, tableName); + } + + final String fileType = context.getProperty(SOURCE_TYPE).getValue(); + + String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue(); + Schema schema = BqUtils.schemaFromString(schemaString); + + WriteChannelConfiguration writeChannelConfiguration = + WriteChannelConfiguration.newBuilder(tableId) + .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue())) + .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue())) + .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean()) + .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger()) + .setSchema(schema) + .setFormatOptions(FormatOptions.of(fileType)) + .build(); + + TableDataWriteChannel writer = bq.writer(writeChannelConfiguration); + + try { + session.read(flowFile, rawIn -> { + ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn); + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE); + while (readableByteChannel.read(byteBuffer) >= 0) { + byteBuffer.flip(); + writer.write(byteBuffer); + byteBuffer.clear(); + } + }); + + writer.close(); + + Job job = writer.getJob(); + PropertyValue property = context.getProperty(READ_TIMEOUT); + Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS); + Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS); + job = job.waitFor(RetryOption.totalTimeout(duration)); + + if (job != null) { + attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime())); + attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime())); + attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime())); + attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink()); + + boolean jobError = (job.getStatus().getError() != null); + + if (jobError) { + attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage()); + attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason()); + attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation()); + } else { + // in case it got looped back from error + flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR); + flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR); + flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR); + } + + if (!attributes.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + + if (jobError) { + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } else { + session.transfer(flowFile, REL_SUCCESS); + } + } + + } catch (Exception ex) { + getLogger().log(LogLevel.ERROR, ex.getMessage(), ex); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java index bed596bc3bb2..762acf430899 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java @@ -16,10 +16,8 @@ */ package org.apache.nifi.processors.gcp.storage; -import com.google.api.client.http.HttpTransport; import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.gax.retrying.RetrySettings; -import com.google.auth.http.HttpTransportFactory; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.Storage; @@ -71,7 +69,7 @@ public List getSupportedPropertyDescriptors() { @Override protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) { - final String projectId = context.getProperty(PROJECT_ID).getValue(); + final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(); final Integer retryCount = context.getProperty(RETRY_COUNT).asInteger(); final String proxyHost = context.getProperty(PROXY_HOST).getValue(); @@ -79,20 +77,18 @@ protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredent StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder() .setCredentials(credentials) - .setProjectId(projectId) .setRetrySettings(RetrySettings.newBuilder() .setMaxAttempts(retryCount) .build()); + if (!projectId.isEmpty()) { + storageOptionsBuilder.setProjectId(projectId); + } + if (!StringUtils.isBlank(proxyHost) && proxyPort > 0) { - storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(new HttpTransportFactory() { - @Override - public HttpTransport create() { - return new NetHttpTransport.Builder() - .setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort))) - .build(); - } - }).build()); + storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(() -> new NetHttpTransport.Builder() + .setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort))) + .build()).build()); } return storageOptionsBuilder.build(); } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java index e018814c0cef..01293cfe3263 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java @@ -150,8 +150,8 @@ public class ListGCSBucket extends AbstractGCSProcessor { .displayName("Bucket") .description(BUCKET_DESC) .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder() @@ -159,7 +159,8 @@ public class ListGCSBucket extends AbstractGCSProcessor { .displayName("Prefix") .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').") .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor USE_GENERATIONS = new PropertyDescriptor.Builder() @@ -242,9 +243,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro final long startNanos = System.nanoTime(); - final String bucket = context.getProperty(BUCKET).getValue(); + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); - final String prefix = context.getProperty(PREFIX).getValue(); + final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 249d19eb15c7..4ff9dfb2f162 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -17,4 +17,5 @@ org.apache.nifi.processors.gcp.storage.FetchGCSObject org.apache.nifi.processors.gcp.storage.DeleteGCSObject org.apache.nifi.processors.gcp.storage.ListGCSBucket org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub -org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub \ No newline at end of file +org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub +org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java new file mode 100644 index 000000000000..e424a3a82b6e --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.bigquery; + + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; + +/** + * Base class for BigQuery Unit Tests. Provides a framework for creating a TestRunner instance with always-required credentials. + */ +public abstract class AbstractBQTest { + private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project"); + private static final Integer RETRIES = 9; + + static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + } + + public static TestRunner buildNewRunner(Processor processor) throws Exception { + final GCPCredentialsService credentialsService = new GCPCredentialsControllerService(); + + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.addControllerService("gcpCredentialsControllerService", credentialsService); + runner.enableControllerService(credentialsService); + + runner.setProperty(AbstractBigQueryProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcpCredentialsControllerService"); + runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID); + runner.setProperty(AbstractBigQueryProcessor.RETRY_COUNT, String.valueOf(RETRIES)); + + runner.assertValid(credentialsService); + + return runner; + } + + public abstract AbstractBigQueryProcessor getProcessor(); + + protected abstract void addRequiredPropertiesToRunner(TestRunner runner); + + @Mock + protected BigQuery bq; + + @Test + public void testBiqQueryOptionsConfiguration() throws Exception { + reset(bq); + final TestRunner runner = buildNewRunner(getProcessor()); + + final AbstractBigQueryProcessor processor = getProcessor(); + final GoogleCredentials mockCredentials = mock(GoogleCredentials.class); + + final BigQueryOptions options = processor.getServiceOptions(runner.getProcessContext(), + mockCredentials); + + assertEquals("Project IDs should match", + PROJECT_ID, options.getProjectId()); + + assertEquals("Retry counts should match", + RETRIES.intValue(), options.getRetrySettings().getMaxAttempts()); + + assertSame("Credentials should be configured correctly", + mockCredentials, options.getCredentials()); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java new file mode 100644 index 000000000000..52327d4b2474 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processors.gcp.GCPIntegrationTests; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertNull; + +@Category(GCPIntegrationTests.class) +public abstract class AbstractBigQueryIT { + + static final String CONTROLLER_SERVICE = "GCPCredentialsService"; + protected static BigQuery bigquery; + protected static Dataset dataset; + protected static TestRunner runner; + + @BeforeClass + public static void beforeClass() { + dataset = null; + BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder() + .build(); + bigquery = bigQueryOptions.getService(); + + DatasetInfo datasetInfo = DatasetInfo.newBuilder(RemoteBigQueryHelper.generateDatasetName()).build(); + dataset = bigquery.create(datasetInfo); + } + + @AfterClass + public static void afterClass() { + bigquery.delete(dataset.getDatasetId(), BigQuery.DatasetDeleteOption.deleteContents()); + } + + protected static void validateNoServiceExceptionAttribute(FlowFile flowFile) { + assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_MSG_ATTR)); + assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_REASON_ATTR)); + assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR)); + } + + protected TestRunner setCredentialsControllerService(TestRunner runner) throws InitializationException { + final Map propertiesMap = new HashMap<>(); + final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService(); + + runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap); + runner.enableControllerService(credentialsControllerService); + runner.assertValid(credentialsControllerService); + + return runner; + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java new file mode 100644 index 000000000000..8686213e6993 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.cloud.bigquery.FormatOptions; +import org.apache.nifi.processors.gcp.AbstractGCPProcessor; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + +public class PutBigQueryBatchIT extends AbstractBigQueryIT { + + private static final String TABLE_SCHEMA_STRING = "[\n" + + " {\n" + + " \"description\": \"field 1\",\n" + + " \"mode\": \"REQUIRED\",\n" + + " \"name\": \"field_1\",\n" + + " \"type\": \"STRING\"\n" + + " },\n" + + " {\n" + + " \"description\": \"field 2\",\n" + + " \"mode\": \"REQUIRED\",\n" + + " \"name\": \"field_2\",\n" + + " \"type\": \"STRING\"\n" + + " },\n" + + " {\n" + + " \"description\": \"field 3\",\n" + + " \"mode\": \"NULLABLE\",\n" + + " \"name\": \"field_3\",\n" + + " \"type\": \"STRING\"\n" + + " }\n" + + "]"; + + @Before + public void setup() { + runner = TestRunners.newTestRunner(PutBigQueryBatch.class); + } + + @Test + public void PutBigQueryBatchSmallPayloadTest() throws Exception { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + runner = setCredentialsControllerService(runner); + runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset()); + runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName); + runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType()); + runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING); + + String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Daniel is great\"}\r\n"; + + runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8))); + runner.run(1); + for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) { + validateNoServiceExceptionAttribute(flowFile); + } + runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1); + } + + @Test + public void PutBigQueryBatchBadRecordTest() throws Exception { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + runner = setCredentialsControllerService(runner); + runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset()); + runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName); + runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType()); + runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING); + + String str = "{\"field_1\":\"Daniel is great\"}\r\n"; + + runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8))); + runner.run(1); + runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_FAILURE, 1); + } + + @Test + public void PutBigQueryBatchLargePayloadTest() throws InitializationException, IOException { + String methodName = Thread.currentThread().getStackTrace()[1].getMethodName(); + runner = setCredentialsControllerService(runner); + runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset()); + runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName); + runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType()); + runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING); + + // Allow one bad record to deal with the extra line break. + runner.setProperty(BigQueryAttributes.MAX_BADRECORDS_ATTR, String.valueOf(1)); + + String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Here's to the crazy ones. The misfits. The rebels. The troublemakers." + + " The round pegs in the square holes. The ones who see things differently. They're not fond of rules. And they have no respect" + + " for the status quo. You can quote them, disagree with them, glorify or vilify them. About the only thing you can't do is ignore" + + " them. Because they change things. They push the human race forward. And while some may see them as the crazy ones, we see genius." + + " Because the people who are crazy enough to think they can change the world, are the ones who do.\"}\n"; + Path tempFile = Files.createTempFile(methodName, ""); + try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) { + + for (int i = 0; i < 2; i++) { + for (int ii = 0; ii < 1_000_000; ii++) { + writer.write(str); + } + writer.flush(); + } + writer.flush(); + } + + runner.enqueue(tempFile); + runner.run(1); + for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) { + validateNoServiceExceptionAttribute(flowFile); + } + runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java new file mode 100644 index 000000000000..7ec5aa97e17e --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.cloud.RetryOption; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatistics; +import com.google.cloud.bigquery.JobStatus; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDataWriteChannel; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.apache.nifi.util.TestRunner; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; + +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link PutBigQueryBatch}. + */ +public class PutBigQueryBatchTest extends AbstractBQTest { + private static final String TABLENAME = "test_table"; + private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]"; + private static final String SOURCE_TYPE = FormatOptions.json().getType(); + private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(); + private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name(); + private static final String MAXBAD_RECORDS = "0"; + private static final String IGNORE_UNKNOWN = "true"; + private static final String READ_TIMEOUT = "5 minutes"; + + @Mock + BigQuery bq; + + @Mock + Table table; + + @Mock + Job job; + + @Mock + JobStatus jobStatus; + + @Mock + JobStatistics stats; + + @Mock + TableDataWriteChannel tableDataWriteChannel; + + @Before + public void setup() throws Exception { + super.setup(); + reset(bq); + reset(table); + reset(job); + reset(jobStatus); + reset(stats); + } + + @Override + public AbstractBigQueryProcessor getProcessor() { + return new PutBigQueryBatch() { + @Override + protected BigQuery getCloudService() { + return bq; + } + }; + } + + @Override + protected void addRequiredPropertiesToRunner(TestRunner runner) { + runner.setProperty(PutBigQueryBatch.DATASET, DATASET); + runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLENAME); + runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA); + runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE); + runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, CREATE_DISPOSITION); + runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, WRITE_DISPOSITION); + runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAXBAD_RECORDS); + runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN); + runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT); + } + + @Test + public void testSuccessfulLoad() throws Exception { + when(table.exists()).thenReturn(Boolean.TRUE); + when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job); + when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel); + when(tableDataWriteChannel.getJob()).thenReturn(job); + when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job); + when(job.getStatus()).thenReturn(jobStatus); + when(job.getStatistics()).thenReturn(stats); + + when(stats.getCreationTime()).thenReturn(0L); + when(stats.getStartTime()).thenReturn(1L); + when(stats.getEndTime()).thenReturn(2L); + + final TestRunner runner = buildNewRunner(getProcessor()); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + runner.enqueue("{ \"data\": \"datavalue\" }"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS); + } + + + @Test + public void testFailedLoad() throws Exception { + when(table.exists()).thenReturn(Boolean.TRUE); + when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job); + when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel); + when(tableDataWriteChannel.getJob()).thenReturn(job); + when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class); + when(job.getStatus()).thenReturn(jobStatus); + when(job.getStatistics()).thenReturn(stats); + + when(stats.getCreationTime()).thenReturn(0L); + when(stats.getStartTime()).thenReturn(1L); + when(stats.getEndTime()).thenReturn(2L); + + final TestRunner runner = buildNewRunner(getProcessor()); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + runner.enqueue("{ \"data\": \"datavalue\" }"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_FAILURE); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java index eca81bbaa27f..e17cf4bf932a 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.gcp.storage; - import com.google.api.gax.paging.Page; import com.google.cloud.storage.Acl; import com.google.cloud.storage.Blob; diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml index 8f8979dc3d67..0a0c65ef7755 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml @@ -33,7 +33,7 @@ com.google.auth google-auth-library-oauth2-http - 0.6.0 + 0.9.0 com.google.code.findbugs