In [31]:
import duckdb
from pathlib import Path
import pyarrow as pa
import pandas as pd  # Ensure pandas is imported

class DuckDBWrapper:
    def __init__(self, duckdb_path=None):
        """
        Initialize a DuckDB connection.
        If duckdb_path is provided, a persistent DuckDB database will be used.
        Otherwise, it will create an in-memory database.
        """
        if duckdb_path:
            self.con = duckdb.connect(str(duckdb_path), read_only=False)
        else:
            self.con = duckdb.connect(database=':memory:', read_only=False)
        self.registered_tables = []
        
        # Enable httpfs for potential remote paths if needed (though we focus on local here)
        self.con.execute("INSTALL httpfs;")
        self.con.execute("LOAD httpfs;")

    def register_data(self, paths, table_names):
        """
        Registers local data files (Parquet, CSV, JSON) in DuckDB by creating views.
        Automatically detects the file type based on the file extension.

        Args:
            paths (list): List of paths to data files.
            table_names (list): List of table names corresponding to the paths.
        """
        if len(paths) != len(table_names):
            raise ValueError("The number of paths must match the number of table names.")

        for path, table_name in zip(paths, table_names):
            path_str = str(path)
            file_extension = Path(path_str).suffix.lower()

            if file_extension == ".parquet":
                query = f"CREATE VIEW {table_name} AS SELECT * FROM read_parquet('{path_str}')"
            elif file_extension == ".csv":
                query = f"CREATE VIEW {table_name} AS SELECT * FROM read_csv_auto('{path_str}')"
            elif file_extension == ".json":
                query = f"CREATE VIEW {table_name} AS SELECT * FROM read_json_auto('{path_str}')"
            else:
                raise ValueError(f"Unsupported file type '{file_extension}' for file: {path_str}")

            self.con.execute(query)
            self.registered_tables.append(table_name)

    def run_query(self, sql_query):
        """
        Runs a SQL query on the registered tables in DuckDB.
        
        Args:
            sql_query (str): The SQL query string to execute.
            
        Returns:
            pandas.DataFrame: Query result as a pandas DataFrame.
        """
        return self.con.execute(sql_query).fetchdf()

    def _construct_path(self, path, base_path, file_name, extension):
        """
        Constructs the full file path based on input parameters.
        """
        if path:
            return Path(path)
        elif base_path and file_name:
            return Path(base_path) / f"{file_name}.{extension}"
        else:
            # Default file path: "output.<extension>" in the current directory
            return Path(f"output.{extension}")

    def export(self, result, file_type, path=None, base_path=None, file_name=None, with_header=True):
        """
        Exports a query result to the specified file type.
        Handles Arrow Tables, Pandas DataFrames, and DuckDB query results.

        Args:
            result (any): Query result to export (e.g., DuckDB query result, Pandas DataFrame, or Arrow Table).
            file_type (str): Type of file to export ('parquet', 'csv', 'json').
            path (str): Full path to the file (optional).
            base_path (str): Directory path (optional).
            file_name (str): Name of the file (without extension) (optional).
            with_header (bool): Include header row for CSV files (default: True).
        """
        file_type = file_type.lower()
        if file_type not in ["parquet", "csv", "json"]:
            raise ValueError("file_type must be one of 'parquet', 'csv', or 'json'.")

        full_path = self._construct_path(path, base_path, file_name, file_type)
        full_path.parent.mkdir(parents=True, exist_ok=True)

        # Convert result to a Pandas DataFrame if needed
        if isinstance(result, pa.Table):
            # Arrow Table to Pandas DataFrame
            dataframe = result.to_pandas()
        elif hasattr(result, "to_pandas"):
            # DuckDB result to Pandas DataFrame
            dataframe = result.to_pandas()
        elif isinstance(result, pd.DataFrame):
            # Already a Pandas DataFrame
            dataframe = result
        else:
            raise ValueError("Unsupported result type. Must be a Pandas DataFrame, Arrow Table, or DuckDB query result.")

        # Export based on file type
        if file_type == "parquet":
            dataframe.to_parquet(full_path, index=False)
        elif file_type == "csv":
            dataframe.to_csv(full_path, index=False, header=with_header)
        elif file_type == "json":
            dataframe.to_json(full_path, orient='records', lines=True)

        print(f"File written to: {full_path}")


    def show_tables(self):
        """
        Displays the table names and types currently registered in the catalog.
        """
        query = """
        SELECT table_name, table_type
        FROM information_schema.tables
        WHERE table_schema='main'
        """
        result_df = self.run_query(query)
        print(result_df)

    def show_schema(self, table_name):
        """
        Displays the schema of the specified table.
        
        Args:
            table_name (str): Name of the table whose schema is to be displayed.
        """
        query = f"""
        SELECT 
            table_name, 
            column_name, 
            data_type
        FROM 
            information_schema.columns 
        WHERE 
            table_name = '{table_name}'
        """
        result_df = self.run_query(query)
        print(result_df)


