In [6]:
from sqlalchemy import create_engine
import sqlalchemy
# Your database connection details
user = 'dbadmin'
password = 'dbadminpw'
host = 'localhost'
port = 5433
db = 'sqldatabase'

sqlalchemy_engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{db}")

In [7]:
with sqlalchemy_engine.connect() as conn:
    conn.execute(sqlalchemy.text("CREATE SCHEMA IF NOT EXISTS bronze"))
    conn.execute(sqlalchemy.text("CREATE SCHEMA IF NOT EXISTS silver"))
    conn.commit() # Commit the changes if you're not in an auto-commit context
    print("Schemas 'bronze' and 'silver' created (or already existed).")

OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 5433 failed: Connection refused
	Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5433 failed: Connection refused
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/20/e3q8)

In [8]:
from pyspark.sql import SparkSession

In [9]:
import os
from pyspark.sql import SparkSession

# --- IMPORTANT: Configure Spark Session with JDBC Driver ---
jdbc_driver_path = "/Users/mac/Learningnewthings/data-engineering/postgresql-42.6.0.jar" # Verify this path on your Mac

spark = SparkSession.builder \
    .appName("ETL CSV to Postgres Local") \
    .config("spark.jars", jdbc_driver_path) \
    .getOrCreate()

jdbc_url = f"jdbc:postgresql://{host}:{port}/{db}"

# This is the path to your datasets folder ON YOUR MAC
host_datasets_path = "/Users/mac/Learningnewthings/data-engineering/projectFirst/datasets" # Verify this path on your Mac


file_table_map = {
    "cust_info.csv": "crm_cust_info",
    "prd_info.csv": "crm_prd_info",
    "sales_details.csv": "crm_sales_details",
    "CUST_AZ12.csv": "erp_cust_info",
    "LOC_A101.csv": "erp_loc_info",
    "PX_CAT_G1V2.csv": "erp_px_cat"
}

folders = ["source_crm", "source_erp"]

# Optional: Create schemas... (omitted for brevity here, assuming it works)

print('Starting CSV ingestion...')
# --- DEBUG PRINT: Verify initial paths ---
print(f"DEBUG: host_datasets_path = {host_datasets_path}")
print(f"DEBUG: folders = {folders}")


for folder in folders:
    # --- DEBUG PRINT: Verify 'folder' value ---
    print(f"\nDEBUG: Current 'folder' in outer loop: {folder}")

    # This is the expected path to a directory like /datasets/source_crm or /datasets/source_erp
    full_path = os.path.join(host_datasets_path, folder)

    # --- DEBUG PRINT: Verify 'full_path' before os.listdir ---
    print(f"DEBUG: 'full_path' before os.listdir: {full_path}")


    # Check if the directory exists before listing its contents
    if not os.path.isdir(full_path):
        print(f"Warning: Directory not found on host: {full_path}. Skipping.")
        continue

    # This is the line where the error occurs, meaning 'full_path' is NOT a directory at this point
    for file in os.listdir(full_path):
        # --- DEBUG PRINT: Verify 'file' value ---
        print(f"  DEBUG: Found 'file' in directory: {file}")

        if file.endswith('.csv') and file in file_table_map:
            table = file_table_map[file]
            file_path = os.path.join(full_path, file) # This correctly builds path to a CSV file
            print(f"    → File path: {file_path}")
            print(f"    Ingesting {file} to bronze.{table}")

            try:
                df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)

                df.write \
                    .format('jdbc') \
                    .option('url', jdbc_url) \
                    .option('dbtable', f'bronze.{table}') \
                    .option('user', user) \
                    .option('password', password) \
                    .option('driver', 'org.postgresql.Driver') \
                    .mode('overwrite') \
                    .save()
                print(f"    Successfully ingested {file} to bronze.{table}")
            except Exception as e:
                print(f"    Error ingesting {file}: {e}")

print('All CSVs ingestion process completed (check database for actual data).')

Starting CSV ingestion...
DEBUG: host_datasets_path = /Users/mac/Learningnewthings/data-engineering/projectFirst/datasets
DEBUG: folders = ['source_crm', 'source_erp']

DEBUG: Current 'folder' in outer loop: source_crm
DEBUG: 'full_path' before os.listdir: /Users/mac/Learningnewthings/data-engineering/projectFirst/datasets/source_crm
  DEBUG: Found 'file' in directory: cust_info.csv
    → File path: /Users/mac/Learningnewthings/data-engineering/projectFirst/datasets/source_crm/cust_info.csv
    Ingesting cust_info.csv to bronze.crm_cust_info
    Error ingesting cust_info.csv: An error occurred while calling o228.save.
: org.postgresql.util.PSQLException: Connection to localhost:5433 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:342)
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:54)
	at org.postg

In [2]:
import os
user = os.environ['PG_USER']
password = os.environ['PG_PASSWORD']
host = os.environ['PG_HOST']
port = os.environ['PG_PORT']
db = os.environ['PG_DB']

KeyError: 'PG_USER'