# Part 1

### Task 1 - Examine the sample data, determine which rows don't follow the business rules and clean the data.

##### Detect and clean rows that break the 5 business rules:

1. The players are not allowed to make any transactions before they are KYC verified.
2. Multiple players can use the same affiliate code, but each code redeemed should match 1:1 to the
player with the affiliate ID.
3. An affiliate redeemed must be linked to a player.
4. Players may or may not use an affiliate code to join.
5. The IDs of all tables are unique.

In [1]:
# Import necessary libraries

import pandas as pd
from pandas import Timedelta
import numpy as np
from datetime import timedelta
import random



# Set random seed for reproducibility.
# It ensures that any random operations produce the same results each time the code is run.
random.seed(42)
np.random.seed(42)

In [2]:
# Define root path
ROOT = '/Users/augustocardosoagostini/Desktop/Outros Documentos/Documentos Gerais/trainings/dbt_airflow/project/analytical-assessment/technical-assessment/data'

# Load data
df_players = pd.read_csv(f"{ROOT}/Sample_data_-_Technical_Interview_-_players.csv")
df_affiliates = pd.read_csv(f"{ROOT}/Sample_data_-_Technical_Interview_-_affiliates.csv")
df_transactions = pd.read_csv(f"{ROOT}/Sample_data_-_Technical_Interview_-_transactions.csv")

# Check the shape of the dataframes
print("players:", df_players.shape)
print("affiliates:", df_affiliates.shape)
print("transactions:", df_transactions.shape)

players: (11, 6)
affiliates: (10, 4)
transactions: (10, 5)


In [3]:
# Keep originals untouched
df_players_clean = df_players.copy()
df_affiliates_clean = df_affiliates.copy()
df_transactions_clean = df_transactions.copy()

# Standardize column names
df_players_clean.columns = [col.strip().lower() for col in df_players_clean.columns]
df_affiliates_clean.columns = [col.strip().lower() for col in df_affiliates_clean.columns]
df_transactions_clean.columns = [col.strip().lower() for col in df_transactions_clean.columns]

# Convert timestamp columns to datetimes (errors='coerce' keeps invalid rows as NaT)
df_players_clean['created_at'] = pd.to_datetime(df_players_clean['created_at'], errors='coerce')
df_players_clean['updated_at'] = pd.to_datetime(df_players_clean['updated_at'], errors='coerce')
df_affiliates_clean['redeemed_at'] = pd.to_datetime(df_affiliates_clean['redeemed_at'], errors='coerce')
df_transactions_clean['timestamp'] = pd.to_datetime(df_transactions_clean['timestamp'], errors='coerce')

# Normalize affiliate_id column to nullable integer (pandas Int64)
df_players_clean['affiliate_id'] = df_players_clean['affiliate_id'].astype('Int64')

# Prepare exceptions log list-of-dfs
exceptions = []

In [4]:
# Rule 5: IDs of all tables are unique

# transactions.id unique
tx_dup_ids = df_transactions_clean[df_transactions_clean['id'].duplicated(keep=False)].sort_values('id')
if not tx_dup_ids.empty:
    tx_dup_ids['reason'] = 'duplicate_transaction_id'
    exceptions.append(tx_dup_ids)
    # Keep the latest by timestamp for duplicates
    df_transactions_clean = df_transactions_clean.sort_values('timestamp').drop_duplicates('id', keep='last').reset_index(drop=True)

# players.id unique
player_dup = df_players_clean[df_players_clean['id'].duplicated(keep=False)].sort_values(['id','updated_at'])
if not player_dup.empty:
    player_dup['reason'] = 'duplicate_player_id'
    exceptions.append(player_dup)
    # Business rule: keep the most recently updated record
    df_players_clean = df_players_clean.sort_values('updated_at').drop_duplicates('id', keep='last').reset_index(drop=True)

