Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support avro ingestion for realtime & hadoop batch indexing #1858

Merged
merged 1 commit into from
Jan 5, 2016

Conversation

zhaown
Copy link
Contributor

@zhaown zhaown commented Oct 25, 2015

Apache Avro™ is a schematic data serialization system, well integrated with hadoop ecosystem. Object schema is saved with encoded objects into a Object Container File, so decoding is easy and natively supported by hadoop. Adding avro ingestion support to druid hadoop indexing is not complicate.

Sending encoded avro objects into streams like kafka, you don't want to send schema with every object, the overhead is usually too big, but you still need schema to decode it, so people use schema repository. In producer end you register schema to the repository before sending, get an schema id, send the id with encoded data to stream. In consumer end you extract the schema id from stream message, lookup the schema from repository, use it to decode the data.

This PR treat avro object as Map<String, Object>, actually in early versions avro GenericRecord does extend Map<String, Object>. In hadoop batch ingestion, it uses custom AvroValueInputFormat, AvroHadoopInputRowParser and AvroParseSpec. If your need a reader schema that differs with writer schema, you can set it in the tuningConfig#jobProperties with name avro.schema.input.value or avro.schema.path.input.value, the former is JSON text while the later is the file path in HDFS of the reader schema, if both are setted, the former is used. NOTE: The reader schema is for all input files. The spec file will look likes:

