In [1]:
from pathlib import Path

import duckdb
import pandas as pd
import polars as pl
from result import Err, Ok, Result


class DuckDB:
    DB = r"./data/input/flight-delays.duckdb"

    def __init__(self, db=DB):
        self.DB = db

    def connect_to_duckdb(func):
        def wrapper(self, *args, **kwargs) -> Result:
            try:
                with duckdb.connect(self.DB) as conn:
                    with conn.cursor() as cursor:
                        return func(self, cursor, *args, **kwargs)
            except duckdb.Error as e:
                return Err(f"Database error: {e}")
            except Exception as e:
                return Err(f"An unexpected error occurred: {e}")

        return wrapper

    @connect_to_duckdb
    def show_duckdb_table(self, cursor, table) -> Result:
        print(cursor.table(table))

    @connect_to_duckdb
    def fetch_duckdb_data(self, cursor, sql) -> Result:
        try:
            result = cursor.sql(sql)
            return Ok(result.pl())
        except duckdb.Error as e:
            return Err(f"Error fetching data: {e}")

    @connect_to_duckdb
    def show_duckdb_tables(self, cursor) -> Result:
        try:
            cursor.execute("SHOW TABLES;")
            tables = cursor.fetchall()
            return Ok(tables)
        except duckdb.Error as e:
            return Err(f"Error retrieving tables: {e}")

    @connect_to_duckdb
    def write_data_to_duckdb(self, cursor, data) -> Result:
        data_type = data.get("type")
        table_name = data.get("table")

        try:
            # 既存のテーブルがある場合は削除
            cursor.execute(f"DROP TABLE IF EXISTS {table_name};")

            if data_type == "csv":
                return self._write_csv(cursor, data, table_name)
            elif data_type == "pandas":
                return self._write_pandas(cursor, data, table_name)
            elif data_type == "polars":
                return self._write_polars(cursor, data, table_name)
            elif data_type == "excel":
                return self._write_excel(cursor, data, table_name)
            else:
                return Err(f"Unsupported data type: {data_type}")

        except duckdb.Error as e:
            return Err(f"Error writing data to DuckDB: {e}")
        except FileNotFoundError as e:
            return Err(f"File not found: {e}")

    def _write_csv(self, cursor, data, table_name) -> Result:
        path = data.get("path")
        if path and path.exists():
            cursor.execute(f"CREATE TABLE {table_name} AS SELECT * FROM read_csv_auto('{path}');")
            return Ok(f"CSV file '{path.name}' has been successfully written to DuckDB.")
        else:
            return Err(f"CSV file not found: {path}")

    def _write_pandas(self, cursor, data, table_name) -> Result:
        df = data.get("data")
        if isinstance(df, pd.DataFrame):
            cursor.register("df", df)
            cursor.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df;")
            return Ok("pandas DataFrame has been successfully written to DuckDB.")
        else:
            return Err("Invalid pandas DataFrame provided.")

    def _write_polars(self, cursor, data, table_name) -> Result:
        df = data.get("data")
        if isinstance(df, pl.DataFrame):
            # Polars DataFrameをpandas DataFrameに変換
            pandas_df = df.to_pandas()
            cursor.register("df", pandas_df)
            cursor.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df;")
            return Ok("polars DataFrame has been successfully written to DuckDB.")
        else:
            return Err("Invalid polars DataFrame provided.")

    def _write_excel(self, cursor, data, table_name) -> Result:
        path = data.get("path")
        sheet_name = data.get("sheet_name", 0)
        if path and path.exists():
            df = pd.read_excel(path, sheet_name=sheet_name)
            cursor.register("df", df)
            cursor.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df;")
            return Ok(f"Excel sheet '{sheet_name}' from '{path.name}' has been written to DuckDB.")
        else:
            return Err(f"Excel file not found: {path}")


