In [2]:
import pandas as pd
from sqlalchemy import create_engine, text

# General setup

In [3]:
# Create the connection to PostgreSQL
engine = create_engine('postgresql://postgres:123456@localhost:5432/chess_openings')

# Simple query to verify the connection
try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT current_database();"))
        db_name = result.fetchone()[0]
        print(f"Successfully connected to the database: {db_name}")
except Exception as e:
    print(f"Connection error: {e}")


Successfully connected to the database: chess_openings


## Generate fields for the raw_data table

In [6]:
try:
    df = pd.read_csv('chess_games.csv')
    column_names = df.columns.tolist()
    print(column_names)
except FileNotFoundError:
    print("File not found")

: 

In [4]:
# SQL query to create the raw_data table
create_raw_data_sql = """
CREATE TABLE IF NOT EXISTS raw_data (
    event VARCHAR(50),
    white VARCHAR(255),
    black VARCHAR(255),
    result VARCHAR(10),
    utcdate DATE,
    utctime TIME,
    whiteelo INT,
    blackelo INT,
    whiteratingdiff INT,
    blackratingdiff INT,
    eco VARCHAR(10),
    opening VARCHAR(255),
    timecontrol VARCHAR(50),
    termination VARCHAR(50),
    an TEXT
);
"""

try:
    with engine.connect() as connection:
        connection.execute(text(create_raw_data_sql))
        connection.commit()
        print("Table 'raw_data' created successfully in database: chess_openings")
except Exception as e:
    print(f"An error occurred while creating the table: {e}")


Table 'raw_data' created successfully in database: chess_openings


In [5]:
# Check if the table exists
try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT * FROM information_schema.tables WHERE table_name = 'raw_data';"))
        if result.rowcount > 0:
            print("The table 'raw_data' exists in the database.")
        else:
            print("The table 'raw_data' was not created successfully.")
except Exception as e:
    print(f"Error checking the table existence: {e}")


The table 'raw_data' exists in the database.


### Populate the table
Use of chunks for better insertion

In [None]:
#! DO NOT EXECUTE ANOTHER TIME -- TAKES TOO LONG

# Data insertion to the raw_data table
total_rows = sum(1 for line in open('chess_games.csv')) - 1 
print(f"Total rows: {total_rows}")

chunk_size = 100000

# Load the CSV file 
for chunk in pd.read_csv('chess_games.csv', chunksize=chunk_size, low_memory=False):
    chunk.columns = [col.lower() for col in chunk.columns]
    try:
        with engine.connect() as connection:
            chunk.to_sql('raw_data', con=connection, if_exists='append', index=False)
    except Exception as e:
        print(f"An error occurred while inserting data: {e}")

In [6]:
# Basic query to verify data insertion
try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT COUNT(*) FROM raw_data;"))
        count = result.fetchone()[0]
        print(f"Total rows inserted into 'raw_data': {count}")
except Exception as e:
    print(f"Error verifying data insertion: {e}")

Total rows inserted into 'raw_data': 6256184


### Column renaming 

In [11]:
rename_columns_sql = """
ALTER TABLE raw_data
RENAME COLUMN utcdate TO utc_date;
"""

rename_columns_sql += """
ALTER TABLE raw_data
RENAME COLUMN utctime TO utc_time;
"""

rename_columns_sql += """
ALTER TABLE raw_data
RENAME COLUMN whiteelo TO white_elo;
"""

rename_columns_sql += """
ALTER TABLE raw_data
RENAME COLUMN blackelo TO black_elo;
"""

rename_columns_sql += """
ALTER TABLE raw_data
RENAME COLUMN whiteratingdiff TO white_rating_diff;
"""

rename_columns_sql += """
ALTER TABLE raw_data
RENAME COLUMN blackratingdiff TO black_rating_diff;
"""

rename_columns_sql += """
ALTER TABLE raw_data
RENAME COLUMN timecontrol TO time_control;
"""

try:
    with engine.connect() as connection:
        connection.execute(text(rename_columns_sql))
        connection.commit()
        print("Columns renamed successfully.")
except Exception as e:
    print(f"An error occurred while renaming columns: {e}")


Columns renamed successfully.


## Create reconciled dimension tables for the second layer of the DW architecture

