Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom 'path.format' of 'FieldPartitioner' #397

Closed
counter2015 opened this issue Dec 13, 2018 · 5 comments
Closed

Custom 'path.format' of 'FieldPartitioner' #397

counter2015 opened this issue Dec 13, 2018 · 5 comments

Comments

@counter2015
Copy link

counter2015 commented Dec 13, 2018

I'm new for HDFS connector.In my case, I want to use kafka to do something like log collect. I'm using HDFS connector to put my data from kafka to HDFS. and I'm using Fieldpartitioner with fields name.

For example , here is my sample data:

{"type":"type1", "time":"2018-12-13", "msg":"anything"}
{"type":"type2", "time":"2018-12-13", "msg":"anything"}

I want to store them to hdfs as such path "path-to/type/time"
for example "/tmp/type1/2018-12-13".

however, it worked as "/tmp/tpye=type1/time=2018-12-13"
here is my configuration of HDFS connector:

{
  "name": "hdfs-sink-f1",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "t1",
    "hdfs.url": "hdfs://<host-name>:<port>/<path-to>",
    "flush.size": "1",
    "format.class": "io.confluent.connect.hdfs.json.JsonFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
    "partition.field.name": "type,time"
  }
}

I try to use "path.format", but it didn't work, in the doc of "HDFS Connector Configuration Options",it says

path.format
This configuration is used to set the format of the data directories when partitioning with TimeBasedPartitioner.
The format set in this configuration converts the Unix timestamp to proper directories strings.
For example, if you set path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH, the data directories will have the format /year=2015/month=12/day=07/hour=15/.
Type: string
Default: “”
Importance: medium

So is it unable to set the format of "FieldPartitioner" through "path.format"?

Is there any easy way to config the connector to achieve my goal?

Thank you!

@OneCricketeer
Copy link

OneCricketeer commented Jan 16, 2019

The reason for the key=value format is that is how Hive (and Spark) prefers to read partitions for SQL and push-down predicate access. It's mostly just extra metadata. There isn't a current configuration to exclude the "field=" prefix.

is it unable to set the format of "FieldPartitioner" through "path.format"

Yes, as the docs say, when partitioning with TimeBasedPartitioner, you can look in the source of the FieldPartitioner to see it doesn't use the path.format config property

@counter2015
Copy link
Author

@Cricket007 Thanks for your reply.

In my situation, many of downstream application code depend on the path format of hdfs, the former way of collecting log is through Flume. Flume then send it to Kafka and HDFS.

If only add this feature.

@OneCricketeer
Copy link

I still think Flume (or Filebeat or Fluentbit) are still necessary tools. Running a JVM to tail a log file is rather unnecessary, in my opinion. Plus, I've not used the spooldir connector to really say if that's a reasonable option for tailing logs... My point here is that Flume itself has an HDFS Sink, which is one option, otherwise Hortonworks supports Nifi for this use case, and Cloudera seems to partner with Streamsets.

Overall, I'm saying this feature doesn't currently exist, but if you're comfortable with Java, adding JARs to connector classpath isn't too difficult

@counter2015
Copy link
Author

My initial idea was to replace Flume with Kafka Connector in order to reduce the maintenance cost of the system. At the same time, the impact of replacement on the original system should minimized.

I thought at first that all I had to do was to simply modify FileStreamSource Connector and to configure HDFS connector, But now it's much more complicated than I thought.

In this case, it seems that the source code has to be modified to meet this requirement.And implement tail -F and monitoring log scrolling and fault tolerant processing……

Thank you for your patient reply and explanation.😃

I will close the issue then.

@OneCricketeer
Copy link

OneCricketeer commented Jan 16, 2019

Note: This project is a sink to HDFS. It cannot read your log files. The FileStreamSinkConnector also doesn't monitor new files, or tail them. Perhaps you are looking for this source connector -https://github.com/jcustenborder/kafka-connect-spooldir

Or one of the other tools I mentioned (Filebeat and Fluentbit), which, as I point out, are more lightweight than Flume, but still require a config file. Or you can try using NiFi and Minifi, or Streamsets and SDC-Edge for a GUI / WYSIWYG approach at data collection, and optionally into Kafka, or just directly into HDFS/Hive using those tools

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants