In [1]:
import pyarrow.dataset as ds
import polars as pl
from datetime import datetime
%load_ext memory_profiler
import time



In [2]:
data =  ds.dataset("~/internal/data/trade_taq2019.arrow", format = "arrow")

In [3]:
def window(data):
    
    # make the scanner
    print("data into scan...")
    data = pl.scan_ds(data, allow_pyarrow_filter=True)
    
    # QUERY
    print("running window query...")
    results = data.filter(
        (pl.col("Symbol") == "AAPL")&
        (pl.col("Sale_Condition").str.contains("O"))&
        (pl.col("Time").is_between(datetime(2019,10,7,9,30), datetime(2019,10,7,16,0)))
    )
    
    
    return results

query = window(data)

result=query.collect()
result

%timeit %memit

data into scan...
running window query...
peak memory: 4668.07 MiB, increment: 0.00 MiB
peak memory: 4633.32 MiB, increment: -26.45 MiB
peak memory: 4047.43 MiB, increment: -347.94 MiB
peak memory: 2127.42 MiB, increment: -728.03 MiB
peak memory: 1202.44 MiB, increment: -92.51 MiB
peak memory: 1186.02 MiB, increment: 0.00 MiB
peak memory: 1186.06 MiB, increment: 0.00 MiB
peak memory: 1186.06 MiB, increment: 0.00 MiB
313 ms ± 64.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [4]:
def agg(data):
    
    data = pl.scan_ds(data, allow_pyarrow_filter=True)
    
    #sort the data first
    print("Sorting on time first...")
    data = data.sort("Time")
    
    #run the agg query
    print("running query now")
    
    result = data.groupby_dynamic(index_column="Time", every="1m", period="1m", by="Symbol").agg(
    [
        pl.col("Trade_Volume").sum().alias("Total_vol")
    ])
    
    return result

query = agg(data)
result = query.collect()
result
%timeit %memit 


Sorting on time first...
running query now
peak memory: 1284.57 MiB, increment: -45.26 MiB
peak memory: 1284.58 MiB, increment: 0.00 MiB
peak memory: 1284.58 MiB, increment: 0.00 MiB
peak memory: 1284.58 MiB, increment: 0.00 MiB
peak memory: 1284.58 MiB, increment: 0.00 MiB
peak memory: 1284.58 MiB, increment: 0.00 MiB
peak memory: 1284.58 MiB, increment: 0.00 MiB
peak memory: 1284.58 MiB, increment: 0.00 MiB
272 ms ± 4.04 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [5]:
def newagg(data):
    
    data = pl.scan_ds(data, allow_pyarrow_filter=True)
    print ("running query...")
    result = data.groupby(["Symbol","Exchange"]).agg(
        [
            pl.col("Trade_Price").mean().alias("avg price"),
            pl.col("Trade_Volume").mean().alias("avg volume")
        ]
    ).sort("avg price", reverse = True)
    
    return result
    
query = newagg(data)
result = query.collect()
result
%timeit %memit 


running query...
peak memory: 2062.24 MiB, increment: -122.69 MiB
peak memory: 1720.03 MiB, increment: -157.34 MiB
peak memory: 1581.11 MiB, increment: -58.12 MiB
peak memory: 1466.73 MiB, increment: -11.58 MiB
peak memory: 1462.02 MiB, increment: 0.00 MiB
peak memory: 1462.02 MiB, increment: 0.00 MiB
peak memory: 1460.13 MiB, increment: 0.00 MiB
peak memory: 1455.33 MiB, increment: -4.80 MiB
279 ms ± 13.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
#data ingest of the bigger datset
data = ds.dataset("~/internal/data/quote_taq2019.arrow", format = "arrow")


In [7]:
def bagg(data):

    data = pl.scan_ds(data)
    print("sorting on time")
    data = data.sort("Time")

    #QUERY
    print("running big query")
    data = data.groupby_dynamic(index_column="Time", every="1h", period="1h", by="Symbol").agg(
            [
                (0.5*(pl.col("Bid_Price")+pl.col("Offer_Price")).mean()).alias("mid"),
                pl.col("Time").min().alias("min_time"),
                pl.col("Time").max().alias("max_time")

                ]
            ).sort("Time")
    return data

query = bagg(data)
result = query.collect()
result


%timeit %memit

sorting on time
running big query
peak memory: 1475.67 MiB, increment: 0.00 MiB
peak memory: 1475.67 MiB, increment: 0.00 MiB
peak memory: 1475.67 MiB, increment: 0.00 MiB
peak memory: 1475.67 MiB, increment: 0.00 MiB
peak memory: 1475.67 MiB, increment: 0.00 MiB
peak memory: 1475.68 MiB, increment: 0.00 MiB
peak memory: 1475.68 MiB, increment: 0.00 MiB
peak memory: 1475.68 MiB, increment: 0.00 MiB
289 ms ± 7.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