{
  "type": "index_hadoop",
  "spec": {
    "dataSchema": {
      "parser": {
        "type": "avro_hadoop",
        "parsSpec": {
          "format": "timeAndDims",
          "timestampSpec": {},
          "dimensionsSpec": {}
        }
      }
    },
    "ioConfig": {
      "type": "hadoop",
      "inputSpec": {
        "type": "",
        "inputFormat": "io.druid.data.input.avro.AvroValueInputFormat"
      }
    },
    "tuningConfig": {
       "jobProperties": {
          "avro.schema.path.input.value": "/path/to/my/schema.avsc",
          "avro.schema.input.value": "my_schema_JSON_text"
      }
    }
  }

Towards realtime ingestion, it needs an schema repository, using shcema-repo. I'm aware there are two schema repository implementaions, the other is schema-registry from Conflunt, probably written by same team. The latter is easier to use, but not in maven central, so I choosed the formmer. This extension should be easy to extend to communate with some other certain kind of schema registry servers. The spec file will look likes:

{
  "dataSchema": {
    "parser": {
      "type": "avro_stream",
      "avroBytesDecoder": {
        "type": "schema_repo",
        "subjectAndIdConverter": {
          "type": "avro_1124",
          "topic": "${YOUR_TOPIC}"
        },
        "schemaRepository": {
          "type": "avro_1124_rest_client",
          "url":"${YOUR_SCHEMA_REPO_END_POINT}",
        }
      },
      "parsSpec": {
        "format": "timeAndDims",
        "timestampSpec": {},
        "dimensionsSpec": {}
      }
    }
}

Fixes #1844

@zhaown
Copy link
Contributor Author

zhaown commented Oct 25, 2015

Towards wrapping avro object as Map<String, Object>, I'm thinking maybe druid could support nested dimensions/metrics. It could be expressed likelevel_0_name.level_1_name, in InputRow#getDimension() and InputRow#getLongMetric(), it can check if the full name in the keySet, if not then check if the level_0_name in the keySet && map.get("level_0_name") is a map, and do it recursively.

So if there is a top level field named "level-0-name.level-1-name", then druid gets its value as dimensions/metrics, if there isn't the top level but a nested field {"level_0_name": {"level_1_name": "nested_value:}}, then druid gets the nested_value.

The cons is the ambiguity, but I think in some scenario it's acceptable?

@zhaown zhaown mentioned this pull request Oct 25, 2015
@will-lauer
Copy link
Contributor

At least for batch indexing, it would be ideal if an avro reader schema could be passed in as part of the parse spec (I would prefer passing a filename rather than embedding the entire schema directly in the parse space). That way, if the schema of the avro files changes over time, the files can still be ingested together and provide a consistent view. If no reader schema is specified, the schema embedded in the avro file should be used. Something like:

"parser": {
    "type": "avro-hadoop",
        "parsSpec": {
            "format": "avro",
            "readerSchema": "/path/to/my/schema.avsc"
            "timestampSpec": {},
            "dimensionsSpec": {}
        }
    }
}

return doParse(record, parseSpec, dimensions);
}
catch (IOException e) {
throw new RuntimeException("Fail to decode avro message!", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this always a hard failure? meaning is it possible to skip bad values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IOException comes from the InputStream#read(), usually because corrupt data or wrong schema/data match, which make the whole record is unreadable and the partially read data meaningless. So I think it's impossible to skip the bad field in one row(avro record). And for the wrong schema/data match, there is a better way than skipping the bad field: set proper reader schema against the writer schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be ParseException instead of RuntimeException ?

@fjy fjy added this to the 0.9.0 milestone Oct 26, 2015
@zhaown
Copy link
Contributor Author

zhaown commented Oct 27, 2015

@will-lauer Yes, you are right, reader schema should be supported.

But it should not be one schema per parserSpec nor parser, but per pathSpec, because you could need one parser to parse all avro datas in different dirs with different schema, like pv/impr/click. I currently cannot find a decent way to do this.

Adding custom properties into PathSpec seems a bit overdo?

@zhaown
Copy link
Contributor Author

zhaown commented Oct 27, 2015

@drcrallen the CI build is failed in jdk8 while successed in jdk7, and the failed module(druid-server) is irrelevant to this PR, need it a rebuild?

@drcrallen
Copy link
Contributor

Test in failure: AnnouncerTest.testSanity:99 » test timed out after 60000 milliseconds

Some of the tests which use zookeeper have unpredictable problems in Travis CI. Please close then open the PR to restart the test.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.data.input;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen
Copy link
Contributor

@zhaown FYI, a few of these files will need newlines at the end of the files.

</plugin>
</plugins>
</build>
</project>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in general druid files always end with a newline, can you have same in all the files?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are multiple files which need that. I just had a comment in the main thread of the PR

if (readerSchema == null) {
String schemaFilePath = context.getConfiguration().get(CONF_INPUT_VALUE_SCHEMA_PATH);
if (StringUtils.isNotBlank(schemaFilePath)) {
log.info(String.format("Using file: %s as reader schema.", schemaFilePath));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

com.metamx.common.Logger.info has the formatter builtin so String.format isn't needed

@zhaown
Copy link
Contributor Author

zhaown commented Nov 15, 2015

@himanshug @codingwhatever Please check if the commit is ok, there is something to be done in unit tests, I'll make them right later, have to go now... The main mess is that testing serde with avro-1124-relevant classes, if you init an Avro1124RESTRepositoryClient with an endpoint url, it will actually communicate with the endpoint...

PS: I think the io.druid.TestObjectMapper is not packaged with the druid-api.jar, so I have to use and configure my own ObjectMapper.


@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRepoBasedAvroBytesParser.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "schema_repo", value = SchemaRepoBasedAvroBytesParser.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you call it "schemaRepo" ? that would be consistent with other names used across druid.

@himanshug
Copy link
Contributor

can you please add documentation for this module under docs/content/development/avro-extensions.md and link that as an experimental feature in docs/content/toc.textile ?

@codingwhatever
Copy link

@zhaown The schema reading logic and the union/array logic for hadoop ingestion look correct now.

@fjy
Copy link
Contributor

fjy commented Nov 24, 2015

@zhaown if we can get some docs, I'm 👍

@fjy
Copy link
Contributor

fjy commented Dec 18, 2015

@zhaown can we finish this one off?

@zhaown
Copy link
Contributor Author

zhaown commented Dec 20, 2015

@fjy and all, really sorry for the disappearance, I've been busy on another project, and I think it needs some field tests because the code is changed a bit from what I've tested initially. I'll try to add docs and do the tests this week.

@zhaown
Copy link
Contributor Author

zhaown commented Dec 28, 2015

@fjy Added some docs, please check if them are OK.

@@ -91,6 +91,54 @@ If `type` is not included, the parser defaults to `string`.
| type | String | This should say `protobuf`. | no |
| parseSpec | JSON Object | Specifies the format of the data. | yes |

### Avro Stream Parser
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaown can you include a full spec that works for avro ingestion?

@fjy
Copy link
Contributor

fjy commented Dec 28, 2015

I'm 👍 . @himanshug ?

@@ -137,6 +137,7 @@ Is a type of inputSpec where a static path to where the data files are located i
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|paths|Array of String|A String of input paths indicating where the raw data is located.|yes|
|inputFormat|String|The input format of the data files.|no|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you say that default is TextInputFormat?

@himanshug
Copy link
Contributor

@zhaown I'm assuming nothing changed other than addition of documentation. 👍 besides the minor comments. pls squash your commits. thanks for the contrib, this will be very useful to many.

@fjy
Copy link
Contributor

fjy commented Dec 30, 2015

I can merge this once teh documentation changes are addressed.

@fjy
Copy link
Contributor

fjy commented Dec 31, 2015

@zhaown For information about squashing your commits, please see: https://github.com/druid-io/druid/blob/master/CONTRIBUTING.md

@zhaown
Copy link
Contributor Author

zhaown commented Jan 2, 2016

@fjy @himanshug Done some docs refinement, pls check if the grammer and else is OK, if it's OK I'll squash my commits.

@zhaown zhaown force-pushed the avro-module branch 2 times, most recently from 7e11ca1 to 48f068c Compare January 2, 2016 10:42
@fjy
Copy link
Contributor

fjy commented Jan 4, 2016

👍 Please squash your commits

@himanshug any more comments?

@himanshug
Copy link
Contributor

👍 ready to merge after squashing

@zhaown
Copy link
Contributor Author

zhaown commented Jan 5, 2016

Commit squashed, thanks you guys.

fjy added a commit that referenced this pull request Jan 5, 2016
Support avro ingestion for realtime & hadoop batch indexing
@fjy fjy merged commit d413808 into apache:master Jan 5, 2016
@fjy fjy mentioned this pull request Feb 5, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants