# 使用 PyMySQL 操作 MySQL

## 安装

[PyMySQL](https://github.com/PyMySQL/PyMySQL) 是一个纯 Python 实现的 MySQL 客户端操作库，支持事务、存储过程、批量执行等。

PyMySQL 遵循 Python 数据库 API v2.0 规范，并包含了 pure-Python MySQL 客户端库。

In [None]:
!pip install PyMySQL

## 基本命令

### 数据库连接

In [1]:
import pymysql

try:
    conn = pymysql.connect(host='localhost',
                           port=3306,
                           user='root',
                           password='root',
                           db='myemployees',
                           charset='utf8')
except pymysql.Error as e:
    print("Error %d: %s" % (e.args[0],e.args[1]))

更多参数及其含义请见：https://pymysql.readthedocs.io/en/latest/modules/connections.html

### 数据库操作

- cursor.execute(sql, args) 执行单条 SQL

In [2]:
# 使用 cursor() 方法获取操作游标
cursor = conn.cursor()

# 使用 execute 方法执行 SQL 语句
cursor.execute('SHOW DATABASES;')

# 使用 fetchall()、fetchone() 方法获取数据
rest = cursor.fetchall()

print(rest)

(('demo',), ('information_schema',), ('myemployees',), ('mysql',), ('performance_schema',), ('sys',))


- executemany(sql, args) 批量执行 SQL

## 操作案例

### 查询数据-DQL(Data Query Language)

- **数据查询 - 核心命令**

```mysql
cursor.execute('SELECT * FROM `users`')  # 执行查询 SQL
cursor.fetchone()  # 获取单条数据
cursor.fetchmany(3) # 获取前N条数据
cursor.fetchall()  # 获取所有数据
```

- **数据查询 - SQL语句**

```mysql
SELECT 
    selection_list /*要查询的列名称*/
FROM 
    table_list /*要查询的表名称*/
WHERE 
    row_condition /*行条件*/
GROUP BY 
    grouping_columns /*对结果分组*/
HAVING 
    group_row_condition /*分组后的行条件*/
ORDER BY
    sorting_columns /*对结果排序*/
LIMIT 
    offset_start, row_count /*结果限定*/

```

In [3]:
import pymysql

class MysqlSearch(object):
    
    def __init__(self):
        self.get_conn()
    
    def get_conn(self):
        try:
            self.conn = pymysql.connect(
                host='localhost',
                port=3306,
                user='root',
                password='root',
                db='myemployees',
                charset='utf8'
            )
        except pymysql.Error as e:
            print("Error %d: %s" % (e.args[0],e.args[1]))
            
    def close_conn(self):
        try:
            if self.conn:
                self.conn.close()
        except pymysql.Error as e:
            print("Error %d: %s" % (e.args[0],e.args[1]))
            
    def get_one(self):
        '''
        单条查询：查询单条数据
        '''
        # 准备 SQL
        sql = 'SELECT * FROM `employees` WHERE `salary` > %s ;'
        # 执行 SQL
        cursor = self.conn.cursor()
        try:
            cursor.execute(sql, (15000,))
            rest = dict(zip([k[0] for k in cursor.description], cursor.fetchone()))
        except pymysql.Error as e:
            print("Error %d: %s" % (e.args[0],e.args[1]))
        # 关闭连接
        cursor.close()
        self.close_conn()
        # 返回数据
        return rest
        
    def get_page(self, page, page_size):
        '''
        分页查询：查询第 page 页
        '''
        # 准备 SQL
        offset = (page - 1) * page_size
        sql = 'SELECT * FROM `employees` WHERE `salary` > %s LIMIT %s,%s;'
        # 执行 SQL
        cursor = self.conn.cursor()
        try:
            cursor.execute(sql, (10000, offset, page_size))
            rest = [dict(zip([k[0] for k in cursor.description], row)) 
                    for row in cursor.fetchall()]
        except pymysql.Error as e:
            print("Error %d: %s" % (e.args[0],e.args[1]))
        # 关闭连接
        cursor.close()
        self.close_conn()
        return rest
        
def main():
    obj = MysqlSearch()
    #rest = obj.get_one()
    #print(rest)
    rest = obj.get_page(1, 10)
    for item in rest:
        print(item)
        print('---------')

if __name__ == '__main__':
    main()

{'employee_id': 100, 'first_name': 'Steven', 'last_name': 'K_ing', 'email': 'SKING', 'phone_number': '515.123.4567', 'job_id': 'AD_PRES', 'salary': 24000.0, 'commission_pct': None, 'manager_id': None, 'department_id': 90, 'hiredate': datetime.datetime(1992, 4, 3, 0, 0)}
---------
{'employee_id': 101, 'first_name': 'Neena', 'last_name': 'Kochhar', 'email': 'NKOCHHAR', 'phone_number': '515.123.4568', 'job_id': 'AD_VP', 'salary': 17000.0, 'commission_pct': None, 'manager_id': 100, 'department_id': 90, 'hiredate': datetime.datetime(1992, 4, 3, 0, 0)}
---------
{'employee_id': 102, 'first_name': 'Lex', 'last_name': 'De Haan', 'email': 'LDEHAAN', 'phone_number': '515.123.4569', 'job_id': 'AD_VP', 'salary': 17000.0, 'commission_pct': None, 'manager_id': 100, 'department_id': 90, 'hiredate': datetime.datetime(1992, 4, 3, 0, 0)}
---------
{'employee_id': 108, 'first_name': 'Nancy', 'last_name': 'Greenberg', 'email': 'NGREENBE', 'phone_number': '515.124.4569', 'job_id': 'FI_MGR', 'salary': 12000

### 增删改数据-DML(Data Manipulation Language)

- **事务处理 - 核心命令**

```mysql
connection.begin()    # 开启事务
connection.commit()   # 提交修改
connection.rollback() # 回滚事务
```

- **增删改数据 - SQL命令**

```mysql
# 插入数据
INSERT INTO 表名(列名1,列名2, …) VALUES(值1, 值2) /*所有字符串数据必须使用单引号*/
# 修改数据
UPDATE 表名 SET 列名1=值1, … 列名n=值n [WHERE 条件] /*单表修改语法*/
UPDATE 表1 [inner] john 表2 on 表 SET 列名1=新值1,列名2=新值2 [where 筛选条件] /*多表修改语法*/
# 删除数据
DELETE FROM 表名 [WHERE 条件] /*单表删除语法*/
DELETE FROM 表1 别名1 INNER JOIN 表2 别名2 on 连接条件 [AND 筛选条件] /*多表删除语法*/
```

In [4]:
import pymysql

class MysqlOperate(object):
    
    def __init__(self):
        self.get_conn()
    
    def get_conn(self):
        try:
            self.conn = pymysql.connect(
                host='localhost',
                port=3306,
                user='root',
                password='root',
                db='myemployees',
                charset='utf8'
            )
        except pymysql.Error as e:
            print("Error %d: %s" % (e.args[0],e.args[1]))
            
    def close_conn(self):
        try:
            if self.conn:
                self.conn.close()
        except pymysql.Error as e:
            print("Error %d: %s" % (e.args[0],e.args[1]))
            
    def add_one(self):
        '''
        增加一条数据
        '''
        try:
            # 准备 SQL
            sql = (
                "INSERT INTO `jobs`(`job_id`,`job_title`,`min_salary`,`max_salary`) VALUE"
                "(%s, %s, %s, %s);"
            )
            # 执行 SQL，提交数据到数据库
            cursor = self.conn.cursor()
            cursor.execute(sql, ('hhd','ddd',4444,5555))
            cursor.execute(sql, ('hhf','ddd',4444,5555))
            # 提交事务
            self.conn.commit()
            # 关闭连接
            cursor.close()
        except pymysql.Error as e:
            print("Error %d: %s" % (e.args[0],e.args[1]))
            # self.conn.commit() # 提交错误发生前的命令，错误命令及其之后的命令不提交
            self.conn.rollback() # 有一条命令错误，则全部命令不提交
        self.close_conn()
        
def main():
    obj = MysqlOperate()
    obj.add_one()

if __name__ == '__main__':
    main()

# 使用 SQLAlchemy 操作 MySQL

## ORM 简介

面向对象编程和关系型数据库，都是目前最流行的技术，但是它们的模型是不一样的。

面向对象编程把所有实体看成对象（object），关系型数据库则是采用实体之间的关系（relation）连接数据。很早就有人提出，关系也可以用对象表达，这样的话，就能使用面向对象编程，来操作关系型数据库。

**简单说，ORM 就是通过实例对象的语法，完成关系型数据库的操作的技术，是"对象-关系映射"（Object/Relational Mapping） 的缩写。**

ORM 把数据库映射成对象。

- 数据库的表（table） --> 类（class）
- 记录（record，行数据）--> 对象（object）
- 字段（field）--> 对象的属性（attribute）

## 安装

In [None]:
!pip install sqlalchemy

## 基本命令

[SQLAlchemy 1.3 文档教程](https://docs.sqlalchemy.org/en/13/orm/tutorial.html?highlight=tutorial)

|概念|对应数据库概念|说明|
|:---|:---|:---|
|Engine|连接||
|Session|连接池、事务|由此开始查询|
|Model|表|类定义和表定义类似，类实例本质上是其中一行|
|Colume|列|在各个地方支持运算符操作|
|Query|若干行|可链式操作添加条件|

### 连接数据库

`create_engine()` 函数中的数据库 urls 用于和数据 API 进行交流，根据配置文件的不同调用不同的数据库 API，从而实现对数据库的操作，如：

**数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名**

- MySQL-Python

```md
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
```

- pymysql

```md
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
```

- MySQL-Connector

```md
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
```

更多详见：https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls

In [5]:
from sqlalchemy import create_engine

engine = create_engine('mysql+pymysql://root:root@localhost/myemployees')

- `create_engine()` 会返回一个数据库引擎
- `mysql+mysqldb` 指定使用 MySQL-Python 来连接
- 使用用户名 `root` 和密码 `root` 来连接数据库
- 数据库链接地址此处是 `localhost`，127.0.0.1
- `myemployees` 是数据库名

### 声明表结构

[数据类型声明 - 官方文档](https://docs.sqlalchemy.org/en/13/core/type_basics.html)

In [6]:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

# 声明基类
ModelBase = declarative_base()

# 类定义 == 表定义
class jobs(ModelBase):
    
    __tablename__ = 'jobs'
    
    job_id = Column(String(10), primary_key=True)
    job_title = Column(String(35), nullable=False)
    min_salary = Column(Integer, nullable=False)
    max_salary = Column(Integer, nullable=False)

- **Model** 等同于数据库的一张表
- **Column** 显然就是这张表的一列

## 操作案例

### 增

- session.add()
- session.commit()

In [10]:
import warnings
warnings.filterwarnings("ignore")

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

# 创建连接
engine = create_engine('mysql+pymysql://root:root@localhost:3306/myemployees')


# 类定义 == 表定义

ModelBase = declarative_base()  # 声明基类

class jobs(ModelBase):
    
    __tablename__ = 'jobs'
    
    job_id = Column(String(10), primary_key=True)
    job_title = Column(String(35), nullable=False)
    min_salary = Column(Integer, nullable=False)
    max_salary = Column(Integer, nullable=False)


# 事务 == SQL 操作

Session = sessionmaker(bind=engine)

class OrmSession(object):
    
    def __init__(self):
        self.session = Session()
        
    def add_one(self):
        '''新增一条记录''' 
        jobs_obj = jobs(
            job_id="Hhn", 
            job_title="好人", 
            min_salary=4444, 
            max_salary=5555
        )
        self.session.add(jobs_obj)
        self.session.commit()
        
    def add_more(self):
        '''新增多条记录''' 
        jobs_obj = jobs(
            job_id="Hhn", 
            job_title="好人", 
            min_salary=4444, 
            max_salary=5555
        )
        self.session.add_all([
            jobs(job_id="Hhn", job_title="好人", min_salary=4444, max_salary=5555),
            jobs(job_id="Hhm", job_title="好人", min_salary=4444, max_salary=5555),
            jobs(job_id="Hhd", job_title="好人", min_salary=4444, max_salary=5555)
        ])
        self.session.commit()

    
def main():
    obj = OrmSession()
    #rest = obj.add_one()
    #print(rest.job_id)
    obj.add_more()

    
if __name__ == '__main__':
    main()

### 查

- session.query(table_name).get/filter/filter_by/order_by()

In [18]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

# 创建连接
engine = create_engine('mysql+pymysql://root:root@localhost:3306/myemployees')


# 类定义 == 表定义

ModelBase = declarative_base()  # 声明基类

class jobs(ModelBase):
    
    __tablename__ = 'jobs'
    
    job_id = Column(String(10), primary_key=True)
    job_title = Column(String(35), nullable=False)
    min_salary = Column(Integer, nullable=False)
    max_salary = Column(Integer, nullable=False)


# 事务 == SQL 操作

Session = sessionmaker(bind=engine)

class OrmSession(object):
    
    def __init__(self):
        self.session = Session()
        
    def get_one(self):
        '''查询一条记录''' 
        return self.session.query(jobs).get("Hhn") # 只适用于主键

    def get_more(self):
        '''查询多条记录，更多命令见官方教程 https://docs.sqlalchemy.org/en/13/orm/tutorial.html?highlight=tutorial''' 
        return self.session.query(jobs).filter_by(min_salary=4444)


def main():
    obj = OrmSession()
    #rest = obj.get_one()
    #if rest:
    #    print('ID: {0} => {1}'.format(rest.job_id, rest.job_title))
    #else:
    #    print("Not exist.")
    rest = obj.get_more()
    if rest:
        print(rest.count())
        for jobs_obj in rest:
            print('ID: {0} => {1}'.format(jobs_obj.job_id, jobs_obj.job_title))
    else:
        print("Not exist.")
        
    
if __name__ == '__main__':
    main()

3
ID: Hhd => 好人
ID: Hhm => 好人
ID: Hhn => 好人


### 改

In [23]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

# 创建连接
engine = create_engine('mysql+pymysql://root:root@localhost:3306/myemployees')


# 类定义 == 表定义

ModelBase = declarative_base()  # 声明基类

class jobs(ModelBase):
    
    __tablename__ = 'jobs'
    
    job_id = Column(String(10), primary_key=True)
    job_title = Column(String(35), nullable=False)
    min_salary = Column(Integer, nullable=False)
    max_salary = Column(Integer, nullable=False)


# 事务 == SQL 操作

Session = sessionmaker(bind=engine)

class OrmSession(object):
    
    def __init__(self):
        self.session = Session()
        
    def update_data(self):
        '''修改数据'''
        data_list = self.session.query(jobs).filter_by(min_salary=4444)
        for item in data_list:
            item.job_title = "test"
            self.session.add(item)
        self.session.commit()


def main():
    obj = OrmSession()
    obj.update_data()
        
    
if __name__ == '__main__':
    main()

### 删

- session.delete()  
- session.commit()

In [25]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

# 创建连接
engine = create_engine('mysql+pymysql://root:root@localhost:3306/myemployees')


# 类定义 == 表定义

ModelBase = declarative_base()  # 声明基类

class jobs(ModelBase):
    
    __tablename__ = 'jobs'
    
    job_id = Column(String(10), primary_key=True)
    job_title = Column(String(35), nullable=False)
    min_salary = Column(Integer, nullable=False)
    max_salary = Column(Integer, nullable=False)


# 事务 == SQL 操作

Session = sessionmaker(bind=engine)

class OrmSession(object):
    
    def __init__(self):
        self.session = Session()
        
    def delete_data(self):
        '''删除数据'''
        data_list = self.session.query(jobs).filter_by(min_salary=4444)
        for item in data_list:
            self.session.delete(item)
        self.session.commit()


def main():
    obj = OrmSession()
    obj.delete_data()
        
    
if __name__ == '__main__':
    main()