In [2]:
import random
import time
from datetime import datetime, timedelta
from pathlib import Path

import polars as pl
from deltalake import DeltaTable

PATH = Path("./my-test-timelake")

In [3]:
def create_sample_data(
    num_rows: int = 100_000_0,
    asset_ids: list[str] = ["AAPL", "MSFT", "GOOG", "TSLA"],
    start_date: datetime = datetime(2023, 1, 1),
) -> pl.DataFrame:
    assert num_rows >= len(asset_ids), (
        "num_rows must be at least equal to the number of asset_ids"
    )
    rows_per_asset = num_rows // len(asset_ids)
    all_data = []

    for asset_id in asset_ids:
        dates = [start_date + timedelta(hours=i) for i in range(rows_per_asset)]
        prices = [
            round(100 + i * 0.05 + (i % 24) * 0.3 + random.uniform(-1, 1), 2)
            for i in range(rows_per_asset)
        ]
        volumes = [
            int(1000 + i * 2 + (i % 10) * 50 + random.randint(-20, 20))
            for i in range(rows_per_asset)
        ]
        asset_col = [asset_id] * rows_per_asset

        all_data.append(
            pl.DataFrame(
                {
                    "date": dates,
                    "asset_id": asset_col,
                    "price": prices,
                    "volume": volumes,
                }
            )
        )

    return pl.concat(all_data)


In [4]:
df = create_sample_data()
df.head(5)

date,asset_id,price,volume
datetime[μs],str,f64,i64
2023-01-01 00:00:00,"""AAPL""",99.99,990
2023-01-01 01:00:00,"""AAPL""",99.81,1054
2023-01-01 02:00:00,"""AAPL""",99.78,1122
2023-01-01 03:00:00,"""AAPL""",101.46,1149
2023-01-01 04:00:00,"""AAPL""",100.84,1195


In [5]:
df.shape

(1000000, 4)

### We test a little bit the polars code


In [6]:
df.write_delta(
    target=PATH,
    mode="overwrite",
)

In [7]:
dt = DeltaTable(PATH)

In [8]:
# We can open it in data wrangler!!
dt

DeltaTable()

In [9]:
dt.schema()

Schema([Field(date, PrimitiveType("timestamp_ntz"), nullable=True), Field(asset_id, PrimitiveType("string"), nullable=True), Field(price, PrimitiveType("double"), nullable=True), Field(volume, PrimitiveType("long"), nullable=True)])

In [10]:
dt.partitions()

[]

In [11]:
start = time.time()
pl.read_delta(dt)
end = time.time()
print(f"Read completed in {end - start:.3f} seconds")

Read completed in 0.009 seconds


In [12]:
dt.optimize.compact()

{'numFilesAdded': 1,
 'numFilesRemoved': 4,
 'filesAdded': '{"avg":9196174.0,"max":9196174,"min":9196174,"totalFiles":1,"totalSize":9196174}',
 'filesRemoved': '{"avg":4474110.5,"max":4474666,"min":4473499,"totalFiles":4,"totalSize":17896442}',
 'partitionsOptimized': 1,
 'numBatches': 980,
 'totalConsideredFiles': 4,
 'totalFilesSkipped': 0,
 'preserveInsertionOrder': True}

In [13]:
start = time.time()
pl.read_delta(dt)
end = time.time()
print(f"Read completed in {end - start:.3f} seconds")

Read completed in 0.067 seconds


In [14]:
start = time.time()
pl.read_delta(dt, columns="date")
end = time.time()
print(f"Read completed in {end - start:.3f} seconds")

Read completed in 0.012 seconds


In [15]:
# When we don't push down the filtering
start = time.time()
df = pl.read_delta(dt)
df.filter(df["date"].cast(pl.Date) == pl.lit("2023-01-01").cast(pl.Date))
end = time.time()
print(f"Read completed in {end - start:.3f} seconds")

Read completed in 0.058 seconds


In [16]:
# This does not seem to work, perhaps because we have no partitioning
start = time.time()
pl.read_delta(
    dt,
    delta_table_options={
        "predicate": [
            pl.col("date").cast(pl.Date) == pl.lit("2023-01-01").cast(pl.Date)
        ]
    },
)
end = time.time()
print(f"Read completed in {end - start:.3f} seconds")

Read completed in 0.022 seconds




### We try a new table partitioned


In [17]:
PATH_PARTITIONED = Path("./my-test-timelake-partitioned")
df.write_delta(
    target=PATH_PARTITIONED,
    mode="overwrite",
    delta_write_options={
        "partition_by": ["asset_id"],
    },
)

dt_partitioned = DeltaTable(PATH_PARTITIONED)

In [18]:
start = time.time()
df = pl.read_delta(
    dt_partitioned,
)
end = time.time()
print(f"Read completed in {end - start:.3f} seconds")

Read completed in 0.009 seconds


In [19]:
start = time.time()
pl.from_arrow(dt_partitioned.to_pyarrow_table(partitions=[("asset_id", "=", "AAPL")]))
end = time.time()
print(f"Read completed in {end - start:.3f} seconds")

Read completed in 0.061 seconds


In [20]:
start = time.time()
df = pl.read_delta(
    dt_partitioned,
    pyarrow_options={"partitions": [("asset_id", "=", "AAPL")]},
    use_pyarrow=True,
)
end = time.time()
print(f"Read completed in {end - start:.3f} seconds")

Read completed in 0.010 seconds


In [21]:
dt.schema()

Schema([Field(date, PrimitiveType("timestamp_ntz"), nullable=True), Field(asset_id, PrimitiveType("string"), nullable=True), Field(price, PrimitiveType("double"), nullable=True), Field(volume, PrimitiveType("long"), nullable=True)])

In [22]:
dt.metadata().partition_columns

[]

In [24]:
dt_partitioned.partitions()

[{'asset_id': 'GOOG'},
 {'asset_id': 'MSFT'},
 {'asset_id': 'TSLA'},
 {'asset_id': 'AAPL'}]

### We test upserts


In [3]:
import pyarrow as pa
from deltalake import DeltaTable, write_deltalake

data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
write_deltalake("tmp", data)
dt = DeltaTable("tmp")
new_data = pa.table({"x": [2, 3], "deleted": [False, True]})

(
    dt.merge(
        source=new_data,
        predicate="target.x = source.x",
        source_alias="source",
        target_alias="target",
    )
    .when_matched_delete(predicate="source.deleted = true")
    .execute()
)
dt.to_pandas().sort_values("x", ignore_index=True)

Unnamed: 0,x,y
0,1,4
1,2,5
