Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface Record {
boolean isDropUnknownFields();

/**
* Updates the Record's schema to to incorporate all of the fields in the given schema. If both schemas have a
* Updates the Record's schema to incorporate all of the fields in the given schema. If both schemas have a
* field with the same name but a different type, then the existing schema will be updated to have a
* {@link RecordFieldType#CHOICE} field with both types as choices. If two fields have the same name but different
* default values, then the default value that is already in place will remain the default value, unless the current
Expand Down
14 changes: 14 additions & 0 deletions nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;


/**
* Util class for schema manipulation
*/
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
Expand Down Expand Up @@ -63,10 +64,12 @@

/**
* A processor for batch loading data into a Google BigQuery table
* @deprecated use {@link PutBigQuery} instead which uses the Write API
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.")
@Tags({ "google", "google cloud", "bq", "bigquery" })
@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. Batch loads flow files content to a Google BigQuery table.")
@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class })
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
Expand All @@ -79,6 +82,7 @@
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
@Deprecated
public class PutBigQueryBatch extends AbstractBigQueryProcessor {

private static final List<String> TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
Expand Down Expand Up @@ -67,19 +68,27 @@
* output table to periodically clean these rare duplicates. Alternatively, using the Batch insert
* method does guarantee no duplicates, though the latency for the insert into BigQuery will be much
* higher.
*
* @deprecated use {@link PutBigQuery} instead which uses the Write API
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.")
@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" })
@CapabilityDescription("Load data into Google BigQuery table using the streaming API. This processor "
@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. "
+ "Load data into Google BigQuery table using the streaming API. This processor "
+ "is not intended to load large flow files as it will load the full content into memory. If "
+ "you need to insert large flow files, consider using PutBigQueryBatch instead.")
@SeeAlso({ PutBigQueryBatch.class })
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
@Deprecated
public class PutBigQueryStreaming extends AbstractBigQueryProcessor {

private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");

public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.RECORD_READER_ATTR)
.displayName("Record Reader")
Expand All @@ -98,9 +107,6 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
.defaultValue("false")
.build();

private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
Expand Down Expand Up @@ -182,8 +188,8 @@ private Map<String, Object> convertMapRecord(Map<String, Object> map) {
if (obj instanceof MapRecord) {
result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
} else if (obj instanceof Object[]
&& ((Object[]) obj).length > 0
&& ((Object[]) obj)[0] instanceof MapRecord) {
&& ((Object[]) obj).length > 0
&& ((Object[]) obj)[0] instanceof MapRecord) {
List<Map<String, Object>> lmapr = new ArrayList<Map<String, Object>>();
for (Object mapr : ((Object[]) obj)) {
lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
Expand All @@ -198,7 +204,7 @@ private Map<String, Object> convertMapRecord(Map<String, Object> map) {
// ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
// the local system time zone to the GMT time zone
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time) obj).getTime()), ZoneOffset.UTC);
result.put(key, dateTime.format(timeFormatter) );
result.put(key, dateTime.format(timeFormatter));
} else if (obj instanceof Date) {
result.put(key, obj.toString());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.processors.gcp.bigquery.proto;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

/**
* Util class for protocol buffer messaging
*/
public class ProtoUtils {

public static DynamicMessage createMessage(Descriptors.Descriptor descriptor, Map<String, Object> valueMap) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);

for (Descriptors.FieldDescriptor field : descriptor.getFields()) {
String name = field.getName();
Object value = valueMap.get(name);
if (value == null) {
continue;
}

if (Descriptors.FieldDescriptor.Type.MESSAGE.equals(field.getType())) {
if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), (Map<String, Object>) act)));
} else {
builder.setField(field, createMessage(field.getMessageType(), (Map<String, Object>) value));
}
} else {
// Integer in the bigquery table schema maps back to INT64 which is considered to be Long on Java side:
// https://developers.google.com/protocol-buffers/docs/proto3
if (value instanceof Integer && (field.getType() == Descriptors.FieldDescriptor.Type.INT64)) {
value = Long.valueOf((Integer) value);
}

if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, act));
} else {
builder.setField(field, value);
}
}
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
org.apache.nifi.processors.gcp.bigquery.PutBigQuery
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
org.apache.nifi.processors.gcp.drive.ListGoogleDrive
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<head>
<meta charset="utf-8"/>
<title>PutBigQuery</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>

<h1>Streaming Versus Batching Data</h1>

<p>
PutBigQuery is record based and is relying on the gRPC based Write API using protocol buffers. The underlying stream supports both streaming and batching approaches.
</p>

<h3>Streaming</h3>
<p>
With streaming the appended data to the stream is instantly available in BigQuery for reading. It is configurable how many records (rows) should be appended at once.
Only one stream is established per flow file so at the conclusion of the FlowFile processing the used stream is closed and a new one is opened for the next FlowFile.
Supports exactly once delivery semantics via stream offsets.
</p>

<h3>Batching</h3>
<p>
Similarly to the streaming approach one stream is opened for each FlowFile and records are appended to the stream. However data is not available in BigQuery until it is
committed by the processor at the end of the FlowFile processing.
</p>

<h1>Improvement opportunities</h1>
<p>
<ul>
<li>The table has to exist on BigQuery side it is not created automatically</li>
<li>The Write API supports multiple streams for parallel execution and transactionality across streams. This is not utilized at the moment as this would be covered on NiFI framework level.</li>
</ul>
</p>

<p>
The official <a href="https://cloud.google.com/bigquery/docs/write-api">Google Write API documentation</a> provides additional details.
</p>


</body>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.gcp.bigquery;


import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
Expand All @@ -26,30 +25,26 @@
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;

/**
* Base class for BigQuery Unit Tests. Provides a framework for creating a TestRunner instance with always-required credentials.
*/
@ExtendWith(MockitoExtension.class)
public abstract class AbstractBQTest {
private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project");
private static final Integer RETRIES = 9;

static final String DATASET = RemoteBigQueryHelper.generateDatasetName();

@BeforeEach
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
}

public static TestRunner buildNewRunner(Processor processor) throws Exception {
final GCPCredentialsService credentialsService = new GCPCredentialsControllerService();

Expand Down Expand Up @@ -84,13 +79,8 @@ public void testBiqQueryOptionsConfiguration() throws Exception {
final BigQueryOptions options = processor.getServiceOptions(runner.getProcessContext(),
mockCredentials);

assertEquals("Project IDs should match",
PROJECT_ID, options.getProjectId());

assertEquals("Retry counts should match",
RETRIES.intValue(), options.getRetrySettings().getMaxAttempts());

assertSame("Credentials should be configured correctly",
mockCredentials, options.getCredentials());
assertEquals(PROJECT_ID, options.getProjectId(), "Project IDs should match");
assertEquals(RETRIES.intValue(), options.getRetrySettings().getMaxAttempts(), "Retry counts should match");
assertSame(mockCredentials, options.getCredentials(), "Credentials should be configured correctly");
}
}
Loading