In [None]:
###############################################

In [None]:
!pip install transformers
!pip install datasets
!pip install tiktoken
!pip install wandb
!pip install tqdm

In [None]:
import pandas as pd
data = pd.read_json('data/mtg_char/default-cards-20230313210730.json')

In [None]:
# peek at the data
print(data.shape)
data.head()

In [None]:
# filter out cards that are not in english
data = data[data['lang'] == 'en']
print(data.shape)
data.head()

In [None]:
# Keep only normal cards
data = data[data['layout'] == 'normal']

In [None]:
data.head()

In [None]:
# remove cards with duplicate names
data = data.drop_duplicates(subset=['name'])
print(data.shape)
data.head()

In [None]:
# now that we dont have any other languages, we can drop the lang column
data = data.drop(columns=['lang'])
print(data.shape)
data.head()

In [None]:
data = data.drop(columns=['mtgo_id', 'mtgo_foil_id', 'arena_id', 'tcgplayer_id', 'cardmarket_id', 'legalities', 'games', 'reserved', 'foil', 'nonfoil', 'finishes', 'oversized', 'promo', 'reprint', 'variation', 'artist', 'artist_ids', 'illustration_id', 'border_color', 'booster', 'story_spotlight', 'edhrec_rank', 'penny_rank','prices', 'promo_types', 'arena_id', 'preview', 'security_stamp', 'tcgplayer_etched_id', 'variation_of','released_at', 'set_id','set', 'set_uri', 'set_search_uri', 'scryfall_set_uri', 'rulings_uri', 'prints_search_uri', 'card_back_id', 'frame', 'related_uris'])

In [None]:
data = data.drop(columns=['object', 'id', 'oracle_id', 'uri', 'scryfall_uri', 'layout', 'highres_image', 'image_status', 'printed_name', 'card_faces', 'attraction_lights', 'color_indicator', 'color_indicator', 'life_modifier', 'hand_modifier', 'printed_type_line', 'printed_text', 'content_warning', 'flavor_name'])

In [None]:
data = data.drop(columns=['full_art', 'textless', 'all_parts', 'produced_mana', 'watermark', 'loyalty', 'frame_effects', 'digital', 'cmc' ])

In [None]:
data = data.drop(columns=['keywords', 'collector_number', 'colors', 'color_identity'])

In [None]:
print(data.shape)
data.head()

In [None]:
data['oracle_text'] = data.apply(lambda x: x['oracle_text'].replace(x['name'], '<card_name>'), axis=1)


In [None]:
print(data.shape)
data.head()

In [None]:
# Build a string representation of each card to be saved out as a text file.
# Card data will be represented as a string with the following format:
# "<|endoftext|>{card_name} | ?{mana_cost} | {type_line} | {rarity} | ?{oracle_text} | ?{flavor_text} | ?{power} | ?{toughness}<|endoftext|>"
# The ? indicates that the field may be empty.

# start of text token
sot = "<|startoftext|>"
# end of text token
eot = "<|endoftext|>"
# pad token
pad = "<|pad|>"

card_data = []

for index, row in data.iterrows():
    card_data.append(f" {eot} {row['name']} | {row['mana_cost']} | {row['type_line']} | {row['rarity']} | {row['oracle_text']} | {row['flavor_text']} | {row['power']} | {row['toughness']} {eot} \n")

# sample the first 10 cards
card_data[:10]


In [None]:
# Remove all new line characters from the data
card_data = [card.replace('\n', ' ') for card in card_data]

In [None]:
# save the card data to a text file
with open('data/mtg_char/mtg_card_data.txt', 'w') as f:
    f.write(''.join(card_data))

In [None]:
# write out the dataframe to a json file
data.to_json('data/mtg_char/mtg_card_data.json', orient='records', lines=True)

# Load the data into a dataset

In [None]:
# Create a dataset from the text file
from datasets import load_dataset
dataset = load_dataset('text', data_files='data/mtg_char/mtg_card_data.json')


In [None]:
# sample the first 10 cards
dataset['train'][:1]


In [None]:
dataset['train'].shape

In [None]:
# create a tokenizer

In [None]:
from transformers import GPT2Tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2', max_length=1024 , padding='max_length', pad_to_max_length=True, add_prefix_space=True, truncation=True, bos_token='<|endoftext|>', eos_token='<|endoftext|>', pad_token='<|pad|>', return_tensors='pt', mask_token='<|mask|>')


