In [28]:
from Extractor.csv_extractor import CSVExtractor
from Extractor.database_connector import DatabaseConnector
from Extractor.api_extractor import APIExtractor
from Extractor.s3_extractor import PublicS3Extractor
# from Extractor.main_extractor import MainExtractor
from Extractor.json_extractor import JSONExtractor



In [29]:
from dotenv import load_dotenv
import yaml
import os

In [30]:
# adjust this path to where your .env actually lives
load_dotenv(r".env")

config_path = r"F:\Data Engineering\Virtual Environment\config.yaml"
with open(config_path) as f:
    raw = f.read()

In [31]:
# expands ${DB_HOST}, ${DB_USER}, etc. using os.environ
expanded = os.path.expandvars(raw)
config = yaml.safe_load(expanded)

# sanity check: should be a dict-of-dicts
print("CONFIG:", config)
print("TYPE of config['database']:", type(config['database']))

CONFIG: {'database': {'host': 'localhost', 'database': 'advancedsql', 'user': 'postgres', 'password': 'Harshana123', 'port': 5432}, 's3': {'bucket_name': 'fde-bucket-my', 'region': 'us-east-1', 'files': {'JSON/products.json': 'lnd_products_json', 'JSON/sales.json': 'lnd_sales_json', 'CSV/customers.csv': 'lnd_customers_csv', 'CSV/products.csv': 'lnd_products_csv', 'CSV/sales.csv': 'lnd_sales_csv'}}, 'api': {'endpoints': {'https://dummyjson.com/products': 'lnd_products_api', 'https://dummyjson.com/users': 'lnd_users_api'}}, 'schemas': {'landing_schema': 'landing', 'staging_schema': 'staging', 'transform_schema': 'transform', 'target_schema': 'target'}, 'entities': {'products': {'staging_view': 'stg_products', 'temp_table': 'tmp_products', 'target_table': 'dim_products'}, 'users': {'staging_view': 'stg_users', 'temp_table': 'tmp_users', 'target_table': 'dim_users'}, 'sales': {'staging_view': 'stg_sales', 'temp_table': 'tmp_sales', 'target_table': 'fact_sales'}}}
TYPE of config['database']

In [32]:
# 3) instantiate your connector with that dict
db = DatabaseConnector(config)

In [33]:
# # 4) now test psycopg2
print("\n--- psycopg2 test ---")
conn = db.get_connection()
cur  = conn.cursor()

# # Test1: Get PostgreSQL version
cur.execute("SELECT version();")
print("Postgres version:", cur.fetchone()[0])


# 2. List all tables in landing schema
cur.execute("""
    SELECT table_name
      FROM information_schema.tables
     WHERE table_schema = 'landing';
""")
print("Tables in landing schema:", cur.fetchall())

# 3. show the structure of one of the tables
cur.execute("""
    SELECT column_name, data_type
      FROM information_schema.columns
     WHERE table_schema = 'landing' 
    AND table_name = 'lnd_customers_csv';
""")
print("Table structure:", cur.fetchall())

cur.close()
conn.close()




--- psycopg2 test ---
Postgres version: PostgreSQL 17.5 on x86_64-windows, compiled by msvc-19.44.35209, 64-bit
Tables in landing schema: [('lnd_products_json',), ('lnd_sales_json',), ('lnd_customers_csv',), ('lnd_products_csv',), ('lnd_sales_csv',), ('lnd_products_api',), ('lnd_users_api',)]
Table structure: [('id', 'integer'), ('customer_key', 'character varying'), ('gender', 'character varying'), ('name', 'character varying'), ('city', 'character varying'), ('state_code', 'character varying'), ('state', 'character varying'), ('zip_code', 'character varying'), ('country', 'character varying'), ('continent', 'character varying'), ('birthday', 'date'), ('loaded_at', 'timestamp without time zone')]


In [35]:
from urllib.parse import quote_plus
from sqlalchemy import create_engine, text, MetaData, inspect

print("\n--- SQLAlchemy engine test ---")

# Encode password with special characters
password = quote_plus("Harshana123")

# Create engine with encoded password
engine = create_engine(
    f"postgresql+psycopg2://postgres:{password}@localhost:5432/advancedsql",
    echo=False  # Set to True if you want to see raw SQL logs
)

# Test connection
with engine.connect() as conn:
    dbname = conn.execute(text("SELECT current_database();")).scalar()
    print("✅ Connected to database:", dbname)

# Reflect all tables in the 'landing' schema
metadata = MetaData()
metadata.reflect(bind=engine, schema="landing")

# Print all table names in schema
print("Tables in 'landing' schema:", metadata.tables.keys())

# Fully qualified table name
table_key = "landing.lnd_customers_csv"

# Check if the table exists in reflected metadata
if table_key in metadata.tables:
    my_table = metadata.tables[table_key]
    print("✅ Table reflected successfully:", my_table)
else:
    print("❌ Table 'lnd_customers_csv' not found in schema 'landing'")

# Use Inspector to get column details
inspector = inspect(engine)
columns = inspector.get_columns('lnd_customers_csv', schema='landing')
print("\n📋 Columns in lnd_customers_csv:")
for col in columns:
    print(f"- {col['name']} ({col['type']})")  



--- SQLAlchemy engine test ---
✅ Connected to database: advancedsql
Tables in 'landing' schema: dict_keys(['landing.lnd_products_json', 'landing.lnd_sales_json', 'landing.lnd_customers_csv', 'landing.lnd_products_csv', 'landing.lnd_sales_csv', 'landing.lnd_products_api', 'landing.lnd_users_api'])
✅ Table reflected successfully: landing.lnd_customers_csv

📋 Columns in lnd_customers_csv:
- id (INTEGER)
- customer_key (VARCHAR(50))
- gender (VARCHAR(20))
- name (VARCHAR(255))
- city (VARCHAR(100))
- state_code (VARCHAR(50))
- state (VARCHAR(100))
- zip_code (VARCHAR(20))
- country (VARCHAR(100))
- continent (VARCHAR(50))
- birthday (DATE)
- loaded_at (TIMESTAMP)
