In [1]:
import ibis
from ibis import _

import ibis.expr.operations as ops
import ibis.expr.rules as rlz
import ibis.expr.datatypes as dt
import sqlalchemy as sa

In [2]:
ibis.options.interactive = True

In [4]:
con = ibis.duckdb.connect("/data/ticks.ddb")
ticks = con.tables.ticks

In [85]:
ticks.head()

Unnamed: 0,time,symbol,price,day_volume
0,2022-04-06 23:59:58,SOL/USD,112.940002,2077074
1,2022-04-06 23:59:57,USDT/USD,1.0001,154953973
2,2022-04-06 23:59:57,ADA/USD,1.053,25820067
3,2022-04-06 23:59:57,NEAR/USD,15.267,3768144
4,2022-04-06 23:59:56,AVAX/USD,82.93,221013


In [17]:
class Range(ops.Value):
    start = rlz.timestamp
    stop = rlz.timestamp
    step = rlz.interval
    
    output_dtype = dt.Array(dt.timestamp)
    output_shape = rlz.Shape.COLUMNAR
    
    
@ibis.duckdb.add_operation(Range)
def compile_range(t, expr):
    op = expr.op()
    start = t.translate(op.start)
    stop = t.translate(op.stop)
    step = t.translate(op.step)
    return sa.func.generate_series(start, stop, step)


def ibis_range(start, stop, step):
    return Range(start, stop, step).to_expr()

In [80]:
min_max = ticks.aggregate(min=_.time.min(), max=_.time.max())
buckets = ibis_range(
    min_max.min.truncate('m'),
    (min_max.max + ibis.interval(minutes=1)).truncate('m'),
    ibis.interval(minutes=1)
).name("bucket").unnest().to_projection()
buckets.head()

Unnamed: 0,bucket
0,2022-04-05 12:05:00
1,2022-04-05 12:06:00
2,2022-04-05 12:07:00
3,2022-04-05 12:08:00
4,2022-04-05 12:09:00


In [79]:
ibis.util.psql(buckets)

SELECT
  UNNEST(GENERATE_SERIES(DATE_TRUNC(?, min), DATE_TRUNC(?, t0.max + INTERVAL '1 minute' ), INTERVAL '1 minute' )) AS bucket
FROM (
  SELECT
    MIN(t1.time) AS min,
    MAX(t1.time) AS max
  FROM ticks AS t1
) AS t0


In [70]:
buckets.count()

43202

In [71]:
class ArgMinMax(ops.Reduction):
    arg = rlz.any
    key = rlz.any
    
    output_dtype = rlz.dtype_like("arg")
    
    
class ArgMax(ArgMinMax):
    pass


class ArgMin(ArgMinMax):
    pass


def argmin(x, y):
    return ArgMin(x, y).to_expr()


def argmax(x, y):
    return ArgMax(x, y).to_expr()


@ibis.duckdb.add_operation(ArgMin)
def compile_argmin(t, expr):
    op = expr.op()
    arg = t.translate(op.arg)
    key = t.translate(op.key)
    return sa.func.arg_min(arg, key)
    
    
@ibis.duckdb.add_operation(ArgMax)
def compile_argmax(t, expr):
    op = expr.op()
    arg = t.translate(op.arg)
    key = t.translate(op.key)
    return sa.func.arg_max(arg, key)

In [86]:
expr = (
    (b := buckets)
        .left_join(
            t := ticks.filter([_.time.between('2022-04-10', '2022-04-17'), _.symbol == "BTC/USD"]),
            [
                t.time >= b.bucket,
                t.time < b.bucket + ibis.interval(minutes=1)
            ]
        ).aggregate(
            by=_.bucket,
            open=lambda t: argmin(t.price, t.time),
            high=_.price.max(),
            low=_.price.min(),
            close=lambda t: argmax(t.price, t.time),
        ).sort_by(_.bucket)
)
ibis.util.psql(expr)
expr

SELECT
  bucket,
  ARG_MIN(price, time) AS open,
  MAX(price) AS high,
  MIN(price) AS low,
  ARG_MAX(price, time) AS close
FROM (
  SELECT
    UNNEST(GENERATE_SERIES(DATE_TRUNC(?, min), DATE_TRUNC(?, t2.max + INTERVAL '1 minute' ), INTERVAL '1 minute' )) AS bucket
  FROM (
    SELECT
      MIN(t3.time) AS min,
      MAX(t3.time) AS max
    FROM ticks AS t3
  ) AS t2
) AS t0
LEFT OUTER JOIN (
  SELECT
    t2.time AS time,
    t2.symbol AS symbol,
    t2.price AS price,
    t2.day_volume AS day_volume
  FROM ticks AS t2
  WHERE
    t2.time BETWEEN CAST(? AS TEXT) AND CAST(? AS TEXT)
    AND t2.symbol = CAST(? AS TEXT)
) AS t1
  ON t1.time >= t0.bucket
  AND t1.time < t0.bucket + INTERVAL '1 minute' 
GROUP BY
  bucket
ORDER BY
  bucket


Unnamed: 0,bucket,open,high,low,close
0,2022-04-10 00:00:00,42771.601562,42788.000000,42756.101562,42785.398438
1,2022-04-10 00:01:00,42770.500000,42782.101562,42764.699219,42772.101562
2,2022-04-10 00:02:00,42768.699219,42772.101562,42722.699219,42723.199219
3,2022-04-10 00:03:00,42753.199219,42753.199219,42717.398438,42717.398438
4,2022-04-10 00:04:00,42726.101562,42755.101562,42726.101562,42755.101562
...,...,...,...,...,...
9995,2022-04-16 22:37:00,40483.398438,40483.398438,40478.398438,40480.101562
9996,2022-04-16 22:38:00,40482.101562,40482.101562,40471.300781,40479.601562
9997,2022-04-16 22:39:00,40495.898438,40499.199219,40463.898438,40472.199219
9998,2022-04-16 22:40:00,40471.699219,40472.199219,40468.601562,40471.500000


In [73]:
df = expr.execute()

In [74]:
import plotly.graph_objects as go

In [75]:
fig = go.Figure(data=[go.Candlestick(x=df['bucket'],
                open=df['open'],
                high=df['high'],
                low=df['low'],
                close=df['close'])])
fig.show()