Imports section:

In [252]:
import pandas as pd
import os
import numpy as np
from sqlalchemy import create_engine,text
import numpy as np

In [253]:
db_username = os.getenv("DB_USERNAME")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")

engine = create_engine(
    f"postgresql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
)

table_names = ['all_steam', 'genres', 'mdn_play_time','metacritic_review','play_time_by_player','regions','time_to_beat']

Creates Data Warehouse during initial load:

In [254]:
creation_query = f"""CREATE SCHEMA IF NOT EXISTS dwh;
DO
$body$
BEGIN
-- ORANGE BLOCK
CREATE TABLE IF NOT EXISTS dwh.dim_os(
	id SERIAL PRIMARY KEY,
	name VARCHAR(255) UNIQUE
);
CREATE TABLE IF NOT EXISTS dwh.dim_score_source(
	id SERIAL PRIMARY KEY,
	name VARCHAR(255) UNIQUE
);
CREATE TABLE IF NOT EXISTS dwh.dim_platform(
	id SERIAL PRIMARY KEY,
	name VARCHAR(255) UNIQUE
);
-- LIGHT BLUE BLOCK
CREATE TABLE IF NOT EXISTS dwh.dim_game(
	id BIGSERIAL PRIMARY KEY,
	name VARCHAR(255) UNIQUE
);
CREATE TABLE IF NOT EXISTS dwh.dim_date(
	id BIGINT PRIMARY KEY,
	year SMALLINT,
	month SMALLINT,
	day SMALLINT
);
--GREEN BLOCK
CREATE TABLE IF NOT EXISTS dwh.dim_region(
	id SERIAL PRIMARY KEY,
	current_name VARCHAR(255) UNIQUE, 
	previous_name VARCHAR(255)
);
--VIOLET BLOCK 
CREATE TABLE IF NOT EXISTS dwh.dim_technologies(
	id BIGSERIAL PRIMARY KEY,
 	name VARCHAR(255) UNIQUE,
	type VARCHAR(255)
);
CREATE TABLE IF NOT EXISTS dwh.dim_publisher(
	id BIGSERIAL PRIMARY KEY,
 	name VARCHAR(255) UNIQUE
);
CREATE TABLE IF NOT EXISTS dwh.dim_genre(
	id SERIAL PRIMARY KEY,
 	name VARCHAR(255) UNIQUE
);
CREATE TABLE IF NOT EXISTS dwh.dim_developer(
	id BIGSERIAL PRIMARY KEY,
 	name VARCHAR(255) UNIQUE
);
CREATE TABLE IF NOT EXISTS dwh.dim_difficulty(
	id SERIAL PRIMARY KEY,
 	name VARCHAR(255) UNIQUE
);
--FACT BLOCK
CREATE TABLE IF NOT EXISTS dwh.fact_players(
	id BIGSERIAL PRIMARY KEY,
	game_id INT REFERENCES dwh.dim_game(id),
	region_id INT REFERENCES dwh.dim_region(id),
	players NUMERIC(7,2)
);
CREATE TABLE IF NOT EXISTS dwh.fact_score(
	id BIGSERIAL PRIMARY KEY,
	game_id BIGINT REFERENCES dwh.dim_game(id),
	score_source_id INT REFERENCES dwh.dim_score_source(id),
	score NUMERIC(5,2) CHECK (score <= 100.00)
);
CREATE TABLE IF NOT EXISTS dwh.fact_gameplay(
	id BIGSERIAL PRIMARY KEY,
	game_id BIGINT REFERENCES dwh.dim_game(id),
	release_date BIGINT REFERENCES dwh.dim_date(id),
	achievements_count INT, 
	avg_play_time NUMERIC(7,2),
	publisher_id BIGINT REFERENCES dwh.dim_publisher(id),
	mdn_play_time NUMERIC(7,2),
	difficulty_id INT REFERENCES dwh.dim_difficulty(id),
	hltb_single NUMERIC(7,2),
	hltb_complete NUMERIC(7,2),
	peak_players INT CHECK (peak_players >= 0),
	peak_date BIGINT REFERENCES dwh.dim_date(id),
	owners INT CHECK (owners >= 0)
);
--GRAY BLOCK
CREATE TABLE IF NOT EXISTS dwh.scr_os(
	game_id BIGINT REFERENCES dwh.dim_game(id),
	os_id INT REFERENCES dwh.dim_os(id),
	CONSTRAINT scr_os_pkey PRIMARY KEY(game_id,os_id)
);
CREATE TABLE IF NOT EXISTS dwh.scr_pltf(
	game_id BIGINT REFERENCES dwh.dim_game(id),
	platform_id INT REFERENCES dwh.dim_platform(id),
	CONSTRAINT scr_pltf_pkey PRIMARY KEY(game_id,platform_id)
);
CREATE TABLE IF NOT EXISTS dwh.gmp_genr(
	game_id BIGINT REFERENCES dwh.dim_game(id),
	genre_id INT REFERENCES dwh.dim_genre(id),
	CONSTRAINT gmp_genr_pkey PRIMARY KEY(game_id,genre_id)
);
CREATE TABLE IF NOT EXISTS dwh.gmp_dev(
	game_id BIGINT REFERENCES dwh.dim_game(id),
	developer_id BIGINT REFERENCES dwh.dim_developer(id),
	CONSTRAINT gmp_dev_pkey PRIMARY KEY(game_id,developer_id)
);
END
$body$
LANGUAGE plpgsql"""

