In [1]:
import duckdb

**Create DuckDB connection**

In [2]:
conn = duckdb.connect()

**Install Postgres in DuckDB**

In [None]:
conn.execute("INSTALL postgres;")
conn.execute("LOAD postgres;")

**Connect DuckDB with Postgres**

Replace with your Postgres credentials

In [None]:
conn.execute(f"""
ATTACH 'dbname=your_dbname user=your_user password=your_password host=your_host port=5432' AS pg_db (TYPE POSTGRES);
""")

`pg_db` is the connection alias, that acts as a catalog inside DuckDB, you can execute queries like this

In [None]:
# conn.execute("SELECT * FROM pg_db.table_name")

## **Extract**

**Creating DuckDB Tables from CSV files**

In [None]:
# Tables (files) list
tables = ["students", "teachers", "classes", "courses", "enrollments", "grades"]

In [None]:
for table in tables:
    conn.execute(f"""CREATE OR REPLACE TABLE 
    {table}
    AS SELECT 
        * 
    FROM 
        read_csv_auto('data/raw_data/{table}.csv');""")

    print(f"{table} table created succesfully")

## **Transform**

**Students Table**

In [None]:
students_query = conn.execute("DESCRIBE students").df()
students_query

Casting `student_id` from `BIGINT` to `INTEGER`: 8 bytes -> 4 bytes

Casting `age` from `BIGINT` to `SMALLINT`: 8 bytes -> 2 bytes

In [None]:
conn.execute("""
CREATE OR REPLACE TABLE 
    students_clean
AS SELECT
    CAST(student_id AS INTEGER) AS student_id,
    CAST(age AS SMALLINT) AS age,
    * EXCLUDE (student_id, age)
FROM
    students;
""")

**Teachers Table**

In [None]:
teachers_query = conn.execute("DESCRIBE teachers").df()
teachers_query

Casting `teacher_id` from `BIGINT` to `INTEGER`

In [None]:
conn.execute("""
CREATE OR REPLACE TABLE 
    teachers_clean
AS SELECT
    CAST(teacher_id AS INTEGER) AS teacher_id,
    * EXCLUDE (teacher_id)
FROM
    teachers;
""")

**Classes Table**

In [None]:
classes_query = conn.execute("DESCRIBE classes").df()
classes_query

Casting `class_id`, `course_id`, `teacher_id` from `BIGINT` to `INTEGER`

In [None]:
conn.execute("""
CREATE OR REPLACE TABLE 
    classes_clean
AS SELECT
    CAST(class_id AS INTEGER) AS class_id,
    CAST(course_id AS INTEGER) AS course_id,
    CAST(teacher_id AS INTEGER) AS teacher_id,
    * EXCLUDE (class_id, course_id, teacher_id)
FROM
    classes;
""")

**Courses Table**

In [None]:
courses_query = conn.execute("DESCRIBE courses").df()
courses_query

In [None]:
conn.execute("""
CREATE OR REPLACE TABLE 
    courses_clean
AS SELECT
    CAST(course_id AS INTEGER) AS course_id,
    * EXCLUDE (course_id, credits),
    CAST(credits AS SMALLINT) AS credits
FROM
    courses;
""")

**Grades Table**

In [None]:
grades_query = conn.execute("DESCRIBE grades").df()
grades_query

Casting `grade_id` and `enrollment` FROM `BIGINT` to `INTEGER`

In [None]:
conn.execute("""
CREATE OR REPLACE TABLE 
    grades_clean
AS SELECT
    CAST(grade_id AS INTEGER) AS grade_id,
    CAST(enrollment_id AS INTEGER) AS enrollment_id,
    * EXCLUDE (grade_id, enrollment_id)
FROM
    grades;
""")

**Enrollments Table**

In [None]:
enrollments_query = conn.execute("DESCRIBE enrollments").df()
enrollments_query

In [None]:
conn.execute("""
CREATE OR REPLACE TABLE 
    enrollments_clean
AS SELECT
    CAST(enrollment_id AS INTEGER) AS enrollment_id,
    CAST(student_id AS INTEGER) AS student_id,
    CAST(class_id AS INTEGER) AS class_id,
    * EXCLUDE (enrollment_id, student_id, class_id)
FROM
    enrollments;
""")

## **Load**

Loading tables with clean data into Postgres

In [None]:
for table in tables:
    clean_table_name = f"{table}_clean" # students_clean, teachers_clean, etc...
    
    conn.execute(f"""
    CREATE OR REPLACE TABLE 
        pg_db.{table} 
    AS SELECT 
        * 
    FROM 
        {clean_table_name};
    """)
    
    print(f"[{table}] table loaded succesfully into Postgres")

conn.close()