上下文(Context) 是 OpenCL 操作的基础，定义了一个计算环境，其中包含设备、内存对象、程序和内核。

上下文的作用

	1.	管理设备：上下文可以包含一个或多个计算设备（如 CPU 或 GPU）。所有与设备相关的资源和操作都在上下文中定义和管理。
	2.	管理内存对象：上下文用于创建和管理内存对象（如缓冲区和图像）。这些内存对象可以在设备之间共享，并且是内核程序操作的数据。
	3.	管理程序和内核：上下文用于编译和管理程序（包含一个或多个内核函数）。内核是运行在设备上的函数，通过上下文管理。

上下文可以理解为一个包含了所有 OpenCL 资源的容器，它为计算提供了一个整体的环境。

OpenCL 命令队列 (Command Queue)

命令队列 (Command Queue) 是 OpenCL 的另一个核心概念。它是指令被提交给设备执行的途径。

队列的作用

	1.	提交命令：命令队列用于提交所有类型的命令到设备执行，包括内核执行命令、内存拷贝命令和同步命令。
	2.	控制执行顺序：通过命令队列，可以控制命令的执行顺序。命令按照提交的顺序依次执行，除非使用事件或其他机制来改变顺序。
	3.	异步执行：命令队列允许命令异步执行，主机程序可以继续执行其他操作，而设备则在后台执行提交的命令。

In [6]:
import pyopencl as cl
import numpy as np


platform = cl.get_platforms()[0]
device = platform.get_devices()[0]
context = cl.Context([device])
queue = cl.CommandQueue(context)


a = np.array([1, 2, 3, 4, 5], dtype=np.float32)
b = np.array([10, 20, 30, 40, 50], dtype=np.float32)


c = np.empty_like(a)


mf = cl.mem_flags
a_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=a)
b_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=b)
c_buf = cl.Buffer(context, mf.WRITE_ONLY, c.nbytes)

#Build a function in opencl
program = cl.Program(context, """
__kernel void vec_add(__global const float *a,
                      __global const float *b,
                      __global float *c) {
    int gid = get_global_id(0);
    c[gid] = a[gid] + b[gid];
}
""").build()


program.vec_add(queue, a.shape, None, a_buf, b_buf, c_buf)


cl.enqueue_copy(queue, c, c_buf)


print('a:', a)
print('b:', b)
print('c:', c)

a: [1. 2. 3. 4. 5.]
b: [10. 20. 30. 40. 50.]
c: [11. 22. 33. 44. 55.]


in a queue, each step is done simultaneously
and if more than 1 queue is created, they are still excuted sumultaneously

In [7]:
import numpy as np
import pyopencl as cl

#create OpenCL context and device
platform = cl.get_platforms()[0]
device = platform.get_devices()[0]
context = cl.Context([device])

#create multiple command queues
#queue1 = cl.CommandQueue(context)
#queue2 = cl.CommandQueue(context)
queue1 = cl.CommandQueue(context, properties=cl.command_queue_properties.PROFILING_ENABLE)#use profiling to observe time
queue2 = cl.CommandQueue(context, properties=cl.command_queue_properties.PROFILING_ENABLE)

#create input data
vector_length = 10
a1 = np.random.rand(vector_length).astype(np.float32)
b1 = np.random.rand(vector_length).astype(np.float32)
c1 = np.empty_like(a1)

a2 = np.random.rand(vector_length).astype(np.float32)
b2 = np.random.rand(vector_length).astype(np.float32)
c2 = np.empty_like(a2)

#create OpenCL memory buffers
mf = cl.mem_flags#to assign an object in GPU
a1_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=a1)#assign content in host(a) into mf object
b1_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=b1)
c1_buf = cl.Buffer(context, mf.WRITE_ONLY, c1.nbytes)

a2_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=a2)
b2_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=b2)
c2_buf = cl.Buffer(context, mf.WRITE_ONLY, c2.nbytes)

#write OpenCL kernel
program = cl.Program(context, """
__kernel void vec_add(__global const float *a,
                      __global const float *b,
                      __global float *c,
                      const unsigned int vector_length) {
    int gid = get_global_id(0);
    if (gid < vector_length) {
        c[gid] = a[gid] + b[gid];
    }
}
""").build()#here no for loop as they are excuted simultaneously

#execute kernel, queue1 processes the vector pair
global_work_size = (vector_length,)
event1 = program.vec_add(queue1, global_work_size, None, a1_buf, b1_buf, c1_buf, np.uint32(vector_length))
event2 = program.vec_add(queue2, global_work_size, None, a2_buf, b2_buf, c2_buf, np.uint32(vector_length))

