## ‚òÄÔ∏è Hypertable Guide

In [None]:
!pip install psycopg2-binary
!pip install sqlalchemy

import psycopg2
from sqlalchemy import create_engine, text
import os

def quick_test():
    config = {
        'host': os.getenv('DB_HOST', 'localhost'),
        'port': os.getenv('DB_PORT', '5432'),
        'user': os.getenv('DB_USER', 'postgres'),
        'password': os.getenv('DB_PASSWORD', 'postgres'),
        'database': os.getenv('DB_NAME', 'postgres')
    }
    
    # Test 1: Direct psycopg2 connection
    try:
        conn = psycopg2.connect(**config)
        cur = conn.cursor()
        cur.execute("SELECT version();")
        print(f"‚úÖ psycopg2: {cur.fetchone()[0].split(',')[0]}")
        conn.close()
    except Exception as e:
        print(f"‚ùå psycopg2 failed: {e}")
    
    # Test 2: SQLAlchemy connection
    try:
        engine = create_engine(f"postgresql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}")
        with engine.connect() as conn:
            result = conn.execute(text("SELECT current_database(), current_user;"))
            db, user = result.fetchone()
            print(f"‚úÖ SQLAlchemy: Connected to {db} as {user}")
    except Exception as e:
        print(f"‚ùå SQLAlchemy failed: {e}")

quick_test()

### Step 1: Enable TimescaleDB extension

Ensure the extension is installed and enabled in your database.

```sql
-- Create the extension in your database
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
```
```sql
-- (Optional) Turn off telemetry
ALTER SYSTEM SET timescaledb.telemetry_level=off;
```

In [None]:
from sqlalchemy import create_engine, text

# Update with your Docker Postgres credentials
engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")

try:
    with engine.connect() as conn:
        print("üîß Step 1: Enabling TimescaleDB extension...")
        
        # Enable TimescaleDB extension
        conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;"))
        conn.commit()
        print("‚úÖ TimescaleDB extension enabled")

except Exception as e:
    print(f"‚ùå Error: {e}")

### Step 2: Create a Regular Table

Define your table schema as you would in standard SQL. A NOT NULL timestamp column is mandatory for creating a hypertable.

```sql
-- A wide table design
CREATE TABLE vehicle_telemetry (
    time TIMESTAMPTZ NOT NULL,  -- This will be the partitioning column
    vehicle_id TEXT NOT NULL,
    engine_temp DOUBLE PRECISION,
    rpm INTEGER,
    fuel_level DOUBLE PRECISION,
    location TEXT
);
```

In [None]:
from sqlalchemy import create_engine, text

# Update these credentials for your Docker Postgres
engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")

