# Data Warehouse Project
<p style="text-align: left"><i>Gallo Giovanni 252308</i></p>

# ETL process

In this notebook, we perform a complete ETL process from a database accessible via DBeaver.
The transformations include:
- Dropping unnecessary columns.
- Converting `birthdate` to `age`.
- Combining `firstname` and `lastname` into `fullname`
- Extracting a date dimension from `dateuct` and inserting `dateutc_id` into `matchEvent`.
- Extracting a event dimension from `<eventname, action, modifier>` and inserting `event_id` into `matchEvent`.

### Imports and Utility functions

In [37]:
import pandas as pd
from sqlalchemy import create_engine, text

def check_db_counts(engine):
    with engine.connect() as conn:
        # club_df
        result = conn.execute(text("SELECT COUNT(*) FROM club"))
        club_rows = result.scalar()
        expected = 142
        if club_rows >= expected:
            print(f"✔️ Verified: {club_rows} rows in club (expected {expected})")
        else:
            print(f"⚠️ Warning: Only {club_rows} rows in club (expected {expected})")

        # player_df
        result = conn.execute(text("SELECT COUNT(*) FROM player"))
        player_rows = result.scalar()
        expected = 3603
        if player_rows >= expected:
            print(f"✔️ Verified: {player_rows} rows in player (expected {expected})")
        else:
            print(f"⚠️ Warning: Only {player_rows} rows in player (expected {expected})")

        # referee_df
        result = conn.execute(text("SELECT COUNT(*) FROM referee"))
        referee_rows = result.scalar()
        expected = 626
        if referee_rows >= expected:
            print(f"✔️ Verified: {referee_rows} rows in referee (expected {expected})")
        else:
            print(f"⚠️ Warning: Only {referee_rows} rows in referee (expected {expected})")

        # match_df
        result = conn.execute(text("SELECT COUNT(*) FROM match"))
        match_rows = result.scalar()
        expected = 1340
        if match_rows >= expected:
            print(f"✔️ Verified: {match_rows} rows in match (expected {expected})")
        else:
            print(f"⚠️ Warning: Only {match_rows} rows in match (expected {expected})")

        # matchEvent_df
        result = conn.execute(text("SELECT COUNT(*) FROM matchEvent"))
        matchEvent_rows = result.scalar()
        expected = 2090774
        if matchEvent_rows >= expected:
            print(f"✔️ Verified: {matchEvent_rows} rows in matchEvent (expected {expected})")
        else:
            print(f"⚠️ Warning: Only {matchEvent_rows} rows in matchEvent (expected {expected})")

### Database Connections

In [38]:
engine_extraction = create_engine('postgresql://postgres:postgres@localhost:5432/reconciledDatabase')
engine_loading = create_engine('postgresql://postgres:postgres@localhost:5432/datawarehouse')

##### This code is used to verify that all data has been imported completely and correctly.

In [39]:
check_db_counts(engine_extraction)

✔️ Verified: 142 rows in club (expected 142)
✔️ Verified: 3603 rows in player (expected 3603)
✔️ Verified: 626 rows in referee (expected 626)
✔️ Verified: 1340 rows in match (expected 1340)
✔️ Verified: 2090774 rows in matchEvent (expected 2090774)


## **Data Extraction**

The `referee` table is not extracted because it does not exist in the final database, having been dropped during operations and attribute tree analysis. <br>
(Pruning operation)

In [40]:
player_df = pd.read_sql("SELECT * FROM player", con=engine_extraction)
club_df = pd.read_sql("SELECT * FROM club", con=engine_extraction)
match_df = pd.read_sql("SELECT * FROM match", con=engine_extraction)
matchEvent_df = pd.read_sql("SELECT * FROM matchevent", con=engine_extraction)

## **Data Transformation**

### Drop columns

In this phase, all columns corresponding to attributes removed during the attribute tree operations are dropped. These attributes were deemed unnecessary for the analysis and excluded through a grafting operation.

In [41]:
player_df = player_df.drop(columns=['foot'], errors='ignore')
player_df.head()

Unnamed: 0,id,firstname,lastname,birthdate,country,position,height
0,0,Harun,Tekin,1989-06-17,Turkey,Goalkeeper,187.0
1,1,Malang,Sarr,1999-01-23,France,Defender,182.0
2,2,Over,Mandanda,1998-10-26,France,Goalkeeper,176.0
3,3,Alfred John Momar,N'Diaye,1990-03-06,France,Midfielder,187.0
4,4,Ibrahima,Konaté,1999-05-25,France,Defender,192.0