In [32]:
# Initialize the DuckDBWrapper (in-memory DuckDB instance)
con = DuckDBWrapper()

In [33]:
from pathlib import Path

repo_root = Path.cwd().resolve().parents[0]  # Adjust to locate the repo root

# Define relative paths from the repo root and corresponding table names
paths = [
    repo_root / "data/opendata/nyc/mta/mta_operations_statement/*.parquet",
    repo_root / "data/opendata/nyc/mta/mta_hourly_subway_socrata/*.parquet",
    repo_root / "data/opendata/nyc/mta/mta_daily_ridership/*.parquet",
    repo_root / "data/opendata/nyc/mta/mta_bus_wait_time/*.parquet",
    repo_root / "data/opendata/nyc/mta/daily_weather_asset/*.parquet",
    repo_root / "data/opendata/nyc/mta/hourly_weather_asset/*.parquet",
    repo_root / "data/opendata/nyc/mta/mta_bus_speeds/*.parquet",
]
table_names = [
    "mta_operations_statement",
    "mta_hourly_subway_socrata",
    "mta_daily_ridership",
    "mta_bus_wait_time",
    "daily_weather_asset",
    "hourly_weather_asset",
    "mta_bus_speeds",
]


# Register all datasets as views in DuckDB
con.register_data(paths, table_names)

In [21]:
# Show the tables registered
con.show_tables()


                  table_name table_type
0        daily_weather_asset       VIEW
1       hourly_weather_asset       VIEW
2             mta_bus_speeds       VIEW
3          mta_bus_wait_time       VIEW
4        mta_daily_ridership       VIEW
5  mta_hourly_subway_socrata       VIEW
6   mta_operations_statement       VIEW


In [28]:
# Show the schema of a specific table
con.show_schema("mta_daily_ridership")

             table_name                             column_name data_type
0   mta_daily_ridership                                    date      DATE
1   mta_daily_ridership                 subways_total_ridership    DOUBLE
2   mta_daily_ridership                subways_pct_pre_pandemic    DOUBLE
3   mta_daily_ridership                   buses_total_ridership    DOUBLE
4   mta_daily_ridership                  buses_pct_pre_pandemic    DOUBLE
5   mta_daily_ridership                    lirr_total_ridership    DOUBLE
6   mta_daily_ridership                   lirr_pct_pre_pandemic    DOUBLE
7   mta_daily_ridership             metro_north_total_ridership    DOUBLE
8   mta_daily_ridership            metro_north_pct_pre_pandemic    DOUBLE
9   mta_daily_ridership               access_a_ride_total_trips    DOUBLE
10  mta_daily_ridership          access_a_ride_pct_pre_pandemic    DOUBLE
11  mta_daily_ridership           bridges_tunnels_total_traffic    DOUBLE
12  mta_daily_ridership        bridges

In [34]:
query = f"""

SELECT count(*) from mta_hourly_subway_socrata

"""

result = con.run_query(query)

print(result)


   count_star()
0      67763465


In [None]:
repo_root = Path.cwd().resolve().parents[0]  # Adjust to locate the repo root
base_path = repo_root / "data/exports"
file_name = "row_count"
file_type= "csv"
# Export the query result to CSV
con.export(result, file_type=file_type, base_path=base_path, file_name=file_name)

File written to: /home/christianocean/mta/data/exports/row_count.json