In [None]:
# tokenize the dataset
tokenized_dataset = dataset.map(lambda examples: tokenizer(examples['text'], add_special_tokens=True ), batched=True)



In [None]:
tokenized_dataset.shape

In [None]:
# split the dataset into train and validation
tokenized_dataset = tokenized_dataset['train'].train_test_split(test_size=0.2)


In [None]:
from transformers import DataCollatorForLanguageModeling

data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer, mlm=True, mlm_probability=0.15
)

In [None]:
# create a model
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained('gpt2-medium')


In [None]:
# train the model
from transformers import Trainer, TrainingArguments
training_args = TrainingArguments(
    output_dir='./results',          # output directory
    num_train_epochs=1,              # total # of training epochs
    per_device_train_batch_size=1,  # batch size per device during training
    per_device_eval_batch_size=1,   # batch size for evaluation
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    logging_dir='./logs',            # directory for storing logs
    logging_steps=10,
)


In [None]:
trainer = Trainer(
    model=model,                         # the instantiated 🤗 Transformers model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=tokenized_dataset['train'],         # training dataset
    eval_dataset=tokenized_dataset['test'],           # evaluation dataset
    data_collator=data_collator,         # data collator
)


In [None]:
from transformers.integrations import WandbCallback

# disable the wandb logger
trainer.remove_callback(WandbCallback)

In [None]:
trainer.train()

In [None]:
# peek into the tokenized dataset
tokenized_dataset['train'][0]






In [None]:
# Create a dataset from the text file
from datasets import load_dataset

dataset = load_dataset('text', data_files='data/mtg_char/mtg_card_data.json')

# sample the first 10 cards
dataset['train'][:1]

dataset['train'].shape
# create a tokenizer
from transformers import GPT2Tokenizer

tokenizer = GPT2Tokenizer.from_pretrained('gpt2', max_length=1024, padding='max_length', pad_to_max_length=True, add_prefix_space=True, truncation=True, bos_token='', eos_token='', pad_token='<pad>', return_tensors='pt', mask_token='')


# tokenize the dataset
def tokenize_function(examples):
    return tokenizer(examples['text'], add_special_tokens=True, padding='max_length', truncation=True, max_length=1024)


tokenized_dataset = dataset.map(tokenize_function, batched=True)

tokenized_dataset.shape
# split the dataset into train and validation
tokenized_dataset = tokenized_dataset['train'].train_test_split(test_size=0.2)

from transformers import DataCollatorForLanguageModeling

data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer, mlm=True, mlm_probability=0.15
)
# create a model
from transformers import AutoModelForCausalLM

model = AutoModelForCausalLM.from_pretrained('gpt2-medium')

# train the model
from transformers import Trainer, TrainingArguments

training_args = TrainingArguments(
    output_dir='./results',  # output directory
    num_train_epochs=1,  # total # of training epochs
    per_device_train_batch_size=1,  # batch size per device during training
    per_device_eval_batch_size=1,  # batch size for evaluation
    warmup_steps=500,  # number of warmup steps for learning rate scheduler
    weight_decay=0.01,  # strength of weight decay
    logging_dir='./logs',  # directory for storing logs
    logging_steps=10,
    device='cpu'
)

trainer = Trainer(
    model=model,  # the instantiated 🤗 Transformers model to be trained
    args=training_args,  # training arguments, defined above
    train_dataset=tokenized_dataset['train'],  # training dataset
    eval_dataset=tokenized_dataset['test'],  # evaluation dataset
    data_collator=data_collator,  # data collator
)

from transformers.integrations import WandbCallback

# disable the wandb logger
trainer.remove_callback(WandbCallback)

trainer.train()


In [None]:
# Create a dataset from the text file
from datasets import load_dataset
dataset = load_dataset('text', data_files='data/mtg_char/mtg_card_data.json')

# create a tokenizer
from transformers import GPT2Tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('gpt2', max_length=1024 , padding='max_length', pad_to_max_length=True, add_prefix_space=True, truncation=True, bos_token='', eos_token='', pad_token='<pad>', return_tensors='pt', mask_token='')

# tokenize the dataset
tokenized_dataset = dataset.map(lambda examples: tokenizer(examples['text'], add_special_tokens=True ), batched=True)

# split the dataset into train and validation
tokenized_dataset = tokenized_dataset['train'].train_test_split(test_size=0.2)

# create a model
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained('gpt2-medium')
model.resize_token_embeddings(len(tokenizer))

