$$0

In [35]:
import sqlite3

# python数据库访问

## sqlite3模块

sqlite3 — DB-API 2.0 interface for SQLite databases: https://docs.python.org/3.6/library/sqlite3.html;

sqlite3不需要独立的数据库服务器, 并且可以在连接数据库的同时创建该数据库(如果该数据库未存在);   
可以把sqlite3中的数据库当做一个普通文件来理解;

**第一次简单使用**

在创建连接对象(Connect)的同时创建数据库文件

In [9]:
conn = sqlite3.connect('data/example.db')

创建游标对象(Cursor)

In [10]:
c = conn.cursor()

创建表

In [22]:
# c.execute('''CREATE TABLE stocks
#              (date text, trans text, symbol text, qty real, price real)''')

插入一行数据

In [12]:
c.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")

<sqlite3.Cursor at 0x51775e0>

提交改变

In [13]:
conn.commit()

使用完数据库之后关闭连接, 防止信息丢失

In [14]:
conn.close()

**第二次简单使用**

In [18]:
import sqlite3
conn = sqlite3.connect('data/example.db')
c = conn.cursor()

使用python变量向数据库传递值: SQL语句中的每个`?`都必须由元组中的一个元素替代;

In [27]:
t = ('RHAT',)
c.execute('SELECT * FROM stocks WHERE symbol=?', t)
# 调取一个匹配行
c.fetchone()

('2006-01-05', 'BUY', 'RHAT', 100.0, 35.14)

> 可以把游标看做是一个迭代器;

In [20]:
# 一次传入多个记录
purchases = [('2006-03-28', 'BUY', 'IBM', 1000, 45.00),
             ('2006-04-05', 'BUY', 'MSFT', 1000, 72.00),
             ('2006-04-06', 'SELL', 'IBM', 500, 53.00),]
c.executemany('INSERT INTO stocks VALUES (?,?,?,?,?)', purchases)


<sqlite3.Cursor at 0x5177570>

In [32]:
c.execute("SELECT * FROM stocks WHERE symbol='IBM'")
# 调取所有行
c.fetchall()

[('2006-03-28', 'BUY', 'IBM', 1000.0, 45.0),
 ('2006-04-06', 'SELL', 'IBM', 500.0, 53.0)]

$$1

### 模块函数和常数

 实验: 一个最小化的SQLite壳

In [40]:
import sqlite3

con = sqlite3.connect(":memory:")

con.isolation_level = None
cur = con.cursor()

buffer = ""

print("Enter your SQL commands to execute in sqlite3.")
print("Enter a blank line to exit.")

while True:
    line = input()
    if line == "":
        break
    buffer += line
    if sqlite3.complete_statement(buffer):
        try:
            buffer = buffer.strip()
            cur.execute(buffer)

            if buffer.lstrip().upper().startswith("SELECT"):
                print(cur.fetchall())
        except sqlite3.Error as e:
            print("An error occurred:", e.args[0])
        buffer = ""

con.close()

### 连接对象

**c.create_function**(name, num_params, func):   
* 创建可以在SQL声明(SQL statement)中使用的自定义函数;  
* name: 函数名;  
* num_params: 函数所需的参数个数;  
* func: 普通的Python函数;  

In [49]:
import sqlite3
import hashlib

def md5sum(t):
    return hashlib.md5(t).hexdigest()

con = sqlite3.connect(":memory:")
con.create_function("md5", 1, md5sum)
cur = con.cursor()
cur.execute("select md5(?)", (b"foo",))
print(cur.fetchone()[0])

acbd18db4cc2f85cedef654fccc4a4d8


**create_aggregate**(name, num_params, aggregate_class):  
* 自定义聚类函数(aggregate function), 通过重复调用step()方法进行聚合, 然后调用finalize()获得最终值;
* aggregate_class: 聚类函数的类名;
* 聚类函数必须包含三个方法:
    * __init__(self): 不带参数的初始化; 
    * step(params): 参数个数由num_params决定;      
    * finalize(self): 获取最终值;      

