## Data Engineering Exercise with Pagila data


In [1]:
%load_ext sql

In [2]:
DB_ENDPOINT = "127.0.0.1"
DB = 'Pagila2'
DB_USER = 'student'
DB_PASSWORD = 'student'
DB_PORT = '5432'

# postgresql://username:password@host:port/database
conn_string = "postgresql://{}:{}@{}:{}/{}" \
                        .format(DB_USER, DB_PASSWORD, DB_ENDPOINT, DB_PORT, DB)

print(conn_string)


postgresql://student:student@127.0.0.1:5432/Pagila2


In [3]:
%sql $conn_string

In [None]:
## Explore the 3 Normal Form Schema od Pagila

TODO: Explain 1NF 2NF and 3NF

<img src="images/pagila-3nf.png"/>

### What data sizes are we looking at?

In [None]:
nStores = %sql select count(*) from store;
nFilms = %sql select count(*) from film;
nCustomers = %sql select count(*) from customer;
nRentals = %sql select count(*) from rental;
nPayment = %sql select count(*) from payment;
nStaff = %sql select count(*) from staff;
nCity = %sql select count(*) from city;
nCountry = %sql select count(*) from country;

print("nFilms\t\t=", nFilms[0][0])
print("nCustomers\t=", nCustomers[0][0])
print("nRentals\t=", nRentals[0][0])
print("nPayment\t=", nPayment[0][0])
print("nStaff\t\t=", nStaff[0][0])
print("nStores\t\t=", nStores[0][0])
print("nCities\t\t=", nCity[0][0])
print("nCountry\t\t=", nCountry[0][0])

### What time period are we talking about?

In [None]:
%%sql 
select min(payment_date) as start, max(payment_date) as end from payment;

## Where do events in this database occur?

In [None]:
%%sql
select district, COUNT(district)
from address 
group by district
order by count(district) desc
limit 10;

### Insights 
1. Top Grossing Movies

In [None]:
%%sql
select film_id, title, release_year, rental_rate, rating  from film limit 5;

In [None]:
%%sql
select * from payment limit 10;

In [None]:
%%sql
select * from inventory limit 5;

In [None]:
%%sql
SELECT f.title, p.amount, p.payment_date, p.customer_id                                            
FROM payment p
JOIN rental r    ON ( p.rental_id = r.rental_id )
JOIN inventory i ON ( r.inventory_id = i.inventory_id )
JOIN film f ON ( i.film_id = f.film_id)
limit 5;

In [None]:
%%sql
SELECT f.title, SUM(p.amount)                      
FROM payment p
JOIN rental r ON (p.rental_id = r.rental_id)
JOIN inventory i ON (r.inventory_id = i.inventory_id)
JOIN film f ON (i.film_id = f.film_id)
GROUP BY f.title
ORDER BY SUM(p.amount) DESC
LIMIT 10


In [None]:
%%sql
SELECT p.customer_id, p.rental_id, p.amount, ci.city                            
FROM payment p
JOIN customer c  ON ( p.customer_id = c.customer_id )
JOIN address a ON ( c.address_id = a.address_id )
JOIN city ci ON ( a.city_id = ci.city_id )
order by p.payment_date
limit 10;

In [None]:
%%sql
SELECT ci.city as City, SUM(p.amount) as Revenue
FROM payment p
JOIN customer c ON(p.customer_id = c.customer_id)
JOIN address a ON ( c.address_id = a.address_id )
JOIN city ci ON ( a.city_id = ci.city_id )
GROUP BY ci.city
ORDER BY SUM(p.amount) DESC
LIMIT 10

In [None]:
%%sql
SELECT sum(p.amount) as revenue, EXTRACT(month FROM p.payment_date) as month
from payment p
group by month
order by revenue desc
limit 10;

In [None]:
%%sql
SELECT f.title, p.amount, p.customer_id, ci.city, p.payment_date,EXTRACT(month FROM p.payment_date) as month
FROM payment p
JOIN rental r    ON ( p.rental_id = r.rental_id )
JOIN inventory i ON ( r.inventory_id = i.inventory_id )
JOIN film f ON ( i.film_id = f.film_id)
JOIN customer c  ON ( p.customer_id = c.customer_id )
JOIN address a ON ( c.address_id = a.address_id )
JOIN city ci ON ( a.city_id = ci.city_id )
order by p.payment_date
limit 10;