with engine.connect() as connection:
    connection.execute(text(creation_query))
    connection.commit()

Load game dimension:

In [255]:
query = f"""
INSERT INTO dwh.dim_game (name)
SELECT DISTINCT game FROM (
    SELECT game FROM stage.all_steam
    UNION
    SELECT game FROM stage.genres
    UNION
    SELECT game FROM stage.mdn_play_time
    UNION
    SELECT game FROM stage.metacritic_review
    UNION
    SELECT game FROM stage.play_time_by_player
    UNION
    SELECT game FROM stage.regions
    UNION
    SELECT game FROM stage.time_to_beat
) AS games
ON CONFLICT (name) DO NOTHING
"""

with engine.connect() as connection:
    connection.execute(text(query))
    connection.commit()

Load operational system (os) dimension:

In [256]:
query = f"""
INSERT INTO dwh.dim_os (name)
SELECT DISTINCT os FROM (
    SELECT os FROM stage.mdn_play_time
    UNION
    SELECT os FROM stage.time_to_beat
) AS games
ON CONFLICT (name) DO NOTHING
"""

with engine.connect() as connection:
    connection.execute(text(query))
    connection.commit()

Load score_source dimension:

In [257]:
query = """
SELECT table_name, column_name
FROM information_schema.columns
WHERE column_name LIKE '%score%' 
OR column_name LIKE '%rating%' 
AND table_schema = 'stage'
"""

with engine.connect() as connection:
    result = connection.execute(text(query))
    columns = result.fetchall()

for table_name, column_name in columns:

    if '_uscore' in column_name:
        source = column_name.split('_uscore')[0]
    elif 'score' in column_name:
        source = column_name.split('score')[0]
    elif '_rating' in column_name:
        source = column_name.split('_rating')[0]
    elif 'rating' in column_name:
        source = 'steam'
    else:
        continue
  
    if source.strip() == '':
            continue
    
    query = f"""
    INSERT INTO dwh.dim_score_source (name)
    VALUES ('{source}')
    ON CONFLICT DO NOTHING
    """

    with engine.connect() as connection:
        result = connection.execute(text(query))
        connection.commit()

Load region dimension:

In [258]:
query = """
SELECT table_name, column_name
FROM information_schema.columns
WHERE column_name LIKE '%players%' 
AND table_schema = 'stage'
"""

with engine.connect() as connection:
    result = connection.execute(text(query))
    columns = result.fetchall()

for table_name, column_name in columns:
    source = column_name.split('_players')[0]

    query = f"""
    INSERT INTO dwh.dim_region (current_name)
    VALUES ('{source}')
    ON CONFLICT DO NOTHING
    """

    with engine.connect() as connection:
        result = connection.execute(text(query))
        connection.commit()

 

Load publisher dimension:

In [259]:
query = f"""
INSERT INTO dwh.dim_publisher (name)
SELECT DISTINCT publisher FROM (
    SELECT publisher FROM stage.mdn_play_time WHERE publisher IS NOT NULL
    UNION
    SELECT publisher FROM stage.all_steam WHERE publisher IS NOT NULL
    UNION
    SELECT publisher FROM stage.regions WHERE publisher IS NOT NULL
    UNION
    SELECT publisher FROM stage.time_to_beat WHERE publisher IS NOT NULL
) AS games
ON CONFLICT (name) DO NOTHING
"""

with engine.connect() as connection:
    connection.execute(text(query))
    connection.commit()

Load developer dimension:

In [260]:
query = f"""
INSERT INTO dwh.dim_genre (name)
SELECT DISTINCT genre FROM (
    SELECT genre FROM stage.genres WHERE genre IS NOT NULL
    UNION
    SELECT genre FROM stage.mdn_play_time WHERE genre IS NOT NULL
    UNION
    SELECT genre FROM stage.regions WHERE genre IS NOT NULL
) AS games
ON CONFLICT (name) DO NOTHING
"""

