# Instal libraries

In [36]:
#!pip3 install psycopg2-binary

# Import libraries

In [None]:
#import os
import psycopg2
from psycopg2.extras import execute_values, RealDictCursor
import polars as pl
#import json
from psycopg2 import OperationalError, sql
from contextlib import contextmanager
#from dotenv import load_dotenv

# Functions

In [2]:
DB_NAME="testdb"
DB_USER="admin"
DB_PASSWORD="your_password"
DB_HOST="localhost"
DB_PORT="6432"

In [3]:
@contextmanager
def get_db_connection():
    """Context manager for PostgreSQL database connection."""
    conn = None
    try:
        # Establish the connection using environment variables
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        # Yield the connection to be used in the 'with' block
        yield conn
    except OperationalError as e:
        print(f"An error occurred while connecting to the database: {e}")
        raise
    finally:
        if conn:
            conn.close()

In [4]:
def test_db_connection():
    """Test function to verify database connection."""
    try:
        with get_db_connection() as conn:
            with conn.cursor() as cur:
                # Execute a simple query to test the connection
                cur.execute(sql.SQL("SELECT 1"))
                result = cur.fetchone()
                if result:
                    print("Database connection successful.")
                else:
                    print("Failed to retrieve data from the database.")
    except Exception as e:
        print(f"Test failed: {e}")

In [5]:
def load_and_prepare_data(parquet_file_path: str) -> pl.DataFrame:
    """
    Load and process the Parquet data file, expanding the `full_vehicleInfo` column.
    """
    # Read the Parquet file
    df = pl.read_parquet(parquet_file_path)

    # Cast 'full_vehicleInfo' to Struct type and unnest
    return df.with_columns(
        pl.col("full_vehicleInfo").cast(pl.Struct)
    ).unnest("full_vehicleInfo")

In [6]:
def insert_data_into_db(df: pl.DataFrame, table_name: str):
    """
    Insert data from a Polars DataFrame into a PostgreSQL table.
    """
    # Convert Polars DataFrame to list of tuples
    records = df.to_dicts()
    
    # Use the first record to generate column names dynamically
    columns = list(records[0].keys())
    rows = [tuple(record.values()) for record in records]

    # Database connection
    conn = None
    try:
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        with conn.cursor() as cur:
            # Create an insert query dynamically
            insert_query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES %s"
            execute_values(cur, insert_query, rows)
            conn.commit()
            print(f"{len(rows)} records successfully inserted into {table_name}.")
    except Exception as e:
        print(f"An error occurred while inserting data: {e}")
    finally:
        if conn:
            conn.close()

In [7]:
def map_polars_to_postgres_types(polars_dtype):
    """
    Map Polars data types to PostgreSQL data types.
    """
    type_mapping = {
        pl.Int32: "INTEGER",
        pl.Int64: "BIGINT",
        pl.Float32: "REAL",
        pl.Float64: "DOUBLE PRECISION",
        pl.Utf8: "TEXT",
        pl.Boolean: "BOOLEAN",
        pl.Date: "DATE",
        pl.Datetime: "TIMESTAMP",
        pl.List: "JSONB",  # If lists are used, JSONB is a good fit
    }
    return type_mapping.get(polars_dtype, "TEXT")  # Default to TEXT for unknown types

In [8]:
def create_table_from_df(table_name: str, df: pl.DataFrame):
    """
    Create a PostgreSQL table based on the schema of a Polars DataFrame.
    """
    # Generate column definitions based on DataFrame schema
    columns = [
        f"{col_name} {map_polars_to_postgres_types(dtype)}"
        for col_name, dtype in zip(df.columns, df.dtypes)
    ]
    columns_sql = ", ".join(columns)
    
    # Construct CREATE TABLE statement
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        {columns_sql}
    );
    """
    
    # Connect to PostgreSQL and execute the query
    conn = None
    try:
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        with conn.cursor() as cur:
            cur.execute(create_table_query)
            conn.commit()
            print(f"Table '{table_name}' created successfully.")
    except Exception as e:
        print(f"An error occurred while creating the table: {e}")
    finally:
        if conn:
            conn.close()

In [9]:
def retrieve_data_from_db(query: str, params: tuple = ()) -> list:
    """
    Retrieve data from the PostgreSQL database based on a query.

    Args:
        query (str): The SQL query to execute.
        params (tuple): Parameters for the SQL query.

    Returns:
        list: A list of dictionaries containing the query results.
    """
    conn = None
    try:
        # Connect to the PostgreSQL database
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            # Execute the query
            cur.execute(query, params)
            # Fetch all results
            # Convert the results to a Polars DataFrame
            return pl.DataFrame(cur.fetchall())
    except Exception as e:
        print(f"An error occurred while retrieving data: {e}")
        return []
    finally:
        if conn:
            conn.close()

In [10]:
def update_record_in_db(table_name: str, update_values: dict, condition: str, condition_params: tuple):
    """
    Update records in a PostgreSQL table.

    Args:
        table_name (str): The name of the table to update.
        update_values (dict): A dictionary of column-value pairs to update.
        condition (str): The WHERE clause to specify which records to update.
        condition_params (tuple): Parameters for the WHERE clause.

    Returns:
        int: The number of rows affected.
    """
    conn = None
    try:
        # Connect to the PostgreSQL database
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        with conn.cursor() as cur:
            # Generate the SET part of the SQL query dynamically
            set_clause = ", ".join([f"{col} = %s" for col in update_values.keys()])
            update_query = f"UPDATE {table_name} SET {set_clause} WHERE {condition}"
            
            # Combine the values to update and the WHERE clause parameters
            query_params = tuple(update_values.values()) + condition_params
            
            # Execute the update query
            cur.execute(update_query, query_params)
            conn.commit()
            
            # Return the number of rows affected
            return cur.rowcount
    except Exception as e:
        print(f"An error occurred while updating the record: {e}")
        return 0
    finally:
        if conn:
            conn.close()

In [11]:
def delete_record_from_db(table_name: str, condition: str, condition_params: tuple) -> int:
    """
    Delete records from a PostgreSQL table.

    Args:
        table_name (str): The name of the table from which to delete records.
        condition (str): The WHERE clause to specify which records to delete.
        condition_params (tuple): Parameters for the WHERE clause.

    Returns:
        int: The number of rows affected (deleted).
    """
    conn = None
    try:
        # Connect to the PostgreSQL database
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        with conn.cursor() as cur:
            # Construct the DELETE query
            delete_query = f"DELETE FROM {table_name} WHERE {condition}"
            
            # Execute the DELETE query
            cur.execute(delete_query, condition_params)
            conn.commit()
            
            # Return the number of rows deleted
            return cur.rowcount
    except Exception as e:
        print(f"An error occurred while deleting the record: {e}")
        return 0
    finally:
        if conn:
            conn.close()

# Main

## Test connection

In [17]:
test_db_connection()

Database connection successful.


## Prepare data

In [18]:
parquet_file_path = "../Data/Transform/Small/data.parquet"
table_name = "vehicles"

In [19]:
df = load_and_prepare_data(parquet_file_path)

## Create table

In [20]:
create_table_from_df(table_name, df)

Table 'vehicles' created successfully.


## Insert data

In [21]:
insert_data_into_db(df, table_name)

1000 records successfully inserted into vehicles.


# Testing

## Test 1 - fetch data from db

In [53]:
data = retrieve_data_from_db(query = "SELECT * FROM vehicles WHERE vehicle_year = %s;", params = (1999,))
print(data)

shape: (25, 9)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ vin       ┆ license_p ┆ vehicle_m ┆ vehicle_m ┆ … ┆ vehicle_c ┆ vehicle_m ┆ vehicle_y ┆ vehicle_ │
│ ---       ┆ late      ┆ ake       ┆ odel      ┆   ┆ ategory   ┆ ake_model ┆ ear_make_ ┆ year_mak │
│ str       ┆ ---       ┆ ---       ┆ ---       ┆   ┆ ---       ┆ ---       ┆ model     ┆ e_model_ │
│           ┆ str       ┆ str       ┆ str       ┆   ┆ str       ┆ str       ┆ ---       ┆ cat      │
│           ┆           ┆           ┆           ┆   ┆           ┆           ┆ str       ┆ ---      │
│           ┆           ┆           ┆           ┆   ┆           ┆           ┆           ┆ str      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 82HFE9767 ┆ WU37 WRN  ┆ Mitsubish ┆ Montero   ┆ … ┆ SUV       ┆ Mitsubish ┆ 1999 Mits ┆ 1999 Mit │
│ U326DEZ2  ┆           ┆ i         ┆           ┆   ┆           ┆ i Montero 

In [54]:
data = retrieve_data_from_db("SELECT * FROM vehicles;", "")
with pl.Config(tbl_cols=-1):
    print(data)

shape: (1_000, 9)
┌──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┐
│ vin      ┆ license_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ │
│ ---      ┆ plate    ┆ make     ┆ model    ┆ year     ┆ category ┆ make_mod ┆ year_mak ┆ year_mak │
│ str      ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ el       ┆ e_model  ┆ e_model_ │
│          ┆ str      ┆ str      ┆ str      ┆ i64      ┆ str      ┆ ---      ┆ ---      ┆ cat      │
│          ┆          ┆          ┆          ┆          ┆          ┆ str      ┆ str      ┆ ---      │
│          ┆          ┆          ┆          ┆          ┆          ┆          ┆          ┆ str      │
╞══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╡
│ 82HFE976 ┆ WU37 WRN ┆ Mitsubis ┆ Montero  ┆ 1999     ┆ SUV      ┆ Mitsubis ┆ 1999 Mit ┆ 1999 Mit │
│ 7U326DEZ ┆          ┆ hi       ┆          ┆          ┆          ┆ hi   

## Test 2 - update the record and get the updated version

In [55]:
data = retrieve_data_from_db(query = "SELECT * FROM vehicles WHERE vin = %s;", params = ("82HFE9767U326DEZ2",))
with pl.Config(tbl_cols=-1):
    print(data)

shape: (1, 9)
┌──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┐
│ vin      ┆ license_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ │
│ ---      ┆ plate    ┆ make     ┆ model    ┆ year     ┆ category ┆ make_mod ┆ year_mak ┆ year_mak │
│ str      ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ el       ┆ e_model  ┆ e_model_ │
│          ┆ str      ┆ str      ┆ str      ┆ i64      ┆ str      ┆ ---      ┆ ---      ┆ cat      │
│          ┆          ┆          ┆          ┆          ┆          ┆ str      ┆ str      ┆ ---      │
│          ┆          ┆          ┆          ┆          ┆          ┆          ┆          ┆ str      │
╞══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╡
│ 82HFE976 ┆ WU37 WRN ┆ Mitsubis ┆ Montero  ┆ 1999     ┆ SUV      ┆ Mitsubis ┆ 1999 Mit ┆ 1999 Mit │
│ 7U326DEZ ┆          ┆ hi       ┆          ┆          ┆          ┆ hi       

In [56]:
table_name = "vehicles"
update_values = {"vehicle_make": "Toyota", "vehicle_model": "Camry"}
condition = "vin = %s"
condition_params = ("82HFE9767U326DEZ2",)
rows_updated = update_record_in_db(table_name, update_values, condition, condition_params)
print(f"Number of rows updated: {rows_updated}")

Number of rows updated: 1


In [57]:
data = retrieve_data_from_db(query = "SELECT * FROM vehicles WHERE vin = %s;", params = ("82HFE9767U326DEZ2",))
with pl.Config(tbl_cols=-1):
    print(data)

shape: (1, 9)
┌──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┐
│ vin      ┆ license_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ │
│ ---      ┆ plate    ┆ make     ┆ model    ┆ year     ┆ category ┆ make_mod ┆ year_mak ┆ year_mak │
│ str      ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ el       ┆ e_model  ┆ e_model_ │
│          ┆ str      ┆ str      ┆ str      ┆ i64      ┆ str      ┆ ---      ┆ ---      ┆ cat      │
│          ┆          ┆          ┆          ┆          ┆          ┆ str      ┆ str      ┆ ---      │
│          ┆          ┆          ┆          ┆          ┆          ┆          ┆          ┆ str      │
╞══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╡
│ 82HFE976 ┆ WU37 WRN ┆ Toyota   ┆ Camry    ┆ 1999     ┆ SUV      ┆ Mitsubis ┆ 1999 Mit ┆ 1999 Mit │
│ 7U326DEZ ┆          ┆          ┆          ┆          ┆          ┆ hi       

## Test 3 - Get historical data to see updates

## Test 4 - delete the record

In [58]:
data = retrieve_data_from_db(query = "SELECT * FROM vehicles WHERE vin = %s;", params = ("82HFE9767U326DEZ2",))
with pl.Config(tbl_cols=-1):
    print(data)

shape: (1, 9)
┌──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┬──────────┐
│ vin      ┆ license_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ ┆ vehicle_ │
│ ---      ┆ plate    ┆ make     ┆ model    ┆ year     ┆ category ┆ make_mod ┆ year_mak ┆ year_mak │
│ str      ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ ---      ┆ el       ┆ e_model  ┆ e_model_ │
│          ┆ str      ┆ str      ┆ str      ┆ i64      ┆ str      ┆ ---      ┆ ---      ┆ cat      │
│          ┆          ┆          ┆          ┆          ┆          ┆ str      ┆ str      ┆ ---      │
│          ┆          ┆          ┆          ┆          ┆          ┆          ┆          ┆ str      │
╞══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╪══════════╡
│ 82HFE976 ┆ WU37 WRN ┆ Toyota   ┆ Camry    ┆ 1999     ┆ SUV      ┆ Mitsubis ┆ 1999 Mit ┆ 1999 Mit │
│ 7U326DEZ ┆          ┆          ┆          ┆          ┆          ┆ hi       

In [59]:
table_name = "vehicles"
condition = "vin = %s"
condition_params = ("82HFE9767U326DEZ2",)
rows_deleted = delete_record_from_db(table_name, condition, condition_params)
print(f"Number of rows deleted: {rows_deleted}")

Number of rows deleted: 1


In [60]:
data = retrieve_data_from_db(query = "SELECT * FROM vehicles WHERE vin = %s;", params = ("82HFE9767U326DEZ2",))
with pl.Config(tbl_cols=-1):
    print(data)

shape: (0, 0)
┌┐
╞╡
└┘
