Skip to content

Commit

Permalink
add python examples
Browse files Browse the repository at this point in the history
Signed-off-by: zhu733756 <zhuhan@cestc.cn>
  • Loading branch information
zhu733756 committed May 17, 2022
1 parent 041a9e1 commit 6239a39
Show file tree
Hide file tree
Showing 15 changed files with 545 additions and 1 deletion.
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -6,4 +6,7 @@ dist/
**/.mvn/wrapper/maven-wrapper.jar
**/.mvn/wrapper/MavenWrapperDownloader.java
**/src/main/**/target/
**/src/test/**/target/
**/src/test/**/target/

__pycache__
.vscode
69 changes: 69 additions & 0 deletions samples/python/README.md
@@ -0,0 +1,69 @@
# dbpack-samples

### Step0: Clone dbpack
```shell
git clone git@github.com:cectc/dbpack.git
cd dbpack
```

### Step1: Setup etcd

### Step2: Setup mysql, initialize the database with the following sql script
```
./samples/go/scripts/order.sql
./samples/go/scripts/product.sql
```

### Step3: run dbpack
```bash
make build-local

vim ./samples/python/config1.yaml
# update distributed_transaction.etcd_config.endpoints

vim ./samples/python/config2.yaml
# update data_source_cluster.dsn
# update distributed_transaction.etcd_config.endpoints

vim ./samples/python/config3.yaml
# update data_source_cluster.dsn
# update distributed_transaction.etcd_config.endpoints

./dist/dbpack start --config samples/python/config1.yaml

./dist/dbpack start --config samples/python/config2.yaml

./dist/dbpack start --config samples/python/config3.yaml
```

### Step4: setup python requirements
```bash
cd samples/python
pip3 install -r requirements.txt
```

### Step5: setup aggregation client
```bash
cd samples/python/aggregation

python3 app.py
```

### Step6: setup order client
```bash
cd samples/python/order

python3 app.py
```

### Step7: setup product client
```bash
cd samples/python/product

python3 app.py
```

### Step8: access and test
```
curl -XPOST http://localhost:13000/v1/order/create
```
Empty file.
66 changes: 66 additions & 0 deletions samples/python/aggregation/app.py
@@ -0,0 +1,66 @@
from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

@app.route('/v1/order/create', methods=['POST'])
def create_1():
return create_so(rollback=False)

@app.route('/v1/order/create2', methods=['POST'])
def create_2():
return create_so(rollback=True)

def create_so(rollback=True):
xid = request.headers.get("x_dbpack_xid")

so_items = [dict(
product_sysno=1,
product_name="apple iphone 13",
original_price=6799,
cost_price=6799,
deal_price=6799,
quantity=2,
)]

so_master = [dict(
buyer_user_sysno = 10001,
seller_company_code = "SC001",
receive_division_sysno = 110105,
receive_address = "beijing",
receive_zip = "000001",
receive_contact = "scott",
receive_contact_phone = "18728828296",
stock_sysno = 1,
payment_type = 1,
so_amt = 6999 * 2,
status = 10,
appid = "dk-order",
so_items = so_items,
)]

success = (jsonify(dict(success=True, message="success")), 200)
failed = (jsonify(dict(success=False, message="failed")), 400)
headers = {
"Content-Type": "application/json",
"xid": xid
}

so_req = dict(req=so_master)
resp1 = requests.post("http://localhost:3001/createSo", headers=headers, json=so_req, timeout=30)
if resp1.status_code == 400:
return failed

ivt_req = dict(req=[dict(product_sysno= 1, qty=2)])
resp2 = requests.post("http://localhost:3002/allocateInventory", headers=headers, json=ivt_req, timeout=30)
if resp2.status_code == 400:
return failed

if rollback:
print("rollback")
return failed

return success

if __name__ == "__main__":
app.run(port=3000)
27 changes: 27 additions & 0 deletions samples/python/config1.yaml
@@ -0,0 +1,27 @@
listeners:
- protocol_type: http
socket_address:
address: 0.0.0.0
port: 13000
config:
backend_host: localhost:3000
filters:
- HttpDistributedTransaction

filters:
- name: HttpDistributedTransaction
conf:
appid: aggregationSvc
transaction_infos:
- request_path: "/v1/order/create"
timeout: 60000
- request_path: "/v1/order/create2"
timeout: 60000

distributed_transaction:
appid: aggregationSvc
retry_dead_threshold: 130000
rollback_retry_timeout_unlock_enable: true
etcd_config:
endpoints:
- localhost:2379
43 changes: 43 additions & 0 deletions samples/python/config2.yaml
@@ -0,0 +1,43 @@
listeners:
- protocol_type: mysql
socket_address:
address: 0.0.0.0
port: 13307
config:
users:
dksl: "123456"
server_version: "8.0.27"
executor: redirect

executors:
- name: redirect
mode: sdb
config:
data_source_ref: product

filters:
- name: MysqlDistributedTransaction
conf:
appid: productSvc
lock_retry_interval: 100ms
lock_retry_times: 15

