Skip to content

Commit

Permalink
Upgraded snowflake-kafka-connector to v1.9.4
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilerigila09 committed Jun 6, 2024
1 parent 15e22e3 commit dbe339c
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<test.additional.args />
<test.additional.args/>
<testReuseFork>true</testReuseFork>
<testForkCount>4</testForkCount>
<testRetryCount>1</testRetryCount>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-snowflake-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
<dependency>
<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>1.7.2</version>
<version>1.9.4</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@

import static org.mockito.Mockito.when;

import com.snowflake.kafka.connector.internal.Logging;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.mockito.Mockito;

@Slf4j
public class SnowflakeSinkConnectorMock extends SnowflakeSinkConnector {

private KCLogger DYNAMIC_LOGGER;
final SnowflakeSinkConnector wrapped;
final SnowflakeConnectionService connMock;
final SnowflakeTelemetryService telemetryMock;

public SnowflakeSinkConnectorMock() {
log.info("[sf_mock] Creating SnowflakeSinkConnectorMock");
DYNAMIC_LOGGER = new KCLogger(this.getClass().getName());
this.DYNAMIC_LOGGER.info("[sf_mock] Creating SnowflakeSinkConnectorMock");
wrapped = new SnowflakeSinkConnector();
telemetryMock = Mockito.mock(SnowflakeTelemetryService.class);
connMock = Mockito.mock(SnowflakeConnectionService.class);
Expand Down Expand Up @@ -74,7 +74,7 @@ public ConfigDef config() {
@Override
public void start(Map<String, String> parsedConfig) {
Utils.checkConnectorVersion();
log.info(Logging.logMessage("SnowflakeSinkConnector:start"));
this.DYNAMIC_LOGGER.info("SnowflakeSinkConnector:start");

FieldUtils.writeDeclaredField(wrapped, "setupComplete", false, true);
FieldUtils.writeDeclaredField(wrapped, "connectorStartTime", System.currentTimeMillis(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,44 @@
*/
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.snowflake.kafka.connector.internal.InternalUtilsAccessor;
import com.snowflake.kafka.connector.internal.Logging;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1;
import com.snowflake.kafka.connector.internal.SnowflakeIngestionService;
import com.snowflake.kafka.connector.internal.SnowflakeIngestionServiceV1;
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory;
import com.snowflake.kafka.connector.internal.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.internal.SnowflakeTelemetryServiceV1;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryServiceV1;
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.mockito.Mockito;

@Slf4j
public class SnowflakeSinkTaskMock extends SnowflakeSinkTask {

private final KCLogger DYNAMIC_LOGGER;
final SnowflakeSinkTask wrapped;
final SnowflakeConnectionService connMock;
final SnowflakeTelemetryService telemetryMock;
final SnowflakeIngestionService ingestionMock;

public SnowflakeSinkTaskMock() {
log.info("[sf_mock] Creating SnowflakeSinkTaskMock");
DYNAMIC_LOGGER = new KCLogger(this.getClass().getName());
this.DYNAMIC_LOGGER.info("[sf_mock] Creating SnowflakeSinkTaskMock");
wrapped = new SnowflakeSinkTask();
telemetryMock = Mockito.mock(SnowflakeTelemetryServiceV1.class);

Expand Down Expand Up @@ -97,8 +98,8 @@ public void start(Map<String, String> parsedConfig) {
long startTime = System.currentTimeMillis();

String id = parsedConfig.getOrDefault(Utils.TASK_ID, "-1");
FieldUtils.writeDeclaredField(wrapped, "id", id, true);
log.info(Logging.logMessage("SnowflakeSinkTask[ID:{}]:start", id));
FieldUtils.writeDeclaredField(wrapped, "taskConfigId", id, true);
this.DYNAMIC_LOGGER.info("SnowflakeSinkTask[TaskConfigID:{}]:start", id);

// generate topic to table map
Map<String, String> topic2table = new HashMap<>();
Expand Down Expand Up @@ -130,6 +131,7 @@ public void start(Map<String, String> parsedConfig) {
}

FieldUtils.writeDeclaredField(wrapped, "conn", connMock, true);
FieldUtils.writeDeclaredField(wrapped, "ingestionMethodConfig", SNOWPIPE, true);

SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(parsedConfig);

Expand All @@ -145,16 +147,15 @@ public void start(Map<String, String> parsedConfig) {

FieldUtils.writeDeclaredField(wrapped, "sink", sink, true);

log.info(
Logging.logMessage(
"SnowflakeSinkTask[ID:{}]:start. Time: {} seconds",
id,
(System.currentTimeMillis() - startTime) / 1000));
this.DYNAMIC_LOGGER.info(
"SnowflakeSinkTask[TaskConfigID:{}]:start. Time: {} seconds",
id,
(System.currentTimeMillis() - startTime) / 1000);
}

@Override
public void put(Collection<SinkRecord> collection) {
log.info("[sf_mock] put of {} items", collection.size());
this.DYNAMIC_LOGGER.info("[sf_mock] put of {} items", collection.size());
wrapped.put(collection);
}

Expand Down
4 changes: 2 additions & 2 deletions snowflake-kafka-connector-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"/>
</transformers>
</configuration>
</plugin>
Expand Down

0 comments on commit dbe339c

Please sign in to comment.