In [2]:
# etl_pipeline.py
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine("postgresql+psycopg2://postgres:rijaa@localhost:5433/telco_project")
try:
    connection = engine.connect()
    print("Connection successful!")
    connection.close()
except Exception as e:
    print("Connection failed with error: {e}")

Connection successful!


In [10]:
# Load raw data into staging
for table, file in {
    "staging.telco_raw": "telco_data/telco_clean.csv",
    "staging.support_calls_raw": "telco_data/support_calls.csv",
    "staging.payments_raw": "telco_data/payments.csv"
}.items():
    df = pd.read_csv(file)
    row_count_csv = len(df)

    df.to_sql(table.split('.')[1], engine, schema=table.split('.')[0], if_exists='replace', index=False)
    print(f"Loaded {row_count_csv} rows into {table}")

Loaded 7032 rows into staging.telco_raw
Loaded 2987 rows into staging.support_calls_raw
Loaded 84384 rows into staging.payments_raw


In [17]:
# Run transformations (SQL files stored separately, executed from Python)
from sqlalchemy import text

with engine.begin() as conn:
    for script in [
        "sql/customers.sql",
        "sql/support_features.sql",
        "sql/payment_features.sql",
        "sql/customer_features.sql"
    ]:
        with open(script) as f:
            # Split the file content into individual SQL statements
            # This assumes statements are separated by semicolons
            sql_content = f.read()

        # Split on semicolon, execute each statement separately
        for stmt in sql_content.split(";"):
            stmt = stmt.strip()
            if stmt:  # ignore empty lines
                conn.execute(text(stmt))

print("ETL pipeline completed. Tables available in analytics schema.")

ETL pipeline completed. Tables available in analytics schema.


In [18]:
####alternative method with raw sql
# Get a raw connection from the engine
raw_conn = engine.raw_connection()

# Create a cursor
cur = raw_conn.cursor()

# Execute SQL scripts
for script in ["sql/customers.sql", "sql/support_features.sql", "sql/payment_features.sql", "sql/customer_features.sql"]:
    with open(script) as f:
        cur.execute(f.read())

# Commit the changes
raw_conn.commit()

# Close cursor and connection
cur.close()
raw_conn.close()