In [None]:
import pyarrow as pa
import pandas as pd
df = pd.DataFrame({'n_legs': [2, 4, 5, 100],
                   'animals': ["Flamingo", "Horse", "Brittle stars", "Centipede"]})
table = pa.Table.from_pandas(df)
print(table.to_batches()[0])


recBatchOutSchema = table.schema
recBatchOutColumns = table.column_names
recBatchOutDict = {}
for name in recBatchOutColumns:
    recBatchOutDict[name] = []
emptyRecBatchOut = pa.RecordBatch.from_pydict(
    mapping=recBatchOutDict, schema=recBatchOutSchema)

print(emptyRecBatchOut.column(0))

In [None]:
import pyarrow as pa

# 定义两个RecordBatch对象
# 假设我们有两个包含相同列的RecordBatch

# 第一个RecordBatch的数据
data1 = [
    pa.array([1, 2, 3]),
    pa.array(['a', 'b', 'c'])
]

# 第一个RecordBatch
record_batch1 = pa.RecordBatch.from_arrays(data1, names=['id', 'letter'])

# 第二个RecordBatch的数据
data2 = [
    pa.array([4, 5, 6]),
    pa.array(['d', 'e', 'f'])
]

# 第二个RecordBatch
record_batch2 = pa.RecordBatch.from_arrays(data2, names=['id', 'letter'])

# 假设您有一个RecordBatch列表
record_batch_list = [record_batch1, record_batch2]

# 将所有RecordBatch合并成一个Table
table = pa.Table.from_batches(record_batch_list)

# 将Table转换回一个单一的RecordBatch
# 注意：这个操作假设所有RecordBatch的总大小可以放入内存
combined_record_batch = pa.RecordBatch.from_pandas(table.to_pandas())

# 现在 combined_record_batch 是一个包含所有数据的单一RecordBatch


In [None]:
from tqdm import tqdm
import time

# 例如，一个简单的循环
for i in tqdm(range(100),desc="test",total=10):
    # 模拟你的任务
    time.sleep(0.1)  # 模拟任务需要的时间


In [None]:
import pyarrow as pa
import numpy as np
import pyarrow.compute as pc

array1 = pa.array(np.arange(1, 100_000_000 + 1, dtype=np.int64))
array2 = pa.array([2] * 100_000_000, type=pa.int64())

table = pa.RecordBatch.from_arrays([array1, array2], names=['a', 'b'])

def test():
    column_name = 'a'
    n = 3  # 分割成3个RecordBatch

    # 获取列值
    column_values = table.column(column_name)

    # 计算哈希值并进行模运算
    hash_mod_n = np.array([hash(value.as_py()) % n for value in column_values])

    # 初始化一个列表来存储分割后的 RecordBatches
    record_batches = [table.filter(pa.array(hash_mod_n == i)) for i in range(n)]


from memory_profiler import memory_usage

mem_usage = memory_usage(test)

print(mem_usage)


In [6]:
import pyarrow as pa

# 创建自定义内存池
class MyMemoryPool(pa.MemoryPool):
    def __init__(self):
        ...
        #super(MyMemoryPool, self).__init__()

    # 重写分配内存的方法
    def Allocate(self, size):
        # 这里可以添加自定义的内存分配逻辑
        print("Allocate", size)
        return super(MyMemoryPool, self).Allocate(size)

# 实例化自定义内存池
# custom_memory_pool = pa.mimalloc_memory_pool() #MyMemoryPool()
custom_memory_pool = MyMemoryPool()

# 使用自定义内存池创建数组
# array = pa.array([1, 2, 3, 4, 5], memory_pool=custom_memory_pool)
array = pa.array([1, 2, 3, 4, 5], memory_pool=custom_memory_pool)

# 检查数组是否使用了自定义内存池
# print(array.memory_pool is custom_memory_pool)  # 应该输出 True