# train the model
from transformers import Trainer, TrainingArguments, DataCollatorForLanguageModeling
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=1,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
)

data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer, mlm=True, mlm_probability=0.15
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset['train'],
    eval_dataset=tokenized_dataset['test'],
    data_collator=data_collator
)

from transformers.integrations import WandbCallback

# disable the wandb logger
trainer.remove_callback(WandbCallback)

trainer.train()


In [None]:
# Evalute the model
trainer.evaluate()


In [None]:
# save the model
trainer.save_model('mtg_card_model')


In [None]:
#sample the model output
from transformers import pipeline
unmasker = pipeline('fill-mask', model='mtg_card_model', tokenizer='gpt2')
unmasker('')




In [None]:
#generate the model output
from transformers import pipeline
generator = pipeline('text-generation', model='mtg_card_model', tokenizer='gpt2', max_length=1024, do_sample=True, top_k=50, top_p=0.95, num_return_sequences=1, repetition_penalty=1.0, temperature=1.0, no_repeat_ngram_size=2, bad_words_ids=None, pad_token_id=50256, length_penalty=1.0, num_beams=1, early_stopping=False, use_cache=True, num_beam_groups=1, diversity_penalty=0.0, prefix_allowed_tokens_fn=None, output_attentions=None, output_hidden_states=None, output_scores=None, return_dict_in_generate=None, forced_bos_token_id=None, forced_eos_token_id=None, remove_invalid_values=None, return_dict=None)
generator('')


In [None]:
# sample the first element of the dataset

dataset['train'][0]
dataset.shape

In [8]:
# create a function to get the base64 encoded art_crop image
import base64
import requests

def get_image_url(card):
    return card['image_uris']['art_crop']

#create a function to get the image from the url in base64 encoding
def get_image_base64(card):
    url = get_image_url(card)
    response = requests.get(url)
    return base64.b64encode(response.content).decode('utf-8')


In [None]:
# Try the function out on the first row of data
get_image_base64(data.iloc[0])

In [None]:
# Test the image classifier api

import requests

response = requests.post("https://persing-clip-interrogator.hf.space/run/clipit", json={
  "data": [
    "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAACklEQVR4nGMAAQAABQABDQottAAAAABJRU5ErkJggg==",
    "ViT-L (best for Stable Diffusion 1.*)",
    "best",
]}).json()

description = response["data"]


In [None]:
description

In [None]:
# create a function to get the description from the image

def get_description(image_base64):
    response = requests.post("https://persing-clip-interrogator.hf.space/run/clipit", json={
        "data": [
            "data:image/png;base64," + image_base64 ,
            "ViT-L (best for Stable Diffusion 1.*)",
            "fast",
        ]}).json()
    return response["data"]


In [None]:
# Test the function
get_description(get_image_base64(data.iloc[0]))

In [None]:
# add a column to the dataset with the art_description for the first 10 rows
data['art_description'] = data.iloc[:1].apply(lambda row: get_description(get_image_base64(row)), axis=1)


In [None]:
# time how long it takes to get the descriptions for the first 5 rows
import time
start = time.time()
data['art_description'] = data.iloc[:5].apply(lambda row: get_description(get_image_base64(row)), axis=1)
end = time.time()
print(end - start)



In [None]:
# get the size of the dataset
data.shape


In [None]:
# 135 seconds time the data size, then divide by 60*60 to get the hours
135*25438/(60*60)

In [None]:
# that was the time before I changed the api call to fast from best
12.5*25438/(60*60)

In [None]:
# thats still way too long, maybe reduce the resolution of the images


# Check the current size of the images
import requests
from io import BytesIO
from PIL import Image

url = get_image_url(data.iloc[0])
response = requests.get(url)
img = Image.open(BytesIO(response.content))
img.size


In [None]:
# I wonder how acurate the image classifier is with the lower resolution images
# Let's try it with 256x256 images

# resize the image
img = img.resize((256, 256))
img.size

In [None]:
# convert the image to base64
base64_image = base64.b64encode(BytesIO(response.content).getvalue()).decode('utf-8')

# get the description
description = get_description(base64_image)

In [None]:
description

In [None]:
# let's redefine the get_image_base64 function to resize the image to 256x256