with engine.connect() as conn:

    print("üóëÔ∏è Dropping table and all dependent objects...")
        
    # Use CASCADE to drop table and all dependent objects (continuous aggregates, etc.)
    conn.execute(text("DROP TABLE IF EXISTS vehicle_telemetry CASCADE;"))
    conn.commit()
    print("‚úÖ Table and all dependencies dropped!")

    print("üîß Step 2: Create a Regular Table...")
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS vehicle_telemetry (
            time TIMESTAMPTZ NOT NULL,
            vehicle_id TEXT NOT NULL,
            engine_temp DOUBLE PRECISION,
            rpm INTEGER,
            fuel_level DOUBLE PRECISION,
            location TEXT
        );
    """))
    conn.commit()
    print("‚úÖ vehicle_telemetry table created!")

### Step 3: Convert to a Hypertable

A hypertable automatically partitions your data by time into "chunks" for better performance and management.

```sql
-- Convert the standard table into a hypertable
SELECT create_hypertable('vehicle_telemetry', 'time');
```

In [None]:
from sqlalchemy import create_engine, text

# Update with your Docker Postgres credentials
engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")

try:
    with engine.connect() as conn:
        print("üîß Step 3: Convert to a Hypertable...")

        # Convert to hypertable
        result = conn.execute(text("SELECT create_hypertable('vehicle_telemetry', 'time');"))
        conn.commit()
        
        # Check if successful
        hypertable_status = result.fetchone()[0]
        print(f"‚úÖ Hypertable creation result: {hypertable_status}")
        
        # Verify hypertable was created
        verify_result = conn.execute(text("""
            SELECT hypertable_name, num_chunks 
            FROM timescaledb_information.hypertables 
            WHERE hypertable_name = 'vehicle_telemetry';
        """))
        
        hypertable_info = verify_result.fetchone()
        if hypertable_info:
            print(f"üéâ Success! Hypertable '{hypertable_info[0]}' created with {hypertable_info[1]} chunks")
        else:
            print("‚ùå Hypertable creation may have failed - not found in hypertables list")
            
except Exception as e:
    print(f"‚ùå Error: {e}")
    print("üí° Make sure:")
    print("   - TimescaleDB extension is installed")
    print("   - vehicle_telemetry table exists")
    print("   - You have the necessary permissions")

### Step 4: Insert and Query Data

Interact with your hypertable using standard SQL.

```sql
-- Insert data
INSERT INTO vehicle_telemetry (time, vehicle_id, engine_temp, rpm)
VALUES (NOW(), 'truck_123', 85.2, 2200);

-- Query recent data
SELECT * FROM vehicle_telemetry
WHERE time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC;
```

In [None]:
from sqlalchemy import create_engine, text

# Simple version without pandas
engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")

with engine.connect() as conn:
    print("üìù Step 4a: Inserting sample data...")

    # Insert data
    conn.execute(text("""
            INSERT INTO vehicle_telemetry (time, vehicle_id, engine_temp, rpm, fuel_level, location)
            VALUES 
                (NOW(), 'truck_123', 85.2, 2200, 75.5, 'New York'),
                (NOW() - INTERVAL '60 minutes', 'truck_456', 92.1, 1800, 60.0, 'Boston'),
                (NOW() - INTERVAL '120 minutes', 'truck_123', 87.8, 2500, 72.3, 'New York');
        """))
    conn.commit()
    print("‚úÖ Data inserted!")
    
    # Query data
    print("\nüîç Step4b: Querying recent data...")
    result = conn.execute(text("SELECT * FROM vehicle_telemetry WHERE time > NOW() - INTERVAL '1 hour' ORDER BY time DESC"))
    
    # Simple print
    print("üìã Recent data:")
    for row in result:
        print(f"Time: {row[0]}, Vehicle: {row[1]}, Temp: {row[2]}, RPM: {row[3]}")

## ‚ö° Optimizations

### 1. Accelerate Queries with Continuous Aggregates

For dashboards and frequently run aggregate queries, use continuous aggregates.

```sql
-- Create a continuous aggregate for hourly averages
CREATE MATERIALIZED VIEW vehicle_hourly_summary
WITH (timescaledb.continuous) AS
SELECT
    vehicle_id,
    time_bucket(INTERVAL '1 hour', time) AS bucket,  -- Groups data into 1-hour buckets
    AVG(engine_temp) AS avg_temp,
    STDDEV(engine_temp) AS temp_variability,
    AVG(rpm) AS avg_rpm
FROM vehicle_telemetry
GROUP BY vehicle_id, bucket;

-- Query the pre-computed aggregate
SELECT * FROM vehicle_hourly_summary
WHERE bucket >= NOW() - INTERVAL '24 hours';
```

In [None]:
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")
with engine.connect() as conn:
    conn.commit()  # Exit transaction block

    # Drop if exists (this can run in transaction)
    conn.execute(text("DROP MATERIALIZED VIEW IF EXISTS vehicle_hourly_summary;"))
    conn.commit()

In [None]:
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")
with engine.connect() as conn:
    conn.commit()  # Exit transaction block

    # Create continuous aggregate without data first
    conn.execute(text("""
        CREATE MATERIALIZED VIEW IF NOT EXISTS vehicle_hourly_summary
        WITH (timescaledb.continuous) AS
        SELECT
            vehicle_id,
            time_bucket(INTERVAL '1 hour', time) AS bucket,
            AVG(engine_temp) AS avg_temp,
            STDDEV(engine_temp) AS temp_variability,
            AVG(rpm) AS avg_rpm
        FROM vehicle_telemetry
        GROUP BY vehicle_id, bucket
        WITH NO DATA;
    """))
    conn.commit()

In [None]:
# from sqlalchemy import create_engine, text
# engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")
# with engine.connect() as conn:
#     conn.commit()  # Exit transaction block
    
#     # Refresh to populate data
#     conn.execute(text("CALL refresh_continuous_aggregate('vehicle_hourly_summary', NULL, NULL);"))
#     conn.commit()

In [None]:
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")
with engine.connect() as conn:
    conn.commit()  # Exit transaction block
    
    # Query results
    result = conn.execute(text("SELECT * FROM vehicle_hourly_summary WHERE bucket >= NOW() - INTERVAL '24 hours';"))
    for row in result: print(row)

### 2. Save Space with Native Compression

Enable compression to significantly reduce storage footprint as your data grows.

```sql
-- Enable compression on the hypertable
ALTER TABLE vehicle_telemetry SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'vehicle_id'
);

-- Add a policy to compress chunks older than 6 days
ADD COMPRESSION POLICY vehicle_telemetry
INTERVAL '6 days';
```

In [None]:
from sqlalchemy import create_engine, text

# Update with your Docker Postgres credentials
engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")

try:
    with engine.connect() as conn:
        print("üóúÔ∏è Step 1: Enabling compression on hypertable...")
        
        # Enable compression with vehicle_id as segmentby column
        conn.execute(text("""
            ALTER TABLE vehicle_telemetry SET (
                timescaledb.compress,
                timescaledb.compress_segmentby = 'vehicle_id'
            );
        """))
        conn.commit()
        print("‚úÖ Compression enabled on vehicle_telemetry!")
        
        print("‚è∞ Step 2: Adding compression policy (6 days)...")
        
        # Add compression policy with if_not_exists to avoid duplicate errors
        try:
            conn.execute(text("""
                SELECT add_compression_policy('vehicle_telemetry', INTERVAL '6 days', if_not_exists => true);
            """))
            conn.commit()
            print("‚úÖ Compression policy added!")
        except Exception as policy_error:
            if "already exists" in str(policy_error):
                print("‚úÖ Compression policy already exists - skipping")
            else:
                raise policy_error
        
        print("\nüîç Step 3: Verifying compression setup...")
        
        # Check compression settings
        result = conn.execute(text("""
            SELECT 
                hypertable_name,
                *
            FROM timescaledb_information.compression_settings 
            WHERE hypertable_name = 'vehicle_telemetry';
        """))
        
        comp_settings = result.fetchone()
        if comp_settings:
            print("üìä Compression Settings:")
            print(f"   - Table: {comp_settings[0]}")
            print(f"   - Enabled: {comp_settings[1]}")
            print(f"   - Segment By: {comp_settings[2]}")
        
        # Check active compression policies
        result = conn.execute(text("""
            SELECT 
                application_name,
                schedule_interval,
                config
            FROM timescaledb_information.jobs
            WHERE hypertable_name = 'vehicle_telemetry'
            AND proc_name = 'policy_compression';
        """))
        
        policies = result.fetchall()
        if policies:
            print(f"‚úÖ Active Compression Policies: {len(policies)}")
            for policy in policies:
                print(f"   - Job: {policy[0]}")
                print(f"   - Schedule: {policy[1]}")
                print(f"   - Config: {policy[2]}")
        
        # Check chunk compression status
        result = conn.execute(text("""
            SELECT 
                COUNT(*) as total_chunks,
                SUM(CASE WHEN is_compressed THEN 1 ELSE 0 END) as compressed_chunks
            FROM timescaledb_information.chunks 
            WHERE hypertable_name = 'vehicle_telemetry';
        """))
        
        chunks = result.fetchone()
        if chunks:
            print(f"üì¶ Chunk Status: {chunks[1]}/{chunks[0]} compressed")
            
        print("\nüéâ Compression setup verified and active!")
        
except Exception as e:
    print(f"‚ùå Error: {e}")

### 3. Automate Data Retention

Automatically drop old data to manage storage costs.

```sql
-- Add a policy to drop data older than 7 days
ADD RETENTION POLICY vehicle_telemetry
INTERVAL '7 days';
```

In [None]:
from sqlalchemy import create_engine, text

# Update with your Docker Postgres credentials
engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")

try:
    with engine.connect() as conn:
        print("üóëÔ∏è Adding retention policy (7 days)...")
        
        # Add retention policy to drop data older than 7 days
        conn.execute(text("""
            SELECT add_retention_policy('vehicle_telemetry', INTERVAL '7 days');
        """))
        conn.commit()
        print("‚úÖ Retention policy added! Data older than 7 days will be automatically dropped.")
        
        # Verify the policy was created
        result = conn.execute(text("""
            SELECT 
                hypertable_name,
                schedule_interval,
                config
            FROM timescaledb_information.jobs
            WHERE hypertable_name = 'vehicle_telemetry'
            AND proc_name = 'policy_retention';
        """))
        
        policies = result.fetchall()
        if policies:
            print("‚úÖ Retention Policy Active:")
            for policy in policies:
                print(f"   - Table: {policy[0]}")
                print(f"   - Schedule: {policy[1]}")
                print(f"   - Config: {policy[2]}")
        else:
            print("‚ùå No retention policy found")
            
except Exception as e:
    if "already exists" in str(e):
        print("‚úÖ Retention policy already exists!")
    else:
        print(f"‚ùå Error: {e}")