In [None]:
create_reconciled_players_sql = """
CREATE TABLE IF NOT EXISTS reconciled_players (
    player_id SERIAL PRIMARY KEY,
    player_name VARCHAR(255),
    player_elo INT
);
"""

create_reconciled_openings_sql = """
CREATE TABLE IF NOT EXISTS reconciled_openings (
    opening_id SERIAL PRIMARY KEY,
    opening_name VARCHAR(255),
    opening_eco VARCHAR(10)
);
"""

create_reconciled_results_sql = """
CREATE TABLE IF NOT EXISTS reconciled_results (
    result_id SERIAL PRIMARY KEY,
    result_name VARCHAR(10)
);
"""

try:
    with engine.connect() as connection:
        connection.execute(text(create_reconciled_players_sql))
        connection.execute(text(create_reconciled_openings_sql))
        connection.execute(text(create_reconciled_results_sql))
        connection.commit()
        print("Dimension tables created successfully.")
except Exception as e:
    print(f"An error occurred while creating dimension tables: {e}")


Dimension tables created successfully.


In [None]:
# Query to verify the creation of dimension tables
try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT table_name FROM information_schema.tables WHERE table_name IN ('reconciled_players', 'reconciled_openings', 'reconciled_results');"))
        tables = [row[0] for row in result.fetchall()]
        if tables:
            print(f"Dimension tables created: {', '.join(tables)}")
        else:
            print("No dimension tables were created.")
except Exception as e:
    print(f"Error verifying dimension tables: {e}")

Dimension tables created: reconciled_players, reconciled_openings, reconciled_results


In [30]:
with engine.connect() as connection:
    connection.execute(text("TRUNCATE TABLE cleaned_data_temp RESTART IDENTITY CASCADE"))
    connection.commit()
    print("cleaned_data_temp table emptied and ID counter reset to 1")

cleaned_data_temp table emptied and ID counter reset to 1


In [None]:
#! DO NOT EXECUTE ANOTHER TIME -- TAKES TOO LONG

# Data cleaning process before inserting them into the reconciled tables
chunk_size = 10000
cleaned_chunks = []

print("Starting data cleaning process...")

for i, chunk in enumerate(pd.read_sql('SELECT * FROM raw_data', engine, chunksize=chunk_size)):
    print(f"Processing chunk {i+1}...")
    
    # 1. Drop rows with null values on key columns
    initial_count = len(chunk)
    chunk = chunk.dropna(subset=['white', 'black', 'white_elo', 'black_elo', 'eco', 'opening', 'result'])
    after_nulls = len(chunk)
    
    # 2. Drop duplicate rows
    chunk = chunk.drop_duplicates()
    after_duplicates = len(chunk)
    
    # 3. Normalize player names
    chunk['white'] = chunk['white'].str.strip().str.lower()
    chunk['black'] = chunk['black'].str.strip().str.lower()
    
    # 4. Filter only standardized results
    valid_results = ['1-0', '0-1', '1/2-1/2']
    chunk = chunk[chunk['result'].isin(valid_results)]
    after_results = len(chunk)
    
    # 5. Normalize ECO code
    chunk['eco'] = chunk['eco'].str.strip().str.upper()
    
    # 6. Normalize openings
    chunk['opening'] = chunk['opening'].str.strip().str.title()
    
    # 7. Clean and normalize event field
    chunk['event'] = chunk['event'].str.strip().str.lower()
    chunk.loc[chunk['event'].str.contains('blitz', na=False), 'event'] = 'Blitz'
    chunk.loc[chunk['event'].str.contains('bullet', na=False), 'event'] = 'Bullet'
    chunk.loc[chunk['event'].str.contains('classical', na=False), 'event'] = 'Classical'
    chunk.loc[chunk['event'].str.contains('correspondence', na=False), 'event'] = 'Correspondence'
    
    # 8. Consider only normal termination
    chunk['termination'] = chunk['termination'].astype(str).str.strip()
    chunk = chunk[chunk['termination'] == 'Normal']

    print(f"  Initial: {initial_count:,} → After nulls: {after_nulls:,} → After duplicates: {after_duplicates:,} → After results filter: {after_results:,}")
    
    # Store cleaned chunk temporarily in database
    try:
        with engine.connect() as connection:
            chunk.to_sql('cleaned_data_temp', con=connection, if_exists='append', index=False)
    except Exception as e:
        print(f"Error storing cleaned chunk {i+1}: {e}")