def write_data(duck: DuckDB) -> Result:
    # インスタンスの作成とメソッドの呼び出し
    base_path = Path("./data/input/archive/")
    input_data_list = [
        {"type": "csv", "path": base_path / "airlines.csv", "table": "airlines"},
        {"type": "csv", "path": base_path / "airports.csv", "table": "airports"},
        {"type": "csv", "path": base_path / "flights.csv", "table": "flights"},
        {
            "type": "excel",
            "path": base_path / "airports.xlsx",
            "table": "airports",
            "sheet_name": "Sheet1",
        },
        {
            "type": "pandas",
            "data": pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]}),
            "table": "sample_pandas",
        },
        {
            "type": "polars",
            "data": pl.DataFrame({"col1": [3, 4], "col2": ["x", "y"]}),
            "table": "sample_polars",
        },
    ]

    for data in input_data_list:
        # データ書き込み
        result = duck.write_data_to_duckdb(data)
        if result.is_ok():
            print(result.unwrap())
        else:
            print(result.unwrap_err())

In [2]:
duck = DuckDB()
# テーブルのリスト表示
result = duck.show_duckdb_tables()
if result.is_ok():
    tables = result.unwrap()
    for table in tables:
        print(table)
else:
    print(result.unwrap_err())

('airlines',)
('flights',)
('sample_pandas',)
('sample_polars',)


In [3]:
duck.show_duckdb_table("flights")

┌───────┬───────┬───────┬─────────────┬─────────┬───────────────┬─────────────┬────────────────┬─────────────────────┬─────────────────────┬────────────────┬─────────────────┬──────────┬────────────┬────────────────┬──────────────┬──────────┬──────────┬───────────┬─────────┬───────────────────┬──────────────┬───────────────┬──────────┬───────────┬─────────────────────┬──────────────────┬────────────────┬───────────────┬─────────────────────┬───────────────┐
│ YEAR  │ MONTH │  DAY  │ DAY_OF_WEEK │ AIRLINE │ FLIGHT_NUMBER │ TAIL_NUMBER │ ORIGIN_AIRPORT │ DESTINATION_AIRPORT │ SCHEDULED_DEPARTURE │ DEPARTURE_TIME │ DEPARTURE_DELAY │ TAXI_OUT │ WHEELS_OFF │ SCHEDULED_TIME │ ELAPSED_TIME │ AIR_TIME │ DISTANCE │ WHEELS_ON │ TAXI_IN │ SCHEDULED_ARRIVAL │ ARRIVAL_TIME │ ARRIVAL_DELAY │ DIVERTED │ CANCELLED │ CANCELLATION_REASON │ AIR_SYSTEM_DELAY │ SECURITY_DELAY │ AIRLINE_DELAY │ LATE_AIRCRAFT_DELAY │ WEATHER_DELAY │
│ int64 │ int64 │ int64 │    int64    │ varchar │     int64     │   varchar 

In [4]:
result = duck.fetch_duckdb_data("SELECT * FROM flights;")
if result.is_ok():
    df = result.unwrap()

In [5]:
df.head()

YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
i64,i64,i64,i64,str,i64,str,str,str,str,str,i64,i64,str,i64,i64,i64,i64,str,i64,str,str,i64,i64,i64,str,i64,i64,i64,i64,i64
2015,1,1,4,"""AS""",98,"""N407AS""","""ANC""","""SEA""","""0005""","""2354""",-11,21,"""0015""",205,194,169,1448,"""0404""",4,"""0430""","""0408""",-22,0,0,,,,,,
2015,1,1,4,"""AA""",2336,"""N3KUAA""","""LAX""","""PBI""","""0010""","""0002""",-8,12,"""0014""",280,279,263,2330,"""0737""",4,"""0750""","""0741""",-9,0,0,,,,,,
2015,1,1,4,"""US""",840,"""N171US""","""SFO""","""CLT""","""0020""","""0018""",-2,16,"""0034""",286,293,266,2296,"""0800""",11,"""0806""","""0811""",5,0,0,,,,,,
2015,1,1,4,"""AA""",258,"""N3HYAA""","""LAX""","""MIA""","""0020""","""0015""",-5,15,"""0030""",285,281,258,2342,"""0748""",8,"""0805""","""0756""",-9,0,0,,,,,,
2015,1,1,4,"""AS""",135,"""N527AS""","""SEA""","""ANC""","""0025""","""0024""",-1,11,"""0035""",235,215,199,1448,"""0254""",5,"""0320""","""0259""",-21,0,0,,,,,,