In [42]:
match_df = match_df.drop(columns=['venue', 'season', 'referee_id'], errors='ignore')
match_df.head()

Unnamed: 0,id,dateutc,competition,home_club,away_club,winner,goal_by_home_club,goal_by_away_club
0,0,2018-05-20 18:45:00,Italian first division,Lazio,Internazionale,Internazionale,2,3
1,1,2018-05-20 18:45:00,Italian first division,Sassuolo,Roma,Roma,0,1
2,2,2018-05-20 16:00:00,Italian first division,Cagliari,Atalanta,Cagliari,1,0
3,3,2018-05-20 16:00:00,Italian first division,Chievo,Benevento,Chievo,1,0
4,4,2018-05-20 16:00:00,Italian first division,Udinese,Bologna,Udinese,1,0


In [43]:
matchEvent_df = matchEvent_df.drop(columns=['eventsec', 'is_success'], errors='ignore')
matchEvent_df.head()

Unnamed: 0,id,club_id,match_id,player_id,matchperiod,eventname,action,modifier,x_begin,x_end,y_begin,y_end
0,26588,81,364,178,1H,Others on the ball,Touch,opportunity,97,95.0,58,68.0
1,67599,57,339,1838,1H,Others on the ball,Touch,opportunity,88,91.0,32,49.0
2,106637,47,319,1572,2H,Others on the ball,Touch,opportunity,92,87.0,44,0.0
3,209035,63,253,1261,2H,Others on the ball,Touch,opportunity,92,93.0,58,57.0
4,233767,38,250,913,1H,Others on the ball,Touch,opportunity,89,89.0,59,71.0


### Convert `birthdate` to `age`

We perform attribute derivation by calculating age from birthdate, then removing the original birthdate column.

In [44]:
today = pd.to_datetime('today')
birthdates = pd.to_datetime(player_df['birthdate'])

age = today.year - birthdates.dt.year - (
    (today.month < birthdates.dt.month) | 
    ((today.month == birthdates.dt.month) & (today.day < birthdates.dt.day))
)

player_df.insert(3, 'age', age)
player_df = player_df.drop(columns=['birthdate'])

player_df.head()

Unnamed: 0,id,firstname,lastname,age,country,position,height
0,0,Harun,Tekin,36,Turkey,Goalkeeper,187.0
1,1,Malang,Sarr,26,France,Defender,182.0
2,2,Over,Mandanda,26,France,Goalkeeper,176.0
3,3,Alfred John Momar,N'Diaye,35,France,Midfielder,187.0
4,4,Ibrahima,Konaté,26,France,Defender,192.0


### Combine `firstname` and `lastname` into `fullname`

In [45]:
fullname = player_df['firstname'] + ' ' + player_df['lastname']
player_df.insert(1, 'fullname', fullname)
player_df = player_df.drop(columns=['firstname', 'lastname'])
player_df.head()

Unnamed: 0,id,fullname,age,country,position,height
0,0,Harun Tekin,36,Turkey,Goalkeeper,187.0
1,1,Malang Sarr,26,France,Defender,182.0
2,2,Over Mandanda,26,France,Goalkeeper,176.0
3,3,Alfred John Momar N'Diaye,35,France,Midfielder,187.0
4,4,Ibrahima Konaté,26,France,Defender,192.0


### Extracting a date dimension from `dateuct` and inserting `id_date` into `match_event`.

We want normalize the match table by extracting its dateutc column into a separate dimension table named `dateutc`. This transformation aligns the data with a star schema structure. The new dateutc table stores unique timestamps along with their corresponding year, month, day, hour (including minutes), and weekday name. We then establish a foreign key relationship by adding a `dateutc_id` to the matchevent table, allowing direct access to temporal information.

In [46]:
# Valori unique di dateutc
dateutc_df = match_df[['dateutc']].drop_duplicates().reset_index(drop=True)
dateutc_df.insert(0, 'id', dateutc_df.index)

# Dizionario (mappa) da match_id a data (da match_df)
match_id_to_data = dict(zip(match_df['id'], match_df['dateutc']))

# Dizionario (mappa) da data a dateutc_id (da dateutc_df)
data_to_dateutc_id = dict(zip(dateutc_df['dateutc'], dateutc_df['id']))

# Applico le due mappe in cascata
matchEvent_df.insert(4, 'dateutc_id', matchEvent_df['match_id'].map(match_id_to_data).map(data_to_dateutc_id))

# Spezzo la data nella colonne previste per la tabella dateutc
dateutc_df['year'] = dateutc_df['dateutc'].dt.year
dateutc_df['month'] = dateutc_df['dateutc'].dt.month
dateutc_df['day'] = dateutc_df['dateutc'].dt.day
dateutc_df['hour'] = dateutc_df['dateutc'].dt.strftime('%H:%M')
dateutc_df['dayOfWeek'] = dateutc_df['dateutc'].dt.day_name()

# Elimino le colonne ridondanti
match_df = match_df.drop(columns=['dateutc'])
dateutc_df = dateutc_df.drop(columns=['dateutc'])

### Extracting Event Dimension from MatchEvent Data into a `event` Table

We aim to extract the `eventname`, `action`, and `modifier` columns from the `matchEvent` table to create a new `event` table, representing a normalized dimension for future aggregations. Then, we insert the corresponding 'event_id' into matchEvent to follow the star schema structure.


In [47]:
event_df = matchEvent_df[['eventname', 'action', 'modifier']].drop_duplicates()
event_df.insert(0, 'id', event_df.index)

# Chiave di confronto in entrambi i dataframe: una tupla delle 3 colonne
matchEvent_df['tripla'] = list(zip(matchEvent_df['eventname'], matchEvent_df['action'], matchEvent_df['modifier']))
event_df['tripla'] = list(zip(event_df['eventname'], event_df['action'], event_df['modifier']))

# Dizionario che mappa le triple all'id corrispondente in event_df
tripla_to_id = dict(zip(event_df['tripla'], event_df['id']))

# Mappatura a matchEvent_df per riempire 'event_id'
matchEvent_df.insert(5, 'event_id', matchEvent_df['tripla'].map(tripla_to_id))

# Elimino le colonne ridondanti
matchEvent_df.drop(columns=['tripla', 'eventname', 'action', 'modifier'], inplace=True)
event_df.drop(columns=['tripla'], inplace=True)


Check that the structure of the new and modified tables aligns with the database schema.


In [48]:
match_df.head()

Unnamed: 0,id,competition,home_club,away_club,winner,goal_by_home_club,goal_by_away_club
0,0,Italian first division,Lazio,Internazionale,Internazionale,2,3
1,1,Italian first division,Sassuolo,Roma,Roma,0,1
2,2,Italian first division,Cagliari,Atalanta,Cagliari,1,0
3,3,Italian first division,Chievo,Benevento,Chievo,1,0
4,4,Italian first division,Udinese,Bologna,Udinese,1,0


In [49]:
dateutc_df.head()

Unnamed: 0,id,year,month,day,hour,dayOfWeek
0,0,2018,5,20,18:45,Sunday
1,1,2018,5,20,16:00,Sunday
2,2,2018,5,20,13:00,Sunday
3,3,2018,5,19,13:00,Saturday
4,4,2018,5,13,18:45,Sunday


In [50]:
event_df.head()

Unnamed: 0,id,eventname,action,modifier
0,0,Others on the ball,Touch,opportunity
49,49,Others on the ball,Touch,missed ball
65,65,Duel,Air duel,won
329,329,Offside,,
395,395,Foul,Foul,


In [51]:
matchEvent_df.head()

Unnamed: 0,id,club_id,match_id,player_id,dateutc_id,event_id,matchperiod,x_begin,x_end,y_begin,y_end
0,26588,81,364,178,181,0,1H,97,95.0,58,68.0
1,67599,57,339,1838,168,0,1H,88,91.0,32,49.0
2,106637,47,319,1572,159,0,2H,92,87.0,44,0.0
3,209035,63,253,1261,127,0,2H,92,93.0,58,57.0
4,233767,38,250,913,124,0,1H,89,89.0,59,71.0


In [52]:
club_df.head()

Unnamed: 0,id,name,officialname,country
0,0,Newcastle United,Newcastle United FC,England
1,1,Celta de Vigo,Real Club Celta de Vigo,Spain
2,2,Espanyol,Reial Club Deportiu Espanyol,Spain
3,3,Deportivo Alavés,Deportivo Alavés,Spain
4,4,Levante,Levante UD,Spain


## **Data Loading**

Now that the transformation phase is complete, we can proceed with the load phase, loading the data into the database structured according to the star schema defined during the design phase.

In [53]:
club_df.to_sql('club', engine_loading, if_exists='append', index=False)
dateutc_df.to_sql('dateutc', engine_loading, if_exists='append', index=False)
event_df.to_sql('event', engine_loading, if_exists='append', index=False)
player_df.to_sql('player', engine_loading, if_exists='append', index=False)
match_df.to_sql('match', engine_loading, if_exists='append', index=False)
matchEvent_df.to_sql('matchevents', engine_loading, if_exists='append', index=False)

print("All dataframes uploaded successfully.")

All dataframes uploaded successfully.