# affiliates.id unique
aff_dup = df_affiliates_clean[df_affiliates_clean['id'].duplicated(keep=False)]
if not aff_dup.empty:
    aff_dup['reason'] = 'duplicate_affiliate_id'
    exceptions.append(aff_dup)
    df_affiliates_clean = df_affiliates_clean.drop_duplicates('id', keep='last').reset_index(drop=True)

print("Shapes after deduplication for players, affiliates, and transactions:")
print(df_players_clean.shape, df_affiliates_clean.shape, df_transactions_clean.shape)


Shapes after deduplication for players, affiliates, and transactions:
(10, 6) (10, 4) (10, 5)


In [5]:
# Rule 1: No transactions before player is KYC verified

# Join transaction -> player.created_at and player.is_kyc_approved
player_meta = df_players_clean[['id','is_kyc_approved','created_at']].rename(columns={'id':'player_id','created_at':'player_created_at','is_kyc_approved':'player_is_kyc'})
tx_with_meta = df_transactions_clean.merge(player_meta, on='player_id', how='left', indicator=True)

# 1a: transactions referencing missing players -> violation of rule 3 (affiliate redeemed must be linked to player)
missing_player_tx = tx_with_meta[tx_with_meta['_merge']=='left_only'].copy()
if not missing_player_tx.empty:
    missing_player_tx['reason'] = 'tx_missing_player'
    exceptions.append(missing_player_tx)

# 1b: transactions where player exists but is not KYC verified -> violate rule 1
tx_by_unkyc = tx_with_meta[(tx_with_meta['_merge']=='both') & (tx_with_meta['player_is_kyc']==False)].copy()
if not tx_by_unkyc.empty:
    tx_by_unkyc['reason'] = 'tx_by_unkyc_player'
    exceptions.append(tx_by_unkyc)

# 1c: transactions with timestamp < player's created_at (player couldn't have trans before account creation)
tx_before_created = tx_with_meta[(tx_with_meta['_merge']=='both') & (tx_with_meta['timestamp'] < tx_with_meta['player_created_at'])].copy()
if not tx_before_created.empty:
    tx_before_created['reason'] = 'tx_before_player_creation'
    exceptions.append(tx_before_created)


# Conservative cleaning choices:
# - Drop transactions referencing missing players (assume we cannot assign it to an invented player). -> remove from df_transactions_clean.
# - Drop transactions by not KYC verified players (business rule forbids it). Put them into exceptions and remove.
# - Drop transactions before player creation (assume data error, cannot fix). Put them into exceptions and remove.

# Implementing changes:
# Remove all transactions that break these rules from the clean dataset
bad_tx_ids = set(missing_player_tx['id'].tolist()) \
           | set(tx_by_unkyc['id'].tolist()) \
           | set(tx_before_created['id'].tolist())

df_transactions_clean = df_transactions_clean[~df_transactions_clean['id'].isin(bad_tx_ids)].reset_index(drop=True)

In [6]:
# Rules 2 and 3 (affiliate redemption logic)

# Rules explanation:
# - Multiple players can share the same affiliate code.
# - BUT, if an affiliate has redeemed_at NOT NULL, it must be linked to exactly ONE player.
# - That is, if multiple players reference a redeemed affiliate, 
#   or a redeemed affiliate is not referenced by any player, that affiliate record violates the rules.

# Count how many players reference each affiliate
player_aff_counts = df_players_clean.groupby('affiliate_id')['id'].count().rename('player_count')

# Merge into affiliates
aff_with_counts = df_affiliates_clean.merge(player_aff_counts, how='left', left_on='id', right_on='affiliate_id').fillna({'player_count': 0})

# Identify violations
viol_redeemed_no_player = aff_with_counts[aff_with_counts['redeemed_at'].notna() & (aff_with_counts['player_count'] == 0)].copy()
viol_redeemed_multi_players = aff_with_counts[aff_with_counts['redeemed_at'].notna() & (aff_with_counts['player_count'] > 1)].copy()

if not viol_redeemed_no_player.empty:
    viol_redeemed_no_player['reason'] = 'redeemed_affiliate_not_linked_to_player'
    exceptions.append(viol_redeemed_no_player)

if not viol_redeemed_multi_players.empty:
    viol_redeemed_multi_players['reason'] = 'redeemed_affiliate_has_multiple_players'
    exceptions.append(viol_redeemed_multi_players)

# Null out redeemed_at if the redemption is invalid (no player or multiple players)
bad_aff_ids = set(viol_redeemed_no_player['id']).union(set(viol_redeemed_multi_players['id']))
df_affiliates_clean.loc[df_affiliates_clean['id'].isin(bad_aff_ids), 'redeemed_at'] = pd.NaT


In [7]:
# Rule 4: players may or may not use affiliate code

# It's allowed to have a player's affiliate_id missing (we only ensure affiliate_id values really exist in affiliates)
missing_aff_ids = set(df_players_clean['affiliate_id'].dropna().astype(int)) - set(df_affiliates_clean['id'].astype(int))
if missing_aff_ids:
    # Conservative action: set those affiliate_id to NaN (we don't invent affiliate rows)
    df_players_clean.loc[df_players_clean['affiliate_id'].isin(list(missing_aff_ids)), 'affiliate_id'] = pd.NA
    # record in exceptions
    rec = df_players_clean[df_players_clean['affiliate_id'].isna()].copy()
    rec['reason'] = 'player_affiliate_missing_after_validation'
    exceptions.append(rec)


In [8]:
# Final checks and write cleaned copies

# Ensure transactions' player_id refers to an actual player (remove any remaining or record)
tx_missing_players = df_transactions_clean[~df_transactions_clean['player_id'].isin(df_players_clean['id'])]
if not tx_missing_players.empty:
    tx_missing_players['reason'] = 'tx_post_clean_missing_player'
    exceptions.append(tx_missing_players)
    df_transactions_clean = df_transactions_clean[df_transactions_clean['player_id'].isin(df_players_clean['id'])]

# Ensure amounts > 0
bad_amounts = df_transactions_clean[df_transactions_clean['amount'] <= 0]
if not bad_amounts.empty:
    bad_amounts['reason'] = 'non_positive_amount'
    exceptions.append(bad_amounts)
    df_transactions_clean = df_transactions_clean[df_transactions_clean['amount'] > 0]

# summary
print("Final shapes after cleaning")
print("players:", df_players_clean.shape)
print("affiliates:", df_affiliates_clean.shape)
print("transactions:", df_transactions_clean.shape)

# create df_exceptions if any
if exceptions:
    df_exceptions = pd.concat(exceptions, ignore_index=True, sort=False)
else:
    df_exceptions = pd.DataFrame(columns=['reason'])

# Show top violations by reason
if not df_exceptions.empty:
    print("\nExceptions summary (counts by reason):")
    print(df_exceptions['reason'].value_counts())
else:
    print("No exceptions recorded.")


Final shapes after cleaning
players: (10, 6)
affiliates: (10, 4)
transactions: (3, 5)

Exceptions summary (counts by reason):
reason
tx_before_player_creation    6
duplicate_player_id          2
tx_by_unkyc_player           2
Name: count, dtype: int64


In [9]:
# Write CSVs for inspection

df_players_clean.to_csv(f"{ROOT}/players_cleaned_from_notebook.csv", index=False)
df_affiliates_clean.to_csv(f"{ROOT}/affiliates_cleaned_from_notebook.csv", index=False)
df_transactions_clean.to_csv(f"{ROOT}/transactions_cleaned_from_notebook.csv", index=False)
df_exceptions.to_csv(f"{ROOT}/exceptions_from_notebook.csv", index=False)

print("Wrote cleaned data and exceptions to CSV files.")


Wrote cleaned data and exceptions to CSV files.


### Task 2 - Write a Python script that extends the sample data for these tables according to the provided schema to 1000 rows.

In [10]:
# Python script part_1_task_2__expand_clean_data.py in the /scripts directory 