In [None]:
%%sql
SELECT f.title, ci.city,EXTRACT(month FROM p.payment_date) as month,SUM(p.amount) as Revenue
FROM payment p
JOIN rental r ON (p.rental_id = r.rental_id)
JOIN inventory i ON ( r.inventory_id = i.inventory_id )
JOIN film f ON ( i.film_id = f.film_id)
JOIN customer c ON (p.customer_id = c.customer_id)
JOIN  address a ON (c.address_id = a.address_id)
JOIN city ci ON (a.city_id = ci.city_id)
GROUP BY f.title, ci.city, EXTRACT(month FROM p.payment_date)
ORDER BY SUM(p.amount) DESC
LIMIT 10

### Star Schema - Entity Relationship Diagram

<img src="images/pagila-star.png" width="50%"/>

Create the dimDate Dimension

In [5]:
%%sql
CREATE SCHEMA IF NOT EXISTS starSchema;

 * postgresql://student:***@127.0.0.1:5432/Pagila2
Done.


[]

In [38]:
%%sql
DROP TABLE starSchema.factSales;
DROP TABLE starSchema.dimDate CASCADE;

 * postgresql://student:***@127.0.0.1:5432/Pagila2
Done.
Done.


[]

In [37]:
%%sql
TRUNCATE TABLE starSchema.factSales;
TRUNCATE TABLE starSchema.dimDate;

 * postgresql://student:***@127.0.0.1:5432/Pagila2
Done.


NotSupportedError: (psycopg2.errors.FeatureNotSupported) cannot truncate a table referenced in a foreign key constraint
DETAIL:  Table "factsales" references "dimdate".
HINT:  Truncate table "factsales" at the same time, or use TRUNCATE ... CASCADE.

