普通算子测试

In [None]:
class Operator:
    def __init__(self):
        self.child = None

    def next(self):
        pass

class Scan(Operator):
    def __init__(self, data):
        super().__init__()
        self.data = data
        self.index = 0

    def next(self):
        if self.index < len(self.data):
            result = self.data[self.index]
            self.index += 1
            return result
        else:
            return None

class Join(Operator):
    def __init__(self, left_child, right_child, join_condition):
        super().__init__()
        self.left_child = left_child
        self.right_child = right_child
        self.join_condition = join_condition
        self.left_tuple = None
        self.right_tuple = None

    def next(self):
        while True:
            if self.left_tuple is None:
                self.left_tuple = self.left_child.next()
                if self.left_tuple is None:
                    return None
                self.right_child.index = 0

            self.right_tuple = self.right_child.next()
            if self.right_tuple is None:
                self.left_tuple = None
                continue

            if self.join_condition(self.left_tuple, self.right_tuple):
                result = {**self.left_tuple, **self.right_tuple}
                return result

class Projection(Operator):
    def __init__(self, child, columns):
        super().__init__()
        self.child = child
        self.columns = columns

    def next(self):
        tuple = self.child.next()
        if tuple is None:
            return None
        
        result = {column: tuple[column] for column in self.columns}
        return result

class Selection(Operator):
    def __init__(self, child, predicate):
        super().__init__()
        self.child = child
        self.predicate = predicate

    def next(self):
        while True:
            tuple = self.child.next()
            if tuple is None:
                return None
            
            if self.predicate(tuple):
                return tuple

# 示例数据
data1 = [
    {'id': 1, 'name': 'John', 'age': 25},
    {'id': 2, 'name': 'Jane', 'age': 30},
    {'id': 3, 'name': 'Bob', 'age': 35}
]

data2 = [
    {'id': 1, 'city': 'New York'},
    {'id': 2, 'city': 'London'},
    {'id': 3, 'city': 'Paris'}
]

# # 构建查询计划
# scan1 = Scan(data1)
# scan2 = Scan(data2)
# join = Join(scan1, scan2, lambda t1, t2: t1['id'] == t2['id'])
# projection = Projection(join, ['name', 'age', 'city'])
# selection = Selection(projection, lambda t: t['age'] <= 30)

# # 执行查询计划
# while True:
#     tuple = selection.next()
#     if tuple is None:
#         break
#     print(tuple)
scan0 = Scan(data1)
projection0 = Projection(scan0, ['name','age'])
selection0 = Selection(projection0, lambda t: t['age'] <= 30)
while True:
    tuple = selection0.next()
    if tuple is None:
        break
    print(tuple)

创建ONNX模型操作张量测试

In [None]:
import numpy as np
import onnxruntime as ort
from onnx import helper, TensorProto

# 创建自定义的ONNX模型
def create_model(min_val, max_val):
    # 定义输入张量
    input_tensor = helper.make_tensor_value_info('input', TensorProto.FLOAT, [None, None])

    # 定义常量张量
    min_const = helper.make_tensor('min', TensorProto.FLOAT, [], [min_val])
    max_const = helper.make_tensor('max', TensorProto.FLOAT, [], [max_val])

    # 定义Greater和Less节点
    greater_node = helper.make_node('Greater', ['input', 'min'], ['greater_output'])
    less_node = helper.make_node('Less', ['input', 'max'], ['less_output'])

    # 定义And节点
    and_node = helper.make_node('And', ['greater_output', 'less_output'], ['and_output'])

    # 定义Where节点
    where_node = helper.make_node('Where', ['and_output', 'input', 'zero'], ['output'])

    # 定义输出张量
    output_tensor = helper.make_tensor_value_info('output', TensorProto.FLOAT, [None, None])

    # 创建Graph
    graph_def = helper.make_graph(
        [greater_node, less_node, and_node, where_node],
        'clip_model',
        [input_tensor],
        [output_tensor],
        [min_const, max_const, helper.make_tensor('zero', TensorProto.FLOAT, [], [0.0])]
    )

    # 创建Model
    model_def = helper.make_model(graph_def, producer_name='clip_model')
    return model_def

# 创建ONNX模型
min_val = 0.2
max_val = 0.4
model_def = create_model(min_val, max_val)

# 将模型序列化为字节数组
model_bytes = model_def.SerializeToString()

# 创建推理会话
session = ort.InferenceSession(model_bytes)

# 输入数据
input_data = np.random.rand(3, 4).astype(np.float32)
print("Input data:")
print(input_data)

# 运行推理
output_data = session.run(None, {'input': input_data})[0]

print("Output data:")
print(output_data)


调用arrow格式表测试

In [None]:
import pyarrow as pa
import numpy as np
import onnxruntime as ort

# 创建Arrow表
data = [
    pa.array([1, 2, 3, 4, 5]),
    pa.array([6.0, 7.0, 8.0, 9.0, 10.0]),
    pa.array(['a', 'b', 'c', 'd', 'e'])
]
schema = pa.schema([
    ('int_column', pa.int64()),
    ('float_column', pa.float64()),
    ('string_column', pa.string())
])
table = pa.Table.from_arrays(data, schema=schema)

# 将Arrow表转换为张量
tensor_data = []
for column in table.columns:
    np_array = column.to_numpy()
    tensor_data.append(np_array)

tensor = np.stack(tensor_data, axis=1)
print(tensor)
# # 创建ONNX会话并运行模型
# session = ort.InferenceSession('path/to/your/model.onnx')
# input_name = session.get_inputs()[0].name
# output_name = session.get_outputs()[0].name

# result = session.run([output_name], {input_name: tensor.astype(np.float32)})[0]

# print(result)


普通算子无join查询测试

In [None]:
class Scan():
    def __init__(self, data):
        self.data = data
        self.index = 0

    def next(self):
        if self.index < len(self.data):
            result = self.data[self.index]
            self.index += 1
            return result
        else:
            return None

class Projection():
    def __init__(self, child, columns):
        self.child = child
        self.columns = columns

    def next(self):
        tuple = self.child.next()
        if tuple is None:
            return None
        
        result = {column: tuple[column] for column in self.columns}
        return result

class Selection():
    def __init__(self, child, predicate):
        self.child = child
        self.predicate = predicate

    def next(self):
        while True:
            tuple = self.child.next()
            if tuple is None:
                return None
            
            if self.predicate(tuple):
                return tuple

# 示例数据
data1 = [
    {'id': 1, 'name': 'John', 'age': 25},
    {'id': 2, 'name': 'Jane', 'age': 30},
    {'id': 3, 'name': 'Bob', 'age': 35}
]

data2 = [
    {'id': 1, 'city': 'New York'},
    {'id': 2, 'city': 'London'},
    {'id': 3, 'city': 'Paris'}
]

# 构建查询计划
scan0 = Scan(data1)
projection0 = Projection(scan0, ['name','age'])
selection0 = Selection(projection0, lambda t: t['age'] <= 30)

# 执行查询计划
while True:
    tuple = selection0.next()
    if tuple is None:
        break
    print(tuple)


张量算子无join测试

In [14]:
import pyarrow as pa
import numpy as np
import torch
import torch.nn as nn
import onnxruntime as ort
import onnx

class Scan():
    def __init__(self, data):
        self.data = data
        self.index = 0

    def next(self):
        if self.index < len(self.data):
            result = self.data.slice(self.index, 1).to_pandas().iloc[0].to_numpy()
            result = result[np.newaxis, :]
            self.index += 1
            return result
        else:
            return None

class ProjectionModel(nn.Module):
    def __init__(self, proj):
        super(ProjectionModel, self).__init__()
        self.proj = proj

    def forward(self, x):
        return x[:, self.proj]

class Projection():
    def __init__(self, child, columns):
        self.child = child
        self.columns = columns

    def next(self):
        tensor = self.child.next()
        if tensor is None:
            return None
        tensor = tensor.astype(np.float32)
        model = ProjectionModel(self.columns)
        # 将模型转换为ONNX格式
        dummy_input = torch.randn(tensor.shape[0], tensor.shape[1])
        torch.onnx.export(model, dummy_input, "projection_model.onnx", input_names=["input"], output_names=["output"])
        # 使用ONNX Runtime执行模型
        ort_session = ort.InferenceSession("projection_model.onnx")
        # 运行模型并获取输出
        ort_inputs = {"input": tensor}
        ort_outputs = ort_session.run(None, ort_inputs)
        # 输出结果
        result = ort_outputs[0]
        return result

class SelectionModel(nn.Module):
    def __init__(self, predicate):
        super(SelectionModel, self).__init__()
        self.predicate = predicate

    def forward(self, x):
        mask = self.predicate(x)
        # print("mask")
        return mask