print("Data cleaning completed!")

Starting data cleaning process...
Processing chunk 1...
  Initial: 10,000 → After nulls: 10,000 → After duplicates: 10,000 → After results filter: 10,000
Processing chunk 2...
  Initial: 10,000 → After nulls: 10,000 → After duplicates: 10,000 → After results filter: 10,000
Processing chunk 3...
  Initial: 10,000 → After nulls: 10,000 → After duplicates: 10,000 → After results filter: 10,000
Processing chunk 4...
  Initial: 10,000 → After nulls: 10,000 → After duplicates: 10,000 → After results filter: 10,000
Processing chunk 5...
  Initial: 10,000 → After nulls: 10,000 → After duplicates: 10,000 → After results filter: 10,000
Processing chunk 6...
  Initial: 10,000 → After nulls: 10,000 → After duplicates: 10,000 → After results filter: 10,000
Processing chunk 7...
  Initial: 10,000 → After nulls: 10,000 → After duplicates: 10,000 → After results filter: 10,000
Processing chunk 8...
  Initial: 10,000 → After nulls: 10,000 → After duplicates: 10,000 → After results filter: 10,000
Proces

In [15]:
# Basic query to verify data insertion into cleaned_data_temp
try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT COUNT(*) FROM cleaned_data_temp;"))
        count = result.fetchone()[0]
        print(f"Total rows inserted into 'cleaned_data_temp': {count}")
except Exception as e:
    print(f"Error verifying data insertion: {e}")

# Compare it to raw_data
try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT COUNT(*) FROM raw_data;"))
        count = result.fetchone()[0]
        print(f"Total rows on 'raw_data': {count}")
except Exception as e:
    print(f"Error verifying data insertion: {e}")

Total rows inserted into 'cleaned_data_temp': 6254840
Total rows on 'raw_data': 6256184


In [18]:
# Clear dimension tables before populating
print("Clearing dimension tables...")

try:
    with engine.connect() as connection:
        connection.execute(text("TRUNCATE TABLE reconciled_players, reconciled_openings, reconciled_results RESTART IDENTITY CASCADE;"))
        connection.commit()
    print("Dimension tables cleared successfully!")
except Exception as e:
    print(f"Error clearing tables: {e}")

Clearing dimension tables...
Dimension tables cleared successfully!


In [None]:
# Populate reconciled_players
print("Populating players dimension...")

chunk_size = 10000
total_players_inserted = 0
unique_players_set = set()

try:
    # Process cleaned_data_temp in chunks
    for i, chunk in enumerate(pd.read_sql('SELECT white, white_elo, black, black_elo FROM cleaned_data_temp', engine, chunksize=chunk_size)):
        print(f"Processing chunk {i+1} for players dimension...")
        
        # Extract white players from current chunk
        players_white = chunk[['white', 'white_elo']].copy()
        players_white.columns = ['player_name', 'player_elo']
        
        # Extract black players from current chunk
        players_black = chunk[['black', 'black_elo']].copy()
        players_black.columns = ['player_name', 'player_elo']
        
        # Combine white and black players from this chunk
        chunk_players = pd.concat([players_white, players_black]).drop_duplicates()
        
        # Filter out players we've already seen (to avoid duplicates across chunks)
        new_players = []
        for _, player in chunk_players.iterrows():
            player_key = (player['player_name'], player['player_elo'])
            if player_key not in unique_players_set:
                unique_players_set.add(player_key)
                new_players.append(player)
        
        if new_players:
            new_players_df = pd.DataFrame(new_players)
            
            # Insert new players into reconciled_players
            with engine.connect() as connection:
                new_players_df.to_sql('reconciled_players', con=connection, if_exists='append', index=False, method='multi')
            
            total_players_inserted += len(new_players_df)
            print(f"  Inserted {len(new_players_df):,} new players from chunk {i+1}")
        else:
            print(f"  No new players found in chunk {i+1}")

    print(f"\nPlayers dimension populated! Total unique players inserted: {total_players_inserted:,}")

except Exception as e:
    print(f"Error populating players dimension: {e}")

In [27]:
# Simple verification for reconciled_players
try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT COUNT(*) FROM reconciled_players"))
        count = result.fetchone()[0]
        print(f"reconciled_players: {count} records inserted.")
except Exception as e:
    print(f"Error: {e}")

reconciled_players: 6276377 records inserted.


In [None]:
# POpulate reconciled_openings

try:
    with engine.connect() as connection:
        connection.execute(text("""
            INSERT INTO reconciled_openings (opening_eco, opening_name)
            SELECT DISTINCT eco, opening
            FROM cleaned_data_temp
            WHERE eco IS NOT NULL AND opening IS NOT NULL
            ORDER BY eco, opening
        """))
        connection.commit()
        count = connection.execute(text("SELECT COUNT(*) FROM reconciled_openings")).fetchone()[0]
        print(f"reconciled_openings: {count} records inserted starting from ID 1")
except Exception as e:
    print(f"Error populating openings dimension: {e}")

Table columns:
('opening_id', 'integer', 'NO', "nextval('reconciled_openings_opening_id_seq'::regclass)")
('opening_name', 'character varying', 'YES', None)
('opening_eco', 'character varying', 'YES', None)

Constraints:
('PRIMARY KEY', 'opening_id')


In [19]:
# Quick verification for reconciled_openings
with engine.connect() as connection:
    df = pd.read_sql("SELECT DISTINCT * FROM reconciled_openings LIMIT 10", connection)
print(df)

   opening_id                                       opening_name opening_eco
0        1421  French Defense: Tarrasch Variation, Chistyakov...         C07
1        1365                  French Defense: Pelikan Variation         C00
2        2679            Semi-Slav Defense: Semi-Meran Variation         D47
3        1572                  Bishop'S Opening: Lopez Variation         C23
4        1082    Sicilian Defense: Closed Variation, Traditional         B25
5        2836            Bogo-Indian Defense: Vitolins Variation         E11
6         687  Nimzowitsch Defense: Kennedy Variation, Rieman...         B00
7         209              King'S Indian Attack: Keres Variation         A07
8        2432                    Queen'S Pawn Game: Torre Attack         D03
9        2335  Ruy Lopez: Morphy Defense, Breyer Defense, Zai...         C95


In [23]:
print("Populating results dimension...")

try:
    with engine.connect() as connection:
        connection.execute(text("""
            INSERT INTO reconciled_results (result)
            SELECT DISTINCT result
            FROM cleaned_data_temp
            WHERE result IS NOT NULL
            ORDER BY result
        """))
        connection.commit()
        
except Exception as e:
    print(f"Error populating results dimension: {e}")

Populating results dimension...


In [24]:
# Simple verification for reconciled_results
try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT COUNT(*) FROM reconciled_results"))
        count = result.fetchone()[0]
        print(f"reconciled_results: {count} records inserted.")
except Exception as e:
    print(f"Error: {e}")

reconciled_results: 3 records inserted.


In [36]:
# Fact table creation
create_fact_games_sql = """
CREATE TABLE IF NOT EXISTS fact_games (
    game_id SERIAL PRIMARY KEY,
    white_player_id INT NOT NULL REFERENCES reconciled_players(player_id),
    black_player_id INT NOT NULL REFERENCES reconciled_players(player_id),
    opening_id INT NOT NULL REFERENCES reconciled_openings(opening_id),
    result_id INT NOT NULL REFERENCES reconciled_results(result_id),
    event TEXT,
    utc_date DATE,
    utc_time TIME,
    white_elo INT,
    black_elo INT,
    white_rating_diff INT,
    black_rating_diff INT,
    time_control TEXT,
    termination TEXT,
    an TEXT
);
"""

try:
    with engine.connect() as connection:
        connection.execute(text(create_fact_games_sql))
        connection.commit()
    print("fact_games table created successfully.")
except Exception as e:
    print(f"Error creating fact_games table: {e}")

fact_games table created successfully.


In [None]:
with engine.connect() as connection:
    connection.execute(text("TRUNCATE TABLE fact_games RESTART IDENTITY CASCADE"))
    connection.commit()

print("Populating fact_games table...")

try:
    with engine.connect() as connection:
        connection.execute(text("""
            INSERT INTO fact_games (
                white_player_id,
                black_player_id,
                opening_id,
                result_id,
                event,
                utc_date,
                utc_time,
                white_elo,
                black_elo,
                white_rating_diff,
                black_rating_diff,
                time_control,
                termination,
                an
            )
            SELECT
                wp.player_id AS white_player_id,
                bp.player_id AS black_player_id,
                o.opening_id,
                r.result_id,
                c.event,
                c.utc_date,
                c.utc_time,
                c.white_elo,
                c.black_elo,
                c.white_rating_diff,
                c.black_rating_diff,
                c.time_control,
                c.termination,
                c.an
            FROM cleaned_data_temp c
            JOIN reconciled_players wp
                ON c.white = wp.player_name AND c.white_elo = wp.player_elo
            JOIN reconciled_players bp
                ON c.black = bp.player_name AND c.black_elo = bp.player_elo
            JOIN reconciled_openings o
                ON c.eco = o.opening_eco AND c.opening = o.opening_name
            JOIN reconciled_results r
                ON c.result = r.result
        """))
        connection.commit()
        count = connection.execute(text("SELECT COUNT(*) FROM fact_games")).fetchone()[0]
        print(f"fact_games: {count} records inserted starting from ID 1")
except Exception as e:
    print(f"Error populating fact_games table: {e}")

Populating fact_games table...
fact_games: 6254840 records inserted starting from ID 1


In [21]:
# Quick verification for fact_games
with engine.connect() as connection:
    df = pd.read_sql("SELECT * FROM fact_games ORDER BY game_id LIMIT 10", connection)
print(df)

   game_id  white_player_id  black_player_id  opening_id  result_id  \
0        1              569            10319         940          1   
1        2              569           560820        1967          1   
2        3              970            10711         394          2   
3        4             1225            10956        1141          2   
4        5             1225          4289990        1225          2   
5        6             2548            12222        2987          1   
6        7             3108            12755        1531          1   
7        8             3126          2616133        1360          1   
8        9             3126           301773         912          2   
9       10             3126          1398318        1921          1   

                event    utc_date  utc_time  white_elo  black_elo  \
0              Blitz   2016-06-30  22:02:48       1485       1902   
1              Blitz   2016-07-04  22:02:43       1485       1624   
2          

In [24]:
# Check data integrity in fact_games
print("=== DATA INTEGRITY CHECK ===")

with engine.connect() as connection:
    # Check for missing white_player_id
    missing_white = connection.execute(text("""
        SELECT COUNT(*) AS missing_white_players
        FROM fact_games f
        LEFT JOIN reconciled_players p ON f.white_player_id = p.player_id
        WHERE p.player_id IS NULL
    """)).fetchone()[0]
    
    # Check for missing black_player_id
    missing_black = connection.execute(text("""
        SELECT COUNT(*) AS missing_black_players
        FROM fact_games f
        LEFT JOIN reconciled_players p ON f.black_player_id = p.player_id
        WHERE p.player_id IS NULL
    """)).fetchone()[0]
    
    # Check for missing opening_id
    missing_openings = connection.execute(text("""
        SELECT COUNT(*) AS missing_openings
        FROM fact_games f
        LEFT JOIN reconciled_openings o ON f.opening_id = o.opening_id
        WHERE o.opening_id IS NULL
    """)).fetchone()[0]
    
    # Check for missing result_id
    missing_results = connection.execute(text("""
        SELECT COUNT(*) AS missing_results
        FROM fact_games f
        LEFT JOIN reconciled_results r ON f.result_id = r.result_id
        WHERE r.result_id IS NULL
    """)).fetchone()[0]
    
    print(f"Missing white players: {missing_white}")
    print(f"Missing black players: {missing_black}")
    print(f"Missing openings: {missing_openings}")
    print(f"Missing results: {missing_results}")
    
    if missing_white + missing_black + missing_openings + missing_results == 0:
        print("All integrity checks passed!")

=== DATA INTEGRITY CHECK ===
Missing white players: 0
Missing black players: 0
Missing openings: 0
Missing results: 0
All integrity checks passed!