[SQL: TRUNCATE TABLE starSchema.dimDate;]
(Background on this error at: https://sqlalche.me/e/14/tw8g)

In [40]:
%%sql
CREATE TABLE IF NOT EXISTS starSchema.dimDate (
    date_key SERIAL PRIMARY KEY,
    date DATE NOT NULL,
    year INTEGER NOT NULL GENERATED ALWAYS AS(EXTRACT(YEAR FROM date)) STORED,
    quarter INTEGER NOT NULL GENERATED ALWAYS AS (EXTRACT(QUARTER FROM date))STORED,
    month INTEGER NOT NULL GENERATED ALWAYS AS (EXTRACT(MONTH FROM date))STORED,
    day INTEGER NOT NULL GENERATED ALWAYS AS (EXTRACT(DAY FROM date))STORED,
    week INTEGER NOT NULL GENERATED ALWAYS AS (EXTRACT(WEEK FROM date))STORED,
    is_weekend BOOLEAN NOT NULL GENERATED ALWAYS AS (((EXTRACT(DOW FROM date))= 0) OR (EXTRACT(DOW FROM date)= 6)OR (EXTRACT(DOW FROM date)= 7)) STORED
);

 * postgresql://student:***@127.0.0.1:5432/Pagila2
Done.


[]

In [9]:
%%sql
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name   = 'dimdate'

 * postgresql://student:***@127.0.0.1:5432/Pagila2
8 rows affected.


column_name,data_type
date_key,integer
date,date
year,integer
quarter,integer
month,integer
day,integer
week,integer
is_weekend,boolean


In [14]:
%%sql
CREATE TABLE IF NOT EXISTS starSchema.dimCustomer
(
  customer_key SERIAL PRIMARY KEY,
  customer_id  smallint NOT NULL,
  first_name   varchar(45) NOT NULL,
  last_name    varchar(45) NOT NULL,
  email        varchar(50),
  address      varchar(50) NOT NULL,
  address2     varchar(50),
  district     varchar(20) NOT NULL,
  city         varchar(50) NOT NULL,
  country      varchar(50) NOT NULL,
  postal_code  varchar(10),
  phone        varchar(20) NOT NULL,
  active       smallint NOT NULL,
  create_date  timestamp NOT NULL,
  start_date   date NOT NULL,
  end_date     date NOT NULL
);

CREATE TABLE IF NOT EXISTS starSchema.dimMovie
(
  movie_key          SERIAL PRIMARY KEY,
  film_id            smallint NOT NULL,
  title              varchar(255) NOT NULL,
  description        text,
  release_year       year,
  language           varchar(20) NOT NULL,
  original_language  varchar(20),
  rental_duration    smallint NOT NULL,
  length             smallint NOT NULL,
  rating             varchar(5) NOT NULL,
  special_features   varchar(60) NOT NULL
);
CREATE TABLE IF NOT EXISTS starSchema.dimStore
(
  store_key           SERIAL PRIMARY KEY,
  store_id            smallint NOT NULL,
  address             varchar(50) NOT NULL,
  address2            varchar(50),
  district            varchar(20) NOT NULL,
  city                varchar(50) NOT NULL,
  country             varchar(50) NOT NULL,
  postal_code         varchar(10),
  manager_first_name  varchar(45) NOT NULL,
  manager_last_name   varchar(45) NOT NULL,
  start_date          date NOT NULL,
  end_date            date NOT NULL
);

 * postgresql://student:***@127.0.0.1:5432/Pagila2
Done.
Done.
Done.


[]

In [41]:
%%sql
CREATE TABLE IF NOT EXISTS starSchema.factSales
(
  sales_key SERIAL PRIMARY Key,
  date_key INTEGER NOT NULL REFERENCES starSchema.dimDate(date_key) ON DELETE RESTRICT,
  customer_key INTEGER NOT NULL REFERENCES starSchema.dimCustomer(customer_key) ON DELETE RESTRICT,
  movie_key INTEGER NOT NULL REFERENCES starSchema.dimMovie(movie_key) ON DELETE RESTRICT,
  store_key INTEGER NOT NULL REFERENCES starSchema.dimStore(store_key) ON DELETE RESTRICT,
  sales_amount MONEY NOT NULL
    
);

 * postgresql://student:***@127.0.0.1:5432/Pagila2
Done.


[]

In [16]:
%%sql
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name   = 'factsales'

 * postgresql://student:***@127.0.0.1:5432/Pagila2
6 rows affected.


column_name,data_type
sales_key,integer
date_key,integer
customer_key,integer
movie_key,integer
store_key,integer
sales_amount,money


In [17]:
%%sql
DROP TABLE dimCustomer;
DROP TABLE dimMovie;
DROP TABLE dimStore;

 * postgresql://student:***@127.0.0.1:5432/Pagila2
Done.
Done.
Done.


[]

In [42]:
%%sql
INSERT INTO starSchema.dimDate(date_key, date)
SELECT DISTINCT(TO_CHAR(payment_date :: DATE, 'yyyyMMDD')::integer) AS date_key,
       date(payment_date) 
FROM payment;

 * postgresql://student:***@127.0.0.1:5432/Pagila2
40 rows affected.


[]

In [29]:
%%sql
INSERT INTO starSchema.dimCustomer (customer_key, customer_id, first_name,last_name,email,
                                    address,address2,district,city,country,
                                    postal_code,phone,active,create_date,
                                    start_date,end_date)
SELECT c.customer_id AS customer_key,
c.customer_id, c.first_name,
c.last_name, c.email,
a.address, a.address2,
a.district, ci.city,
co.country,a.postal_code,
a.phone, 
c.active, 
c.create_date,
now() AS start_date,
now() AS end_date
FROM customer c
JOIN address a ON (c.address_id=a.address_id)
JOIN city ci ON (a.city_id = ci.city_id)
JOIN country co ON (ci.country_id = co.country_id)

 * postgresql://student:***@127.0.0.1:5432/Pagila2
599 rows affected.


[]

In [31]:
%%sql
INSERT INTO starSchema.dimMovie (movie_key,film_id,title,description,
                                 release_year,language,original_language,rental_duration,
                                 length,rating,special_features)
SELECT f.film_id as movie_key,
f.film_id, f.title, f.description, f.release_year, 
l.name AS language,
orig_lang.name AS original_language, 
f.rental_duration, f.length, f.rating, f.special_features
FROM film f
JOIN language l ON (f.language_id=l.language_id)
LEFT JOIN language orig_lang ON (f.original_language_id = orig_lang.language_id);

 * postgresql://student:***@127.0.0.1:5432/Pagila2
1000 rows affected.


[]

In [35]:
%%sql 
INSERT INTO starSchema.dimStore (store_key, store_id,address,address2,district,city,country,
                                 manager_first_name, manager_last_name,start_date, end_date)
SELECT s.store_id AS store_key,
s.store_id,
a.address,
a.address2,
a.district, ci.city,
co.country,
sf.first_name AS manager_first_name,
sf.last_name AS manager_last_name,
now() AS start_date,
now() AS end_date
FROM store s
JOIN address a ON (s.address_id = a.address_id)
JOIN city ci ON (a.city_id = ci.city_id)
JOIN country co ON (ci.country_id = co.country_id)
JOIN staff sf ON (s.manager_staff_id = sf.staff_id )

 * postgresql://student:***@127.0.0.1:5432/Pagila2
2 rows affected.


[]

In [44]:
%%sql 
INSERT INTO starSchema.factSales (date_key, 
                                  customer_key, movie_key, 
                                  store_key, sales_amount)
SELECT TO_CHAR(p.payment_date :: DATE, 'yyyyMMDD')::integer AS date_key,
p.customer_id AS customer_key,
i.film_id AS movie_key,
i.store_id AS store_key,
p.amount AS sales_amount
FROM payment p
JOIN rental r    ON ( p.rental_id = r.rental_id )
JOIN inventory i ON ( r.inventory_id = i.inventory_id );



 * postgresql://student:***@127.0.0.1:5432/Pagila2
16049 rows affected.


[]

In [46]:
%%time
%%sql
SELECT movie_key, date_key, customer_key, sales_amount
FROM starSchema.factSales 
limit 5;

 * postgresql://student:***@127.0.0.1:5432/Pagila2
5 rows affected.
Wall time: 7 ms


movie_key,date_key,customer_key,sales_amount
870,20200124,269,£1.99
651,20200125,269,£0.99
818,20200128,269,£6.99
249,20200129,269,£0.99
159,20200129,269,£4.99


In [47]:
%%time
%%sql
SELECT dimMovie.title, dimDate.month, dimCustomer.city, sum(sales_amount) as revenue
FROM starSchema.factSales 
JOIN starSchema.dimMovie    on (starSchema.dimMovie.movie_key      = starSchema.factSales.movie_key)
JOIN starSchema.dimDate     on (starSchema.dimDate.date_key         = starSchema.factSales.date_key)
JOIN starSchema.dimCustomer on (starSchema.dimCustomer.customer_key = starSchema.factSales.customer_key)
group by (dimMovie.title, dimDate.month, dimCustomer.city)
order by dimMovie.title, dimDate.month, dimCustomer.city, revenue desc;

 * postgresql://student:***@127.0.0.1:5432/Pagila2
15992 rows affected.
Wall time: 230 ms


title,month,city,revenue
ACADEMY DINOSAUR,1,Celaya,£0.99
ACADEMY DINOSAUR,1,Cianjur,£1.99
ACADEMY DINOSAUR,2,San Lorenzo,£0.99
ACADEMY DINOSAUR,2,Sullana,£1.99
ACADEMY DINOSAUR,2,Udaipur,£0.99
ACADEMY DINOSAUR,3,Almirante Brown,£1.99
ACADEMY DINOSAUR,3,Goinia,£0.99
ACADEMY DINOSAUR,3,Kaliningrad,£0.99
ACADEMY DINOSAUR,3,Kurashiki,£0.99
ACADEMY DINOSAUR,3,Livorno,£0.99


In [48]:
%%time
%%sql
SELECT f.title, EXTRACT(month FROM p.payment_date) as month, ci.city, sum(p.amount) as revenue
FROM payment p
JOIN rental r    ON ( p.rental_id = r.rental_id )
JOIN inventory i ON ( r.inventory_id = i.inventory_id )
JOIN film f ON ( i.film_id = f.film_id)
JOIN customer c  ON ( p.customer_id = c.customer_id )
JOIN address a ON ( c.address_id = a.address_id )
JOIN city ci ON ( a.city_id = ci.city_id )
group by (f.title, month, ci.city)
order by f.title, month, ci.city, revenue desc;

 * postgresql://student:***@127.0.0.1:5432/Pagila2
15992 rows affected.
Wall time: 141 ms


title,month,city,revenue
ACADEMY DINOSAUR,1,Celaya,0.99
ACADEMY DINOSAUR,1,Cianjur,1.99
ACADEMY DINOSAUR,2,San Lorenzo,0.99
ACADEMY DINOSAUR,2,Sullana,1.99
ACADEMY DINOSAUR,2,Udaipur,0.99
ACADEMY DINOSAUR,3,Almirante Brown,1.99
ACADEMY DINOSAUR,3,Goinia,0.99
ACADEMY DINOSAUR,3,Kaliningrad,0.99
ACADEMY DINOSAUR,3,Kurashiki,0.99
ACADEMY DINOSAUR,3,Livorno,0.99
