<a href="https://colab.research.google.com/github/doctorsmylie/mtg-draft-agent/blob/main/deck_classification/deck_eval.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

INIT CONFIG

In [None]:
# Configure Drive or Jupyter notebook -- only runs when first loaded
if "CONFIG_DONE" not in globals():
    # Need to mount drive and clone repo to access data and functions
    try:
        from google.colab import drive  # type: ignore

        IN_COLAB = True

        # clone repo
        !git clone https://github.com/doctorsmylie/mtg-draft-agent
        %cd mtg-draft-agent

    except ModuleNotFoundError:
        IN_COLAB = False

    # Finish configuration -- also configures notebook outside of Colab
    %run "project_path.ipynb"
else:
    print("Config done before loading deck_eval.ipynb")

Cloning into 'mtg-draft-agent'...
remote: Enumerating objects: 232, done.[K
remote: Counting objects: 100% (14/14), done.[K
remote: Compressing objects: 100% (10/10), done.[K
remote: Total 232 (delta 2), reused 10 (delta 2), pack-reused 218 (from 1)[K
Receiving objects: 100% (232/232), 11.97 MiB | 27.17 MiB/s, done.
Resolving deltas: 100% (122/122), done.
/content/mtg-draft-agent
Starting config...
Running in Colab? Yes

Configuring Google Colab...
Mounting Drive...
Mounted at /content/mtg-draft-agent/drive
BASE_PATH =  /content/mtg-draft-agent
DATA_FOLDER = /content/mtg-draft-agent/drive/MyDrive/Erdos25/MTGdraft
BASE_PATH == os.getcwd(): True

Configuration done


###Load deck data and column name lists

In [None]:
%run "17landsdataimport.ipynb"

Load Cluster Data

In [None]:
def load_cluster_data(file_path):
  """
  Loads the cluster data from a parquet file into a polars DataFrame.

  Args:
    file_path: The path to the parquet file.

  Returns:
    A polars DataFrame containing the cluster data.
  """
  cluster_data = pl.read_parquet(file_path)
  return cluster_data

In [None]:
cluster_data=load_cluster_data(DATA_PATH + '/cluster_data.parquet')

###Load Scaler and PCA

In [None]:
import pickle

def load_scaler(file_path):
    with open(file_path, 'rb') as f:
        scaler = pickle.load(f)
    return scaler

def load_pca_model(file_path):
    with open(file_path, 'rb') as f:
        pca_model = pickle.load(f)
    return pca_model

In [None]:
scaler = load_scaler(DATA_PATH + '/scaler.pkl')
pca_model = load_pca_model(DATA_PATH + '/pca_model.pkl')

###Scale/PCA/Normalization transform

In [None]:
def normalize_data(data):
  if not isinstance(data, np.ndarray):
    try:
      data = np.array(data)
    except:
      raise Exception('data must be arraylike')

  return data/(np.linalg.norm(data, axis=1).reshape(-1,1))

def scale_transform(data, scaler=scaler):
  return scaler.transform(data)

def pca_transform(data, pca_model=pca_model):
  return pca_model.transform(data)

def scale_pca_transform(data, scaler=scaler, pca_model=pca_model):
  return pca_model.transform(scaler.transform(data))

def normalize_scale_pca_transform(data, scaler=scaler, pca_model=pca_model):
  return pca_model.transform(scaler.transform(normalize_data(data)))

def select_colums(data, columns=deck_atributes):
  return data.select(columns).to_numpy()

###Find closest clusters function

In [None]:
def find_5_closest_clusters_of_rows (data,cluster_data, n_closest_centers=5, columns=deck_atributes):
  normalized_data= normalize_data(data.select(columns).to_numpy())
  # Select only the relevant columns from cluster_data for normalization
  cluster_columns = cluster_data.select(columns).to_numpy()
  normalized_clusters = normalize_data(cluster_columns)

  # Calculate squared Euclidean distance between each data point and each cluster centroid
  # (a-b)^2 = a^2 - 2ab + b^2
  # sum((a-b)^2) = sum(a^2) - 2*sum(ab) + sum(b^2)
  # sum(a^2) is the squared norm of each data point, which is 1 since data is normalized
  # sum(b^2) is the squared norm of each cluster centroid, which is 1 since cluster centroids are normalized
  # So squared distance is 1 - 2*sum(ab) + 1 = 2 - 2*sum(ab) = 2*(1 - sum(ab))
  # sum(ab) is the dot product between each data point and each cluster centroid
  # This can be calculated efficiently using matrix multiplication: normalized_data @ normalized_clusters.T
  dot_product = normalized_data @ normalized_clusters.T
  squared_distance = 2 * (1 - dot_product)

  # Create an array with cube numbers, cluster number, and squared distance
  # The cube numbers and cluster number are in the first 4 columns of cluster_data
  # The squared_distance needs to be matched to the corresponding cluster
  # We can use the index of the closest cluster
  # This requires iterating through each row of the input data

  results = []
  # Get cube and cluster information for each cluster
  cluster_info = cluster_data.with_row_index().select(['index','cube_1', 'cube_2', 'cube_3', 'cluster_number']).to_numpy()

  for i in range(data.shape[0]):
    # Find the index of the 5 closest cluster for the current data point
    closest_cluster_indices = np.argpartition(squared_distance[i], 4)[:n_closest_centers]

    # Get the cube and cluster number of the closest cluster
    closest_5_array = cluster_info[closest_cluster_indices]

    # Get the squared distance to the closest cluster
    distances = squared_distance[i, closest_cluster_indices]

    # join distances to closest 5 array
    closest_5_array = np.column_stack((closest_5_array, distances))

    results.append(closest_5_array)

  return np.stack(results)

###Adjusted WR calculation fn

In [None]:
def calculate_adjusted_win_rate_inverse_squared_weights(data,cluster_data , n_closest_centers=5, columns=deck_atributes):
  closest_5_array = find_5_closest_clusters_of_rows(data, cluster_data, columns=columns, n_closest_centers=n_closest_centers)
  win_rates_all = cluster_data.select('win_rate_adjusted').to_numpy()
  total_matches_all = cluster_data.select('total_matches_adjusted').to_numpy()

  weighted_win_rates = []
  for i in range(closest_5_array.shape[0]):
    # Get the indices of the 5 closest clusters for the current data point
    closest_cluster_indices = closest_5_array[i, :, 0].astype(int)

    # Get the squared distances to the closest clusters
    distances_squared = closest_5_array[i, :, 5]

    # Filter out clusters with zero distance to avoid division by zero
    non_zero_distance_indices = distances_squared != 0
    closest_cluster_indices = closest_cluster_indices[non_zero_distance_indices]
    distances_squared = distances_squared[non_zero_distance_indices]

    if len(closest_cluster_indices) == 0:
        weighted_win_rates.append(np.nan) # Or some other indicator for no valid clusters
        continue


    # Get the win rates and total_matches_adjusted for the closest clusters
    win_rates = win_rates_all[closest_cluster_indices]

    total_matches = total_matches_all[closest_cluster_indices]

    # Calculate the weights
    weights = total_matches / distances_squared

    # Calculate the weighted average win rate
    weighted_average_win_rate = np.sum(win_rates * weights) / np.sum(weights)
    weighted_win_rates.append(weighted_average_win_rate)

  return np.array(weighted_win_rates)