In [None]:
import polars as pl
import json
import psycopg2
from psycopg2.extras import execute_values

SCHEMA_NAME = "soda_checks"
TABLE_NAME = "data_quality_checks"


class DataQualityCheckLoader:
    def __init__(self, host, db_name, username, password, schema, table):
        self.host = host
        self.db_name = db_name
        self.username = username
        self.password = password
        self.schema = schema
        self.table = table
        self.conn = self.create_pg_connection()

    def create_pg_connection(self):
        host_split = self.host.split(':')
        return psycopg2.connect(
            host=host_split[0],
            port=host_split[1] if len(host_split) > 1 else 5432,
            dbname=self.db_name,
            user=self.username,
            password=self.password
        )

    def ensure_schema_and_table(self):
        with self.conn.cursor() as cur:
            cur.execute(f"CREATE SCHEMA IF NOT EXISTS {self.schema};")
            cur.execute(f"""
                CREATE TABLE IF NOT EXISTS {self.schema}.{self.table} (
                    id TEXT PRIMARY KEY,
                    data_source TEXT,
                    table_name TEXT,
                    check_name TEXT,
                    column_name TEXT,
                    metric_id TEXT[],
                    outcome TEXT,
                    value DOUBLE PRECISION,
                    timestamp TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
                );
            """)
        self.conn.commit()

    @staticmethod
    def load_json_to_records(json_path):
        with open(json_path, "r") as f:
            soda_json = json.load(f)
        records = []
        timestamp = soda_json.get("dataTimestamp")
        for check in soda_json.get("checks", []):
            records.append({
                "id": check.get("identity"),
                "data_source": check.get("dataSource"),
                "table_name": check.get("table"),
                "check_name": check.get("name"),
                "column_name": check.get("column"),
                "metric_id": check.get("metrics"),
                "outcome": check.get("outcome"),
                "value": check.get("diagnostics", {}).get("value"),
                "timestamp": timestamp
            })
        return records

    def insert_records(self, records):
        if not records:
            print("No records to insert.")
            return
        columns = ["id", "data_source", "table_name", "check_name", "column_name", "metric_id", "outcome", "value", "timestamp"]
        values = [
            (
                rec["id"],
                rec["data_source"],
                rec["table_name"],
                rec["check_name"],
                rec["column_name"],
                rec["metric_id"],
                rec["outcome"],
                rec["value"],
                rec["timestamp"]
            )
            for rec in records
        ]
        insert_sql = f"""
            INSERT INTO {self.schema}.{self.table} ({', '.join(columns)})
            VALUES %s
            ON CONFLICT (id) DO NOTHING;
        """
        with self.conn.cursor() as cur:
            execute_values(cur, insert_sql, values)
        self.conn.commit()
        print(f"Inserted {len(values)} records into {self.schema}.{self.table}.")

    def close(self):
        if self.conn:
            self.conn.close()


dq_loader = DataQualityCheckLoader(
    host='localhost:5432',
    db_name='mydb',
    username='myuser',
    password='mypassword',
    schema=SCHEMA_NAME,
    table=TABLE_NAME
)

dq_loader.ensure_schema_and_table()
records = dq_loader.load_json_to_records("wordline_velocity_dq_results.json")
dq_loader.insert_records(records)

In [None]:
import pandas as pd
import random
import uuid
from datetime import datetime, timedelta

# Helper to generate fake data
def generate_fake_data(n=30):
    base_timestamp = datetime(2025, 6, 1, 9, 0)

    data = []
    for i in range(n):
        row = {
            "id": str(uuid.uuid4())[:8],
            "data_source": random.choice(['ods', 'dsa', 'dtm']),
            "table_name": random.choice(['table1', 'table2', 'table3', 'table4', 'table5']),
            "check_name": random.choice([
                "row_count > 0",
                "Schema Check",
                "check_last_payment_date",
                "missing_count(\"productId\") = 0"
            ]),
            "column_name": random.choice([None, "productId", "lastPaymentDate", "amount", "status", "customerId"]),
            "metric_id": f"metric-{uuid.uuid4()}",
            "outcome": random.choice(['pass', 'fail', 'warn']),
            "value": round(random.uniform(0.0, 20.0), 2),
            "timestamp": base_timestamp + timedelta(hours=random.randint(0, 96))
        }
        data.append(row)
    return pd.DataFrame(data)

df_fake = generate_fake_data(100)

fake_records = []
for _, check in df_fake.iterrows():
    fake_records.append({
        "id": check["id"],
        "data_source": check["data_source"],
        "table_name": check["table_name"],
        "check_name": check["check_name"],
        "column_name": check["column_name"],
        "metric_id": [check["metric_id"]],
        "outcome": check["outcome"],
        "value": check["value"],
        "timestamp": check["timestamp"]
    })

fake_records

dq_loader.insert_records(fake_records)

In [None]:
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    dbname="mydb",
    user="myuser",
    password="mypassword"
)

# Define your connection string (PostgreSQL)
conn_str = "postgresql://myuser:mypassword@localhost:5432/mydb"

# Define your SQL query
query = "SELECT * FROM soda_checks.data_quality_checks"

# Read the table using Polars and ConnectorX
df = pl.read_database(query, connection=conn)

print(df.head())