# Python 金融資料的處理應用

> 實作：以動態圖表呈現串流金融資料

[郭耀仁](https://hahow.in/@tonykuoyj?tr=tonykuoyj) | yaojenkuo@ntu.edu.tw | April 2024

## 進階 Python 觀念

## 關於進階 Python 觀念

- 資源管理器。
- 裝飾器。
- 非同步 I/O

## 資源管理器

- 資源的管理例如管理開啟的檔案或 Websockets 等，最主要的問題在於必須確保這些開啟的資源在使用完之後，有確實被關閉。
- 如果忘記關閉這些資源，就會造成程式執行上的效能問題，甚至出現錯誤。
- Python 提供 `with` 語法，可讓程式設計者更容易管理這些開啟的資源。

## 傳統開啟檔案的方式

如果在使用檔案的過程中發生了例外（Exception），造成程式中斷時，這個開啟的檔案就沒有被關閉。

```python
# 開啟檔案
f = open(filename)

# 使用檔案的過程

# 關閉檔案
f.close()
```

## 以 `with` 開啟一個檔案

- 在使用 `with` 開啟檔案時，會將開啟的檔案用 `f` 變數參照。
- 這個變數只有在 `with` 的範圍內可以使用，而離開這個範圍時變數就會自動被關閉，回收相關的資源。

```python
with open(filename) as f:
  # 使用檔案的過程
```

## 裝飾器

- 裝飾器（Decorator）是一種設計模式（Design Pattern），可以簡化大量函數中相同的程式碼並重複使用。
- 裝飾器的作用是為已經存在的函數添加額外的功能。

## 一個將累加印出來的函數

In [1]:
def print_cum_sum():
    cum_sum = 0
    for i in range(10):
        cum_sum += i
        print(cum_sum)

print_cum_sum()

0
1
3
6
10
15
21
28
36
45


## 一個計算函數執行所需時間的函數

In [2]:
import time

def timing(func):
    t_start = time.perf_counter()
    func()
    t_stop = time.perf_counter()
    print(f"Elapsed time(secs): {t_stop - t_start}")

## 計算 `print_cum_sum()` 執行所需時間

In [3]:
timing(print_cum_sum)

0
1
3
6
10
15
21
28
36
45
Elapsed time(secs): 0.002172488020732999


## 裝飾器的動機

- 如何在不更動函數執行語法的前提下，仍能計算 `print_cum_sum()` 執行所需時間？
- 意即不要將 `print_cum_sum()` 更改為 `timing(print_cum_sum)`

In [4]:
def print_cum_sum():
    cum_sum = 0
    for i in range(10):
        cum_sum += i
        print(cum_sum)

def timing(func):
    def wrapper():
        t_start = time.perf_counter()
        func()
        t_stop = time.perf_counter()
        print(f"Elapsed time(secs): {t_stop - t_start}")
    return wrapper

print_cum_sum = timing(print_cum_sum)
print_cum_sum()

0
1
3
6
10
15
21
28
36
45
Elapsed time(secs): 0.0011555659584701061


## 將倒數第二段註解掉即可恢復「未加入計時」的功能

In [5]:
def print_cum_sum():
    cum_sum = 0
    for i in range(10):
        cum_sum += i
        print(cum_sum)

def timing(func):
    def wrapper():
        t_start = time.perf_counter()
        func()
        t_stop = time.perf_counter()
        print(f"Elapsed time(secs): {t_stop - t_start}")
    return wrapper

#print_cum_sum = timing(print_cum_sum)
print_cum_sum()

0
1
3
6
10
15
21
28
36
45


## 利用裝飾器實現不更動函數執行語法的前提下添加額外的功能

In [6]:
def timing(func):
    def wrapper():
        t_start = time.perf_counter()
        func()
        t_stop = time.perf_counter()
        print(f"Elapsed time(secs): {t_stop - t_start}")
    return wrapper

@timing
def print_cum_sum():
    cum_sum = 0
    for i in range(10):
        cum_sum += i
        print(cum_sum)

print_cum_sum()

0
1
3
6
10
15
21
28
36
45
Elapsed time(secs): 0.00043658295180648565


## 利用裝飾器實現不更動函數執行語法的前提下添加額外的功能（續）

- `@` 開頭那一段的意思就是將 `print_cum_sum()` 傳入 `timing()` 後，再將傳回的物件重新命名為 `print_cum_sum()`
- 可以把 `@timing` 視為加上 `timing()` 函數功能的意思。
- 利用裝飾器完全不需要更動原本執行 `print_cum_sum()` 的程式。

## 非同步 I/O

- Python 在對任一段程式進行請求的時候，都會等到回應之後再進行下一段程式。
- 非同步 I/O 指的是執行過程不會等待回應，而是繼續執行下面的程式碼，讓後續流程作為事件（Event），並透過輪詢（polling）與回調 （callback）觸發執行後續程式碼。
- Python 以 `asyncio` 模組來實現並行（Concurrency），並且使用 `async` 和 `await` 兩個關鍵字撰寫相關程式。

## 非同步 I/O 的動機

- 程式會遇到的瓶頸可分為：計算瓶頸（CPU）與等待瓶頸。
- 如何在「非」計算瓶頸的時候提升程式執行效率？例如：網路請求或檔案讀寫。

In [7]:
def a_requests_need_two_seconds(i):
    print(f"The request #{i} begins")
    time.sleep(2)
    print(f"The request #{i} ends")

t_start = time.perf_counter()
for i in range(1, 6):
    a_requests_need_two_seconds(i)
t_stop = time.perf_counter()
print(f"Elapsed time(secs): {t_stop - t_start}")

The request #1 begins
The request #1 ends
The request #2 begins
The request #2 ends
The request #3 begins
The request #3 ends
The request #4 begins
The request #4 ends
The request #5 begins
The request #5 ends
Elapsed time(secs): 10.020244781975634


## 以 `asyncio` 模組打破等待瓶頸

`await` 告訴 CPU 這個函數很慢，不需要等它執行完畢，請 CPU 先跳去執行其他的事情，在這個函數結束時再回來處理。

```python
# Run with a script instead of jupyter notebook
import time
import asyncio

async def a_request_needs_two_seconds(i):
    print(f"The request #{i} begins")
    await asyncio.sleep(2)
    print(f"The request #{i} ends")

t_start = time.perf_counter()
loop = asyncio.get_event_loop()
tasks = [loop.create_task(a_request_needs_two_seconds(i)) for i in range(1, 6)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
t_stop = time.perf_counter()
print(f"Elapsed time(secs): {t_stop - t_start}")
```

## 取得串流金融資料

## 全年無休的串流金融資料

- 以比特幣兌美元（BTC-USD）為範例。
- BINANCE 幣安是全球性的加密貨幣交易所，為超過 100 種加密貨幣提供交易服務的平台，2018年初以來，幣安在交易量方面被認為是全世界上最大的加密貨幣交易所。

## BINANCE Websocket Market Streams

- WebSocket 是一種網路傳輸協定，使得客戶端和伺服器之間的資料交換變得更加簡單，允許伺服器端主動向客戶端推播資料。
- 在 WebSocket API 中瀏覽器和伺服器只需要完成一次連線，兩者之間就可以建立永續的連接並進行雙向資料傳輸。
- BINANCE Websocket Market Streams 連線端點：wss://fstream.binance.com/ws

## 比特幣兌美元（BTC-USD）交易資料

連線端點：wss://fstream.binance.com/ws/btcusdt@aggTrade

```python
# Run with a script instead of jupyter notebook
import asyncio
from websockets import connect
import json

async def connect_to_binance_web_market_stream(url):
    async with connect(url) as websocket:
        while True:
            data_str = await websocket.recv()
            data_dict = json.loads(data_str)
            print(data_dict)
        
url = "wss://fstream.binance.com/ws/btcusdt@aggTrade"
asyncio.run(connect_to_binance_web_market_stream(url))
```

## 建立一個 SQLite 資料庫儲存比特幣兌美元（BTC-USD）交易資料

```python
# Run with a script instead of jupyter notebook
import sqlite3

conn = sqlite3.connect("agg_trade.db")
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS btcusdt;")
create_table_statement = """
CREATE TABLE btcusdt (
    id UNSIGNED BIG INT PRIMARY KEY,
    trade_time UNSIGNED BIG INT,
    price float,
    quantity float
)
"""
cursor.execute(create_table_statement)
cursor.execute("CREATE INDEX idx_trade_time ON btcusdt(trade_time);")
conn.commit()
conn.close()
```

## 運用 SQLite 資料庫作為即時交易資料的緩衝

```python
async def insert_data_into_db(url):
    async with connect(url) as websocket:
        data_dicts = []
        while True:
            data_str = await websocket.recv()
            data_dict = json.loads(data_str)
            data_to_append = (data_dict["a"], data_dict["T"], data_dict["p"], data_dict["q"])
            data_dicts.append(data_to_append)
            print(data_dict)
            if len(data_dicts) > 10:
                print("Inserting into DB...")
                async with aiosqlite.connect("agg_trade.db") as db:
                    await db.executemany("""INSERT INTO btcusdt (id, trade_time, price, quantity) VALUES (?, ?, ?, ?)""", data_dicts)
                    await db.commit()
                data_dicts = []
```

## 整合串流金融資料擷取與緩衝

```python
# btcusdt_agg_trade_stream.py
import sqlite3
import asyncio
from websockets import connect
import json
import aiosqlite

conn = sqlite3.connect("agg_trade.db")
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS btcusdt;")
create_table_statement = """
CREATE TABLE btcusdt (
    id UNSIGNED BIG INT PRIMARY KEY,
    trade_time UNSIGNED BIG INT,
    price float,
    quantity float
)
"""
cursor.execute(create_table_statement)
cursor.execute("CREATE INDEX idx_trade_time ON btcusdt(trade_time);")
conn.commit()
conn.close()
```

## 整合串流金融資料擷取與緩衝（續）

```python
async def insert_data_into_db(url):
    async with connect(url) as websocket:
        data_dicts = []
        while True:
            data_str = await websocket.recv()
            data_dict = json.loads(data_str)
            data_to_append = (data_dict["a"], data_dict["T"], data_dict["p"], data_dict["q"])
            data_dicts.append(data_to_append)
            print(data_dict)
            if len(data_dicts) > 10:
                print("Inserting into DB...")
                async with aiosqlite.connect("agg_trade.db") as db:
                    await db.executemany("""INSERT INTO btcusdt (id, trade_time, price, quantity) VALUES (?, ?, ?, ?)""", data_dicts)
                    await db.commit()
                data_dicts = []
                
url = "wss://fstream.binance.com/ws/btcusdt@aggTrade"
asyncio.run(insert_data_into_db(url))
```

## 整合串流金融資料擷取與緩衝（續）

- 執行 `python btcusdt_agg_trade_stream.py`
- 以 SQLiteStudio 檢查 SQLite 資料庫：<https://github.com/pawelsalawa/sqlitestudio/releases>

## 視覺化串流金融資料

## Plotly Dash

- Dash 是以 Plotly.js、React.js 與 Flask 為基礎建構的視覺化模組，不僅支援 Python，也支援 R、Julia 等語言。
- Dash 可以用來建置網頁應用程式，利於呈現資料視覺化與儀表板 Dashboard。Dash 除了支援 Plotly 的豐富圖表之外，也有下拉式選單、按鈕以及其他網頁介面功能可使用。
- <https://dash.plotly.com>

## 建構一個簡易前端頁面

```python
# Run with a script instead of jupyter notebook
from dash import html, dcc, Output, Input, Dash

update_frequency = 200
app = Dash(__name__)
app.layout = html.Div([
    html.H1(id="show_value"),
    dcc.Interval(id="update", interval=update_frequency)
])
if __name__ == "__main__":
    app.run_server(debug=True)
```

## 以裝飾器更新 `H1` 的數值

```python
import sqlite3

@app.callback(Output("show_value", "children"), [Input("update", "n_intervals")])
def update_data(n_intervals):
    conn = sqlite3.connect("agg_trade.db")
    cursor = conn.cursor()
    sql_statement = """
    SELECT *
      FROM btcusdt
     ORDER BY trade_time DESC
     LIMIT 1;
    """
    list_of_tuples = cursor.execute(sql_statement).fetchall()
    latest_price = list_of_tuples[0][2]
    return latest_price
```

## 建構一個簡易前端頁面（續）

```python
# Run with a script instead of jupyter notebook
from dash import html, dcc, Output, Input, Dash

update_frequency = 200
figure = dict(data=[{'x': [], 'y': []}], 
              layout=dict(xaxis=dict(autorange=True), yaxis=dict(autorange=True)))
app = dash.Dash()
app.layout = html.Div([
    dcc.Graph(id='graph', figure=figure),
    dcc.Interval(id="interval")]
)
if __name__ == "__main__":
    app.run_server(debug=True)
```

## 以裝飾器更新 `Graph` 的數值

```python
import sqlite3
from datetime import datetime

@app.callback(Output('graph', 'extendData'), [Input('interval', 'n_intervals')])
def update_data(n_intervals):
    conn = sqlite3.connect("agg_trade.db")
    cursor = conn.cursor()
    sql_statement = """
    SELECT *
      FROM btcusdt
     ORDER BY trade_time DESC
     LIMIT 1;
    """
    list_of_tuples = cursor.execute(sql_statement).fetchall()
    latest_trade_time = list_of_tuples[0][1]
    latest_trade_time_dt = datetime.fromtimestamp(latest_trade_time / 1000)
    latest_price = list_of_tuples[0][2]
    dict_to_return = {
        "x": [[latest_trade_time_dt]],
        "y": [[latest_price]]
    }
    return (dict_to_return), [0], 100
```

## 整合串流金融資料擷取、緩衝與視覺化

```python
# stream_data_line.py
from dash import html, dcc, Output, Input, Dash
import sqlite3
from datetime import datetime

update_frequency = 200
figure = dict(data=[{'x': [], 'y': []}], 
              layout=dict(xaxis=dict(autorange=True), yaxis=dict(autorange=True)))
app = Dash()
app.layout = html.Div([
    dcc.Graph(id='graph', figure=figure),
    dcc.Interval(id="interval")]
)
```

## 整合串流金融資料擷取、緩衝與視覺化

```python
@app.callback(Output('graph', 'extendData'), [Input('interval', 'n_intervals')])
def update_data(n_intervals):
    conn = sqlite3.connect("agg_trade.db")
    cursor = conn.cursor()
    sql_statement = """
    SELECT *
      FROM btcusdt
     ORDER BY trade_time DESC
     LIMIT 1;
    """
    list_of_tuples = cursor.execute(sql_statement).fetchall()
    latest_trade_time = list_of_tuples[0][1]
    latest_trade_time_dt = datetime.fromtimestamp(latest_trade_time / 1000)
    latest_price = list_of_tuples[0][2]
    dict_to_return = {
        "x": [[latest_trade_time_dt]],
        "y": [[latest_price]]
    }
    return (dict_to_return), [0], 100

if __name__ == "__main__":
    app.run_server(debug=True)
```

## 整合串流金融資料擷取、緩衝與視覺化（續）

- 執行 `python btcusdt_agg_trade_stream.py`
- 執行 `python stream_data_line.py`
- 前往 `http://127.0.0.1:8050` 檢視。