In [54]:
import sqlite3

class MySum:
    def __init__(self):
        self.count = 0

    def step(self, value):
        self.count += value

    def finalize(self):
        return self.count

con = sqlite3.connect(":memory:")
con.create_aggregate("mysum", 1, MySum)
cur = con.cursor()
cur.execute("create table test(i)") #创建只包含一个列(列名是i)的表test
cur.execute("insert into test(i) values (1)")
cur.execute("insert into test(i) values (2)")
cur.execute("select mysum(i) from test")
print(cur.fetchone()[0])

3


**create_collation**(name, func):  
* 创建一个自定义校验函数(collation);
* func(a,b): a<b(-1), a=b(0), a>b(1);

In [56]:
import sqlite3

def collate_reverse(string1, string2):
    if string1 == string2:
        return 0
    elif string1 < string2:
        return 1
    else:
        return -1

con = sqlite3.connect(":memory:")
con.create_collation("reverse", collate_reverse)

cur = con.cursor()
cur.execute("create table test(x)")
cur.executemany("insert into test(x) values (?)", [("a",), ("b",)])
cur.execute("select x from test order by x collate reverse")
for row in cur:
    print(row)
con.close()

('b',)
('a',)


row_factory: 

In [58]:
import sqlite3
# 获得结果的游标对象, 原始结果行
def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
cur = con.cursor()
cur.execute("select 1 as a")
print(cur.fetchone()["a"])

1


text_factory

In [59]:
import sqlite3

con = sqlite3.connect(":memory:")
cur = con.cursor()

AUSTRIA = "\xd6sterreich"

# by default, rows are returned as Unicode
cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()
assert row[0] == AUSTRIA

# but we can make sqlite3 always return bytestrings ...
con.text_factory = bytes
cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()
assert type(row[0]) is bytes
# the bytestrings will be encoded in UTF-8, unless you stored garbage in the
# database ...
assert row[0] == AUSTRIA.encode("utf-8")

 iterdump(): 

In [None]:
# %%pass # Convert file existing_db.db to SQL dump file dump.sql
# import sqlite3

# con = sqlite3.connect('existing_db.db')
# with open('dump.sql', 'w') as f:
#     for line in con.iterdump():
#         f.write('%s\n' % line)

### 游标对象

execute(sql, parameters);  
问号占位符和命名占位符;

In [60]:
import sqlite3

con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute("create table people (name_last, age)")

who = "Yeltsin"
age = 72

# This is the qmark style:
cur.execute("insert into people values (?, ?)", (who, age))

# And this is the named style:
cur.execute("select * from people where name_last=:who and age=:age", {"who": who, "age": age})

print(cur.fetchone())

('Yeltsin', 72)


executemany(sql, seq_of_parameters)

In [61]:
import sqlite3

class IterChars:
    def __init__(self):
        self.count = ord('a')

    def __iter__(self):
        return self

    def __next__(self):
        if self.count > ord('z'):
            raise StopIteration
        self.count += 1
        return (chr(self.count - 1),) # this is a 1-tuple

con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute("create table characters(c)")

theIter = IterChars()
cur.executemany("insert into characters(c) values (?)", theIter)

cur.execute("select c from characters")
print(cur.fetchall())

[('a',), ('b',), ('c',), ('d',), ('e',), ('f',), ('g',), ('h',), ('i',), ('j',), ('k',), ('l',), ('m',), ('n',), ('o',), ('p',), ('q',), ('r',), ('s',), ('t',), ('u',), ('v',), ('w',), ('x',), ('y',), ('z',)]


上例的生成器版本

In [62]:
import sqlite3
import string

def char_generator():
    for c in string.ascii_lowercase:
        yield (c,)

con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.execute("create table characters(c)")

cur.executemany("insert into characters(c) values (?)", char_generator())

