In [192]:
import psycopg2
from psycopg2.errorcodes import UNIQUE_VIOLATION

After performing some queries and multiple joins we are going to transform the ERD schema (Entity Relationship Diagram) into a star schema, in order to simplify data analysis

<img src='./img/erd_schema.png' width=40% height = 30%> <img src='./img/dvd_star_schema.png' width=40% height = 30%>

After creatin de schema we procede to create the tables in the dvd database.

In [193]:
conn = psycopg2.connect('host=localhost dbname=dvdrental user=postgres password=root')
conn.set_session(autocommit=True)

cur = conn.cursor()

In [194]:
# We are going to create the tables from the star_schema.
dimcustomer_create = ('''
                      CREATE TABLE IF NOT EXISTS dimcustomer(
                        customer_key SERIAL PRIMARY KEY,
                        customer_id INT NOT NULL,
                        fist_name VARCHAR NOT NULL,
                        last_name VARCHAR NOT NULL,
                        email VARCHAR,
                        address VARCHAR NOT NULL,
                        address2 VARCHAR,
                        phone VARCHAR,
                        city VARCHAR  NOT NULL,
                        district VARCHAR NOT NULL,
                        postal_code VARCHAR,
                        country VARCHAR NOT NULL,
                        create_date DATE NOT NULL,
                        active INT NOT NULL                        
                    )''')
cur.execute(dimcustomer_create)

In [195]:
dimfilm_create = ('''
                  CREATE TABLE IF NOT EXISTS dimfilm(
                    film_key SERIAL PRIMARY KEY,
                    film_id INT NOT NULL,
                    title VARCHAR NOT NULL,
                    description TEXT,
                    release_year YEAR,
                    language VARCHAR NOT NULL,
                    length INT,
                    rating MPAA_RATING NOT NULL,
                    category VARCHAR NOT NULL,
                    rental_rate VARCHAR NOT NULL,
                    rental_duration INT NOT NULL,
                    special_features VARCHAR, 
                    actor_first_name VARCHAR,
                    actor_last_name VARCHAR                  
                )''')
cur.execute(dimfilm_create)
                  

In [196]:
dimstaff_create = ('''
                   CREATE TABLE IF NOT EXISTS dimstaff (
                       staff_key SERIAL PRIMARY KEY,
                       staff_id  INT NOT NULL,
                       first_name VARCHAR NOT NULL,
                       last_name VARCHAR NOT NULL,
                       address VARCHAR NOT NULL,
                       address2 VARCHAR,
                       district VARCHAR NOT NULL,
                       city VARCHAR NOT NULL,
                       posta_code VARCHAR,
                       country VARCHAR NOT NULL,
                       email VARCHAR,
                       phone VARCHAR NOT NULL,
                       working_store VARCHAR NOT NULL,
                       ussername VARCHAR NOT NULL,
                       password  VARCHAR NOT NULL             
                )''')
cur.execute(dimstaff_create)

In [197]:
dimstore_create = ('''
                    CREATE TABLE IF NOT EXISTS dimstore(
                        store_key SERIAL PRIMARY KEY,
                        store_id INT NOT NULL,
                        manager_first_name VARCHAR NOT NULL,
                        manager_last_name VARCHAR NOT NULL,
                        adress VARCHAR NOT NULL,
                        adress2 VARCHAR,
                        district VARCHAR NOT NULL,
                        city VARCHAR NOT NULL,
                        posta_code VARCHAR NOT NULL,
                        country VARCHAR NOT NULL,
                        phone VARCHAR NOT NULL
                )''')
cur.execute(dimstore_create)

In [198]:
dimdate_create = ('''
                  CREATE TABLE IF NOT EXISTS dimdate(
                      date_key SERIAL PRIMARY KEY,
                      payment_date DATE NOT NULL,
                      year INT NOT NULL,
                      month INT NOT NULL,
                      day INT NOT NULL,
                      week INT NOT NULL,
                      is_weekend BOOLEAN                      
                )''')
cur.execute(dimdate_create)

In [199]:
dimrental_create = ('''
                    CREATE TABLE IF NOT EXISTS dimrental(
                        rental_key SERIAL PRIMARY KEY,
                        rental_id INT NOT NULL,
                        title VARCHAR NOT NULL,
                        rental_date DATE NOT NULL,
                        return_date DATE
                )''')
cur.execute(dimrental_create)

In [200]:
# Creating our fact table, wich will reference all my dimension tables
factsales_create = ('''
                    CREATE TABLE IF NOT EXISTS factsales(
                      sales_key SERIAL PRIMARY KEY,
                      customer_key INT REFERENCES dimcustomer (customer_key),
                      store_key INT REFERENCES dimstore (store_key),
                      staff_key INT REFERENCES dimstaff (staff_key),
                      film_key INT REFERENCES dimfilm (film_key),
                      date_key INT REFERENCES dimdate (date_key),
                      rental_key INT REFERENCES dimrental (rental_key),
                      sales_amount  NUMERIC,
                      UNIQUE (customer_key, store_key, staff_key, film_key, date_key, rental_key)
                )''')
cur.execute(factsales_create)

Now im going to instert data into the tables that i just created. For that im going to use pgAdmin4 to execute queries. Used queries are going to be written below, but they are not going to be executed by psycopg2 cursor.

In [201]:
try:
	dimdate_query = ('''INSERT INTO dimdate (date_key, payment_date, year, month, day, week, is_weekend)
						SELECT
							DISTINCT(TO_CHAR(payment_date :: DATE, 'yyyMMDD') :: INT),
							DATE(payment_date) as payment_date,
							EXTRACT(YEAR FROM payment_date) as year,
							EXTRACT(MONTH FROM payment_date) as MONTH,
							EXTRACT(DAY FROM payment_date) as DAY,
							EXTRACT(WEEK FROM payment_date) as week,
							CASE  
								WHEN EXTRACT(isodow FROM payment_date) = 6 THEN True
								WHEN EXTRACT(isodow FROM payment_date) = 7 THEN True
								ELSE False
							END
						FROM payment
					''')
	cur.execute(dimdate_query)
 
except psycopg2.Error as e:
    if psycopg2.errors.lookup(UNIQUE_VIOLATION):
        print('Error: Records you are trying to load are already in the table') 

    else:
        print('Error: Undefined error has happened')
        print(e)

Error: Records you are trying to load are already in the table


In [202]:
try:
    dimcustomer_query = ('''INSERT INTO dimcustomer (customer_key, customer_id, fist_name, last_name, 
                                                    email, address, address2, phone, city, district, 
                                                    postal_code, country, create_date, active)
                            SELECT 
                                c.customer_id as customer_key,
                                c.customer_id,
                                c.first_name,
                                c.last_name,
                                c.email,
                                a.address,
                                a.address2,
                                a.phone,
                                cit.city,
                                a.district,
                                a.postal_code,
                                coun.country,
                                c.create_date,
                                c.active
                            FROM customer AS c
                            INNER JOIN address AS a
                                ON c.address_id = a.address_id
                            INNER JOIN city as cit
                                ON a.city_id = cit.city_id
                            INNER JOIN country as coun
                                ON cit.country_id = coun.country_id
                        ''')
    
    cur.execute(dimcustomer_query)
 
except psycopg2.Error as e:
    if psycopg2.errors.lookup(UNIQUE_VIOLATION):
        print('Error: Records you are trying to load are already in the table') 

    else:
        print('Error: Undefined error has happened')
        print(e)

Error: Records you are trying to load are already in the table


In [203]:
try:
    dimstore_query = ('''INSERT INTO dimstore (store_key, store_id, manager_first_name, 
					  							manager_last_name, adress, adress2, district, 
					  							city, posta_code, country, phone)
						SELECT 
							s.store_id as store_key,
							s.store_id,
							st.first_name,
							st.last_name,
							a.address,
							a.address2,
							a.district,
							c.city,
							a.postal_code,
							coun.country,
							a.phone
						FROM store as s
						INNER JOIN staff as st
							ON s.store_id = st.store_id
						INNER JOIN address as a
							on s.address_id = a.address_id
						INNER JOIN city as c
							ON a.city_id = c.city_id
						INNER JOIN country as coun
							ON c.country_id = coun.country_id
					''')
    cur.execute(dimstore_query)
 
except psycopg2.Error as e:
    if psycopg2.errors.lookup(UNIQUE_VIOLATION):
        print('Error: Records you are trying to load are already in the table') 

    else:
        print('Error: Undefined error has happened')
        print(e)

Error: Records you are trying to load are already in the table


In [204]:
try:
    dimrental_query = ('''INSERT INTO dimrental (rental_key, rental_id, title, rental_date, return_date)
							SELECT
								r.rental_id as rental_key,
								r.rental_id,
								f.title,
								r.rental_date,
								r.return_date
							FROM rental AS r
							INNER JOIN inventory AS i ON i.inventory_id = r.inventory_id
							INNER JOIN film AS f ON f.film_id = i.film_id
						''')
    cur.execute(dimrental_query)
 
except psycopg2.Error as e:
    if psycopg2.errors.lookup(UNIQUE_VIOLATION):
        print('Error: Records you are trying to load are already in the table') 

    else:
        print('Error: Undefined error has happened')
        print(e)

Error: Records you are trying to load are already in the table


In [205]:
try:
    dimfilm_query = ('''INSERT INTO dimfilm (film_key, film_id, title, description, release_year, 
					 							language, length, rating, category, rental_rate,rental_duration, 
					 							special_features, actor_first_name, actor_last_name)
						SELECT
							ROW_NUMBER() OVER() AS film_key,
							f.film_id,
							f.title,
							f.description,
							f.release_year,
							l.name,
							f.length,
							f.rating,
							cat.name,
							f.rental_rate,
							f.rental_duration,
							f.special_features,
							ac.first_name,
							ac.last_name
						FROM film as f
						INNER JOIN language AS l ON l.language_id = f.language_id
						INNER JOIN film_category AS fc ON fc.film_id = f.film_id
						INNER JOIN category AS cat ON cat.category_id = fc.category_id
						RIGHT JOIN film_actor AS fa ON fa.film_id = f.film_id
						RIGHT JOIN actor AS ac ON ac.actor_id = fa.actor_id
					''')
    cur.execute(dimfilm_query)
 
except psycopg2.Error as e:
    if psycopg2.errors.lookup(UNIQUE_VIOLATION):
        print('Error: Records you are trying to load are already in the table') 

    else:
        print('Error: Undefined error has happened')
        print(e)

Error: Records you are trying to load are already in the table


In [206]:
try:
    dimstaff_query = ('''INSERT INTO dimstaff (staff_key, staff_id, first_name, last_name, address,
					                            address2, district, city, posta_code, country, email,
					                            phone, working_store, ussername, password)
                        SELECT
                            s.staff_id as staff_key,
                            s.staff_id,
                            s.first_name,
                            s.last_name,
                            a.address,
                            a.address2,
                            a.district,
                            c.city,
                            a.postal_code,
                            coun.country,
                            s.email,
                            a.phone,
                            s.store_id,
                            s.username,
                            s.password
                        FROM staff as s
                        INNER JOIN address AS a ON a.address_id = s.address_id 
                        INNER JOIN city AS c ON c.city_id = a.city_id
                        INNER JOIN country AS coun ON coun.country_id = c.country_id
                    ''')
    cur.execute(dimstaff_query)
 
except psycopg2.Error as e:
    if psycopg2.errors.lookup(UNIQUE_VIOLATION):
        print('Error: Records you are trying to load are already in the table') 

    else:
        print('Error: Undefined error has happened')
        print(e)

Error: Records you are trying to load are already in the table


In [209]:
# Now im going to insert data into the fact table
try:
    factsales_query = ('''INSERT INTO factsales (customer_key, store_key, staff_key, film_key, date_key ,rental_key, sales_amount)
                            SELECT
                                p.customer_id,
                                c.store_id,
                                p.staff_id,
                                i.film_id,
                                TO_CHAR(p.payment_date :: DATE, 'yyyMMDD') :: INT,
                                p.rental_id,
                                p.amount
                            FROM payment AS p
                            INNER JOIN customer AS c ON c.customer_id = p.customer_id
                            INNER JOIN rental AS r ON r.rental_id = p.rental_id
                            INNER JOIN inventory AS i ON i.inventory_id = r.inventory_id
                        ''')
    cur.execute(factsales_query)
 
except psycopg2.Error as e:
    if psycopg2.errors.lookup(UNIQUE_VIOLATION):
        print('Error: Records you are trying to load are already in the table') 

    else:
        print('Error: Undefined error has happened')
        print(e)

Error: Records you are trying to load are already in the table


# Bibliography

- https://www.postgresqltutorial.com/postgresql-getting-started/postgresql-sample-database/
- https://www.postgresql.org/docs/8.1/functions-formatting.html
- https://www.folkstalk.com/2022/09/copy-value-from-one-column-to-another-postgres-with-code-examples.html