Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public class BigQuerySyncConfig extends HoodieSyncConfig implements Serializable
.markAdvanced()
.withDocumentation("Name of the target project in BigQuery");

public static final ConfigProperty<String> BIGQUERY_SYNC_BILLING_PROJECT_ID = ConfigProperty
.key("hoodie.gcp.bigquery.sync.billing.project.id")
.noDefaultValue()
.sinceVersion("0.15.1")
.markAdvanced()
.withDocumentation("Name of the billing project id in BigQuery. By default it uses the "
+ "configuration from `hoodie.gcp.bigquery.sync.project_id` if this configuration is "
+ "not set. This can only be used with manifest file based approach");

public static final ConfigProperty<String> BIGQUERY_SYNC_DATASET_NAME = ConfigProperty
.key("hoodie.gcp.bigquery.sync.dataset_name")
.noDefaultValue()
Expand Down Expand Up @@ -156,6 +165,8 @@ public static class BigQuerySyncConfigParams {

@Parameter(names = {"--project-id"}, description = "Name of the target project in BigQuery", required = true)
public String projectId;
@Parameter(names = {"--billing-project-id"}, description = "Name of the billing project in BigQuery. This can only be used with --use-bq-manifest-file", required = false)
public String billingProjectId;
@Parameter(names = {"--dataset-name"}, description = "Name of the target dataset in BigQuery", required = true)
public String datasetName;
@Parameter(names = {"--dataset-location"}, description = "Location of the target dataset in BigQuery", required = true)
Expand All @@ -181,6 +192,7 @@ public boolean isHelp() {
public TypedProperties toProps() {
final TypedProperties props = hoodieSyncConfigParams.toProps();
props.setPropertyIfNonNull(BIGQUERY_SYNC_PROJECT_ID.key(), projectId);
props.setPropertyIfNonNull(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), billingProjectId);
props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_NAME.key(), datasetName);
props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_LOCATION.key(), datasetLocation);
props.setPropertyIfNonNull(BIGQUERY_SYNC_TABLE_NAME.key(), hoodieSyncConfigParams.tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;

import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;

public class HoodieBigQuerySyncClient extends HoodieSyncClient {
Expand All @@ -66,6 +68,7 @@ public class HoodieBigQuerySyncClient extends HoodieSyncClient {

protected final BigQuerySyncConfig config;
private final String projectId;
private final String billingProjectId;
private final String bigLakeConnectionId;
private final String datasetName;
private final boolean requirePartitionFilter;
Expand All @@ -75,6 +78,7 @@ public HoodieBigQuerySyncClient(final BigQuerySyncConfig config) {
super(config);
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
this.billingProjectId = config.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID);
this.bigLakeConnectionId = config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
this.requirePartitionFilter = config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
Expand All @@ -86,6 +90,7 @@ public HoodieBigQuerySyncClient(final BigQuerySyncConfig config) {
super(config);
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
this.billingProjectId = config.getStringOrDefault(BIGQUERY_SYNC_BILLING_PROJECT_ID, this.projectId);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
this.requirePartitionFilter = config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
this.bigquery = bigquery;
Expand Down Expand Up @@ -131,7 +136,7 @@ public void createOrUpdateTableUsingBqManifestFile(String tableName, String bqMa
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
.setUseLegacySql(false)
.build();
JobId jobId = JobId.newBuilder().setProject(projectId).setRandomJob().build();
JobId jobId = JobId.newBuilder().setProject(billingProjectId).setRandomJob().build();
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

queryJob = queryJob.waitFor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
Expand All @@ -49,6 +50,7 @@ public class TestBigQuerySyncConfig {
public void testGetConfigs() {
Properties props = new Properties();
props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), "fooproject");
props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), "foobillingproject");
props.setProperty(BIGQUERY_SYNC_DATASET_NAME.key(), "foodataset");
props.setProperty(BIGQUERY_SYNC_DATASET_LOCATION.key(), "US");
props.setProperty(BIGQUERY_SYNC_TABLE_NAME.key(), "footable");
Expand All @@ -61,6 +63,7 @@ public void testGetConfigs() {
props.setProperty(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key(), "true");
BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
assertEquals("fooproject", syncConfig.getString(BIGQUERY_SYNC_PROJECT_ID));
assertEquals("foobillingproject", syncConfig.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID));
assertEquals("foodataset", syncConfig.getString(BIGQUERY_SYNC_DATASET_NAME));
assertEquals("US", syncConfig.getString(BIGQUERY_SYNC_DATASET_LOCATION));
assertEquals("footable", syncConfig.getString(BIGQUERY_SYNC_TABLE_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,27 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestHoodieBigQuerySyncClient {
private static final String PROJECT_ID = "test_project";
private static final String BILLING_PROJECT_ID = "test_billing_project";
private static final String MANIFEST_FILE_URI = "file:/manifest_file";
private static final String SOURCE_PREFIX = "file:/manifest_file/date=*";
private static final String TEST_TABLE = "test_table";
Expand All @@ -82,12 +87,38 @@ static void setupOnce() throws Exception {
@BeforeEach
void setup() {
properties = new Properties();
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
properties.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
properties.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), BILLING_PROJECT_ID);
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), TEST_DATASET);
properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), tempDir.toString());
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(), "true");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testCreateOrUpdateTableUsingManifestWithBillingProjectId(boolean setBillingProjectId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the new test here.

Properties props = new Properties();
props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
if (setBillingProjectId) {
props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), BILLING_PROJECT_ID);
}
props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), TEST_DATASET);
props.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), tempDir.toString());
props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(), "true");
BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
Job mockJob = mock(Job.class);
ArgumentCaptor<JobInfo> jobInfoCaptor = ArgumentCaptor.forClass(JobInfo.class);
when(mockBigQuery.create(jobInfoCaptor.capture())).thenReturn(mockJob);

HoodieBigQuerySyncClient syncClient = new HoodieBigQuerySyncClient(syncConfig, mockBigQuery);
Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
syncClient.createOrUpdateTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, SOURCE_PREFIX, schema);

assertEquals(
setBillingProjectId ? BILLING_PROJECT_ID : PROJECT_ID,
jobInfoCaptor.getValue().getJobId().getProject());
}

@Test
void createTableWithManifestFile_partitioned() throws Exception {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(), "my-project.us.bl_connection");
Expand Down