data_source_cluster:
- name: product
capacity: 30
max_capacity: 100
idle_timeout: 60s
dsn: root:123456@tcp(127.0.0.1:3306)/product?timeout=60s&readTimeout=60s&writeTimeout=60s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3
filters:
- MysqlDistributedTransaction


distributed_transaction:
appid: productSvc
retry_dead_threshold: 130000
rollback_retry_timeout_unlock_enable: true
etcd_config:
endpoints:
- localhost:2379
42 changes: 42 additions & 0 deletions samples/python/config3.yaml
@@ -0,0 +1,42 @@
listeners:
- protocol_type: mysql
socket_address:
address: 0.0.0.0
port: 13308
config:
users:
dksl: "123456"
server_version: "8.0.27"
executor: redirect

executors:
- name: redirect
mode: sdb
config:
data_source_ref: order

filters:
- name: MysqlDistributedTransaction
conf:
appid: orderSvc
lock_retry_interval: 100ms
lock_retry_times: 15

data_source_cluster:
- name: order
capacity: 30
max_capacity: 100
idle_timeout: 60s
dsn: root:123456@tcp(127.0.0.1:3306)/order?timeout=60s&readTimeout=60s&writeTimeout=60s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3
filters:
- MysqlDistributedTransaction

distributed_transaction:
appid: orderSvc
retry_dead_threshold: 130000
rollback_retry_timeout_unlock_enable: true
etcd_config:
endpoints:
- localhost:2379
Empty file.
69 changes: 69 additions & 0 deletions samples/python/order/app.py
@@ -0,0 +1,69 @@
from flask import Flask, jsonify, request
from datetime import datetime

import random

import mysql.connector


app = Flask(__name__)

conn = mysql.connector.connect(
host="127.0.0.1",
port="13308",
user="dksl",
password="123456",
database="order",
)

cursor = conn.cursor(prepared=True)

insert_so_master = "INSERT /*+ XID('{xid}') */ INTO order.so_master({keys}) VALUES ({placeholders})"
insert_so_item = "INSERT /*+ XID('{xid}') */ INTO order.so_item({keys}) VALUES ({placeholders})"

@app.route('/createSo', methods=['POST'])
def create_so():
xid = request.headers.get('xid')
reqs = request.get_json()
if xid and "req" in reqs:
for res in reqs["req"]:
res["sysno"] = next_id()
res["so_id"] = res["sysno"]
res["order_date"] = datetime.now()
res_keys = [str(k) for k,v in res.items() if k != "so_items" and str(v) != ""]
so_master = insert_so_master.format(
xid=xid,
keys=", ".join(res_keys),
placeholders=", ".join(["%s"] * len(res_keys)),
)

try:
cursor.execute(so_master, tuple(res.get(k, "") for k in res_keys))
except Exception as e:
print(e.args)

so_items = res["so_items"]
for item in so_items:
item["sysno"] = next_id()
item["so_sysno"] = res["sysno"]
item_keys = [str(k) for k,v in item.items() if str(v) != "" ]
so_item = insert_so_item.format(
xid=xid,
keys=", ".join(item_keys),
placeholders=", ".join(["%s"] * len(item_keys)),
)
try:
cursor.execute(so_item, tuple(item.get(k, "") for k in item_keys))
except Exception as e:
print(e.args)

return jsonify(dict(success=True, message="success")), 200

return jsonify(dict(success=False, message="failed")), 400

def next_id():
return random.randrange(0, 9223372036854775807)


if __name__ == '__main__':
app.run(host="0.0.0.0", port=3001)
Empty file.
35 changes: 35 additions & 0 deletions samples/python/product/app.py
@@ -0,0 +1,35 @@
from flask import Flask, jsonify, request
import mysql.connector


app = Flask(__name__)

conn = mysql.connector.connect(
host="127.0.0.1",
port="13307",
user="dksl",
password="123456",
database="product",
)

cursor = conn.cursor(prepared=True)

allocate_inventory_sql = "update /*+ XID(%s) */ product.inventory set available_qty = available_qty - %s, allocated_qty = allocated_qty + %s where product_sysno = %s and available_qty >= %s;"

@app.route('/allocateInventory', methods=['POST'])
def create_so():
xid = request.headers.get('xid')
reqs = request.get_json()
if xid and "req" in reqs:
for res in reqs["req"]:
try:
cursor.execute(allocate_inventory_sql, (xid, res["qty"], res["qty"], res["product_sysno"], res["qty"]))
except Exception as e:
print(e.args)

return jsonify(dict(success=True, message="success")), 200

return jsonify(dict(success=False, message="failed")), 400

if __name__ == '__main__':
app.run(host="0.0.0.0", port=3002)
3 changes: 3 additions & 0 deletions samples/python/requirements.txt
@@ -0,0 +1,3 @@
Flask==2.1.2
mysql_connector_repackaged==0.3.1
requests==2.22.0

0 comments on commit 6239a39

Please sign in to comment.