Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KREST-10286 Example python code for produce streaming V3 API #1163

Merged

Conversation

msn-tldr
Copy link
Member

@msn-tldr msn-tldr commented May 15, 2023

This will use the HTTP connection in a fully duplex mode.

Following is the output of streaming_produce_v3_main.py, writing 5 records. It demonstrates the connection is being used in full-duplex mode, i.e. record-receipt is read for 1st record even before then 2nd record is written to the connection.

Establishing connection with headers: {'Content-Type': 'application/json'}
Waiting for http-connection to be established.
Sleeping for 1 second, before producing record #1
Connection established, will read responses.
Sleeping for 1 second, before producing record #2
Writing a record #1 with json b'{"value": {"type": "STRING", "data": "value_0"}, "key": {"type": "STRING", "data": "key_0"}}'
Http-stream has status-code 200
Receipt for record #1 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":468,\"timestamp\":\"2023-05-16T11:02:08.322Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
Sleeping for 1 second, before producing record #3
Writing a record #2 with json b'{"value": {"type": "STRING", "data": "value_1"}, "key": {"type": "STRING", "data": "key_1"}}'
Receipt for record #2 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":469,\"timestamp\":\"2023-05-16T11:02:09.323Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
Sleeping for 1 second, before producing record #4
Writing a record #3 with json b'{"value": {"type": "STRING", "data": "value_2"}, "key": {"type": "STRING", "data": "key_2"}}'
Receipt for record #3 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":470,\"timestamp\":\"2023-05-16T11:02:10.334Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
Sleeping for 1 second, before producing record #5
Writing a record #4 with json b'{"value": {"type": "STRING", "data": "value_3"}, "key": {"type": "STRING", "data": "key_3"}}'
Writing a record #5 with json b'{"value": {"type": "STRING", "data": "value_4"}, "key": {"type": "STRING", "data": "key_4"}}'
Receipt for record #4 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":471,\"timestamp\":\"2023-05-16T11:02:11.338Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
Receipt for record #5 is ******
"{\"error_code\":200,\"cluster_id\":\"EV-5o5e3SViiGP0hpgKn1g\",\"topic_name\":\"topic_1\",\"partition_id\":0,\"offset\":472,\"timestamp\":\"2023-05-16T11:02:12.337Z\",\"key\":{\"type\":\"STRING\",\"size\":5},\"value\":{\"type\":\"STRING\",\"size\":7}}"
No more records to produce, exiting __record_generator
Done producing-records, exiting __produce_records

Vs the this example(&output) demonstrate that idiomatic http request-response is half-duplex, i.e all records must be written to the wire, and then only record-receipts can be read for all records.
#1164 (comment)

@msn-tldr msn-tldr marked this pull request as ready for review May 15, 2023 18:02
@msn-tldr msn-tldr requested a review from a team as a code owner May 15, 2023 18:02
@trnguyencflt
Copy link
Member

@msn-tldr what is the motivation of copying the code from http_parser? Why can't we use the library directly?

Could you also share the original problem (with example code) that we encounter that requires us to go to low level handling of socket?

@msn-tldr
Copy link
Member Author

msn-tldr commented May 17, 2023

@trnguyencflt Thanks for suggesting http.client stdlib package. I had looked at the popular python http-libs like urllib3, aiohttp. Both had used their own version of HttpResponse with their own custom-parsing logic of chunks, instead of using http.client. This led me to be believe python's stdlib doesn't expose "public" modules for low-level http-parsing. Hence I looked into other open-source low-level http-parsers. But I am pleasantly surprised that it does! and this significantly reduced the LOC of the example.

Now run the main driver script:
`python3 streaming_produce_v3_main.py`

**This scipt will work with python-version >= 3.0**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo scipt -> script

Copy link
Member

@trnguyencflt trnguyencflt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a minor comment on typo

@msn-tldr
Copy link
Member Author

The CI job failure is due to missing downstream dep, which is unrelated

 [ERROR] Failed to execute goal on project control-center: Could not resolve dependencies for project io.confluent.controlcenter:control-center:jar:7.5.0-99999: Could not find artifact io.confluent:ce-kafka-rest-extensions:jar:7.5.0-99999 in confluent-codeartifact-central (https://confluent-519856050701.dp.confluent.io/maven/maven-public/) -> [Help 1]

@msn-tldr msn-tldr merged commit 789c496 into confluentinc:master May 17, 2023
2 of 3 checks passed
@msn-tldr msn-tldr deleted the python_code_produce_v3_streaming branch May 17, 2023 12:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants