-
Notifications
You must be signed in to change notification settings - Fork 7
/
async_streaming.py
168 lines (130 loc) · 5.25 KB
/
async_streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""
About
=====
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
dialect, and exercise a few basic examples using the low-level table API, in
asynchronous mode.
Specific to the asynchronous mode of SQLAlchemy is the streaming of results:
> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()`
> method that returns an `AsyncResult` object. This result object uses a server-side cursor
> and provides an async/await API, such as an async iterator.
>
> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
Both the PostgreSQL drivers `asyncpg` and `psycopg` can be used.
The corresponding SQLAlchemy dialect identifiers are::
# PostgreSQL protocol on port 5432, using `asyncpg`
crate+asyncpg://crate@localhost:5432/doc
# PostgreSQL protocol on port 5432, using `psycopg`
crate+psycopg://crate@localhost:5432/doc
Synopsis
========
::
# Run CrateDB
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
# Use PostgreSQL protocol, with `asyncpg`
python async_streaming.py asyncpg
# Use PostgreSQL protocol, with asynchronous support of `psycopg`
python async_streaming.py psycopg
# Use with both variants
python async_streaming.py asyncpg psycopg
"""
import asyncio
import sys
import typing as t
from functools import lru_cache
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine
metadata = sa.MetaData()
table = sa.Table(
"t1",
metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
sa.Column("name", sa.String),
)
class AsynchronousTableStreamingExample:
"""
Demonstrate reading streamed results when using the CrateDB SQLAlchemy
dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers.
- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
- https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html
"""
def __init__(self, dsn: str):
self.dsn = dsn
@property
@lru_cache
def engine(self):
"""
Provide an SQLAlchemy engine object.
"""
return create_async_engine(self.dsn, echo=True)
async def run(self):
"""
Run the whole recipe.
"""
await self.create_and_insert()
await self.read_buffered()
await self.read_streaming()
async def create_and_insert(self):
"""
Create table schema, completely dropping it upfront, and insert a few records.
"""
# conn is an instance of AsyncConnection
async with self.engine.begin() as conn:
# to support SQLAlchemy DDL methods as well as legacy functions, the
# AsyncConnection.run_sync() awaitable method will pass a "sync"
# version of the AsyncConnection object to any synchronous method,
# where synchronous IO calls will be transparently translated for
# await.
await conn.run_sync(metadata.drop_all, checkfirst=True)
await conn.run_sync(metadata.create_all)
# for normal statement execution, a traditional "await execute()"
# pattern is used.
await conn.execute(
table.insert(),
[{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}],
)
# CrateDB specifics to flush/synchronize the write operation.
await conn.execute(sa.text("REFRESH TABLE t1;"))
async def read_buffered(self):
"""
Read data from the database, in buffered mode.
"""
async with self.engine.connect() as conn:
# the default result object is the
# sqlalchemy.engine.Result object
result = await conn.execute(table.select())
# the results are buffered so no await call is necessary
# for this case.
print(result.fetchall())
async def read_streaming(self):
"""
Read data from the database, in streaming mode.
"""
async with self.engine.connect() as conn:
# for a streaming result that buffers only segments of the
# result at time, the AsyncConnection.stream() method is used.
# this returns a sqlalchemy.ext.asyncio.AsyncResult object.
async_result = await conn.stream(table.select())
# this object supports async iteration and awaitable
# versions of methods like .all(), fetchmany(), etc.
async for row in async_result:
print(row)
async def run_example(dsn: str):
example = AsynchronousTableStreamingExample(dsn)
# Run a basic conversation.
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
await example.run()
def run_drivers(drivers: t.List[str]):
for driver in drivers:
if driver == "asyncpg":
dsn = "crate+asyncpg://crate@localhost:5432/doc"
elif driver == "psycopg":
dsn = "crate+psycopg://crate@localhost:5432/doc"
else:
raise ValueError(f"Unknown driver: {driver}")
asyncio.run(run_example(dsn))
if __name__ == "__main__":
drivers = sys.argv[1:]
if not drivers:
raise ValueError("Please select driver")
run_drivers(drivers)