cur.execute("select c from characters")
print(cur.fetchall())

[('a',), ('b',), ('c',), ('d',), ('e',), ('f',), ('g',), ('h',), ('i',), ('j',), ('k',), ('l',), ('m',), ('n',), ('o',), ('p',), ('q',), ('r',), ('s',), ('t',), ('u',), ('v',), ('w',), ('x',), ('y',), ('z',)]


executescript(sql_script):一次调用执行多重SQL查询;

In [None]:
import sqlite3

con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.executescript("""
    create table person(
        firstname,
        lastname,
        age
    );

    create table book(
        title,
        author,
        published
    );

    insert into book(title, author, published)
    values (
        'Dirk Gently''s Holistic Detective Agency',
        'Douglas Adams',
        1987
    );
    """)

connection

In [63]:
con = sqlite3.connect(":memory:")
cur = con.cursor()
cur.connection == con

True

### 行对象

In [64]:
conn = sqlite3.connect(":memory:")
c = conn.cursor()
c.execute('''create table stocks
(date text, trans text, symbol text,
 qty real, price real)''')
c.execute("""insert into stocks
          values ('2006-01-05','BUY','RHAT',100,35.14)""")
conn.commit()
c.close()

A Row instance serves as a highly optimized row_factory for Connection objects.

In [65]:
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute('select * from stocks')
r = c.fetchone()
type(r)

sqlite3.Row

In [66]:
tuple(r)

('2006-01-05', 'BUY', 'RHAT', 100.0, 35.14)

In [67]:
len(r)

5

In [68]:
r[2]

'RHAT'

In [69]:
r.keys()

['date', 'trans', 'symbol', 'qty', 'price']

In [70]:
r['qty']

100.0

In [71]:
for member in r:
...     print(member)

2006-01-05
BUY
RHAT
100.0
35.14


### 异常

### SQLite和Python类型

#### 简介

#### 使用适配器在SQLite数据库中存储额外的Python类  

To use other Python types with SQLite, you must adapt them to one of the sqlite3 module’s supported types for SQLite: one of NoneType, int, float, str, bytes.

 two ways to enable the sqlite3 module to adapt a custom Python type 

让你的对象适应自己

In [72]:
class Point:
    def __init__(self, x, y):
        self.x, self.y = x, y

In [73]:
import sqlite3

class Point:
    def __init__(self, x, y):
        self.x, self.y = x, y

    def __conform__(self, protocol):
        if protocol is sqlite3.PrepareProtocol:
            return "%f;%f" % (self.x, self.y)

con = sqlite3.connect(":memory:")
cur = con.cursor()

p = Point(4.0, -3.2)
cur.execute("select ?", (p,))
print(cur.fetchone()[0])

4.000000;-3.200000


注册一个可调用的适配器

In [74]:
import sqlite3

class Point:
    def __init__(self, x, y):
        self.x, self.y = x, y

def adapt_point(point):
    return "%f;%f" % (point.x, point.y)

sqlite3.register_adapter(Point, adapt_point)

con = sqlite3.connect(":memory:")
cur = con.cursor()

p = Point(4.0, -3.2)
cur.execute("select ?", (p,))
print(cur.fetchone()[0])

4.000000;-3.200000


The sqlite3 module has two default adapters for Python’s built-in datetime.date and datetime.datetime types. 

In [75]:
import sqlite3
import datetime
import time

def adapt_datetime(ts):
    return time.mktime(ts.timetuple())

sqlite3.register_adapter(datetime.datetime, adapt_datetime)

con = sqlite3.connect(":memory:")
cur = con.cursor()

now = datetime.datetime.now()
cur.execute("select ?", (now,))
print(cur.fetchone()[0])

1540195497.0


#### 将SQLite值转换成标准的Python类型

In [76]:
def convert_point(s):
    x, y = map(float, s.split(b";"))
    return Point(x, y)

