Skip to content

Commit

Permalink
[Doc] add explanations for Avro schema in Python client (#12914)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anonymitaet committed Nov 22, 2021
1 parent 32b697d commit 093c230
Showing 1 changed file with 192 additions and 55 deletions.
247 changes: 192 additions & 55 deletions site2/docs/client-libraries-python.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,61 +169,6 @@ client.close()

## Schema

### Declare and validate schema

You can declare a schema by passing a class that inherits
from `pulsar.schema.Record` and defines the fields as
class variables. For example:

```python
from pulsar.schema import *

class Example(Record):
a = String()
b = Integer()
c = Boolean()
```

With this simple schema definition, you can create producers, consumers and readers instances that refer to that.

```python
producer = client.create_producer(
topic='my-topic',
schema=AvroSchema(Example) )

producer.send(Example(a='Hello', b=1))
```

After creating the producer, the Pulsar broker validates that the existing topic schema is indeed of "Avro" type and that the format is compatible with the schema definition of the `Example` class.

If there is a mismatch, an exception occurs in the producer creation.

Once a producer is created with a certain schema definition,
it will only accept objects that are instances of the declared
schema class.

Similarly, for a consumer/reader, the consumer will return an
object, instance of the schema record class, rather than the raw
bytes:

```python
consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )

while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
```

### Supported schema types

You can use different builtin schema types in Pulsar. All the definitions are in the `pulsar.schema` package.
Expand Down Expand Up @@ -327,6 +272,198 @@ The schema definition is like this.
}
```

### Declare and validate schema

You can send messages using `BytesSchema`, `StringSchema`, `AvroSchema`, and `JsonSchema`.

Before the producer is created, the Pulsar broker validates that the existing topic schema is the correct type and that the format is compatible with the schema definition of a class. If the format of the topic schema is incompatible with the schema definition, an exception occurs in the producer creation.

Once a producer is created with a certain schema definition, it only accepts objects that are instances of the declared schema class.

Similarly, for a consumer or reader, the consumer returns an object (which is an instance of the schema record class) rather than raw bytes.

**Example**

```python
consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )

while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
```

<!--DOCUSAURUS_CODE_TABS-->

<!--BytesSchema-->

You can send byte data using a `BytesSchema`.

**Example**

```python
producer = client.create_producer(
'bytes-schema-topic',
schema=BytesSchema())
producer.send(b"Hello")

consumer = client.subscribe(
'bytes-schema-topic',
'sub',
schema=BytesSchema())
msg = consumer.receive()
data = msg.value()
```

<!--StringSchema-->

You can send string data using a `StringSchema`.

**Example**

```python
producer = client.create_producer(
'string-schema-topic',
schema=StringSchema())
producer.send("Hello")

consumer = client.subscribe(
'string-schema-topic',
'sub',
schema=StringSchema())
msg = consumer.receive()
str = msg.value()
```

<!--AvroSchema-->

You can declare an `AvroSchema` using one of the following methods.

#### Method 1: Record

You can declare an `AvroSchema` by passing a class that inherits
from `pulsar.schema.Record` and defines the fields as
class variables.

**Example**

```python
class Example(Record):
a = Integer()
b = Integer()

producer = client.create_producer(
'avro-schema-topic',
schema=AvroSchema(Example))
r = Example(a=1, b=2)
producer.send(r)

consumer = client.subscribe(
'avro-schema-topic',
'sub',
schema=AvroSchema(Example))
msg = consumer.receive()
e = msg.value()
```

#### Method 2: JSON definition

You can declare an `AvroSchema` using JSON. In this case, Avro schemas are defined using JSON.

**Example**

Below is an `AvroSchema` defined using a JSON file (_company.avsc_).

```json
{
"doc": "this is doc",
"namespace": "example.avro",
"type": "record",
"name": "Company",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "address", "type": ["null", "string"]},
{"name": "employees", "type": ["null", {"type": "array", "items": {
"type": "record",
"name": "Employee",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "int"]}
]
}}]},
{"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
]
}
```

You can load a schema definition from file by using [`avro.schema`]((http://avro.apache.org/docs/current/gettingstartedpython.html) or [`fastavro.schema`](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema).

If you use the "JSON definition" method to declare an `AvroSchema`, pay attention to the following points:

- You need to use [Python dict](https://developers.google.com/edu/python/dict-files) to produce and consume messages, which is different from using the "Record" way.

- When generating an `AvroSchema` object, set `_record_cls` parameter to `None`.

**Example**

```
from fastavro.schema import load_schema
from pulsar.schema import *
schema_definition = load_schema("examples/company.avsc")
avro_schema = AvroSchema(None, schema_definition=schema_definition)
producer = client.create_producer(
topic=topic,
schema=avro_schema)
consumer = client.subscribe(topic, 'test', schema=avro_schema)
company = {
"name": "company-name" + str(i),
"address": 'xxx road xxx street ' + str(i),
"employees": [
{"name": "user" + str(i), "age": 20 + i},
{"name": "user" + str(i), "age": 30 + i},
{"name": "user" + str(i), "age": 35 + i},
],
"labels": {
"industry": "software" + str(i),
"scale": ">100",
"funds": "1000000.0"
}
}
producer.send(company)
msg = consumer.receive()
# Users could get a dict object by `value()` method.
msg.value()
```

<!--JsonSchema-->

#### Record

You can declare a `JsonSchema` by passing a class that inherits
from `pulsar.schema.Record` and defines the fields as class variables. This is similar to using `AvroSchema`. The only difference is to use `JsonSchema` instead of `AvroSchema` when defining schema type as shown below. For how to use `AvroSchema` via record, see [here](client-libraries-python.md#method-1-record).

```
producer = client.create_producer(
'avro-schema-topic',
schema=JsonSchema(Example))
consumer = client.subscribe(
'avro-schema-topic',
'sub',
schema=JsonSchema(Example))
```

<!--END_DOCUSAURUS_CODE_TABS-->

## End-to-end encryption

[End-to-end encryption](https://pulsar.apache.org/docs/en/next/cookbooks-encryption/#docsNav) allows applications to encrypt messages at producers and decrypt messages at consumers.
Expand Down

0 comments on commit 093c230

Please sign in to comment.