Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python Client] Python client support using custom Avro schema definition #12516

Merged

Conversation

gaoran10
Copy link
Contributor

@gaoran10 gaoran10 commented Oct 28, 2021

Motivation

Currently, the Python client didn't support using schema definition to generate AvroSchema, so users couldn't use the schema definition file in the Python client.

Modifications

Add a new init-param schema_definition for AvroSchema to support initializing the AvroSchema by an Avro schema definition.

class AvroSchema(Schema):
    def __init__(self, record_cls, schema_definition=None):
        if record_cls is None and schema_definition is None:
            raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")

        if record_cls is not None:
            self._schema = record_cls.schema()
        else:
            self._schema = schema_definition
        super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')

How to use

Assume that there is a company Avro schema definition file company.avsc like this.

{
    "doc": "this is doc",
    "namespace": "example.avro",
    "type": "record",
    "name": "Company",
    "fields": [
        {"name": "name", "type": ["null", "string"]},
        {"name": "address", "type": ["null", "string"]},
        {"name": "employees", "type": ["null", {"type": "array", "items": {
            "type": "record",
            "name": "Employee",
            "fields": [
                {"name": "name", "type": ["null", "string"]},
                {"name": "age", "type": ["null", "int"]}
            ]
        }}]},
        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
    ]
}

Users could load schema from file by avro.schema or fastavro.schema

refer to load_schema or Avro Schema

schema_definition = load_schema("examples/company.avsc")
# schema_definition = avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()
avro_schema = AvroSchema(None, schema_definition=schema_definition)

producer = client.create_producer(
    topic=topic,
    schema=avro_schema)
consumer = client.subscribe(topic, 'test', schema=avro_schema)

company = {
    "name": "company-name" + str(i),
    "address": 'xxx road xxx street ' + str(i),
    "employees": [
        {"name": "user" + str(i), "age": 20 + i},
        {"name": "user" + str(i), "age": 30 + i},
        {"name": "user" + str(i), "age": 35 + i},
    ],
    "labels": {
        "industry": "software" + str(i),
        "scale": ">100",
        "funds": "1000000.0"
    }
}
producer.send(company)

msg = consumer.receive()
# Users could get a dict object by `value()` method.
msg.value()

Verifying this change

Add new tests to verify encode and decode using AvroSchema generated by the Avro schema definition.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

Check the box below and label this PR (if you have committer privilege).

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@github-actions
Copy link

