Skip to content

Helo 快速上手指南

at7h edited this page Oct 6, 2020 · 21 revisions

Helo 是一个简单的小型低级别异步(asyncio) Python ORM。它非常直白,非常容易上手使用。

Helo 可以在你的异步应用中帮助你轻松的构建出富有表达力的常用 SQL 语句,你只需以友好的对象化 API 来操作数据,而不用关心 SQL 语句编写、数据处理等细节。

  • 支持的版本: Python 3.7+
  • 目前仅支持 MySQL, 版本 5.7+
  • 与 web 框架的结合,目前支持:
  • 目前不支持表关系操作

请注意:使用异步 ORM 并不完全意味着可以使你的应用变快,而且有可能会使你的应用变得复杂。继续之前,你可以先阅读下 SQLAlchemy 的作者 Mike Bayer 的 这篇博客文章

本篇上手指南主要介绍以下方面:

开始使用

使用 helo, 首先你需要引入 helo 并使用 helo.G 实例化一个全局变量,假定称其为 db:

当然,提前是你已经获取安装了 helo。

import helo

db = helo.G()

db 是一个全局单例对象,下面的介绍中我们将多次使用到它。

模型声明

使用 helo 声明模型很简单,只需从 helo.Model 继承即可,下面给出几个简单的模型声明的例子:

class Person(helo.Model):
    id = helo.BigAuto()
    name = helo.VarChar(length=45, null=False)

class User(Person):
    email = helo.Email(default='')
    password = helo.VarChar(length=100, null=False)
    create_at = helo.Timestamp(default=helo.ON_CREATE)

    class Meta:
        indexes = [helo.K('idx_ep', ['email', 'password'])]

class Employee(Person):
    department = helo.Smallint()
    salary = helo.Float(default=0)

class Post(helo.Model):
    id = helo.Auto(comment='auto increment pk')
    title = helo.VarChar(length=100)
    content = helo.Text(encoding=helo.ENCODING.UTF8MB4)
    author = helo.Int(default=0)
    create_at = helo.Timestamp(default=helo.ON_CREATE)
    update_at = helo.Timestamp(default=helo.ON_UPDATE)

    class Meta:
        indexes = [
            helo.K('idx_title', 'title'),
            helo.K('idx_author', 'author'),
        ]

内部类 Meta 可用于指定 db_name, table_name, engine, indexes, charset, comment 等 Table 的元属性。

class Meta:
    db = 'db_name'
    name = 'table_name'
    engine = helo.ENGINE.innodb
    charset = helo.ENCODING.utf8mb4
    indexes = []
    comment = 'table comment'

其中 table_name 默认为 model 类名的 snake_case 风格名称,engine 默认为 InnoDBcharset 默认为 utf8mb4

建立连接

前面的模型声明只是定义了模型与真实表结构的映射关系,并非实际在数据库中创建了这些表结构。为此,我们需要先使用 helo 来与数据库建立连接,这里我们创建一个 MySQL 的数据库实例:

>>> await db.bind('mysql://user:pwd@localhost:3306/helo')

或者传递配置参数:

>>> await db.bind(user='user', password='pwd', db='helo')

如果你设置了环境变量 HELO_DATABASE_URL,那么你不用再传递 url:

>>> await db.bind()

如果你想自定义 KEY 的值,可以在初始化 db 时通过 env_key 参数来设置

 db = helo.G(env_key="YOUR_ENV_KEY")

bind 实际上为我们创建了一个数据库连接池:

>>> db.state
{'minsize': 1, 'maxsize': 15, 'size': 1, 'freesize': 1}

bind 给我们提供了很多关键字参数来允许我们自定义设置,详见 helo.db.Pool 类。例如:

>>> await db.bind('mysql://user:pwd@127.0.0.1:3306/db', maxsize=10, connect_timeout=15)

已经创建的连接池对象将是一个全局的单例对象,也就是说如果你已经为你的应用程序调用 bind 绑定了数据库,在此之前如果你没有使用 unbind 进行解绑,你将不能再继续使用 bind 再次绑定另一个数据库,否则你将会得到一个 helo.err.DuplicateBinding 错误。

如果你需要显式地断开与数据库的连接,关闭连接池,可以使用 unbind:

>>> await db.unbind()

在小型的脚本中你可以使用 db.binder 来自动处理上下文:

>>> async with db.binder():
...     pass

数据操作

与数据库建立了连接之后,我们需要在数据库创建我们的表,以便于接下来进行数据的操作。

在真正的应用中,数据库表的设计创建与维护是单独分开,一般由专门的 DBA 来管理,当然 helo 也提供了基础的 DDL 支持。

下面我们在数据库创建它们:

>>> await db.create_tables([User, Employee, Post])

在应用项目中,我们通常将所有的模型声明单独放在一个模块中,在此假设模块名为 models,则可以使用 create_all 为模块中所有的 model 创建表:

>>> from your.application import models
>>> await db.create_all(models)

当然你也可以使用 Modelcreate 方法来单独创建:

>>> await User.create()

Helo 提供了基本的操作数据库中数据的能力,支持丰富的可组合的逻辑运算表达,你可以轻松的完成富有表达力的 queries,以实现通过对象化的 API 来构建你想要的 SQL 语句(DML 和 DQL)的能力。

下面示例基本的增删改查的操作。

使用 helo 你可以有多种插入数据的方式选择,我们从创建一个 User 对象开始:

user = User(name='at7h', password='1111')
print(user.name, user.password)  # at7h, 1111
# Now user.id is None, because it is not saved to the database
assert user.id is None

此时的 user 仅是内存中的一个对象,你需要通过 save 方法持久化到数据库中:

user_id = await user.save()
assert user_id == user.id == 1

我们可以修改它,并保存更改:

user.name = 'at8h'
user.email = 'g@at7h.com'
user_id = await user.save()
assert user_id == user.id == 1

请注意: 目前 save 操作是通过 MySQL REPLACE 语句实现,其根据对象的 PRIMARY KEY 属性或 UNIQUE KEY 属性的值来决定是否插入新行,请谨慎使用!该实现计划在后续版本中优化。

推荐使用下面几种方式来插入数据。

方法 add, madd 可以用来添加单条或多条数据,它们是 insert, minsert 的简单快捷方式:

user_id = await User.add(name='bobo', password='2222')
# Or: user_id = await User.add({'name': 'bobo', 'password': '2222'})
print(user_id)  # 2

users = [{'name': 'mingz', 'password': '3333'},
         {'name': 'xy69z', 'password': '4444'}]
# Or using user object list:
# users = [User(name='mingz', password='3333'),
#          User(name='xy69z', password='4444')]
count = await User.madd(users)
print(count)  # 2

方法 insertminsert 是最正确的数据插入姿势,它们可以胜任多种数据形式,它们将返回一个 Insert 对象,要执行此操作,请不要忘了写 do() 哦 😉:

ret = await User.insert(name='poper', password='5555').do()
# Or: ret = await User.insert({'name': 'bingo', 'password': '8888'}).do()
assert ret.affected == 1
assert ret.last_id == 5
print(ret)  # (1, 5)

# Inserting multiple
employees = [
    {'name': 'at7h', 'department': 1},
    {'name': 'bobo', 'department': 2},
]
ret = await Employee.minsert(employees).do()
print(ret)  # (2, 1)

# Specify row tuples columns the tuple values correspond to
posts = [
    ('post1', 1),
    ('post2', 2),
]
ret = await Post.minsert(
    posts, columns=[Post.title, Post.author]
).do()
print(ret)  # (2, 1)

使用 insert_from 支持表间数据填充:

select = User.select(User.name).where(User.id.in_([3, 4, 5]))
ret = await Employee.insert_from(select, [Employee.name]).do()
print(ret)  # (3, 3)

Helo 也有多种获取数据的方式选择,如简单获取单条数据可以使用 get 方法:

# By id
user = await User.get(1)
assert isinstance(user, User)
print(user.id, user.name, user.password)  # 1, at7h, 1111

# Or by query
assert (await User.get(User.name == user.name)) == user

获取多条数据可以使用 mget 方法:

# By id list
uid_list = [1, 2, 3]
users = await User.mget(uid_list)
print(users.count)  # 3
print(users)  # [<User object at 1>, <User object at 2>, <User object at 3>]
# Specify columns
users = await User.mget(uid_list, columns=[User.id, User.name])
assert users[0].password is None

# Or by query
users = await User.mget((User.id < 2) | (User.name == 'mingz'))
print(users)  # [<User object at 1>, <User object at 3>]

同样的,方法 getmget 也是 select 的简单快捷版本,其只适合于已知主键或查询条件比较简单的场景,更多的时候我们还是需要使用 select

使用 select 方法可以帮助你以对象化 API 的方式轻松的构造你的 DQL,其支持丰富的可组合的逻辑条件表达式。

users = await User.select().order_by(
    User.id.desc()
).limit(3).offset(2).all()
print(users) # [<User object at 5>, <User object at 4>, <User object at 3>]

方法 all() 以及下面提到的 get(), first(), rows(), paginate() 等方法类似于上面提到的 do(),都用于驱动执行此次查询,不要忘了哦。

比如我需要知道有没有使用 gmail 邮箱的用户:

is_exist = await User.select().where(
    User.email.endswith('gmail.com')
).exist()
print(is_exist)  # False

比如我想知道 2019 年 7 月以来共新增了多少用户:

user_count = await User.select().where(
    User.create_at > datetime(2019, 7, 1)
).count()
print(user_count)  # 4

再比如我们需要分页的获取今年写了 Python(title) 相关文章的用户:

users = await User.select().where(
    User.id.in_(
        Post.select(Post.author).where(
            Post.update_at > datetime(2019, 1, 1),
            Post.title.contains('Python')
        ).order_by(
            Post.update_at.desc()
        )
    )
).paginate(1, 10)
print(users)  # [<User object at 1>]

再比如我们想知道每个用户都写了多少篇文章:

user_posts = await User.select(
    User.name, helo.F.COUNT(helo.SQL('1')).as_('posts')
).join(
    Post, helo.JOINTYPE.LEFT, on=(User.id == Post.author)
).group_by(
    User.name
).rows(100)
print(user_posts)  # [{'name': 'at7h', 'posts': 1}]

如上所示,我们可以通过 helo.F 来使用 SQL 函数,比如我需要计算出每个月所有雇员薪资的总和:

salary_sum = await Employee.select(
    helo.F.SUM(Employee.salary).as_('salary_sum')
).scalar()
print(salary_sum)  # 30000.0

接下来,让我们尝试对数据库中的数据做一些修改操作。

比如你要更改某一位雇员的薪资 😋 :

ret = await Employee.update(salary=20000).where(
    Employee.name == 'at7h'
).do()
print(ret.affected)  # 1

或者,整体涨工资啦 👏:

ret = await Employee.update(
    salary=Employee.salary + 1000
).where(
    (Employee.department.in_([1, 2])) | (Employee.name == 'at7h')
).do()

最后我们来尝试删除表中的数据。

第一种方式,你可以使用 model 对象的 remove 方法来删除它对应于数据库中这一行的数据:

user = User(name='at7h', password='1111')
await user.save()
user = await User.get(user_id)
print(user.id)  # 1
await user.remove()
user = await User.get(user_id)
print(user)  # None

另一种更为通常的方式是使用 delete 方法:

ret = await Post.delete().where(
    Post.create_at < datetime(2010, 1, 1)
).limit(
    100
).do()

请注意: 永远不要忘记写 where 子句,是不是整个表都不想要了 😟 ?

Replace

另外,helo 支持 MySQL REPLACE 语句,提供了 replacemreplace 两个方法,其用法与 insertminsert 类似。当然,在使用它们之前你需要了解 MySQL REPLACE 语句的工作原理。

Quart 应用

如果你正在使用 quart, 一个最小的应用示例是:

import quart
import helo

app = quart.Quart(__name__)
app.config["HELO_DATABASE_URL"] = "mysql://user:password@127.0.0.1:3306/db"

db = helo.G(app)


@app.route('/api/users')
async def users():
    await User.insert(
        name='at7h', email='g@test.com', password='xxxx'
    ).do()
    user_list = await User.select().all(False)
    return quart.jsonify(user_list)


app.run()

此时你不需要再显示的执行 db.bind,binding 操作将会在你应用的第一个请求之前自动完成。

启动此服务:

$ curl http://127.0.0.1:5000/api/users
[{"email":"g@test.com","id":1,"name":"at7h","password":"xxxx"}]

其他

Model Iteration

Helo 中的 ModelSelect 都支持迭代,helo 会自动帮你处理分页问题,以避免频繁的 IO 操作和过大的数据量获取。

async for post in Post:
    print(post)
# <Post object at 1>
# <Post object at 2>
# <Post object at 3>
# <Post object at 4>

users = User.select().where(User.id < 5).order_by(User.id.desc())
async for user in users:
    print(user)
# <User object at 4>
# <User object at 3>
# <User object at 2>
# <User object at 1>

Row Type

当你使用 select 获取数据时,helo 默认会将行数据包装成为对应的 Model 对象,但是,当你使用了 helo.F 函数和 join 时可能会放弃加载到 Model 对象而使用原始的 helo.adict 字典。当然,你也可以通过 wrap 参数来显式指定使用字典类型的 row type。在大型项目中,这可能会显著提高速度并减少内存的使用。

users = await User.select(User.id, User.name).limit(2).all(wrap=False)
print(users)  # [{'id': 1, 'name': 'at7h'}, {'id': 2, 'name': 'bobo'}]
assert users[0].name == 'at7h'

employee = await Employee.select().order_by(
    Employee.salary.desc()
).first(False)
print(employee)
# {'id': 1, 'name': 'at7h', 'department': 1, 'salary': 15000.0}

SQL

执行 SQL

有时你可能迫不得已想要执行一些原始的 SQL 语句,那么你可以使用 db.raw 函数来实现。

await db.raw("SELECT * FROM `user` WHERE `id` < %s;", params=[10])

查看 SQL

为方便调试,有时候需要查看执行的 SQL,在 helo 中,你可以:

第一种方式,在初始化 db 时,设置 debugTrue 即可,这样你将在日志输出中看到执行过的所有 SQL 语句。

db = helo.G(debug=True)

第二种方式主要是方便学习和调试,你可以使用 repr 函数(或在 REPR 环境中)和 str 函数来查看 Insert, Update, Select 等对象,我们拿上面的示例来举个例子:

>>> q1 = Employee.update(
...     salary=Employee.salary + 1000
... ).where(
...    (Employee.department.in_([1, 2])) | (Employee.name == 'at7h')
... )
>>> q1
Query(UPDATE `employee` SET `salary` = (`salary` + %s) WHERE ((`department` IN %s) OR (`name` = %s)); % ((1000.0,), (1, 2), 'at7h'))

>>> q2 = User.select(
...     User.name, helo.F.COUNT(helo.SQL('1')).as_('posts')
... ).join(
...     Post, helo.JOINTYPE.LEFT, on=(User.id == Post.author)
... ).group_by(
...     User.name
... )
>>> print(q2)
SELECT `t1`.`name`, COUNT(1) AS `posts` FROM `user` AS `t1` LEFT JOIN `post` AS `t2` ON (`t1`.`id` = `t2`.`author`) GROUP BY `t1`.`name`; % ()

👋

至此,本篇上手指南就到此结束,感兴趣的同学可以关注后续的使用文档。

十分欢迎大家的使用,问题可随时与我交流,欢迎以任何形式提出任何问题或建议。

感谢 🤝