#copy results from device to host
event1_copy = cl.enqueue_copy(queue1, c1, c1_buf, wait_for=[event1])
event2_copy = cl.enqueue_copy(queue2, c2, c2_buf, wait_for=[event2])

#wait for all events to complete
cl.wait_for_events([event1_copy, event2_copy])

#print results
print("Input A1:", a1)
print("Input B1:", b1)
print("Output C1 (A1 + B1):", c1)

print("\nInput A2:", a2)
print("Input B2:", b2)
print("Output C2 (A2 + B2):", c2)

#measure time
start_time1 = event1.profile.start
end_time1 = event1.profile.end
duration1 = end_time1 - start_time1

start_time2 = event2.profile.start
end_time2 = event2.profile.end
duration2 = end_time2 - start_time2

total_duration = max(end_time1, end_time2) - min(start_time1, start_time2)
total_duration_seconds = total_duration 


print(f"Execution time for queue1: {duration1 / 1e6} ms")
print(f"Execution time for queue2: {duration2 / 1e6} ms")
print(f"Total Duration: {total_duration_seconds / 1e6} ms")

Input A1: [0.31239662 0.409874   0.5589953  0.11191899 0.6037267  0.35863647
 0.0052537  0.36380172 0.18253312 0.5283162 ]
Input B1: [0.9121277  0.13430215 0.99507695 0.62651473 0.28930062 0.04872453
 0.4056671  0.8061638  0.23065333 0.20252344]
Output C1 (A1 + B1): [1.2245243  0.54417616 1.5540723  0.7384337  0.8930273  0.407361
 0.4109208  1.1699655  0.41318643 0.7308396 ]

Input A2: [0.16111054 0.39645186 0.35572973 0.60758543 0.6685644  0.01852382
 0.86826426 0.15966025 0.34373823 0.51897097]
Input B2: [0.0048693  0.06192825 0.84782743 0.4327213  0.95231867 0.23369873
 0.72776264 0.9471239  0.11362476 0.749018  ]
Output C2 (A2 + B2): [0.16597983 0.4583801  1.2035571  1.0403067  1.620883   0.25222254
 1.5960269  1.1067841  0.45736298 1.2679889 ]
Execution time for queue1: 0.069 ms
Execution time for queue2: 0.014125 ms
Total Duration: 0.47475 ms


In [9]:
import numpy as np
import pyopencl as cl

#create OpenCL context and device
platform = cl.get_platforms()[0]
device = platform.get_devices()[0]
context = cl.Context([device])

#create multiple command queues
queue1 = cl.CommandQueue(context, properties=cl.command_queue_properties.PROFILING_ENABLE) # Use profiling to observe time
queue2 = cl.CommandQueue(context, properties=cl.command_queue_properties.PROFILING_ENABLE)

#create input data for matrix multiplication
matrix_size = 1024
a1 = np.random.rand(matrix_size, matrix_size).astype(np.float32)
b1 = np.random.rand(matrix_size, matrix_size).astype(np.float32)
c1 = np.empty_like(a1)

a2 = np.random.rand(matrix_size, matrix_size).astype(np.float32)
b2 = np.random.rand(matrix_size, matrix_size).astype(np.float32)
c2 = np.empty_like(a2)

#create OpenCL memory buffers
mf = cl.mem_flags #assign an object in GPU
a1_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=a1) #assign content in host(a) into mf object
b1_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=b1)
c1_buf = cl.Buffer(context, mf.WRITE_ONLY, c1.nbytes)

a2_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=a2)
b2_buf = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=b2)
c2_buf = cl.Buffer(context, mf.WRITE_ONLY, c2.nbytes)

#write OpenCL kernel for matrix multiplication
program = cl.Program(context, """
__kernel void mat_mul(__global const float *a,
                      __global const float *b,
                      __global float *c,
                      const unsigned int matrix_size) {
    int row = get_global_id(0);
    int col = get_global_id(1);
    float sum = 0.0f;
    for (int i = 0; i < matrix_size; i++) {
        sum += a[row * matrix_size + i] * b[i * matrix_size + col];
    }
    c[row * matrix_size + col] = sum;
}
""").build() # Here no for loop as they are executed simultaneously

#execute kernel, queue1 processes the first matrix pair
global_work_size = (matrix_size, matrix_size)
event1 = program.mat_mul(queue1, global_work_size, None, a1_buf, b1_buf, c1_buf, np.uint32(matrix_size))
event2 = program.mat_mul(queue2, global_work_size, None, a2_buf, b2_buf, c2_buf, np.uint32(matrix_size))

