In [3]:
import os
from openai import OpenAI
import base64
import requests
import pathlib
import json
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
import matplotlib.image as mpimg
from random import shuffle
import random
import torch
import clip
from PIL import Image
from collections import defaultdict, Counter
import matplotlib.pyplot as plt

# GPT

In [2]:
api_key = os.getenv('OPENAI_API_KEY')

In [3]:
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {api_key}"
}

In [4]:
client = OpenAI()

# Task definition

In [34]:
DATA_DIR = 'england_epl/2014-2015'
FOLDER = ('1_frames_actions', '2_frames_actions')
ACTION = ["Indirect free-kick", "Throw-in", "Foul", "Shots off target", "Shots on target", "Goal", "Corner", "Direct free-kick", "Penalty"]
INTERVAL = 2
PROMPT = ''
CLIP_THRESHOLD = 0.78

In [6]:
clip_L, clip_L_preprocess = clip.load("ViT-L/14", device='cuda')

In [7]:
def compute_similarity(image1, image2, model=clip_L, preprocess=clip_L_preprocess):
    image1_preprocess = preprocess(Image.open(image1)).unsqueeze(0).to('cuda')
    image1_features = model.encode_image( image1_preprocess)

    image2_preprocess = preprocess(Image.open(image2)).unsqueeze(0).to('cuda')
    image2_features = model.encode_image( image2_preprocess)
    
    cos = torch.nn.CosineSimilarity(dim=0)
    similarity = cos(image1_features[0],image2_features[0]).item()
    #print("Image similarity", similarity)
    return similarity

In [13]:
def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

In [9]:
def gpt_inference_two(image_1, image_2, prompt):
    en_1  = encode_image(image_1)
    en_2 = encode_image(image_2)
    payload = {
          "model": "gpt-4-turbo",
          "messages": [
            {
              "role": "user",
              "content": [
                {
                  "type": "text",
                  "text": prompt
                },
                {
                  "type": "image_url",
                  "image_url": {
                    "url": f"data:image/jpeg;base64,{en_1}"
                  }
                },
                {
                  "type": "image_url",
                  "image_url": {
                    "url": f"data:image/jpeg;base64,{en_2}"
                  }
                }
              ]
            }
          ],
          "max_tokens": 300
        }
    response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
    answer = response.json()['choices'][0]['message']['content']
    return answer

In [60]:
tmp = []
for i in range(10):
    tmp.append(random.randint(0, 1))
    

In [61]:
from collections import Counter
Counter(tmp)

In [43]:
results = []
f, axarr = plt.subplots(1, 2, figsize=(15, 15))
for match in os.scandir(DATA_DIR):
    if match.is_dir():
        for folder in os.scandir(match):
            if folder.name in FOLDER:
                pair_list = list(os.scandir(folder))
                shuffle(pair_list)
                #pair_list.sort(key=lambda x: x.name)
                for pair in pair_list:
                    if pair.name.split("_")[1] in ACTION:
                        tmp = {}
                        print(pair)
                        first = f'{pair.path}/{pair.name.split("_")[0]}.png'
                        second_index = "{:0{width}d}".format(int(pair.name.split("_")[0]) + INTERVAL, width=len(pair.name.split("_")[0]))
                        second = f'{pair.path}/{second_index}.png'
                        c_score = compute_similarity(first, second)
                        print(f'CLIP: {c_score}')
                        if c_score < CLIP_THRESHOLD:
                            continue
                        
                        rand_1 = random.randint(0, 1)
                        
                        if rand_1 == 0:
                            tmp['left'] = first
                            tmp['right'] = second
                            tmp['answer'] = 'Left'
                        else:
                            tmp['left'] = second
                            tmp['right'] = first
                            tmp['answer'] = 'Right'
                        tmp['CLIP'] = c_score
                        tmp['time'] = pair.name.split("_")[0]
                        tmp['action'] = pair.name.split("_")[1]
                        prompt = f'''
                        These are two frames related to {pair.name.split("_")[1]} in a soccer match. 
                        Which frame happens first. Please only return one option from (Left, Right, None) 
                        without any other words. If these two frames are exactly the same, select None. 
                        Otherwise, choose Left if the first frame happens first and select Right 
                        if the second frame happens first. 
                        '''
                        answer = gpt_inference_two(tmp['left'], tmp['right'], prompt)
                        tmp['gpt_answer'] = answer
                        print(tmp)
                        axarr[0].imshow(mpimg.imread(tmp['left']))
                        axarr[1].imshow(mpimg.imread(tmp['right']))
                        break
                break
        
    

In [None]:
with open(f'{DATA_DIR}.json', 'w') as fout:
    json.dump(results, fout)

# results

In [87]:
acc_action = {}
for i in analysis:
    acc_action[i] = analysis[i] / Counter(action_count)[i]

In [88]:
acc_action

In [49]:

RESULTS = 'results/soccernet/'
all_result = []
league_results = {}
for league in os.scandir(RESULTS):
    tmp = []
    for year in os.scandir(f'{RESULTS}/{league.name}'):
        if year.is_dir():
            f = open(f'{RESULTS}/{league.name}/{year.name}/results_gpt.json')
            result = json.load(f)
            all_result += sum(list(result.values()), [])
            tmp +=  sum(list(result.values()), [])
    league_results[league.name] = tmp


analysis = defaultdict(int)
action_count = []
gt_count = []
pred_count = []
error = []
league_acc = {}
league_count = {}
for league_name in league_results:
    count = 0
    correct = 0
    league_list = league_results[league_name]
    for pair in league_list:
        if pair['gpt_answer'] not in ('Left', "Right"):
            continue
        count += 1
        correct += pair['answer'] == pair['gpt_answer']

    league_acc[league_name] = correct/count
    league_count[league_name]  = count

for pair in all_result:
    if pair['gpt_answer'] not in ('Left', "Right"):
        # print(pair)
        error.append(pair)
        continue
    analysis[pair['action']] += pair['answer'] == pair['gpt_answer']
    gt_count.append(pair['answer'])
    pred_count.append(pair['gpt_answer'])
    action_count.append(pair['action'])


print(total_acc)
print(len(error))
print(Counter(action_count))
print(league_acc)
print(league_count)
print(Counter(pred_count))
print(Counter(gt_count))

In [50]:
error

In [46]:
acc_action = {}
for i in analysis:
    acc_action[i] = analysis[i] / Counter(action_count)[i]

In [47]:
acc_action

In [48]:
plt.bar(range(len(acc_action)), list(acc_action.values()), align='center')
plt.xticks(range(len(acc_action)), list(acc_action.keys()), rotation=90)
plt.ylabel("Accuracy")
plt.title('Engand')
plt.show()