In [3]:
import pandas as pd

dataframe = pd.read_csv(
    "https://docs.dagster.io/assets/iris.csv",
    names=[
        "sepal_length_cm",
        "sepal_width_cm",
        "petal_length_cm",
        "petal_width_cm",
        "species",
    ],
)

dataframe.values[0:10]

array([[5.1, 3.5, 1.4, 0.2, 'Iris-setosa'],
       [4.9, 3.0, 1.4, 0.2, 'Iris-setosa'],
       [4.7, 3.2, 1.3, 0.2, 'Iris-setosa'],
       [4.6, 3.1, 1.5, 0.2, 'Iris-setosa'],
       [5.0, 3.6, 1.4, 0.2, 'Iris-setosa'],
       [5.4, 3.9, 1.7, 0.4, 'Iris-setosa'],
       [4.6, 3.4, 1.4, 0.3, 'Iris-setosa'],
       [5.0, 3.4, 1.5, 0.2, 'Iris-setosa'],
       [4.4, 2.9, 1.4, 0.2, 'Iris-setosa'],
       [4.9, 3.1, 1.5, 0.1, 'Iris-setosa']], dtype=object)

In [9]:
from sqlalchemy import create_engine
from sqlalchemy import text

import fsspec
import os
from pyarrow import parquet, Table

engine = create_engine(
    "trino://trino@localhost:8080/hive", connect_args={"user": "trino"}
)

conn = engine.connect()
cur = conn.execute(text("CREATE SCHEMA IF NOT EXISTS public WITH (LOCATION='s3a://warehouse/notebooks/')"))

# https://s3fs.readthedocs.io/en/latest/api.html#s3fs.core.S3FileSystem
fs = fsspec.filesystem(protocol="s3a", key="minio", secret="minio123", endpoint_url="http://localhost:4566")

file_path = os.path.join("s3a://warehouse", "notebooks", "iris.parquet")
fs.makedirs(os.path.dirname(file_path), exist_ok=True)

parquet.write_table(Table.from_pandas(dataframe), file_path, filesystem=fs)

# dataframe.to_sql() doesn't handle atomic writes or support `with`
# https://pandas.pydata.org/docs/user_guide/io.html#insertion-method

In [30]:
from sqlalchemy import text
import os
from pyarrow import Schema

# Map column types [pandas] -> [trino] using arrows
schema = Schema.from_pandas(dataframe[0])
map_arrow_trino_types = {"string": "VARCHAR", "double": "DOUBLE"}
columns = ", ".join(
    [f"{column} {map_arrow_trino_types[str(dtype)]}" for column, dtype in zip(schema.names, schema.types)]
)
print(columns)

with engine.connect() as conn:
    conn.execute(text("DROP TABLE IF EXISTS public.iris"))
    conn.execute(text(f"""
        CREATE TABLE public.iris ( {columns} )
        WITH (
            format = 'PARQUET', 
            external_location = '{os.path.dirname(file_path)}'
        )
    """))
    conn.commit()


sepal_length_cm DOUBLE, sepal_width_cm DOUBLE, petal_length_cm DOUBLE, petal_width_cm DOUBLE, species VARCHAR