class Selection():
    def __init__(self, child, predicate):
        self.child = child
        self.predicate = predicate

    def next(self):
        while True:
            tensor = self.child.next()
            if tensor is None:
                return None
            tensor = tensor.astype(np.float32)
            model = SelectionModel(self.predicate)
            dummy_input = torch.randn(tensor.shape[0], tensor.shape[1])
            torch.onnx.export(model, dummy_input, "selection_model.onnx", input_names=["input"], output_names=["output"])
            ort_session = ort.InferenceSession("selection_model.onnx")
            ort_inputs = {"input": tensor}
            ort_outputs = ort_session.run(None, ort_inputs)
            # print(ort_outputs[0][0])
            if ort_outputs[0][0]:
                return tensor

# 示例数据
data = [
    {'id': 1, 'gender': 0, 'age': 38, 'length': 177.2},
    {'id': 2, 'gender': 1, 'age': 40, 'length': 178.8},
    {'id': 3, 'gender': 0, 'age': 35, 'length': 175.5}
]
data0 = pa.Table.from_pylist(data)
# 构建查询计划
scan0 = Scan(data0)
# print(scan0.next())
projection0 = Projection(scan0, [0, 2, 3])
selection0 = Selection(projection0, lambda t: t[:, 1] < 38)
# selection0 = Selection(projection0, lambda t: t[:, 2] == 177.2)

# 执行查询计划
while True:
    tuple = selection0.next()
    if tuple is None:
        break
    print(tuple)



verbose: False, log level: Level.ERROR

verbose: False, log level: Level.ERROR

verbose: False, log level: Level.ERROR

verbose: False, log level: Level.ERROR

verbose: False, log level: Level.ERROR

verbose: False, log level: Level.ERROR

[[  3.   35.  175.5]]


测试model

In [None]:
import torch
import torch.nn as nn
import numpy as np
import onnxruntime as ort

class PredicateModel(nn.Module):
    def __init__(self):
        super(PredicateModel, self).__init__()

    def forward(self, x, predicate):
        # 将谓词表达式应用于张量的每一行
        mask = predicate(x)

        # 根据条件筛选张量
        filtered_x = x[mask]

        return filtered_x

# 创建模型实例
model = PredicateModel()

# 生成示例输入数据
x = torch.tensor([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=torch.float32)
predicate = lambda x: (x[:, 1] > 4) & (x[:, 2] < 8)

# 将PyTorch模型转换为ONNX格式
dummy_input = (x, predicate)
torch.onnx.export(model, dummy_input, \'predicate_model.onnx\', opset_version=11)

# 使用ONNX Runtime执行模型
ort_session = ort.InferenceSession(\'predicate_model.onnx\')

# 准备输入数据
input_data = {
    \'x\': x.numpy(),
    \'predicate\': np.array([ord(c) for c in predicate.__name__], dtype=np.int64)
}

# 运行ONNX模型
ort_outputs = ort_session.run(None, input_data)

# 获取输出结果
filtered_x = ort_outputs[0]

print("Original tensor:")
print(x)
print("Filtered tensor:")
print(filtered_x)


In [None]:
import numpy as np
import torch
import torch.nn as nn
import onnxruntime as ort

# 定义模型
class SimpleModel(nn.Module):
    def __init__(self, columns_to_keep):
        super(SimpleModel, self).__init__()
        self.columns_to_keep = columns_to_keep

    def forward(self, x):
        return x[:, self.columns_to_keep]

# 创建模型实例
columns_to_keep = [1, 2]  # 保留第二列和第三列
model = SimpleModel(columns_to_keep)

# 将模型转换为ONNX格式
t = np.array([[1, 0, 25, 177.2]], dtype=np.float32)
dummy_input = torch.randn(t.shape[0], t.shape[1])
torch.onnx.export(model, dummy_input, "simple_model.onnx", input_names=["input"], output_names=["output"])

# 使用ONNX Runtime执行模型
ort_session = ort.InferenceSession("simple_model.onnx")

# 准备输入数据
input_data = np.array([[1, 0, 25, 177.2]], dtype=np.float32)

# 运行模型并获取输出
ort_inputs = {"input": input_data}
ort_outputs = ort_session.run(None, ort_inputs)

# 输出结果
output_data = ort_outputs[0]
print("Output:", output_data)
