From 58d69b6ff04b57e6f59e6eb9581fb07a64687b54 Mon Sep 17 00:00:00 2001 From: Mikhail Sosonkin Date: Fri, 12 Jan 2018 17:51:50 -0800 Subject: [PATCH 1/8] bq processors --- .../nifi-gcp-processors/pom.xml | 6 + .../bigquery/AbstractBigQueryProcessor.java | 67 ++++ .../gcp/bigquery/BigQueryAttributes.java | 67 ++++ .../nifi/processors/gcp/bigquery/BqUtils.java | 82 ++++ .../gcp/bigquery/PutBigQueryBatch.java | 309 +++++++++++++++ .../gcp/bigquery/PutBigQueryStream.java | 353 ++++++++++++++++++ 6 files changed, 884 insertions(+) create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index d2088e0390c5..ff2aec318ee1 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -83,6 +83,12 @@ json 1.8 + + org.apache.beam + beam-sdks-java-core + 0.6.0 + jar + diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java new file mode 100644 index 000000000000..22b03467d207 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java @@ -0,0 +1,67 @@ +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.HttpServiceOptions; +import com.google.cloud.RetryParams; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.spi.BigQueryRpc; +import com.google.common.collect.ImmutableList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processors.gcp.AbstractGCPProcessor; +import static org.apache.nifi.processors.gcp.AbstractGCPProcessor.PROJECT_ID; +import static org.apache.nifi.processors.gcp.AbstractGCPProcessor.RETRY_COUNT; + +/** + * + * Base class for creating processors that connect to GCP BiqQuery service + * + * @author Mikhail Sosonkin (Synack Inc) + */ +public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor{ + public static final Relationship REL_SUCCESS = + new Relationship.Builder().name("success") + .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.") + .build(); + public static final Relationship REL_FAILURE = + new Relationship.Builder().name("failure") + .description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.") + .build(); + + public static final Set relationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList.builder() + .addAll(super.getSupportedPropertyDescriptors()) + .build(); + } + + @Override + protected BigQueryOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) { + final String projectId = context.getProperty(PROJECT_ID).getValue(); + final Integer retryCount = Integer.valueOf(context.getProperty(RETRY_COUNT).getValue()); + + return BigQueryOptions.newBuilder() + .setCredentials(credentials) + .setProjectId(projectId) + .setRetryParams(RetryParams.newBuilder() + .setRetryMaxAttempts(retryCount) + .setRetryMinAttempts(retryCount) + .build()) + .build(); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java new file mode 100644 index 000000000000..f71d8fa78313 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java @@ -0,0 +1,67 @@ +package org.apache.nifi.processors.gcp.bigquery; + +/** + * Attributes associated with the BigQuery processors + */ +public class BigQueryAttributes { + private BigQueryAttributes() {} + + public static final String DATASET_ATTR = "bq.dataset"; + public static final String DATASET_DESC = "BigQuery dataset"; + + public static final String TABLE_NAME_ATTR = "bq.table.name"; + public static final String TABLE_NAME_DESC = "BigQuery table name"; + + public static final String TABLE_SCHEMA_ATTR = "bq.table.schema"; + public static final String TABLE_SCHEMA_DESC = "BigQuery table name"; + + public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition"; + public static final String CREATE_DISPOSITION_DESC = "Options for table creation"; + + public static final String JOB_ERROR_MSG_ATTR = "bq.error.message"; + public static final String JOB_ERROR_MSG_DESC = "Load job error message"; + + public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason"; + public static final String JOB_ERROR_REASON_DESC = "Load jon error reason"; + + public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location"; + public static final String JOB_ERROR_LOCATION_DESC = "Load jon error location"; + + // Stream Attributes + public static final String BATCH_SIZE_ATTR = "bq.batch.size"; + public static final String BATCH_SIZE_DESC = "BigQuery number of rows to insert at a time"; + + public static final String MAX_ROW_SIZE_ATTR = "bq.row.size"; + public static final String MAX_ROW_SIZE_DESC = "BigQuery has a limit (1MB) on max size of a row, for streaming"; + + public static final String TABLE_CACHE_RESET_ATTR = "bq.cache.reset"; + public static final String TABLE_CACHE_RESET_DESC = "How often to reset table info cache"; + + // Batch Attributes + public static final String SOURCE_FILE_ATTR = "bq.load.file"; + public static final String SOURCE_FILE_DESC = "URL of file to load (gs://[bucket]/[key]"; + + public static final String SOURCE_TYPE_ATTR = "bq.load.type"; + public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded"; + + public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown"; + public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema"; + + public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition"; + public static final String WRITE_DISPOSITION_DESC = "Options for writing to table"; + + public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords"; + public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error"; + + public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time"; + public static final String JOB_CREATE_TIME_DESC = "Time load job creation"; + + public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time"; + public static final String JOB_END_TIME_DESC = "Time load job ended"; + + public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time"; + public static final String JOB_START_TIME_DESC = "Time load job started"; + + public static final String JOB_LINK_ATTR = "bq.job.link"; + public static final String JOB_LINK_DESC = "API Link to load job"; +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java new file mode 100644 index 000000000000..d601af1034e2 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java @@ -0,0 +1,82 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.api.client.json.JsonFactory; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.util.Transport; + +/** + * + * @author Mikhail Sosonkin (Synack Inc, Synack.com) + */ +public class BqUtils { + private final static Type gsonSchemaType = new TypeToken>(){}.getType(); + + public static Field mapToField(Map fMap) { + String typeStr = fMap.get("type").toString(); + String nameStr = fMap.get("name").toString(); + String modeStr = fMap.get("mode").toString(); + Field.Type type = null; + + if(typeStr.equals("BOOLEAN")) { + type = Field.Type.bool(); + } else if(typeStr.equals("STRING")) { + type = Field.Type.string(); + } else if(typeStr.equals("BYTES")) { + type = Field.Type.bytes(); + } else if(typeStr.equals("INTEGER")) { + type = Field.Type.integer(); + } else if(typeStr.equals("FLOAT")) { + type = Field.Type.floatingPoint(); + } else if(typeStr.equals("TIMESTAMP") || typeStr.equals("DATE") || + typeStr.equals("TIME") || typeStr.equals("DATETIME")) { + type = Field.Type.timestamp(); + } else if(typeStr.equals("RECORD")) { + List m_fields = (List) fMap.get("fields"); + type = Field.Type.record(listToFields(m_fields)); + } + + return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build(); + } + + public static List listToFields(List m_fields) { + List fields = new ArrayList(m_fields.size()); + for(Map m : m_fields) { + fields.add(mapToField(m)); + } + + return fields; + } + + public static Schema schemaFromString(String schemaStr) { + Gson gson = new Gson(); + List fields = gson.fromJson(schemaStr, gsonSchemaType); + + return Schema.of(BqUtils.listToFields(fields)); + } + + public static T fromJsonString(String json, Class clazz) throws IOException { + JsonFactory JSON_FACTORY = Transport.getJsonFactory(); + + return JSON_FACTORY.fromString(json, clazz); + } + + public static TableSchema tableSchemaFromString(String schemaStr) throws IOException { + schemaStr = "{\"fields\":" + schemaStr + "}"; + + return fromJsonString(schemaStr, TableSchema.class); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java new file mode 100644 index 000000000000..954480c5800e --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -0,0 +1,309 @@ +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.InsertAllResponse; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobConfiguration; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.LoadJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import static org.apache.nifi.processors.gcp.AbstractGCPProcessor.PROJECT_ID; +import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.REL_FAILURE; +import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.relationships; +import static org.apache.nifi.processors.gcp.bigquery.PutBigQueryStream.DATASET; +import static org.apache.nifi.processors.gcp.bigquery.PutBigQueryStream.TABLE_NAME; +import static org.apache.nifi.processors.gcp.storage.AbstractGCSProcessor.REL_SUCCESS; +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject; +import org.apache.nifi.processors.gcp.storage.FetchGCSObject; +import org.apache.nifi.processors.gcp.storage.ListGCSBucket; +import org.apache.nifi.processors.gcp.storage.PutGCSObject; + +/** + * A processor for batch loading data into a Google BigQuery table + */ + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"google", "google cloud", "bq", "bigquery"}) +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.") +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class, PutBigQueryStream.class}) + +@WritesAttributes({ + @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC), + @WritesAttribute(attribute = BigQueryAttributes.SOURCE_FILE_ATTR, description = BigQueryAttributes.SOURCE_FILE_DESC), + @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC), + @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC), + @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC) +}) + +public class PutBigQueryBatch extends AbstractBigQueryProcessor { + + public static final PropertyDescriptor DATASET = new PropertyDescriptor + .Builder().name(BigQueryAttributes.DATASET_ATTR) + .displayName("Dataset") + .description(BigQueryAttributes.DATASET_DESC) + .required(true) + .defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor + .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR) + .displayName("Table Name") + .description(BigQueryAttributes.TABLE_NAME_DESC) + .required(true) + .defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor + .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR) + .displayName("Table Schema") + .description(BigQueryAttributes.TABLE_SCHEMA_DESC) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SOURCE_FILE = new PropertyDescriptor + .Builder().name(BigQueryAttributes.SOURCE_FILE_ATTR) + .displayName("Load file path") + .description(BigQueryAttributes.SOURCE_FILE_DESC) + .required(true) + .defaultValue("gs://${gcs.path}") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor + .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR) + .displayName("Load file type") + .description(BigQueryAttributes.SOURCE_FILE_DESC) + .required(true) + .allowableValues(FormatOptions.json().getType(), FormatOptions.csv().getType()) + .defaultValue(FormatOptions.json().getType()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR) + .displayName("Ignore Unknown Values") + .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC) + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR) + .displayName("Create Disposition") + .description(BigQueryAttributes.CREATE_DISPOSITION_DESC) + .required(true) + .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name()) + .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR) + .displayName("Write Disposition") + .description(BigQueryAttributes.WRITE_DISPOSITION_DESC) + .required(true) + .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name()) + .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.MAX_BADRECORDS_ATTR) + .displayName("Max Bad Records") + .description(BigQueryAttributes.MAX_BADRECORDS_DESC) + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private final static Type gsonParseType = new TypeToken>(){}.getType(); + + private Schema schemaCache = null; + + public PutBigQueryBatch() { + + } + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList.builder() + .addAll(super.getSupportedPropertyDescriptors()) + .add(DATASET) + .add(TABLE_NAME) + .add(TABLE_SCHEMA) + .add(SOURCE_FILE) + .add(SOURCE_TYPE) + .add(CREATE_DISPOSITION) + .add(WRITE_DISPOSITION) + .add(MAXBAD_RECORDS) + .add(IGNORE_UNKNOWN) + .build(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + @Override + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + + if(schemaCache == null) { + String schemaStr = context.getProperty(TABLE_SCHEMA).getValue(); + schemaCache = BqUtils.schemaFromString(schemaStr); + + getLogger().info("Enabled StreamIntoBigQuery"); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flow = session.get(); + if (flow == null) { + return; + } + + final Map attributes = new HashMap<>(); + + final BigQuery bq = getCloudService(); + + final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions(flow).getValue(); + final String dataset_str = context.getProperty(DATASET).evaluateAttributeExpressions(flow).getValue(); + final String tablename_str = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flow).getValue(); + final TableId tableId = TableId.of(projectId, dataset_str, tablename_str); + + final String fileType = context.getProperty(SOURCE_TYPE).getValue(); + final String jsonFile = context.getProperty(SOURCE_FILE).evaluateAttributeExpressions(flow).getValue(); + + LoadJobConfiguration configuration = LoadJobConfiguration.newBuilder(tableId, jsonFile, FormatOptions.of(fileType)) + .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue())) + .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue())) + .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean()) + .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger()) + .setSchema(schemaCache) + .build(); + + Job job = bq.create(JobInfo.of(configuration)); + try { + job = job.waitFor(); + } catch (Throwable ex) { + getLogger().log(LogLevel.ERROR, ex.getMessage(), ex); + + flow = session.penalize(flow); + + session.transfer(flow, REL_FAILURE); + + return; + } + + boolean jobError = (job.getStatus().getError() != null); + + attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime())); + attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime())); + attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime())); + + attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink()); + + if(jobError) { + attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage()); + attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason()); + attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation()); + } else { + // in case it got looped back from error + flow = session.removeAttribute(flow, "bq.job.error.message"); + flow = session.removeAttribute(flow, "bq.job.error.reason"); + flow = session.removeAttribute(flow, "bq.job.error.location"); + } + + if (!attributes.isEmpty()) { + flow = session.putAllAttributes(flow, attributes); + } + + if(jobError) { + flow = session.penalize(flow); + session.transfer(flow, REL_FAILURE); + } else { + session.transfer(flow, REL_SUCCESS); + } + } + +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java new file mode 100644 index 000000000000..5187d8c61dec --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java @@ -0,0 +1,353 @@ +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.InsertAllResponse; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.REL_FAILURE; +import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.REL_SUCCESS; +import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.relationships; +import static org.apache.nifi.processors.gcp.storage.AbstractGCSProcessor.REL_SUCCESS; +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject; +import org.apache.nifi.processors.gcp.storage.FetchGCSObject; +import org.apache.nifi.processors.gcp.storage.ListGCSBucket; + +/** + * + * @author Mikhail Sosonkin (Synack Inc, Synack.com) + */ + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"google", "google cloud", "bq", "bigquery"}) +@CapabilityDescription("Streams flow files to a Google BigQuery table.") + +@WritesAttributes({ + @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC) +}) + +public class PutBigQueryStream extends AbstractBigQueryProcessor { + public static final Relationship REL_ROW_TOO_BIG = + new Relationship.Builder().name("row_too_big") + .description("FlowFiles are routed to this relationship if the row size is too big.") + .build(); + + public static final PropertyDescriptor DATASET = new PropertyDescriptor + .Builder().name(BigQueryAttributes.DATASET_ATTR) + .displayName("Dataset") + .description(BigQueryAttributes.DATASET_DESC) + .required(true) + .defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor + .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR) + .displayName("Table Name") + .description(BigQueryAttributes.TABLE_NAME_DESC) + .required(true) + .defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor + .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR) + .displayName("Table Schema") + .description(BigQueryAttributes.TABLE_SCHEMA_DESC) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor + .Builder().name(BigQueryAttributes.BATCH_SIZE_ATTR) + .displayName("Max batch size") + .description(BigQueryAttributes.BATCH_SIZE_DESC) + .required(true) + .defaultValue("100") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_ROW_SIZE = new PropertyDescriptor + .Builder().name(BigQueryAttributes.MAX_ROW_SIZE_ATTR) + .displayName("Max row size") + .description(BigQueryAttributes.MAX_ROW_SIZE_DESC) + .required(true) + .defaultValue("1 MB") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR) + .displayName("Create Disposition") + .description(BigQueryAttributes.CREATE_DISPOSITION_DESC) + .required(true) + .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name()) + .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_CACHE_RESET = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.TABLE_CACHE_RESET_ATTR) + .displayName("Table Cache Max Age") + .description(BigQueryAttributes.TABLE_CACHE_RESET_DESC) + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hours") + .build(); + + private final static Type gsonParseType = new TypeToken>(){}.getType(); + + public final static long MAX_REQ_SIZE = 9485760L; + + private Schema schemaCache = null; + private Set tableCache = Collections.synchronizedSet(new HashSet()); + private long lastTableCacheCheck = 0L; + private long tableCacheAge = 1 * 60 * 60 * 1000L; + + public PutBigQueryStream() { + + } + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList.builder() + .addAll(super.getSupportedPropertyDescriptors()) + .add(DATASET) + .add(TABLE_NAME) + .add(TABLE_SCHEMA) + .add(BATCH_SIZE) + .add(MAX_ROW_SIZE) + .add(CREATE_DISPOSITION) + .add(TABLE_CACHE_RESET) + .build(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + @Override + public Set getRelationships() { + return Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_ROW_TOO_BIG))); + } + + private Table createTable(BigQuery bq, TableId tableId, Schema schema) { + TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); + + Table table = bq.create(tableInfo); + + if(!table.exists()) { + throw new ProcessException("Unable to create table: " + tableId); + } + + getLogger().info("Created Table: {}", new Object[]{tableId}); + + return table; + } + + private void validateTables(BigQuery bq, Set tables, Schema schema, boolean create) throws ProcessException { + if(System.currentTimeMillis() - lastTableCacheCheck > tableCacheAge) { + lastTableCacheCheck = System.currentTimeMillis(); + tableCache.clear(); + + getLogger().info("Table Cache cleared"); + } + + for(TableId tid : tables) { + if(tableCache.contains(tid)) { + continue; + } + + Table table = bq.getTable(tid); + + if(table == null || !table.exists()) { + if(create) { + createTable(bq, tid, schema); + } else { + throw new ProcessException("Table doesn't exist and create disposition does not allow creation: " + tid); + } + } + + tableCache.add(tid); + } + } + + @OnStopped + public void onStopped(ProcessContext context) { + schemaCache = null; + } + + @Override + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + + if(schemaCache == null) { + String schemaStr = context.getProperty(TABLE_SCHEMA).getValue(); + schemaCache = BqUtils.schemaFromString(schemaStr); + + lastTableCacheCheck = System.currentTimeMillis(); + tableCache.clear(); + + tableCacheAge = context.getProperty(TABLE_CACHE_RESET).asTimePeriod(TimeUnit.MILLISECONDS); + + getLogger().info("Enabled StreamIntoBigQuery"); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + List flows = session.get(context.getProperty(BATCH_SIZE).asInteger()); + if (flows == null || flows.size() == 0) { + return; + } + + final Gson gson = new Gson(); + final long startNanos = System.nanoTime(); + final long max_row_size = context.getProperty(MAX_ROW_SIZE).asLong(); + + Map> binned = new HashMap(); + long totalSize = 0; + for(final FlowFile ff : flows) { + if(totalSize + ff.getSize() > MAX_REQ_SIZE) { + // total message size would be too large. + // limited at 10 MB, giving a buffer of 1MB + break; + } else if(ff.getSize() > max_row_size) { + // can't stream such large entries + session.transfer(ff, REL_ROW_TOO_BIG); + } + + final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions(ff).getValue(); + final String dataset_str = context.getProperty(DATASET).evaluateAttributeExpressions(ff).getValue(); + final String tablename_str = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(ff).getValue(); + + // put flows into table name bins + final TableId tableId = TableId.of(projectId, dataset_str, tablename_str); + + List tableFlows = binned.get(tableId); + if(tableFlows == null) { + tableFlows = new ArrayList(); + binned.put(tableId, tableFlows); + } + + tableFlows.add(ff); + totalSize += ff.getSize(); + } + + if(binned.isEmpty()) { + return; + } + + final BigQuery bq = getCloudService(); + + if(bq == null) { + throw new ProcessException("Unable to connect to BigQuery Service"); + } + + boolean createTable = context.getProperty(CREATE_DISPOSITION).getValue().equals(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name()); + validateTables(bq, binned.keySet(), schemaCache, createTable); + + for(Entry> binnedFlows : binned.entrySet()) { + List bFlows = binnedFlows.getValue(); + InsertAllRequest.Builder insertBuilder = InsertAllRequest.newBuilder(binnedFlows.getKey()); + + for(final FlowFile ff : bFlows) { + try(InputStreamReader dataReader = new InputStreamReader(session.read(ff))) { + Map rowContent = gson.fromJson(dataReader, gsonParseType); + + insertBuilder = insertBuilder.addRow(rowContent); + } catch (IOException ioe) { + throw new ProcessException(ioe); + } + } + + + // do the actual insertion. + InsertAllResponse response = bq.insertAll(insertBuilder.build()); + + Map> insertErrors = response.getInsertErrors(); + long i = 0; + for(FlowFile ff : bFlows) { + if(insertErrors.containsKey(i)) { + List errors = insertErrors.get(i); + + if(errors.size() > 0) { + BigQueryError err = insertErrors.get(i).get(0); + final Map attributes = new HashMap<>(); + + attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, err.getMessage()); + attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, err.getReason()); + attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, err.getLocation()); + + if (!attributes.isEmpty()) { + ff = session.putAllAttributes(ff, attributes); + } + } + + ff = session.penalize(ff); + session.transfer(ff, REL_FAILURE); + } else { + session.transfer(ff, REL_SUCCESS); + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + } + + ++i; + } + } + } + +} From 3a4749a7b467fff0188174bc256e53818d43b7ba Mon Sep 17 00:00:00 2001 From: Mikhail Sosonkin Date: Fri, 12 Jan 2018 19:19:28 -0800 Subject: [PATCH 2/8] initial tests --- .../org.apache.nifi.processor.Processor | 4 +- .../gcp/bigquery/AbstractBQTest.java | 103 ++++++++++++++++++ .../gcp/bigquery/PutBigQueryStreamTest.java | 81 ++++++++++++++ 3 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b5d5df79d0b6..fd84a09b1213 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,4 +15,6 @@ org.apache.nifi.processors.gcp.storage.PutGCSObject org.apache.nifi.processors.gcp.storage.FetchGCSObject org.apache.nifi.processors.gcp.storage.DeleteGCSObject -org.apache.nifi.processors.gcp.storage.ListGCSBucket \ No newline at end of file +org.apache.nifi.processors.gcp.storage.ListGCSBucket +org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch +org.apache.nifi.processors.gcp.bigquery.PutBigQueryStream \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java new file mode 100644 index 000000000000..f570cdf970b6 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.bigquery; + + +import org.apache.nifi.processors.gcp.storage.*; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.cloud.storage.testing.RemoteStorageHelper; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; + +/** + * Base class for BigQuery Unit Tests. Provides a framework for creating a TestRunner instance with always-required credentials. + */ +public abstract class AbstractBQTest { + private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project"); + private static final Integer RETRIES = 9; + + static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + } + + public static TestRunner buildNewRunner(Processor processor) throws Exception { + final GCPCredentialsService credentialsService = new GCPCredentialsControllerService(); + + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.addControllerService("gcpCredentialsControllerService", credentialsService); + runner.enableControllerService(credentialsService); + + runner.setProperty(AbstractBigQueryProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcpCredentialsControllerService"); + runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID); + runner.setProperty(AbstractBigQueryProcessor.RETRY_COUNT, String.valueOf(RETRIES)); + + runner.assertValid(credentialsService); + + return runner; + } + + public abstract AbstractBigQueryProcessor getProcessor(); + + protected abstract void addRequiredPropertiesToRunner(TestRunner runner); + + @Mock + protected BigQuery bq; + + @Test + public void testBiqQueryOptionsConfiguration() throws Exception { + reset(bq); + final TestRunner runner = buildNewRunner(getProcessor()); + + final AbstractBigQueryProcessor processor = getProcessor(); + final GoogleCredentials mockCredentials = mock(GoogleCredentials.class); + + final BigQueryOptions options = processor.getServiceOptions(runner.getProcessContext(), + mockCredentials); + + assertEquals("Project IDs should match", + PROJECT_ID, options.getProjectId()); + + assertEquals("Retry counts should match", + RETRIES.intValue(), options.getRetryParams().getRetryMinAttempts()); + + assertEquals("Retry counts should match", + RETRIES.intValue(), options.getRetryParams().getRetryMaxAttempts()); + + assertSame("Credentials should be configured correctly", + mockCredentials, options.getCredentials()); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java new file mode 100644 index 000000000000..2c93f4e9215d --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.bigquery; + +import org.apache.nifi.processors.gcp.storage.*; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.JobInfo; +import org.apache.nifi.util.TestRunner; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.Mockito.reset; + +/** + * Unit tests for {@link PutBigQueryStream}. + */ +public class PutBigQueryStreamTest extends AbstractBQTest { + private static final String TABLENAME = "test_table"; + private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]"; + private static final String BATCH_SIZE = "100"; + private static final String MAX_ROW_SIZE = "2 MB"; + private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(); + private static final String TABLE_CACHE_RESET = "1 hours"; + + @Mock + BigQuery bq; + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + } + + @Override + public AbstractBigQueryProcessor getProcessor() { + return new PutBigQueryStream() { + @Override + protected BigQuery getCloudService() { + return bq; + } + }; + } + + @Override + protected void addRequiredPropertiesToRunner(TestRunner runner) { + runner.setProperty(PutBigQueryStream.DATASET, DATASET); + runner.setProperty(PutBigQueryStream.TABLE_NAME, TABLENAME); + runner.setProperty(PutBigQueryStream.TABLE_SCHEMA, TABLE_SCHEMA); + runner.setProperty(PutBigQueryStream.BATCH_SIZE, BATCH_SIZE); + runner.setProperty(PutBigQueryStream.MAX_ROW_SIZE, MAX_ROW_SIZE); + runner.setProperty(PutBigQueryStream.CREATE_DISPOSITION, CREATE_DISPOSITION); + runner.setProperty(PutBigQueryStream.TABLE_CACHE_RESET, TABLE_CACHE_RESET); + } + + @Test + public void testSuccessfulInsert() throws Exception { + reset(bq); + final TestRunner runner = buildNewRunner(getProcessor()); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + runner.enqueue("{ \"data\": \"datavalue\" }"); + + runner.run(); + } +} From 455ecbc8e1f8469a665ec95dfe04bb87d7743492 Mon Sep 17 00:00:00 2001 From: Mikhail Sosonkin Date: Mon, 15 Jan 2018 15:01:15 -0800 Subject: [PATCH 3/8] one test complete --- .../gcp/bigquery/PutBigQueryStream.java | 15 +++++----- .../gcp/bigquery/AbstractBQTest.java | 4 +-- .../gcp/bigquery/PutBigQueryStreamTest.java | 30 +++++++++++++++++-- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java index 5187d8c61dec..0d8dfd7e3e73 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java @@ -43,6 +43,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -189,8 +190,8 @@ private Table createTable(BigQuery bq, TableId tableId, Schema schema) { Table table = bq.create(tableInfo); - if(!table.exists()) { - throw new ProcessException("Unable to create table: " + tableId); + if(table == null || !table.exists()) { + throw new ProcessException("Unable to create table: " + table + " info: " + tableInfo); } getLogger().info("Created Table: {}", new Object[]{tableId}); @@ -230,10 +231,8 @@ public void onStopped(ProcessContext context) { schemaCache = null; } - @Override - public void onScheduled(ProcessContext context) { - super.onScheduled(context); - + @OnScheduled + public void initSchema(ProcessContext context) { if(schemaCache == null) { String schemaStr = context.getProperty(TABLE_SCHEMA).getValue(); schemaCache = BqUtils.schemaFromString(schemaStr); @@ -256,7 +255,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro final Gson gson = new Gson(); final long startNanos = System.nanoTime(); - final long max_row_size = context.getProperty(MAX_ROW_SIZE).asLong(); + final long max_row_size = context.getProperty(MAX_ROW_SIZE).asDataSize(DataUnit.B).longValue(); Map> binned = new HashMap(); long totalSize = 0; @@ -270,7 +269,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro session.transfer(ff, REL_ROW_TOO_BIG); } - final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions(ff).getValue(); + final String projectId = context.getProperty(PROJECT_ID).getValue(); final String dataset_str = context.getProperty(DATASET).evaluateAttributeExpressions(ff).getValue(); final String tablename_str = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(ff).getValue(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java index f570cdf970b6..86545cf1946f 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java @@ -44,8 +44,8 @@ * Base class for BigQuery Unit Tests. Provides a framework for creating a TestRunner instance with always-required credentials. */ public abstract class AbstractBQTest { - private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project"); - private static final Integer RETRIES = 9; + static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project"); + static final Integer RETRIES = 9; static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java index 2c93f4e9215d..d49c2e0c858c 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java @@ -16,16 +16,26 @@ */ package org.apache.nifi.processors.gcp.bigquery; -import org.apache.nifi.processors.gcp.storage.*; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import java.util.Collections; import org.apache.nifi.util.TestRunner; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; /** * Unit tests for {@link PutBigQueryStream}. @@ -34,12 +44,18 @@ public class PutBigQueryStreamTest extends AbstractBQTest { private static final String TABLENAME = "test_table"; private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]"; private static final String BATCH_SIZE = "100"; - private static final String MAX_ROW_SIZE = "2 MB"; + private static final String MAX_ROW_SIZE = "1 MB"; private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(); private static final String TABLE_CACHE_RESET = "1 hours"; @Mock BigQuery bq; + + @Mock + Table table; + + @Mock + InsertAllResponse response; @Before public void setup() throws Exception { @@ -70,12 +86,20 @@ protected void addRequiredPropertiesToRunner(TestRunner runner) { @Test public void testSuccessfulInsert() throws Exception { reset(bq); + reset(table); + reset(response); + + when(table.exists()).thenReturn(Boolean.TRUE); + when(response.getInsertErrors()).thenReturn(Collections.EMPTY_MAP); + when(bq.create(ArgumentMatchers.isA(TableInfo.class))).thenReturn(table); + when(bq.insertAll(ArgumentMatchers.isA(InsertAllRequest.class))).thenReturn(response); + final TestRunner runner = buildNewRunner(getProcessor()); addRequiredPropertiesToRunner(runner); runner.assertValid(); runner.enqueue("{ \"data\": \"datavalue\" }"); - + runner.run(); } } From ee140a13026de68a010527fc5014802b278e50ea Mon Sep 17 00:00:00 2001 From: Mikhail Sosonkin Date: Mon, 15 Jan 2018 15:08:35 -0800 Subject: [PATCH 4/8] clean up attributes --- .../gcp/bigquery/PutBigQueryBatch.java | 40 ------------------- .../gcp/bigquery/PutBigQueryStream.java | 11 ++++- 2 files changed, 10 insertions(+), 41 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java index 954480c5800e..35c42a676ecf 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -1,75 +1,35 @@ package org.apache.nifi.processors.gcp.bigquery; -import com.google.api.services.bigquery.model.JobConfigurationLoad; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryError; -import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.InsertAllRequest; -import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobConfiguration; import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.LoadJobConfiguration; import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; -import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; import com.google.common.collect.ImmutableList; -import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; -import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import static org.apache.nifi.processors.gcp.AbstractGCPProcessor.PROJECT_ID; import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.REL_FAILURE; import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.REL_SUCCESS; -import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.relationships; -import static org.apache.nifi.processors.gcp.bigquery.PutBigQueryStream.DATASET; -import static org.apache.nifi.processors.gcp.bigquery.PutBigQueryStream.TABLE_NAME; -import static org.apache.nifi.processors.gcp.storage.AbstractGCSProcessor.REL_SUCCESS; import org.apache.nifi.processors.gcp.storage.DeleteGCSObject; -import org.apache.nifi.processors.gcp.storage.FetchGCSObject; -import org.apache.nifi.processors.gcp.storage.ListGCSBucket; import org.apache.nifi.processors.gcp.storage.PutGCSObject; /** diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java index 0d8dfd7e3e73..506b0c3e3769 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java @@ -68,7 +68,16 @@ @CapabilityDescription("Streams flow files to a Google BigQuery table.") @WritesAttributes({ - @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC) + @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC), + @WritesAttribute(attribute = BigQueryAttributes.BATCH_SIZE_ATTR, description = BigQueryAttributes.BATCH_SIZE_DESC), + @WritesAttribute(attribute = BigQueryAttributes.MAX_ROW_SIZE_ATTR, description = BigQueryAttributes.MAX_ROW_SIZE_DESC), + @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_CACHE_RESET_ATTR, description = BigQueryAttributes.TABLE_CACHE_RESET_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC) }) public class PutBigQueryStream extends AbstractBigQueryProcessor { From b6a02cd3558d33cb3f23847c94665a1830cb3d68 Mon Sep 17 00:00:00 2001 From: Mikhail Sosonkin Date: Thu, 18 Jan 2018 17:19:44 -0800 Subject: [PATCH 5/8] batch test --- .../gcp/bigquery/PutBigQueryBatch.java | 2 +- .../gcp/bigquery/PutBigQueryBatchTest.java | 133 ++++++++++++++++++ .../gcp/bigquery/PutBigQueryStreamTest.java | 35 +++++ 3 files changed, 169 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java index 35c42a676ecf..76949003c4ba 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -206,7 +206,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro final BigQuery bq = getCloudService(); - final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions(flow).getValue(); + final String projectId = context.getProperty(PROJECT_ID).getValue(); final String dataset_str = context.getProperty(DATASET).evaluateAttributeExpressions(flow).getValue(); final String tablename_str = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flow).getValue(); final TableId tableId = TableId.of(projectId, dataset_str, tablename_str); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java new file mode 100644 index 000000000000..9da30de5e851 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.InsertAllResponse; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatistics; +import com.google.cloud.bigquery.JobStatus; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.nifi.util.TestRunner; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link PutBigQueryBatch}. + */ +public class PutBigQueryBatchTest extends AbstractBQTest { + private static final String TABLENAME = "test_table"; + private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]"; + private static final String SOURCE_FILE = "gs://bucket/what"; + private static final String SOURCE_TYPE = FormatOptions.json().getType(); + private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(); + private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name(); + private static final String MAXBAD_RECORDS = "0"; + private static final String IGNORE_UNKNOWN = "true"; + + @Mock + BigQuery bq; + + @Mock + Table table; + + @Mock + Job job; + + @Mock + JobStatus jobStatus; + + @Mock + JobStatistics stats; + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + } + + @Override + public AbstractBigQueryProcessor getProcessor() { + return new PutBigQueryBatch() { + @Override + protected BigQuery getCloudService() { + return bq; + } + }; + } + + @Override + protected void addRequiredPropertiesToRunner(TestRunner runner) { + runner.setProperty(PutBigQueryBatch.DATASET, DATASET); + runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLENAME); + runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA); + runner.setProperty(PutBigQueryBatch.SOURCE_FILE, SOURCE_FILE); + runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE); + runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, CREATE_DISPOSITION); + runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, WRITE_DISPOSITION); + runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAXBAD_RECORDS); + runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN); + } + + @Test + public void testSuccessfulLoad() throws Exception { + reset(bq); + reset(table); + reset(job); + reset(jobStatus); + reset(stats); + + when(table.exists()).thenReturn(Boolean.TRUE); + when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job); + when(job.waitFor()).thenReturn(job); + when(job.getStatus()).thenReturn(jobStatus); + when(job.getStatistics()).thenReturn(stats); + + when(stats.getCreationTime()).thenReturn(0L); + when(stats.getStartTime()).thenReturn(1L); + when(stats.getEndTime()).thenReturn(2L); + + final TestRunner runner = buildNewRunner(getProcessor()); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + runner.enqueue("{ \"data\": \"datavalue\" }"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutBigQueryStream.REL_SUCCESS); + } +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java index d49c2e0c858c..1db776dca01b 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.gcp.bigquery; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryError; import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.JobInfo; @@ -26,7 +27,11 @@ import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.nifi.util.TestRunner; import org.junit.Before; import org.junit.Test; @@ -101,5 +106,35 @@ public void testSuccessfulInsert() throws Exception { runner.enqueue("{ \"data\": \"datavalue\" }"); runner.run(); + + runner.assertAllFlowFilesTransferred(PutBigQueryStream.REL_SUCCESS); + } + + @Test + public void testFailedInsert() throws Exception { + reset(bq); + reset(table); + reset(response); + + Map> resErrors = new HashMap(); + resErrors.put(0L, Arrays.asList(new BigQueryError("reason", "location", "message"))); + + when(table.exists()).thenReturn(Boolean.TRUE); + when(response.getInsertErrors()).thenReturn(resErrors); + when(bq.create(ArgumentMatchers.isA(TableInfo.class))).thenReturn(table); + when(bq.insertAll(ArgumentMatchers.isA(InsertAllRequest.class))).thenReturn(response); + + final TestRunner runner = buildNewRunner(getProcessor()); + addRequiredPropertiesToRunner(runner); + runner.assertValid(); + + runner.enqueue("{ \"data\": \"datavalue\" }"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutBigQueryStream.REL_FAILURE); + runner.assertAllFlowFilesContainAttribute(PutBigQueryStream.REL_FAILURE, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR); + runner.assertAllFlowFilesContainAttribute(PutBigQueryStream.REL_FAILURE, BigQueryAttributes.JOB_ERROR_MSG_ATTR); + runner.assertAllFlowFilesContainAttribute(PutBigQueryStream.REL_FAILURE, BigQueryAttributes.JOB_ERROR_REASON_ATTR); } } From 31b9c9c8e3a310e619f08d54a8deba31dfd0d8e8 Mon Sep 17 00:00:00 2001 From: Mikhail Sosonkin Date: Wed, 21 Feb 2018 20:51:19 -0800 Subject: [PATCH 6/8] style and license fixups --- .../bigquery/AbstractBigQueryProcessor.java | 20 ++- .../gcp/bigquery/BigQueryAttributes.java | 49 +++++--- .../nifi/processors/gcp/bigquery/BqUtils.java | 40 +++--- .../gcp/bigquery/PutBigQueryBatch.java | 82 +++++++----- .../gcp/bigquery/PutBigQueryStream.java | 117 +++++++++--------- .../gcp/bigquery/AbstractBQTest.java | 4 - .../gcp/bigquery/PutBigQueryBatchTest.java | 37 ++---- .../gcp/bigquery/PutBigQueryStreamTest.java | 34 +++-- 8 files changed, 212 insertions(+), 171 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java index 22b03467d207..3165ca4ebc22 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java @@ -1,7 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.gcp.bigquery; import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.HttpServiceOptions; import com.google.cloud.RetryParams; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; @@ -22,7 +38,7 @@ /** * * Base class for creating processors that connect to GCP BiqQuery service - * + * * @author Mikhail Sosonkin (Synack Inc) */ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor{ diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java index f71d8fa78313..3a5e0b41b49b 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.gcp.bigquery; /** @@ -8,60 +25,60 @@ private BigQueryAttributes() {} public static final String DATASET_ATTR = "bq.dataset"; public static final String DATASET_DESC = "BigQuery dataset"; - + public static final String TABLE_NAME_ATTR = "bq.table.name"; public static final String TABLE_NAME_DESC = "BigQuery table name"; public static final String TABLE_SCHEMA_ATTR = "bq.table.schema"; public static final String TABLE_SCHEMA_DESC = "BigQuery table name"; - + public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition"; public static final String CREATE_DISPOSITION_DESC = "Options for table creation"; - + public static final String JOB_ERROR_MSG_ATTR = "bq.error.message"; public static final String JOB_ERROR_MSG_DESC = "Load job error message"; - + public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason"; public static final String JOB_ERROR_REASON_DESC = "Load jon error reason"; - + public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location"; public static final String JOB_ERROR_LOCATION_DESC = "Load jon error location"; - + // Stream Attributes public static final String BATCH_SIZE_ATTR = "bq.batch.size"; public static final String BATCH_SIZE_DESC = "BigQuery number of rows to insert at a time"; - + public static final String MAX_ROW_SIZE_ATTR = "bq.row.size"; public static final String MAX_ROW_SIZE_DESC = "BigQuery has a limit (1MB) on max size of a row, for streaming"; public static final String TABLE_CACHE_RESET_ATTR = "bq.cache.reset"; public static final String TABLE_CACHE_RESET_DESC = "How often to reset table info cache"; - + // Batch Attributes public static final String SOURCE_FILE_ATTR = "bq.load.file"; public static final String SOURCE_FILE_DESC = "URL of file to load (gs://[bucket]/[key]"; - + public static final String SOURCE_TYPE_ATTR = "bq.load.type"; public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded"; - + public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown"; public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema"; public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition"; public static final String WRITE_DISPOSITION_DESC = "Options for writing to table"; - + public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords"; public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error"; - + public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time"; public static final String JOB_CREATE_TIME_DESC = "Time load job creation"; - + public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time"; public static final String JOB_END_TIME_DESC = "Time load job ended"; - + public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time"; public static final String JOB_START_TIME_DESC = "Time load job started"; - + public static final String JOB_LINK_ATTR = "bq.job.link"; public static final String JOB_LINK_DESC = "API Link to load job"; -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java index d601af1034e2..c622a23ed6cf 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java @@ -1,8 +1,20 @@ /* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ + package org.apache.nifi.processors.gcp.bigquery; import com.google.api.client.json.JsonFactory; @@ -24,13 +36,13 @@ */ public class BqUtils { private final static Type gsonSchemaType = new TypeToken>(){}.getType(); - + public static Field mapToField(Map fMap) { String typeStr = fMap.get("type").toString(); String nameStr = fMap.get("name").toString(); String modeStr = fMap.get("mode").toString(); Field.Type type = null; - + if(typeStr.equals("BOOLEAN")) { type = Field.Type.bool(); } else if(typeStr.equals("STRING")) { @@ -41,8 +53,8 @@ public static Field mapToField(Map fMap) { type = Field.Type.integer(); } else if(typeStr.equals("FLOAT")) { type = Field.Type.floatingPoint(); - } else if(typeStr.equals("TIMESTAMP") || typeStr.equals("DATE") || - typeStr.equals("TIME") || typeStr.equals("DATETIME")) { + } else if(typeStr.equals("TIMESTAMP") || typeStr.equals("DATE") + || typeStr.equals("TIME") || typeStr.equals("DATETIME")) { type = Field.Type.timestamp(); } else if(typeStr.equals("RECORD")) { List m_fields = (List) fMap.get("fields"); @@ -51,32 +63,32 @@ public static Field mapToField(Map fMap) { return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build(); } - + public static List listToFields(List m_fields) { List fields = new ArrayList(m_fields.size()); for(Map m : m_fields) { fields.add(mapToField(m)); } - + return fields; } - + public static Schema schemaFromString(String schemaStr) { Gson gson = new Gson(); List fields = gson.fromJson(schemaStr, gsonSchemaType); return Schema.of(BqUtils.listToFields(fields)); } - + public static T fromJsonString(String json, Class clazz) throws IOException { JsonFactory JSON_FACTORY = Transport.getJsonFactory(); - + return JSON_FACTORY.fromString(json, clazz); } - + public static TableSchema tableSchemaFromString(String schemaStr) throws IOException { schemaStr = "{\"fields\":" + schemaStr + "}"; - + return fromJsonString(schemaStr, TableSchema.class); } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java index 76949003c4ba..7735438aad7f 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.gcp.bigquery; import com.google.cloud.bigquery.BigQuery; @@ -61,7 +78,7 @@ }) public class PutBigQueryBatch extends AbstractBigQueryProcessor { - + public static final PropertyDescriptor DATASET = new PropertyDescriptor .Builder().name(BigQueryAttributes.DATASET_ATTR) .displayName("Dataset") @@ -71,7 +88,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR) .displayName("Table Name") @@ -81,7 +98,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR) .displayName("Table Schema") @@ -89,7 +106,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor SOURCE_FILE = new PropertyDescriptor .Builder().name(BigQueryAttributes.SOURCE_FILE_ATTR) .displayName("Load file path") @@ -99,7 +116,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR) .displayName("Load file type") @@ -109,7 +126,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .defaultValue(FormatOptions.json().getType()) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder() .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR) .displayName("Ignore Unknown Values") @@ -119,7 +136,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .allowableValues("true", "false") .defaultValue("true") .build(); - + public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder() .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR) .displayName("Create Disposition") @@ -129,7 +146,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name()) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder() .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR) .displayName("Write Disposition") @@ -139,7 +156,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name()) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder() .name(BigQueryAttributes.MAX_BADRECORDS_ATTR) .displayName("Max Bad Records") @@ -150,13 +167,13 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .build(); private final static Type gsonParseType = new TypeToken>(){}.getType(); - + private Schema schemaCache = null; public PutBigQueryBatch() { - + } - + @Override public List getSupportedPropertyDescriptors() { return ImmutableList.builder() @@ -172,7 +189,7 @@ public List getSupportedPropertyDescriptors() { .add(IGNORE_UNKNOWN) .build(); } - + @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() @@ -182,38 +199,38 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String .dynamic(true) .build(); } - + @Override public void onScheduled(ProcessContext context) { super.onScheduled(context); - + if(schemaCache == null) { String schemaStr = context.getProperty(TABLE_SCHEMA).getValue(); schemaCache = BqUtils.schemaFromString(schemaStr); - + getLogger().info("Enabled StreamIntoBigQuery"); } } - + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { FlowFile flow = session.get(); if (flow == null) { return; } - + final Map attributes = new HashMap<>(); - + final BigQuery bq = getCloudService(); - + final String projectId = context.getProperty(PROJECT_ID).getValue(); final String dataset_str = context.getProperty(DATASET).evaluateAttributeExpressions(flow).getValue(); final String tablename_str = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flow).getValue(); final TableId tableId = TableId.of(projectId, dataset_str, tablename_str); - + final String fileType = context.getProperty(SOURCE_TYPE).getValue(); final String jsonFile = context.getProperty(SOURCE_FILE).evaluateAttributeExpressions(flow).getValue(); - + LoadJobConfiguration configuration = LoadJobConfiguration.newBuilder(tableId, jsonFile, FormatOptions.of(fileType)) .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue())) .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue())) @@ -221,28 +238,28 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger()) .setSchema(schemaCache) .build(); - + Job job = bq.create(JobInfo.of(configuration)); try { job = job.waitFor(); } catch (Throwable ex) { getLogger().log(LogLevel.ERROR, ex.getMessage(), ex); - + flow = session.penalize(flow); - + session.transfer(flow, REL_FAILURE); - + return; } - + boolean jobError = (job.getStatus().getError() != null); - + attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime())); attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime())); attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime())); - + attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink()); - + if(jobError) { attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage()); attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason()); @@ -253,11 +270,11 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro flow = session.removeAttribute(flow, "bq.job.error.reason"); flow = session.removeAttribute(flow, "bq.job.error.location"); } - + if (!attributes.isEmpty()) { flow = session.putAllAttributes(flow, attributes); } - + if(jobError) { flow = session.penalize(flow); session.transfer(flow, REL_FAILURE); @@ -265,5 +282,4 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro session.transfer(flow, REL_SUCCESS); } } - } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java index 506b0c3e3769..57951f0d23fc 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java @@ -1,14 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.gcp.bigquery; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryError; -import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; @@ -17,7 +31,6 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.io.IOException; -import java.io.InputStream; import java.io.InputStreamReader; import java.lang.reflect.Type; import java.util.ArrayList; @@ -30,33 +43,23 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.REL_FAILURE; import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.REL_SUCCESS; -import static org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor.relationships; -import static org.apache.nifi.processors.gcp.storage.AbstractGCSProcessor.REL_SUCCESS; -import org.apache.nifi.processors.gcp.storage.DeleteGCSObject; -import org.apache.nifi.processors.gcp.storage.FetchGCSObject; -import org.apache.nifi.processors.gcp.storage.ListGCSBucket; /** * @@ -85,7 +88,7 @@ public class PutBigQueryStream extends AbstractBigQueryProcessor { new Relationship.Builder().name("row_too_big") .description("FlowFiles are routed to this relationship if the row size is too big.") .build(); - + public static final PropertyDescriptor DATASET = new PropertyDescriptor .Builder().name(BigQueryAttributes.DATASET_ATTR) .displayName("Dataset") @@ -95,7 +98,7 @@ public class PutBigQueryStream extends AbstractBigQueryProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR) .displayName("Table Name") @@ -105,7 +108,7 @@ public class PutBigQueryStream extends AbstractBigQueryProcessor { .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR) .displayName("Table Schema") @@ -113,7 +116,7 @@ public class PutBigQueryStream extends AbstractBigQueryProcessor { .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor .Builder().name(BigQueryAttributes.BATCH_SIZE_ATTR) .displayName("Max batch size") @@ -122,7 +125,7 @@ public class PutBigQueryStream extends AbstractBigQueryProcessor { .defaultValue("100") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); - + public static final PropertyDescriptor MAX_ROW_SIZE = new PropertyDescriptor .Builder().name(BigQueryAttributes.MAX_ROW_SIZE_ATTR) .displayName("Max row size") @@ -131,7 +134,7 @@ public class PutBigQueryStream extends AbstractBigQueryProcessor { .defaultValue("1 MB") .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .build(); - + public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder() .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR) .displayName("Create Disposition") @@ -141,7 +144,7 @@ public class PutBigQueryStream extends AbstractBigQueryProcessor { .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name()) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - + public static final PropertyDescriptor TABLE_CACHE_RESET = new PropertyDescriptor.Builder() .name(BigQueryAttributes.TABLE_CACHE_RESET_ATTR) .displayName("Table Cache Max Age") @@ -152,18 +155,17 @@ public class PutBigQueryStream extends AbstractBigQueryProcessor { .build(); private final static Type gsonParseType = new TypeToken>(){}.getType(); - + public final static long MAX_REQ_SIZE = 9485760L; - + private Schema schemaCache = null; private Set tableCache = Collections.synchronizedSet(new HashSet()); private long lastTableCacheCheck = 0L; private long tableCacheAge = 1 * 60 * 60 * 1000L; - + public PutBigQueryStream() { - } - + @Override public List getSupportedPropertyDescriptors() { return ImmutableList.builder() @@ -177,7 +179,7 @@ public List getSupportedPropertyDescriptors() { .add(TABLE_CACHE_RESET) .build(); } - + @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() @@ -187,42 +189,42 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String .dynamic(true) .build(); } - + @Override public Set getRelationships() { return Collections.unmodifiableSet( new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_ROW_TOO_BIG))); } - + private Table createTable(BigQuery bq, TableId tableId, Schema schema) { TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build(); Table table = bq.create(tableInfo); - + if(table == null || !table.exists()) { throw new ProcessException("Unable to create table: " + table + " info: " + tableInfo); } - + getLogger().info("Created Table: {}", new Object[]{tableId}); - + return table; } - + private void validateTables(BigQuery bq, Set tables, Schema schema, boolean create) throws ProcessException { if(System.currentTimeMillis() - lastTableCacheCheck > tableCacheAge) { lastTableCacheCheck = System.currentTimeMillis(); tableCache.clear(); - + getLogger().info("Table Cache cleared"); } - + for(TableId tid : tables) { if(tableCache.contains(tid)) { continue; } - + Table table = bq.getTable(tid); - + if(table == null || !table.exists()) { if(create) { createTable(bq, tid, schema); @@ -230,16 +232,16 @@ private void validateTables(BigQuery bq, Set tables, Schema schema, boo throw new ProcessException("Table doesn't exist and create disposition does not allow creation: " + tid); } } - + tableCache.add(tid); } } - + @OnStopped public void onStopped(ProcessContext context) { schemaCache = null; } - + @OnScheduled public void initSchema(ProcessContext context) { if(schemaCache == null) { @@ -254,14 +256,14 @@ public void initSchema(ProcessContext context) { getLogger().info("Enabled StreamIntoBigQuery"); } } - + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { List flows = session.get(context.getProperty(BATCH_SIZE).asInteger()); if (flows == null || flows.size() == 0) { return; } - + final Gson gson = new Gson(); final long startNanos = System.nanoTime(); final long max_row_size = context.getProperty(MAX_ROW_SIZE).asDataSize(DataUnit.B).longValue(); @@ -277,37 +279,37 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro // can't stream such large entries session.transfer(ff, REL_ROW_TOO_BIG); } - + final String projectId = context.getProperty(PROJECT_ID).getValue(); final String dataset_str = context.getProperty(DATASET).evaluateAttributeExpressions(ff).getValue(); final String tablename_str = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(ff).getValue(); - + // put flows into table name bins final TableId tableId = TableId.of(projectId, dataset_str, tablename_str); - + List tableFlows = binned.get(tableId); if(tableFlows == null) { tableFlows = new ArrayList(); binned.put(tableId, tableFlows); } - + tableFlows.add(ff); totalSize += ff.getSize(); } - + if(binned.isEmpty()) { return; } - + final BigQuery bq = getCloudService(); - + if(bq == null) { throw new ProcessException("Unable to connect to BigQuery Service"); } - + boolean createTable = context.getProperty(CREATE_DISPOSITION).getValue().equals(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name()); validateTables(bq, binned.keySet(), schemaCache, createTable); - + for(Entry> binnedFlows : binned.entrySet()) { List bFlows = binnedFlows.getValue(); InsertAllRequest.Builder insertBuilder = InsertAllRequest.newBuilder(binnedFlows.getKey()); @@ -315,7 +317,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro for(final FlowFile ff : bFlows) { try(InputStreamReader dataReader = new InputStreamReader(session.read(ff))) { Map rowContent = gson.fromJson(dataReader, gsonParseType); - + insertBuilder = insertBuilder.addRow(rowContent); } catch (IOException ioe) { throw new ProcessException(ioe); @@ -331,25 +333,25 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro for(FlowFile ff : bFlows) { if(insertErrors.containsKey(i)) { List errors = insertErrors.get(i); - + if(errors.size() > 0) { BigQueryError err = insertErrors.get(i).get(0); final Map attributes = new HashMap<>(); - + attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, err.getMessage()); attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, err.getReason()); attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, err.getLocation()); - + if (!attributes.isEmpty()) { ff = session.putAllAttributes(ff, attributes); } } - + ff = session.penalize(ff); session.transfer(ff, REL_FAILURE); } else { session.transfer(ff, REL_SUCCESS); - + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); } @@ -357,5 +359,4 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } } } - } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java index 86545cf1946f..8122a77d9b1f 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java @@ -17,14 +17,10 @@ package org.apache.nifi.processors.gcp.bigquery; -import org.apache.nifi.processors.gcp.storage.*; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; -import com.google.cloud.storage.Storage; -import com.google.cloud.storage.StorageOptions; -import com.google.cloud.storage.testing.RemoteStorageHelper; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java index 9da30de5e851..671da9a395e1 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java @@ -17,25 +17,12 @@ package org.apache.nifi.processors.gcp.bigquery; import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryError; import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.InsertAllRequest; -import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.JobStatus; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableDefinition; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.nifi.util.TestRunner; import org.junit.Before; import org.junit.Test; @@ -58,22 +45,22 @@ public class PutBigQueryBatchTest extends AbstractBQTest { private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name(); private static final String MAXBAD_RECORDS = "0"; private static final String IGNORE_UNKNOWN = "true"; - + @Mock BigQuery bq; - + @Mock Table table; - + @Mock Job job; - + @Mock JobStatus jobStatus; - + @Mock JobStatistics stats; - + @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); @@ -109,25 +96,25 @@ public void testSuccessfulLoad() throws Exception { reset(job); reset(jobStatus); reset(stats); - + when(table.exists()).thenReturn(Boolean.TRUE); when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job); when(job.waitFor()).thenReturn(job); when(job.getStatus()).thenReturn(jobStatus); when(job.getStatistics()).thenReturn(stats); - + when(stats.getCreationTime()).thenReturn(0L); when(stats.getStartTime()).thenReturn(1L); when(stats.getEndTime()).thenReturn(2L); - + final TestRunner runner = buildNewRunner(getProcessor()); addRequiredPropertiesToRunner(runner); runner.assertValid(); - + runner.enqueue("{ \"data\": \"datavalue\" }"); runner.run(); - + runner.assertAllFlowFilesTransferred(PutBigQueryStream.REL_SUCCESS); } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java index 1db776dca01b..1b6d09457416 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamTest.java @@ -21,11 +21,7 @@ import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableDefinition; -import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; import java.util.Arrays; import java.util.Collections; @@ -52,14 +48,14 @@ public class PutBigQueryStreamTest extends AbstractBQTest { private static final String MAX_ROW_SIZE = "1 MB"; private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(); private static final String TABLE_CACHE_RESET = "1 hours"; - + @Mock BigQuery bq; - + @Mock Table table; - - @Mock + + @Mock InsertAllResponse response; @Before @@ -93,48 +89,48 @@ public void testSuccessfulInsert() throws Exception { reset(bq); reset(table); reset(response); - + when(table.exists()).thenReturn(Boolean.TRUE); when(response.getInsertErrors()).thenReturn(Collections.EMPTY_MAP); when(bq.create(ArgumentMatchers.isA(TableInfo.class))).thenReturn(table); when(bq.insertAll(ArgumentMatchers.isA(InsertAllRequest.class))).thenReturn(response); - + final TestRunner runner = buildNewRunner(getProcessor()); addRequiredPropertiesToRunner(runner); runner.assertValid(); - + runner.enqueue("{ \"data\": \"datavalue\" }"); runner.run(); - + runner.assertAllFlowFilesTransferred(PutBigQueryStream.REL_SUCCESS); } - + @Test public void testFailedInsert() throws Exception { reset(bq); reset(table); reset(response); - + Map> resErrors = new HashMap(); resErrors.put(0L, Arrays.asList(new BigQueryError("reason", "location", "message"))); - + when(table.exists()).thenReturn(Boolean.TRUE); when(response.getInsertErrors()).thenReturn(resErrors); when(bq.create(ArgumentMatchers.isA(TableInfo.class))).thenReturn(table); when(bq.insertAll(ArgumentMatchers.isA(InsertAllRequest.class))).thenReturn(response); - + final TestRunner runner = buildNewRunner(getProcessor()); addRequiredPropertiesToRunner(runner); runner.assertValid(); - + runner.enqueue("{ \"data\": \"datavalue\" }"); runner.run(); - + runner.assertAllFlowFilesTransferred(PutBigQueryStream.REL_FAILURE); runner.assertAllFlowFilesContainAttribute(PutBigQueryStream.REL_FAILURE, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR); runner.assertAllFlowFilesContainAttribute(PutBigQueryStream.REL_FAILURE, BigQueryAttributes.JOB_ERROR_MSG_ATTR); runner.assertAllFlowFilesContainAttribute(PutBigQueryStream.REL_FAILURE, BigQueryAttributes.JOB_ERROR_REASON_ATTR); } -} +} \ No newline at end of file From 690ede563d1c151b726e9aca029488b5dbd50314 Mon Sep 17 00:00:00 2001 From: Mikhail Sosonkin Date: Thu, 22 Feb 2018 18:52:29 -0800 Subject: [PATCH 7/8] removing author tags --- .../nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java | 2 -- .../java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java | 1 - .../apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java | 1 - 3 files changed, 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java index 3165ca4ebc22..89c5bb8fd51b 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java @@ -38,8 +38,6 @@ /** * * Base class for creating processors that connect to GCP BiqQuery service - * - * @author Mikhail Sosonkin (Synack Inc) */ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor{ public static final Relationship REL_SUCCESS = diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java index c622a23ed6cf..c175e5130726 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java @@ -32,7 +32,6 @@ /** * - * @author Mikhail Sosonkin (Synack Inc, Synack.com) */ public class BqUtils { private final static Type gsonSchemaType = new TypeToken>(){}.getType(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java index 57951f0d23fc..e4d893b8daf2 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java @@ -63,7 +63,6 @@ /** * - * @author Mikhail Sosonkin (Synack Inc, Synack.com) */ @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) From c893ae5857c3f94f84e3f0bc297ef443eeca39d8 Mon Sep 17 00:00:00 2001 From: Mikhail Sosonkin Date: Sun, 4 Mar 2018 21:43:47 -0800 Subject: [PATCH 8/8] using attr fields --- .../nifi/processors/gcp/bigquery/PutBigQueryBatch.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java index 7735438aad7f..71cfc4d2caf1 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -266,9 +266,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation()); } else { // in case it got looped back from error - flow = session.removeAttribute(flow, "bq.job.error.message"); - flow = session.removeAttribute(flow, "bq.job.error.reason"); - flow = session.removeAttribute(flow, "bq.job.error.location"); + flow = session.removeAttribute(flow, BigQueryAttributes.JOB_ERROR_MSG_ATTR); + flow = session.removeAttribute(flow, BigQueryAttributes.JOB_ERROR_REASON_ATTR); + flow = session.removeAttribute(flow, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR); } if (!attributes.isEmpty()) {