# def get_image_base64(card):
#     url = get_image_url(card)
#     response = requests.get(url)
#     img = Image.open(BytesIO(response.content))
#     img = img.resize((256, 256))
#     img_byte_arr = BytesIO()
#     img.save(img_byte_arr, format='PNG')
#     img_byte_arr = img_byte_arr.getvalue()
#     return base64.b64encode(BytesIO(img_byte_arr).getvalue()).decode('utf-8')

In [None]:
# It looks like that still works, so let's try timing the first 5 rows again but this time resize the images to 256x256
start = time.time()
data['art_description'] = data.iloc[:5].apply(lambda row: get_description(get_image_base64(row)), axis=1)
end = time.time()
print(end - start)


In [None]:
# that actually took longer. I wonder if it's because the images are being resized. Let's try it again but this time resize the images before the api call

# here we will resize the images before the api call and store the base64 encoded image in the art_description column
data['art_description'] = data.iloc[:5].apply(lambda row: get_image_base64(row), axis=1)

# now we will get the description from the base64 encoded image and time it
start = time.time()
data['art_description'] = data.iloc[:5].apply(lambda row: get_description(row['art_description']), axis=1)
end = time.time()
print(end - start)

In [None]:
# That still took longer, I guess the model is already downsizing the images. So there is no point in downsizing the images before the api call

In [None]:
# let's see if all rows have a value for art_crop
data['image_uris'].isnull().sum()
# 0 rows have a null value for image_uris so now lets see if all of the image uris have an art_crop
data['image_uris'].apply(lambda x: x['art_crop'] if x is not None else None).isnull().sum()

# data['image_uris'][0]['art_crop']

In [None]:
# looks like we cant cut down on the number of descriptions we need to get maybe we can run this locally to avoid some costs


In [32]:
from transformers import pipeline

image_to_text = pipeline("image-to-text", model="nlpconnect/vit-gpt2-image-captioning")

image_to_text("https://ankur3107.github.io/assets/images/image-captioning-example.png")




[{'generated_text': 'a soccer game with a player jumping to catch the ball '}]

In [10]:
# let's try it on the first row of the dataset
image_to_text(get_image_url(new_data.iloc[0]))



[{'generated_text': 'a painting of a bird with a fish in its mouth '}]

In [28]:
# let's try it on the first 5 rows of the dataset and time it
import time
start = time.time()
new_data['art_description'] = new_data.iloc[:5].apply(lambda row: image_to_text(get_image_url(row))[0]['generated_text'], axis=1)
end = time.time()
print(end - start)




3.392888069152832


In [29]:
new_data['art_description'].head()

0       a painting of a bird with a fish in its mouth 
1        a woman in a costume is posing for a picture 
3       a painting of a woman in a bikini with a fish 
4        a river with a bunch of birds flying over it 
5    two women in a park with a painting of a man a...
Name: art_description, dtype: object

In [33]:
# lets isolate the first row and see what the model is returning
new_data.iloc[0]['art_description']

'a painting of a bird with a fish in its mouth '

In [None]:
# This model seems to create an issue with microsoft threat protection so I am restricted to running on my laptop at the moment

In [None]:
# lets load everything and see how far it gets in the night

In [None]:
# we want to batch the job up into 100 rows at a time, retreive the descriptions from the pipeline and store them in the 'art_description' column of the dataframe. After each batch we will save the dataframe to a json file so we can pick up where we left off if the job is interrupted.

for i in range(0, new_data.shape[0], 1000):
    start = time.time()
    new_data['art_description'] = new_data.iloc[i:i+1000].apply(lambda row: image_to_text(get_image_url(row))[0]['generated_text'], axis=1)
    end = time.time()
    print(end - start)
    new_data.to_json(f'./data/{i}.json')



632.1368598937988




In [None]:
# It ran overnight. I should have put a timer in there to see how long it took to run.
# looking at file creation times it looks like it ran from 10:00pm to 2:30am so 4.5 hours. Not too bad considering it was running on my laptop.


In [1]:
# The jupyter notebook seems to be having some issues displaying tables. so let's test loading in the data from the json files and compare it to the data variable
import pandas as pd

new_data = pd.read_json('./data/25400.json')

In [None]:
# now we can compare the dataframes
data.equals(new_data)

In [None]:
# looks like the dataframes are the same. So now I can confidently restart the kernel and load in the data from the json files

In [2]:
# lets peek at the first 5 rows of the art_description column
new_data.shape

(25438, 13)

In [5]:
new_data['art_description'].head()

0    None
1    None
3    None
4    None
5    None
Name: art_description, dtype: object