@gaoran10:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@codelipenghui codelipenghui merged commit 85575f4 into apache:master Oct 30, 2021
@merlimat merlimat added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Oct 30, 2021
merlimat pushed a commit that referenced this pull request Oct 30, 2021
…tion (#12516)

### Motivation

Currently, the Python client didn't support using schema definition to generate `AvroSchema`, so users couldn't use the schema definition file in the Python client.

### Modifications

Add a new init-param `schema_definition` for `AvroSchema`  to support initializing the `AvroSchema` by an Avro schema definition.

```
class AvroSchema(Schema):
    def __init__(self, record_cls, schema_definition=None):
        if record_cls is None and schema_definition is None:
            raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")

        if record_cls is not None:
            self._schema = record_cls.schema()
        else:
            self._schema = schema_definition
        super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
```


### How to use

Assume that there is a company Avro schema definition file `company.avsc` like this.
```
{
    "doc": "this is doc",
    "namespace": "example.avro",
    "type": "record",
    "name": "Company",
    "fields": [
        {"name": "name", "type": ["null", "string"]},
        {"name": "address", "type": ["null", "string"]},
        {"name": "employees", "type": ["null", {"type": "array", "items": {
            "type": "record",
            "name": "Employee",
            "fields": [
                {"name": "name", "type": ["null", "string"]},
                {"name": "age", "type": ["null", "int"]}
            ]
        }}]},
        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
    ]
}
```

Users could load schema from file by `avro.schema` or `fastavro.schema`
> refer to [load_schema](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema) or [Avro Schema](http://avro.apache.org/docs/current/gettingstartedpython.html)
```
schema_definition = load_schema("examples/company.avsc")
# schema_definition = avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()
avro_schema = AvroSchema(None, schema_definition=schema_definition)

producer = client.create_producer(
    topic=topic,
    schema=avro_schema)
consumer = client.subscribe(topic, 'test', schema=avro_schema)

company = {
    "name": "company-name" + str(i),
    "address": 'xxx road xxx street ' + str(i),
    "employees": [
        {"name": "user" + str(i), "age": 20 + i},
        {"name": "user" + str(i), "age": 30 + i},
        {"name": "user" + str(i), "age": 35 + i},
    ],
    "labels": {
        "industry": "software" + str(i),
        "scale": ">100",
        "funds": "1000000.0"
    }
}
producer.send(company)

msg = consumer.receive()
# Users could get a dict object by `value()` method.
msg.value()
```
@gaoran10 gaoran10 deleted the gaoran/python-custom-avro-schema branch November 1, 2021 00:29
@Anonymitaet Anonymitaet added doc-complete Your PR changes impact docs and the related docs have been already added. and removed doc-label-missing labels Nov 24, 2021
eolivelli pushed a commit to eolivelli/pulsar that referenced this pull request Nov 29, 2021
…tion (apache#12516)

### Motivation

Currently, the Python client didn't support using schema definition to generate `AvroSchema`, so users couldn't use the schema definition file in the Python client.

### Modifications

Add a new init-param `schema_definition` for `AvroSchema`  to support initializing the `AvroSchema` by an Avro schema definition.

```
class AvroSchema(Schema):
    def __init__(self, record_cls, schema_definition=None):
        if record_cls is None and schema_definition is None:
            raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")

        if record_cls is not None:
            self._schema = record_cls.schema()
        else:
            self._schema = schema_definition
        super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
```


### How to use

Assume that there is a company Avro schema definition file `company.avsc` like this.
```
{
    "doc": "this is doc",
    "namespace": "example.avro",
    "type": "record",
    "name": "Company",
    "fields": [
        {"name": "name", "type": ["null", "string"]},
        {"name": "address", "type": ["null", "string"]},
        {"name": "employees", "type": ["null", {"type": "array", "items": {
            "type": "record",
            "name": "Employee",
            "fields": [
                {"name": "name", "type": ["null", "string"]},
                {"name": "age", "type": ["null", "int"]}
            ]
        }}]},
        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
    ]
}
```

Users could load schema from file by `avro.schema` or `fastavro.schema`
> refer to [load_schema](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema) or [Avro Schema](http://avro.apache.org/docs/current/gettingstartedpython.html)
```
schema_definition = load_schema("examples/company.avsc")
# schema_definition = avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()
avro_schema = AvroSchema(None, schema_definition=schema_definition)

producer = client.create_producer(
    topic=topic,
    schema=avro_schema)
consumer = client.subscribe(topic, 'test', schema=avro_schema)

company = {
    "name": "company-name" + str(i),
    "address": 'xxx road xxx street ' + str(i),
    "employees": [
        {"name": "user" + str(i), "age": 20 + i},
        {"name": "user" + str(i), "age": 30 + i},
        {"name": "user" + str(i), "age": 35 + i},
    ],
    "labels": {
        "industry": "software" + str(i),
        "scale": ">100",
        "funds": "1000000.0"
    }
}
producer.send(company)

msg = consumer.receive()
# Users could get a dict object by `value()` method.
msg.value()
```
codelipenghui pushed a commit that referenced this pull request Dec 20, 2021
…tion (#12516)

### Motivation

Currently, the Python client didn't support using schema definition to generate `AvroSchema`, so users couldn't use the schema definition file in the Python client.

### Modifications

Add a new init-param `schema_definition` for `AvroSchema`  to support initializing the `AvroSchema` by an Avro schema definition.

```
class AvroSchema(Schema):
    def __init__(self, record_cls, schema_definition=None):
        if record_cls is None and schema_definition is None:
            raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")

        if record_cls is not None:
            self._schema = record_cls.schema()
        else:
            self._schema = schema_definition
        super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
```

### How to use

Assume that there is a company Avro schema definition file `company.avsc` like this.
```
{
    "doc": "this is doc",
    "namespace": "example.avro",
    "type": "record",
    "name": "Company",
    "fields": [
        {"name": "name", "type": ["null", "string"]},
        {"name": "address", "type": ["null", "string"]},
        {"name": "employees", "type": ["null", {"type": "array", "items": {
            "type": "record",
            "name": "Employee",
            "fields": [
                {"name": "name", "type": ["null", "string"]},
                {"name": "age", "type": ["null", "int"]}
            ]
        }}]},
        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
    ]
}
```

Users could load schema from file by `avro.schema` or `fastavro.schema`
> refer to [load_schema](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema) or [Avro Schema](http://avro.apache.org/docs/current/gettingstartedpython.html)
```
schema_definition = load_schema("examples/company.avsc")
# schema_definition = avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()
avro_schema = AvroSchema(None, schema_definition=schema_definition)

producer = client.create_producer(
    topic=topic,
    schema=avro_schema)
consumer = client.subscribe(topic, 'test', schema=avro_schema)

company = {
    "name": "company-name" + str(i),
    "address": 'xxx road xxx street ' + str(i),
    "employees": [
        {"name": "user" + str(i), "age": 20 + i},
        {"name": "user" + str(i), "age": 30 + i},
        {"name": "user" + str(i), "age": 35 + i},
    ],
    "labels": {
        "industry": "software" + str(i),
        "scale": ">100",
        "funds": "1000000.0"
    }
}
producer.send(company)

msg = consumer.receive()
# Users could get a dict object by `value()` method.
msg.value()
```

(cherry picked from commit 85575f4)
@codelipenghui codelipenghui added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Dec 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life doc-complete Your PR changes impact docs and the related docs have been already added. release/2.8.2 release/2.9.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants