In [None]:
## Import Block 
## SYSTEM SETUP & HELPER FUNCTIONS (You are free to edit as needed)
# --- üõ†Ô∏è SYSTEM SETUP & HELPER FUNCTIONS (Do not edit unless necessary) ---

# 1. Imports
import duckdb
import requests
import glob
import os
import gc
import shutil
import time
from pathlib import Path

# Detect Environment
try:
    from IPython.display import display, HTML
    IN_NOTEBOOK = True
except ImportError:
    IN_NOTEBOOK = False

# 2. Database Wrapper Class
class DuckDBWrapper:
    def __init__(self, duckdb_path=None):
        self.db_path = Path(duckdb_path).resolve() if duckdb_path else None
        self.con = None
        self.registered_tables = [] 
    
    def connect(self):
        """Establishes connection and loads HTTPFS for remote files."""
        if self.con: return
        try:
            if self.db_path:
                self.con = duckdb.connect(str(self.db_path), read_only=False)
            else:
                self.con = duckdb.connect(database=':memory:', read_only=False)
            self.con.execute("INSTALL httpfs; LOAD httpfs;")
        except Exception as e:
            print(f"‚ùå Connection Failed: {e}")
            raise e

    def close(self):
        if self.con:
            try: self.con.close()
            except: pass
            self.con = None

    def register_data_view(self, paths, table_names):
        """Creates virtual views for Parquet/CSV/JSON files (Zero-Copy)."""
        if not self.con: self.connect()
        if len(paths) != len(table_names): raise ValueError("Length mismatch")
        
        for path, table_name in zip(paths, table_names):
            path_str = str(path)
            if not glob.glob(path_str) and not os.path.exists(path_str): continue
            try:
                # Logic: Detect filetype and use appropriate DuckDB reader
                if ".parquet" in path_str: query = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{path_str}')"
                elif ".csv" in path_str: query = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_csv_auto('{path_str}')"
                elif ".json" in path_str: query = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_json_auto('{path_str}')"
                else: continue
                
                self.con.execute(query)
                if table_name not in self.registered_tables: self.registered_tables.append(table_name)
            except Exception as e: print(f"‚ùå Error registering {table_name}: {e}")

    def run_query(self, sql_query, show_results=False):
        """Executes SQL. Returns DataFrame. Displays scrollable HTML if show_results=True."""
        if not self.con: self.connect()
        import polars as pl 
        
        try:
            arrow_table = self.con.execute(sql_query).arrow()
            df = pl.DataFrame(arrow_table)
            
            if show_results:
                if IN_NOTEBOOK:
                    # üí° UI FEATURE: Pandas for reliable HTML Table rendering
                    pdf = df.head(1000).to_pandas()
                    table_html = pdf.to_html(index=False, border=0, classes=["dataframe"])
                    scrollable_div = f"""
                    <div style="max-height: 400px; overflow-y: auto; overflow-x: auto; border: 1px solid #444;">
                        <style>.dataframe thead th {{ position: sticky; top: 0; background: #222; color: white; }}</style>
                        {table_html}
                    </div>
                    """
                    display(HTML(scrollable_div))
                else:
                    self._print_simple_table(df)
                return None
            return df
        except Exception as e:
            print(f"‚ùå Query Failed: {e}")
            return None

    def show_tables(self):
        """Show tables using the Brighter Rich style."""
        df = self.run_query("SELECT table_name, table_type FROM information_schema.tables WHERE table_schema='main'", show_results=False)
        if df is not None:
            self._print_fancy_table(df, title="üìÇ Database Assets")

    def show_schema(self, table_name):
        """Show schema using the Brighter Rich style."""
        query = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name}'"
        df = self.run_query(query, show_results=False)
        if df is not None:
            self._print_fancy_table(df, title=f"üìã Schema: {table_name}")

    def _print_fancy_table(self, df, title):
        """A High-Contrast, Bright table for metadata."""
        from rich.console import Console; from rich.table import Table; from rich import box
        console = Console()
        table = Table(title=title, title_style="bold bright_yellow", header_style="bold bright_white", box=box.ROUNDED, show_lines=True, border_style="bright_black")
        for col in df.columns: table.add_column(col, style="bright_cyan", justify="left")
        for row in df.iter_rows(named=True): table.add_row(*[str(v) for v in row.values()])
        console.print(table)

    def _print_simple_table(self, df):
        """Fallback table for terminal."""
        from rich.console import Console; from rich.table import Table; from rich import box
        console = Console()
        table = Table(title="Query Results", box=box.SIMPLE, show_lines=False)
        for col in df.columns: table.add_column(col, style="dim", no_wrap=True, overflow="ellipsis", max_width=30)
        for row in df.head(10).iter_rows(named=True): table.add_row(*[str(v) if v is not None else "" for v in row.values()])
        console.print(table)

    def export(self, data, file_name, file_type="csv", output_dir="../data/exports"):
        """Smart Export."""
        if isinstance(data, str):
            print(f"‚è≥ Running query for export: '{file_name}'...")
            df = self.run_query(data, show_results=False)
        else:
            df = data
        if df is None or df.height == 0: print("‚ö†Ô∏è Export skipped (Empty/None)"); return

        full_path = Path(output_dir) / f"{file_name}.{file_type}"
        full_path.parent.mkdir(parents=True, exist_ok=True)
        try:
            if file_type == "parquet": df.write_parquet(str(full_path))
            elif file_type == "csv": df.write_csv(str(full_path))
            elif file_type == "json": df.write_ndjson(str(full_path))
            else: print(f"‚ùå Unknown format: {file_type}"); return
            print(f"‚úÖ Exported {df.height} rows to: {full_path}")
        except Exception as e: print(f"‚ùå Write failed: {e}")

# 3. Project Helper Functions
def setup_database_environment(db_path, fresh_start=False):
    """
    Initializes DuckDB. Uses RENAME strategy for reliable Fresh Start.
    """
    db_path = Path(db_path).resolve()
    db_path.parent.mkdir(parents=True, exist_ok=True)
    
    # 1. Handle Fresh Start via RENAME (Avoids Lock Issues)
    if fresh_start and db_path.exists():
        print(f"üßπ Fresh Start: Resetting {db_path.name}...")
        gc.collect() # Garbage collect old connections
        
        trash_path = db_path.with_suffix(".duckdb.old")
        if trash_path.exists():
            try: trash_path.unlink() 
            except: pass
            
        try:
            shutil.move(str(db_path), str(trash_path))
            if db_path.with_suffix(".duckdb.wal").exists(): db_path.with_suffix(".duckdb.wal").unlink()
            print("   ‚úì Old database moved to trash (Connection Reset).")
        except Exception as e:
            print(f"‚ùå Warning: Could not move old DB: {e}. Attempting direct overwrite.")

    # 2. Connect
    con = DuckDBWrapper(duckdb_path=db_path)
    con.connect()
    print(f"üîå Connected to: {db_path}")
    return con

def download_and_cache_data(file_list, base_url, data_dir):
    data_dir = Path(data_dir); data_dir.mkdir(parents=True, exist_ok=True)
    paths, names = [], []
    print("\nüöÄ Checking Data Assets...")
    for filename in file_list:
        local_path = data_dir / filename
        url = f"{base_url}/{filename}"; table_name = Path(filename).stem
        if local_path.exists() and local_path.stat().st_size > 0:
            print(f"üìÇ Cached: '{table_name}'"); paths.append(local_path); names.append(table_name); continue
        print(f"‚¨áÔ∏è  Downloading '{filename}'...")
        for attempt in range(1, 4):
            try:
                with requests.get(url, stream=True, headers={'Connection': 'close'}, timeout=(10, 60)) as r:
                    r.raise_for_status()
                    with open(local_path, 'wb') as f:
                        for chunk in r.iter_content(chunk_size=8192): f.write(chunk)
                print(f"‚úÖ Saved to {local_path}"); paths.append(local_path); names.append(table_name); time.sleep(1); break
            except Exception as e:
                if local_path.exists(): local_path.unlink()
                if attempt < 3: time.sleep(2)
                else: print(f"‚ùå Failed {filename}: {e}")
    return paths, names

def process_local_files(file_list):
    """
    Scans for local files and prepares them for registration.
    Args:
        file_list: List of file paths (strings).
    Returns:
        paths (list[Path]), names (list[str])
    """
    paths = []
    names = []
    print("\nüîç Scanning Local Files...")
    for f in file_list:
        p = Path(f).resolve()
        if p.exists() and p.is_file():
            print(f"‚úÖ Found: {p.name}")
            paths.append(p)
            names.append(p.stem) # 'data.csv' -> 'data'
        else:
            print(f"‚ùå Not Found: {f}")
    return paths, names

def run_project_sql_pipeline(con, sql_folder, export_folder, output_format="parquet"):
    """Runs all .sql files, exports, and registers. Defaults to Parquet."""
    sql_dir = Path(sql_folder); export_dir = Path(export_folder)
    print(f"\nüöÄ Starting SQL Pipeline (Output: {output_format})...")
    for sql_path in sorted(sql_dir.glob("*.sql")):
        print(f"‚ñ∂Ô∏é Running {sql_path.name}...")
        try: con.export(sql_path.read_text(), sql_path.stem, output_format, export_dir)
        except Exception as e: print(f"‚ùå Failed {sql_path.name}: {e}")
    extension = f".{output_format}"
    result_paths = sorted(export_dir.glob(f"*{extension}"))
    if result_paths:
        con.register_data_view(result_paths, [p.stem for p in result_paths])
        print(f"‚úì Pipeline finished. {len(result_paths)} processed tables registered.")
    else: print("‚ö†Ô∏è Pipeline finished but no output files were found.")

print("‚úÖ System Loaded: Happy Hacking.")

In [None]:
# Set Up your ingest

FRESH_START = True  # Set to True to wipe DB and start clean
DB_PATH     = "../data/duckdb/test.duckdb"
DATA_DIR    = "../data/opendata"
BASE_URL    = "https://fastopendata.org/dssg-safestreets"

# Define the files you want to download/load
TARGET_FILES = [
    "nyc_speed_cameras_historic.parquet",
    "test1_nyc_speed_cameras.json",
    "test2_nyc_speed_cameras.csv",
    "test3_nyc_speed_cameras.csv",
    "nyc_traffic_violations_historic.parquet",
    "test1_nyc_traffic_violations.json",
    "test2_nyc_traffic_violations.csv",
    "test3_nyc_traffic_violations.csv"
]

# --- üöÄ EXECUTION ---
# 1. Initialize Database
con = setup_database_environment(DB_PATH, fresh_start=FRESH_START)

# 2. Download Data & Register Views
file_paths, table_names = download_and_cache_data(TARGET_FILES, BASE_URL, DATA_DIR)
con.register_data_view(file_paths, table_names)

# 3. Show what we have
con.show_tables()

In [None]:
# Register Local Files
MY_LOCAL_FILES = [
    "../seeds/point_values.csv",

]

# 2. Initialize Database
con = setup_database_environment(DB_PATH)

# 3. Process & Register
file_paths, table_names = process_local_files(MY_LOCAL_FILES)
con.register_data_view(file_paths, table_names)

# 4. Verify results
con.show_tables()

In [None]:
# Query Example: Check total count
query = """
SELECT count(*) as total_rows 
FROM nyc_speed_cameras_historic
"""
con.run_query(query, show_results=True)

In [None]:
# Query with SQK, show_results=True enables scrollable table
query = """



SELECT * FROM nyc_speed_cameras_historic limit 1000




"""
con.run_query(query, show_results=True)

In [None]:
# Query with SQK, show_results=True enables scrollable table
query = """



SELECT * FROM nyc_traffic_violations_historic  limit 1000




"""
con.run_query(query, show_results=True)

In [None]:
#Query Tables with SQLc(without show_results=True)

query = f"""

SELECT * from nyc_speed_cameras_historic limit 20000

"""

result = con.run_query(query)

print(result)


In [None]:
#Show specific table schema
con.show_schema("nyc_speed_cameras_historic")

In [None]:
#Show the registered tables
con.show_tables()


In [None]:
#Export Tables as CSV, JSON, or Parquet

# 1. Define the SQL (or use an existing DataFrame variable)
export_sql = """
SELECT * 
FROM test1_nyc_traffic_violations  

"""

# 2. Run & Save (One line!)
con.export(
    data=export_sql, 
    file_name="my_traffic_subset", 
    file_type="csv"
)

# Optional: You can specify a different folder if needed
# con.export(export_sql, "my_subset", "parquet", output_dir="my_custom_folder")

In [None]:
#Run a SQL pipeline of .sql files in sql folder
repo_root = Path.cwd().resolve().parent
sql_folder = Path.cwd() / "sql"
export_folder = repo_root / "data" / "exports"
run_project_sql_pipeline(con, sql_folder, export_folder, output_format="csv")

# Verify that the new tables (e.g., ticket_summary) are now available
con.show_tables()