with engine.connect() as connection:
    connection.execute(text(query))
    connection.commit()

Load genre dimension:

In [261]:
query = f"""
INSERT INTO dwh.dim_developer (name)
SELECT DISTINCT developer FROM (
    SELECT developer FROM stage.genres WHERE developer IS NOT NULL
    UNION
    SELECT developer FROM stage.mdn_play_time WHERE developer IS NOT NULL
    UNION
    SELECT developer FROM stage.all_steam WHERE developer IS NOT NULL
    UNION
    SELECT developer FROM stage.time_to_beat WHERE developer IS NOT NULL
) AS games
ON CONFLICT (name) DO NOTHING
"""

with engine.connect() as connection:
    connection.execute(text(query))
    connection.commit()

Load difficulty dimension:

In [262]:
query = f"""
INSERT INTO dwh.dim_difficulty (name)
SELECT DISTINCT gfq_difficulty FROM (
    SELECT gfq_difficulty FROM stage.time_to_beat WHERE gfq_difficulty IS NOT NULL
) AS games
ON CONFLICT (name) DO NOTHING
"""

with engine.connect() as connection:
    connection.execute(text(query))
    connection.commit()

Load date dimension:

In [263]:
query = f"""
SELECT DISTINCT date FROM (
    SELECT release_date AS date FROM stage.genres WHERE release_date IS NOT NULL
    UNION
    SELECT release_date AS date FROM stage.mdn_play_time WHERE release_date IS NOT NULL
    UNION
    SELECT release_date AS date FROM stage.all_steam WHERE release_date IS NOT NULL
    UNION
    SELECT all_time_peak_date AS date FROM stage.all_steam WHERE all_time_peak_date IS NOT NULL
    UNION
    SELECT release_date AS date FROM stage.regions WHERE release_date IS NOT NULL
    UNION
    SELECT release_date AS date FROM stage.regions WHERE release_date IS NOT NULL
) AS release_date
"""

df = pd.read_sql_query(query, engine)

df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['id'] = df['year'].astype(str) + df['month'].astype(str).str.zfill(2) + df['day'].astype(str).str.zfill(2)
df = df.drop(columns=['date'])

with engine.connect() as connection:
    query = text("""
        INSERT INTO dwh.dim_date (id, year, month, day)
        VALUES (:id, :year, :month, :day)
    """)
    for index, row in df.iterrows():
        connection.execute(query, {'id': row['id'], 'year': row['year'], 'month': row['month'], 'day': row['day']})
    connection.commit()


Load players fact:

In [264]:
with engine.connect() as connection:
    query = text("""
        INSERT INTO dwh.fact_players (game_id, region_id, players)
        SELECT dg.id AS game_id, dr.id AS region_id,
               CASE
                   WHEN dr.current_name = 'na' THEN sr.na_players
                   WHEN dr.current_name = 'eu' THEN sr.eu_players
                   WHEN dr.current_name = 'jp' THEN sr.jp_players
                   WHEN dr.current_name = 'other' THEN sr.other_players
                   WHEN dr.current_name = 'global' THEN sr.global_players
               END AS players
        FROM stage.regions AS sr
        JOIN dwh.dim_game AS dg ON dg.name = sr.game
        JOIN dwh.dim_region AS dr ON dr.current_name IN ('na', 'eu', 'jp', 'other', 'global');
    """)
    connection.execute(query)
    connection.commit()

Load score fact:

In [266]:
with engine.connect() as connection:
    query = text("""
        INSERT INTO dwh.fact_score (game_id, score_source_id, score)
SELECT DISTINCT ON (dg.id, ss.id)
       dg.id AS game_id,
       ss.id AS score_source_id,
       CASE
           WHEN ss.name = 'gfq' THEN ttb.gfq_rating
           WHEN ss.name = 'igdb' THEN ttb.igdb_uscore
           WHEN ss.name = 'meta' THEN mr.metascore
           WHEN ss.name = 'steam' THEN als.rating
       END AS score
FROM stage.time_to_beat AS ttb
JOIN dwh.dim_game AS dg ON dg.name = ttb.game
JOIN stage.metacritic_review AS mr ON dg.name = mr.game
JOIN stage.all_steam AS als ON dg.name = als.game                 
JOIN dwh.dim_score_source AS ss ON ss.name IN ('steam','meta','gfq', 'igdb')
ORDER BY dg.id, ss.id, score DESC;
    """)
    connection.execute(query)
    connection.commit()