#copy results from device to host
event1_copy = cl.enqueue_copy(queue1, c1, c1_buf, wait_for=[event1])
event2_copy = cl.enqueue_copy(queue2, c2, c2_buf, wait_for=[event2])

#wait for all events to complete
cl.wait_for_events([event1_copy, event2_copy])

#calculate total execution time
event1_start = event1.profile.start
event1_end = event1.profile.end
event1_duration = (event1_end - event1_start) * 1e-6 # Convert nanoseconds to milliseconds

event2_start = event2.profile.start
event2_end = event2.profile.end
event2_duration = (event2_end - event2_start) * 1e-6 # Convert nanoseconds to milliseconds

total_duration_start = min(event1_start, event2_start)
total_duration_end = max(event1_end, event2_end)
total_duration = (total_duration_end - total_duration_start) * 1e-6 # Convert nanoseconds to milliseconds

print(f"Event 1 Duration: {event1_duration} ms")
print(f"Event 2 Duration: {event2_duration} ms")
print(f"Total Duration: {total_duration} ms")

Event 1 Duration: 129.01887499999998 ms
Event 2 Duration: 127.947791 ms
Total Duration: 129.44424999999998 ms


In [2]:
pip install python-binance

Collecting python-binance
  Downloading python_binance-1.0.19-py2.py3-none-any.whl.metadata (11 kB)
Collecting aiohttp (from python-binance)
  Downloading aiohttp-3.9.5-cp311-cp311-macosx_10_9_x86_64.whl.metadata (7.5 kB)
Collecting pycryptodome (from python-binance)
  Downloading pycryptodome-3.20.0-cp35-abi3-macosx_10_9_x86_64.whl.metadata (3.4 kB)
Collecting aiosignal>=1.1.2 (from aiohttp->python-binance)
  Using cached aiosignal-1.3.1-py3-none-any.whl.metadata (4.0 kB)
Collecting frozenlist>=1.1.1 (from aiohttp->python-binance)
  Downloading frozenlist-1.4.1-cp311-cp311-macosx_10_9_x86_64.whl.metadata (12 kB)
Collecting multidict<7.0,>=4.5 (from aiohttp->python-binance)
  Downloading multidict-6.0.5-cp311-cp311-macosx_10_9_x86_64.whl.metadata (4.2 kB)
Collecting yarl<2.0,>=1.0 (from aiohttp->python-binance)
  Downloading yarl-1.9.4-cp311-cp311-macosx_10_9_x86_64.whl.metadata (31 kB)
