Skip to content

Latest commit

 

History

History
146 lines (123 loc) · 5.34 KB

README.md

File metadata and controls

146 lines (123 loc) · 5.34 KB

pgpq

Convert PyArrow RecordBatches to Postgres' native binary format.

Usage

Copying a dataset to PostgreSQL using psycopg

"""Example for README.md"""
from tempfile import mkdtemp
import psycopg
import pyarrow.dataset as ds
import requests
from pgpq import ArrowToPostgresBinaryEncoder

# let's get some example data
tmpdir = mkdtemp()
with open(f"{tmpdir}/yellow_tripdata_2023-01.parquet", mode="wb") as f:
    resp = requests.get(
        "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
    )
    resp.raise_for_status()
    f.write(resp.content)

# load an arrow dataset
# arrow can load datasets from partitioned parquet files locally or in S3/GCS
# it handles buffering, matching globs, etc.
dataset = ds.dataset(tmpdir)

# create an encoder object which will do the encoding
# and give us the expected Postgres table schema
encoder = ArrowToPostgresBinaryEncoder(dataset.schema)
# get the expected Postgres destination schema
# note that this is _not_ the same as the incoming arrow schema
# and not necessarily the schema of your permanent table
# instead it's the schema of the data that will be sent over the wire
# which for example does not have timezones on any timestamps
pg_schema = encoder.schema()
# assemble ddl for a temporary table
# it's often a good idea to bulk load into a temp table to:
# (1) Avoid indexes
# (2) Stay in-memory as long as possible
# (3) Be more flexible with types
#     (you can't load a SMALLINT into a BIGINT column without casting)
cols = [f'"{col_name}" {col.data_type.ddl()}' for col_name, col in pg_schema.columns]
ddl = f"CREATE TEMP TABLE data ({','.join(cols)})"

with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)  # type: ignore
        with cursor.copy("COPY data FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            for batch in dataset.to_batches():
                copy.write(encoder.write_batch(batch))
            copy.write(encoder.finish())
        # load into your actual table, possibly doing type casts
        # cursor.execute("INSERT INTO \"table\" SELECT * FROM data")

Defining field encoders

"""Showcase defining encoders for fields."""
import pgpq
import psycopg
import pyarrow as pa
from pgpq import encoders
from pgpq import schema


data = [
    pa.array([1, 2, 3, 4]),
    pa.array(['{"age": 33, "name": "alice"}', '{"age": 24, "name": "bob"}', "{}", "null"]),
]
arrow_schema = pa.schema([("id", pa.int64()), ("properties", pa.string())])
record_batch = pa.RecordBatch.from_arrays(data, schema=arrow_schema)

encoder = pgpq.ArrowToPostgresBinaryEncoder(record_batch.schema)
pg_schema_with_text_properties = encoder.schema()

assert [
    (col_name, col.data_type.ddl())
    for col_name, col in pg_schema_with_text_properties.columns
] == [("id", "INT8"), ("properties", "TEXT")]

# To support a different PostgreSQL schema, we change the default encoders generated by pgpq:
# * 'id' encoded as INT8 (BIGINT).
# * 'properties' encoded as JSONB.
field_encoders = {
    "id": encoders.Int64EncoderBuilder(pa.field("id", pa.int64())),
    "properties": encoders.StringEncoderBuilder.new_with_output(
        pa.field("properties", pa.string()), schema.Jsonb()
    ),
}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(record_batch.schema, field_encoders)
pg_schema_with_jsonb_properties = encoder.schema()

assert [
    (col_name, col.data_type.ddl())
    for col_name, col in pg_schema_with_jsonb_properties.columns
] == [("id", "INT8"), ("properties", "JSONB")]

ddl = """
CREATE TABLE id_properties (
    id INT8, -- Alternative: BIGINT
    properties JSONB
)
"""

# Without the right encoding, PostgreSQL will report errors in the binary data format when
# executing the following COPY: It expects properties to be encoded as JSONB not TEXT.
with psycopg.connect("postgres://posthog:posthog@localhost:5432/posthog") as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)

        with cursor.copy("COPY id_properties FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            copy.write(encoder.write_batch(record_batch))
            copy.write(encoder.finish())

# The 'id' field matches our schema, so we can use the default encoder for it.
# But, we still need to encode properties as JSONB.
# `infer_encoder` can be used to obtain the default encoder for a field.
field_encoders = {
    "id": pgpq.ArrowToPostgresBinaryEncoder.infer_encoder(record_batch.field("id")),
    "properties": encoders.StringEncoderBuilder.new_with_output(
        pa.field("properties", pa.string()), schema.Jsonb()
    ),
}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(record_batch.schema, field_encoders)
pg_schema_inferred_id_and_jsonb_properties = encoder.schema()

assert [
    (col_name, col.data_type.ddl())
    for col_name, col in pg_schema_inferred_id_and_jsonb_properties.columns
] == [("id", "INT8"), ("properties", "JSONB")]

with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
    with conn.cursor() as cursor:

        with cursor.copy("COPY id_properties FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            copy.write(encoder.write_batch(record_batch))
            copy.write(encoder.finish())