In [77]:
import sqlite3

class Point:
    def __init__(self, x, y):
        self.x, self.y = x, y

    def __repr__(self):
        return "(%f;%f)" % (self.x, self.y)

def adapt_point(point):
    return ("%f;%f" % (point.x, point.y)).encode('ascii')

def convert_point(s):
    x, y = list(map(float, s.split(b";")))
    return Point(x, y)

# Register the adapter
sqlite3.register_adapter(Point, adapt_point)

# Register the converter
sqlite3.register_converter("point", convert_point)

p = Point(4.0, -3.2)

#########################
# 1) Using declared types
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES)
cur = con.cursor()
cur.execute("create table test(p point)")

cur.execute("insert into test(p) values (?)", (p,))
cur.execute("select p from test")
print("with declared types:", cur.fetchone()[0])
cur.close()
con.close()

#######################
# 1) Using column names
con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_COLNAMES)
cur = con.cursor()
cur.execute("create table test(p)")

cur.execute("insert into test(p) values (?)", (p,))
cur.execute('select p as "p [point]" from test')
print("with column names:", cur.fetchone()[0])
cur.close()
con.close()

with declared types: (4.000000;-3.200000)
with column names: (4.000000;-3.200000)


#### 默认适配器和转换器

 use date/timestamps from Python 

In [83]:
import sqlite3
import datetime

con = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
cur = con.cursor()
cur.execute("create table test(d date, ts timestamp)")

today = datetime.date.today()
now = datetime.datetime.now()

cur.execute("insert into test(d, ts) values (?, ?)", (today, now))
cur.execute("select d, ts from test")
row = cur.fetchone()
print(today, "=>", row[0], type(row[0]))
print(now, "=>", row[1], type(row[1]))

cur.execute('select current_date as "d [date]", current_timestamp as "ts [timestamp]"')
row = cur.fetchone()
print("current_date", row[0], type(row[0]))
print("current_timestamp", row[1], type(row[1]))

ValueError: not enough values to unpack (expected 2, got 1)

### 控制事物

### 更有效的使用sqlite3

This way, you can execute a SELECT statement and iterate over it directly using only a single call on the Connection object.

#### 使用快捷方法

Rows wrapped with this class can be accessed both by index (like tuples) and case-insensitively by name:

In [80]:
import sqlite3

persons = [
    ("Hugo", "Boss"),
    ("Calvin", "Klein")
    ]

con = sqlite3.connect(":memory:")

# Create the table
con.execute("create table person(firstname, lastname)")

# Fill the table
con.executemany("insert into person(firstname, lastname) values (?, ?)", persons)

# Print the table contents
for row in con.execute("select firstname, lastname from person"):
    print(row)

print("I just deleted", con.execute("delete from person").rowcount, "rows")

('Hugo', 'Boss')
('Calvin', 'Klein')
I just deleted 2 rows


#### 通过名字访问列而不是索引

In [84]:
import sqlite3

con = sqlite3.connect(":memory:")
con.row_factory = sqlite3.Row

cur = con.cursor()
cur.execute("select 'John' as name, 42 as age")
for row in cur:
    assert row[0] == row["name"]
    assert row["name"] == row["nAmE"]
    assert row[1] == row["age"]
    assert row[1] == row["AgE"]

#### 将连接放入上下文管理器

In [85]:
import sqlite3

con = sqlite3.connect(":memory:")
con.execute("create table person (id integer primary key, firstname varchar unique)")

# Successful, con.commit() is called automatically afterwards
with con:
    con.execute("insert into person(firstname) values (?)", ("Joe",))

# con.rollback() is called after the with block finishes with an exception, the
# exception is still raised and must be caught
try:
    with con:
        con.execute("insert into person(firstname) values (?)", ("Joe",))
except sqlite3.IntegrityError:
    print("couldn't add Joe twice")

couldn't add Joe twice


#### 常见问题

多线程