Downloading python_binance-1.0.19-py2.py3-none-any.whl (69 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━

In [3]:
from binance.client import Client

api_key = 'bOGwQ5LEK1vXTvxawsrjy4xVhtMqXL6iDfQ7gVKbDQ48IMB7OxQlifv20AQ2lFbR'
api_secret = '9Z57Z45NrVev9L3H58FKMNXDDBdHAzWcm6uziqYt2M3dlyw4EMnODjZK593eWsMX'

# Create a Binance client instance
client = Client(api_key, api_secret)

# Get the latest price of a symbol
symbol = 'BTCUSDT'
price = client.get_symbol_ticker(symbol=symbol)
print(f"The latest price of {symbol} is {price['price']}")

The latest price of BTCUSDT is 58180.01000000


In [4]:
import pyopencl as cl
import numpy as np
import pandas as pd
from binance.client import Client

# 初始化 Binance 客户端
api_key = 'bOGwQ5LEK1vXTvxawsrjy4xVhtMqXL6iDfQ7gVKbDQ48IMB7OxQlifv20AQ2lFbR'
api_secret = '9Z57Z45NrVev9L3H58FKMNXDDBdHAzWcm6uziqYt2M3dlyw4EMnODjZK593eWsMX'
client = Client(api_key, api_secret)

# 获取市场数据示例
def get_market_data(symbol='BTCUSDT'):
    klines = client.get_klines(symbol=symbol, interval=Client.KLINE_INTERVAL_1MINUTE, limit=100)
    data = pd.DataFrame(klines, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'])
    data['timestamp'] = pd.to_datetime(data['timestamp'], unit='ms')
    data.set_index('timestamp', inplace=True)
    return data[['open', 'high', 'low', 'close', 'volume']].astype(float)

# 初始化 OpenCL 环境
def init_opencl():
    platform = cl.get_platforms()[0]
    device = platform.get_devices()[0]
    context = cl.Context([device])
    queue = cl.CommandQueue(context)
    return context, queue, device

# OpenCL 内核代码示例
kernel_code = """
__kernel void process_data(__global const float *input, __global float *output, const unsigned int n) {
    int i = get_global_id(0);
    if (i < n) {
        // 简单处理：计算平方
        output[i] = input[i] * input[i];
    }
}
"""

# 处理数据
def process_with_opencl(data):
    context, queue, device = init_opencl()
    program = cl.Program(context, kernel_code).build()
    mf = cl.mem_flags

    # 转换数据为 numpy 数组
    input_data = data.values.flatten().astype(np.float32)
    n = input_data.size

    # 创建 OpenCL 缓冲区
    input_buffer = cl.Buffer(context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=input_data)
    output_buffer = cl.Buffer(context, mf.WRITE_ONLY, input_data.nbytes)

    # 执行 OpenCL 内核
    program.process_data(queue, (n,), None, input_buffer, output_buffer, np.uint32(n))

    # 读取结果
    output_data = np.empty_like(input_data)
    cl.enqueue_copy(queue, output_data, output_buffer).wait()

    # 将结果转换回 DataFrame
    output_df = pd.DataFrame(output_data.reshape(data.shape), columns=data.columns, index=data.index)
    return output_df

if __name__ == "__main__":
    market_data = get_market_data()
    print("原始数据：")
    print(market_data.head())

    processed_data = process_with_opencl(market_data)
    print("处理后数据：")
    print(processed_data.head())

原始数据：
                         open      high       low     close    volume
timestamp                                                            
2024-07-12 17:07:00  57982.00  58037.43  57981.99  58018.00  10.29655
2024-07-12 17:08:00  58018.00  58040.00  58006.38  58040.00   6.48215
2024-07-12 17:09:00  58040.00  58070.00  58040.00  58055.00   7.06893
2024-07-12 17:10:00  58054.99  58090.00  58046.00  58090.00   6.90224
2024-07-12 17:11:00  58089.99  58130.92  58089.99  58130.92  15.55394
处理后数据：
                             open          high           low         close  \
timestamp                                                                     
2024-07-12 17:07:00  3.361912e+09  3.368343e+09  3.361911e+09  3.366088e+09   
2024-07-12 17:08:00  3.366088e+09  3.368642e+09  3.364740e+09  3.368642e+09   
2024-07-12 17:09:00  3.368642e+09  3.372125e+09  3.368642e+09  3.370383e+09   
2024-07-12 17:10:00  3.370382e+09  3.374448e+09  3.369338e+09  3.374448e+09   
2024-07-12 17:11:00  3.

In [None]:
import time
import requests
from typing import List, Dict
import json
import pyopencl as cl
import torch
import numpy as np


class OrderBookClient:
    def __init__(self, symbols: List[str], polling_interval: int = 5):
        self._symbols: List[str] = symbols
        self._polling_interval = polling_interval
        self._lookup_snapshot_id: Dict[str, int] = dict()
        self._lookup_update_id: Dict[str, int] = dict()
        self._bids = {}
        self._asks = {}

        self._closed = False

        self.context, self.queue = self.setup_opencl()

    def _polling(self):
        while not self._closed:
            for symbol in self._symbols:
                self.get_snapshot(symbol)
            time.sleep(self._polling_interval)

    def _log_message(self, msg: str) -> None:
        print(msg)
        return

    def get_snapshot(self, symbol: str):
        snapshot_url = f"https://api.binance.com/api/v3/depth?symbol={symbol}&limit=1000"
        x = requests.get(snapshot_url)
        content = x.content.decode("utf-8")
        data = json.loads(content)
        self._lookup_snapshot_id[symbol] = data["lastUpdateId"]
        self._bids[symbol] = np.array(data['bids'], dtype=float)
        self._asks[symbol] = np.array(data['asks'], dtype=float)
        self._log_message(content)
        self._update_order_book(symbol)
        return

    def _update_order_book(self, symbol):
        bids = self._bids[symbol]
        asks = self._asks[symbol]
        
        # 数据处理
        self._bids[symbol], self._asks[symbol] = self.process_data(bids, asks)

        # 使用OpenCL进行并行处理
        processed_bids = self.parallel_processing(self._bids[symbol])
        processed_asks = self.parallel_processing(self._asks[symbol])
        
        # 使用PyTorch进行复杂计算
        bid_mean = self.pytorch_processing(processed_bids)
        ask_mean = self.pytorch_processing(processed_asks)
        
        print(f'Symbol: {symbol}, Bid Mean: {bid_mean}, Ask Mean: {ask_mean}')

    def setup_opencl(self):
        platform = cl.get_platforms()[0]
        device = platform.get_devices()[0]
        context = cl.Context([device])
        queue = cl.CommandQueue(context)
        return context, queue

    def process_data(self, bids, asks):
        # 数据排序、分组和计算
        return bids, asks

    def parallel_processing(self, data):
        kernel_code = """
        __kernel void square(__global float *data) {
            int id = get_global_id(0);
            data[id] *= data[id];
        }
        """
        program = cl.Program(self.context, kernel_code).build()
        data_buf = cl.Buffer(self.context, cl.mem_flags.READ_WRITE | cl.mem_flags.COPY_HOST_PTR, hostbuf=data)
        program.square(self.queue, data.shape, None, data_buf)
        cl.enqueue_copy(self.queue, data, data_buf).wait()
        return data

    def pytorch_processing(self, data):
        tensor = torch.tensor(data)
        return torch.mean(tensor, dim=0)

    def start(self) -> bool:
        self._polling()
        return True

    def stop(self) -> bool:
        self._closed = True
        return True


def main():
    symbols = ["BTCUSDT", "ETHUSDT"]
    orderbook_client = OrderBookClient(symbols, polling_interval=5)
    try:
        orderbook_client.start()
    except KeyboardInterrupt:
        orderbook_client.stop()

if __name__ == '__main__':
    main()

In [1]:
import numpy as np
import pyopencl as cl

class GPUProcessing:
    def __init__(self):
        self.context, self.queue = self.setup_opencl()

    def setup_opencl(self):
        platform = cl.get_platforms()[0]
        device = platform.get_devices()[0]
        context = cl.Context([device])
        queue = cl.CommandQueue(context)
        return context, queue

    def parallel_process(self, data):
        mf = cl.mem_flags
        data_buf = cl.Buffer(self.context, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=data)
        output_buf = cl.Buffer(self.context, mf.WRITE_ONLY, data.nbytes)
        prg = cl.Program(self.context, '''
        __kernel void double_data(__global const float *data, __global float *output) {
            int i = get_global_id(0);
            output[i] = data[i] * 2;
        }
        ''').build()
        prg.double_data(self.queue, data.shape, None, data_buf, output_buf)
        result = np.empty_like(data)
        cl.enqueue_copy(self.queue, result, output_buf).wait()
        return result

In [2]:
class OrderBookClient:
    def __init__(self, symbols, api_url, rate_limiter):
        self.symbols = symbols
        self.api_url = api_url
        self.rate_limiter = rate_limiter
        self.gpu_processor = GPUProcessing()  # 初始化 GPU 处理器

    def get_snapshot(self, symbol):
        self.rate_limiter.wait()
        url = f"{self.api_url}/api/v3/depth?symbol={symbol}&limit=1000"
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            self.validate_data(data, symbol)
            self.store_data(data, symbol)
            self.parallel_process_data(data)  # 并行处理数据
            return data
        else:
            raise Exception(f"API request failed with status {response.status_code}: {response.text}")

    def parallel_process_data(self, data):
        # 将数据转换为 NumPy 数组并进行处理
        bids = np.array(data['bids'], dtype=float)
        asks = np.array(data['asks'], dtype=float)
        processed_bids = self.gpu_processor.parallel_process(bids[:, 0])  # 处理价格部分
        processed_asks = self.gpu_processor.parallel_process(asks[:, 0])
        print("Processed Bids:", processed_bids)
        print("Processed Asks:", processed_asks)

In [5]:
import unittest

class TestOrderBookClient(unittest.TestCase):
    def test_parallel_processing(self):
        client = OrderBookClient(['BTCUSDT'], 'https://api.binance.com', RateLimiter(10, 60))
        data = client.get_snapshot('BTCUSDT')
        client.parallel_process_data(data)
        # 在这里可以添加断言来检查处理结果

if __name__ == '__main__':
    unittest.main()

usage: ipykernel_launcher.py [-h] [-v] [-q] [--locals] [-f] [-c] [-b]
                             [-k TESTNAMEPATTERNS]
                             [tests ...]
ipykernel_launcher.py: error: argument -f/--failfast: ignored explicit argument '/Users/bryanwang/Library/Jupyter/runtime/kernel-v2-774SYm9XVvF9YJR.json'


AttributeError: 'tuple' object has no attribute 'tb_frame'