Skip to content

Commit

Permalink
Make configuration changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Dec 6, 2015
1 parent f11c277 commit f8acc01
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 88 deletions.
18 changes: 0 additions & 18 deletions config/connect-schema-source.properties

This file was deleted.

Expand Up @@ -13,7 +13,6 @@
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test
topics=test_hdfs
hdfs.url=hdfs://localhost:9000
hdfs.conf.dir=/etc/hadoop
flush.size=3
6 changes: 3 additions & 3 deletions src/main/java/io/confluent/connect/hdfs/DataWriter.java
Expand Up @@ -371,7 +371,7 @@ private void createDir(String dir) throws IOException {

@SuppressWarnings("unchecked")
private Format getFormat() throws ClassNotFoundException, IllegalAccessException, InstantiationException{
return ((Class<Format>) Class.forName(connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CONFIG))).newInstance();
return ((Class<Format>) Class.forName(connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG))).newInstance();
}

private String getPartitionValue(String path) {
Expand Down Expand Up @@ -400,8 +400,8 @@ private Partitioner createPartitioner(HdfsSinkConnectorConfig config)

private Map<String, Object> copyConfig(HdfsSinkConnectorConfig config) {
Map<String, Object> map = new HashMap<>();
map.put(HdfsSinkConnectorConfig.PARTITION_FIELD_CONFIG, config.getString(HdfsSinkConnectorConfig.PARTITION_FIELD_CONFIG));
map.put(HdfsSinkConnectorConfig.PARTITION_DURATION_CONFIG, config.getLong(HdfsSinkConnectorConfig.PARTITION_DURATION_CONFIG));
map.put(HdfsSinkConnectorConfig.PARTITION_FIELD_NAME_CONFIG, config.getString(HdfsSinkConnectorConfig.PARTITION_FIELD_NAME_CONFIG));
map.put(HdfsSinkConnectorConfig.PARTITION_DURATION_MS_CONFIG, config.getLong(HdfsSinkConnectorConfig.PARTITION_DURATION_MS_CONFIG));
map.put(HdfsSinkConnectorConfig.PATH_FORMAT_CONFIG, config.getString(HdfsSinkConnectorConfig.PATH_FORMAT_CONFIG));
map.put(HdfsSinkConnectorConfig.LOCALE_CONFIG, config.getString(HdfsSinkConnectorConfig.LOCALE_CONFIG));
map.put(HdfsSinkConnectorConfig.TIMEZONE_CONFIG, config.getString(HdfsSinkConnectorConfig.TIMEZONE_CONFIG));
Expand Down
145 changes: 96 additions & 49 deletions src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java

Large diffs are not rendered by default.

Expand Up @@ -37,7 +37,7 @@ public class FieldPartitioner implements Partitioner {

@Override
public void configure(Map<String, Object> config) {
fieldName = (String) config.get(HdfsSinkConnectorConfig.PARTITION_FIELD_CONFIG);
fieldName = (String) config.get(HdfsSinkConnectorConfig.PARTITION_FIELD_NAME_CONFIG);
partitionFields.add(new FieldSchema(fieldName, TypeInfoFactory.stringTypeInfo.toString(), ""));
}

Expand Down
Expand Up @@ -60,9 +60,9 @@ public static long getPartition(long timeGranularityMs, long timestamp, DateTime

@Override
public void configure(Map<String, Object> config) {
long partitionDurationMs = (long) config.get(HdfsSinkConnectorConfig.PARTITION_DURATION_CONFIG);
long partitionDurationMs = (long) config.get(HdfsSinkConnectorConfig.PARTITION_DURATION_MS_CONFIG);
if (partitionDurationMs < 0) {
throw new ConfigException(HdfsSinkConnectorConfig.PARTITION_DURATION_CONFIG,
throw new ConfigException(HdfsSinkConnectorConfig.PARTITION_DURATION_MS_CONFIG,
partitionDurationMs, "Partition duration needs to be a positive.");
}

Expand Down
Expand Up @@ -45,7 +45,7 @@ public void setUp() throws Exception {
protected Map<String, String> createProps() {
Map<String, String> props = super.createProps();
props.put(HdfsSinkConnectorConfig.STORAGE_CLASS_CONFIG, MemoryStorage.class.getName());
props.put(HdfsSinkConnectorConfig.FORMAT_CONFIG, MemoryFormat.class.getName());
props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG, MemoryFormat.class.getName());
return props;
}

Expand Down
Expand Up @@ -148,7 +148,7 @@ public void testHiveIntegrationFieldPartitionerAvro() throws Exception {
Map<String, String> props = createProps();
props.put(HdfsSinkConnectorConfig.HIVE_INTEGRATION_CONFIG, "true");
props.put(HdfsSinkConnectorConfig.PARTITIONER_CLASS_CONFIG, FieldPartitioner.class.getName());
props.put(HdfsSinkConnectorConfig.PARTITION_FIELD_CONFIG, "int");
props.put(HdfsSinkConnectorConfig.PARTITION_FIELD_NAME_CONFIG, "int");

HdfsSinkConnectorConfig config = new HdfsSinkConnectorConfig(props);
DataWriter hdfsWriter = new DataWriter(config, context, avroData);
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testHiveIntegrationFieldPartitionerAvro() throws Exception {
assertEquals(expectedColumnNames, actualColumnNames);


String partitionFieldName = config.getString(HdfsSinkConnectorConfig.PARTITION_FIELD_CONFIG);
String partitionFieldName = config.getString(HdfsSinkConnectorConfig.PARTITION_FIELD_NAME_CONFIG);
String directory1 = TOPIC + "/" + partitionFieldName + "=" + String.valueOf(16);
String directory2 = TOPIC + "/" + partitionFieldName + "=" + String.valueOf(17);
String directory3 = TOPIC + "/" + partitionFieldName + "=" + String.valueOf(18);
Expand Down
Expand Up @@ -65,7 +65,7 @@ public class TopicPartitionWriterTest extends TestWithMiniDFSCluster {
public void setUp() throws Exception {
super.setUp();

Format format = ((Class<Format>) Class.forName(connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CONFIG))).newInstance();
Format format = ((Class<Format>) Class.forName(connectorConfig.getString(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG))).newInstance();
writerProvider = format.getRecordWriterProvider();
schemaFileReader = format.getSchemaFileReader(avroData);
extension = writerProvider.getExtension();
Expand Down Expand Up @@ -116,7 +116,7 @@ public void testWriteRecordFieldPartitioner() throws Exception {
Partitioner partitioner = new FieldPartitioner();
partitioner.configure(config);

String partitionField = (String) config.get(HdfsSinkConnectorConfig.PARTITION_FIELD_CONFIG);
String partitionField = (String) config.get(HdfsSinkConnectorConfig.PARTITION_FIELD_NAME_CONFIG);

TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
TOPIC_PARTITION, storage, writerProvider, partitioner, connectorConfig, context, avroData);
Expand Down Expand Up @@ -172,7 +172,7 @@ public void testWriteRecordTimeBasedPartition() throws Exception {
topicPartitionWriter.close();


long partitionDurationMs = (Long) config.get(HdfsSinkConnectorConfig.PARTITION_DURATION_CONFIG);
long partitionDurationMs = (Long) config.get(HdfsSinkConnectorConfig.PARTITION_DURATION_MS_CONFIG);
String pathFormat = (String) config.get(HdfsSinkConnectorConfig.PATH_FORMAT_CONFIG);
String timeZoneString = (String) config.get(HdfsSinkConnectorConfig.TIMEZONE_CONFIG);
long timestamp = System.currentTimeMillis();
Expand All @@ -191,8 +191,8 @@ public void testWriteRecordTimeBasedPartition() throws Exception {

private Map<String, Object> createConfig() {
Map<String, Object> config = new HashMap<>();
config.put(HdfsSinkConnectorConfig.PARTITION_FIELD_CONFIG, "int");
config.put(HdfsSinkConnectorConfig.PARTITION_DURATION_CONFIG, TimeUnit.HOURS.toMillis(1));
config.put(HdfsSinkConnectorConfig.PARTITION_FIELD_NAME_CONFIG, "int");
config.put(HdfsSinkConnectorConfig.PARTITION_DURATION_MS_CONFIG, TimeUnit.HOURS.toMillis(1));
config.put(HdfsSinkConnectorConfig.PATH_FORMAT_CONFIG, "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/");
config.put(HdfsSinkConnectorConfig.LOCALE_CONFIG, "en");
config.put(HdfsSinkConnectorConfig.TIMEZONE_CONFIG, "America/Los_Angeles");
Expand Down
Expand Up @@ -42,7 +42,7 @@ public class DataWriterParquetTest extends TestWithMiniDFSCluster {
@Override
protected Map<String, String> createProps() {
Map<String, String> props = super.createProps();
props.put(HdfsSinkConnectorConfig.FORMAT_CONFIG, ParquetFormat.class.getName());
props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG, ParquetFormat.class.getName());
return props;
}

Expand Down
Expand Up @@ -46,7 +46,7 @@ public class HiveIntegrationParquetTest extends HiveTestBase {
protected Map<String, String> createProps() {
Map<String, String> props = super.createProps();
props.put(HdfsSinkConnectorConfig.SHUTDOWN_TIMEOUT_CONFIG, "10000");
props.put(HdfsSinkConnectorConfig.FORMAT_CONFIG, ParquetFormat.class.getName());
props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG, ParquetFormat.class.getName());
return props;
}

Expand Down Expand Up @@ -151,7 +151,7 @@ public void testHiveIntegrationFieldPartitionerParquet() throws Exception {
Map<String, String> props = createProps();
props.put(HdfsSinkConnectorConfig.HIVE_INTEGRATION_CONFIG, "true");
props.put(HdfsSinkConnectorConfig.PARTITIONER_CLASS_CONFIG, FieldPartitioner.class.getName());
props.put(HdfsSinkConnectorConfig.PARTITION_FIELD_CONFIG, "int");
props.put(HdfsSinkConnectorConfig.PARTITION_FIELD_NAME_CONFIG, "int");

HdfsSinkConnectorConfig config = new HdfsSinkConnectorConfig(props);
DataWriter hdfsWriter = new DataWriter(config, context, avroData);
Expand Down Expand Up @@ -188,7 +188,7 @@ public void testHiveIntegrationFieldPartitionerParquet() throws Exception {
assertEquals(expectedColumnNames, actualColumnNames);


String partitionFieldName = config.getString(HdfsSinkConnectorConfig.PARTITION_FIELD_CONFIG);
String partitionFieldName = config.getString(HdfsSinkConnectorConfig.PARTITION_FIELD_NAME_CONFIG);
String directory1 = TOPIC + "/" + partitionFieldName + "=" + String.valueOf(16);
String directory2 = TOPIC + "/" + partitionFieldName + "=" + String.valueOf(17);
String directory3 = TOPIC + "/" + partitionFieldName + "=" + String.valueOf(18);
Expand Down
Expand Up @@ -47,7 +47,7 @@ public class ParquetHiveUtilTest extends HiveTestBase {
@Override
protected Map<String, String> createProps() {
Map<String, String> props = super.createProps();
props.put(HdfsSinkConnectorConfig.FORMAT_CONFIG, ParquetFormat.class.getName());
props.put(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG, ParquetFormat.class.getName());
return props;
}

Expand Down

0 comments on commit f8acc01

Please sign in to comment.