In [1]:
!pip install openai
import requests
import openai
import numpy as np
import json
import random
import pandas as pd
import pickle
from pathlib import Path
from PIL import Image
from typing import List, Tuple
from io import BytesIO

import tensorflow as tf

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting openai
  Downloading openai-0.25.0.tar.gz (44 kB)
[K     |████████████████████████████████| 44 kB 3.5 MB/s 
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
Collecting pandas-stubs>=1.1.0.11
  Downloading pandas_stubs-1.5.2.221124-py3-none-any.whl (146 kB)
[K     |████████████████████████████████| 146 kB 57.8 MB/s 
Collecting types-pytz>=2022.1.1
  Downloading types_pytz-2022.6.0.1-py3-none-any.whl (4.7 kB)
Building wheels for collected packages: openai
  Building wheel for openai (PEP 517) ... [?25l[?25hdone
  Created wheel for openai: filename=openai-0.25.0-py3-none-any.whl size=55880 sha256=b6b3ab9df5d188fd2f7500fe23d7e3afef93a833eb6a5550cef99aed2747b7fd
  Stored in directory: /root/.cache/pip/wheels/4b/92/33/6f57c7aae0b16875267999a50570e81f15eecec577ebe

In [11]:
DATA_DIR = Path("gdrive/MyDrive/data/")


In [2]:
from google.colab import drive

drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
labels = pd.read_csv("labels.csv")

In [4]:

def query_dalle(prompt, n=1):
  return openai.Image.create(
    prompt=prompt,
    n=n,
    size="256x256"
  )

def read_json(path) -> dict:
    with open(path) as f:
        return json.load(f)
      

config = read_json("openai_config.json")
openai.api_key = config["key"]



In [9]:
MONK = {
    "monk_1": "very pale",
    "monk_2": "pale",
    "monk_3": "light",
    "monk_4": "olive",
    "monk_5": "light brown",
    "monk_6": "brown",
    "monk_7": "dark brown",
    "monk_8": "dark",
    "monk_9": "black",
    "monk_10": "very black"
}

def create_prompt(age, gender, skin_tone) -> str: 
  age_min, age_max = age.split("_")
  new_age = random.randint(a=int(age_min), b=int(age_max))
  monk = MONK[skin_tone] 
  base_prompt =  "a face photo of a XXX year old, YYY-skinned ZZZ, photo-realistic"
  return base_prompt.replace("XXX", str(new_age)).replace("YYY", monk).replace("ZZZ", gender)



In [10]:
import itertools

combis = list(itertools.product(labels["age"].dropna().unique(), ["male", "female"], MONK.keys()))
prompts = {(age, gender, skin_tone): create_prompt(age=age, gender=gender, skin_tone=skin_tone) for age, gender, skin_tone in combis}

{('18_30', 'male', 'monk_1'): [1], ('18_30', 'male', 'monk_2'): [], ('18_30', 'male', 'monk_3'): [], ('18_30', 'male', 'monk_4'): [], ('18_30', 'male', 'monk_5'): [], ('18_30', 'male', 'monk_6'): [], ('18_30', 'male', 'monk_7'): [], ('18_30', 'male', 'monk_8'): [], ('18_30', 'male', 'monk_9'): [], ('18_30', 'male', 'monk_10'): [], ('18_30', 'female', 'monk_1'): [], ('18_30', 'female', 'monk_2'): [], ('18_30', 'female', 'monk_3'): [], ('18_30', 'female', 'monk_4'): [], ('18_30', 'female', 'monk_5'): [], ('18_30', 'female', 'monk_6'): [], ('18_30', 'female', 'monk_7'): [], ('18_30', 'female', 'monk_8'): [], ('18_30', 'female', 'monk_9'): [], ('18_30', 'female', 'monk_10'): [], ('0_17', 'male', 'monk_1'): [], ('0_17', 'male', 'monk_2'): [], ('0_17', 'male', 'monk_3'): [], ('0_17', 'male', 'monk_4'): [], ('0_17', 'male', 'monk_5'): [], ('0_17', 'male', 'monk_6'): [], ('0_17', 'male', 'monk_7'): [], ('0_17', 'male', 'monk_8'): [], ('0_17', 'male', 'monk_9'): [], ('0_17', 'male', 'monk_10'):

In [25]:
from tqdm import tqdm
import time

responses = {cats: [] for cats in prompts}
for cats, prompt in tqdm(prompts.items()):
  try:
    response = query_dalle(prompt)
    responses[cats].append(response["data"])
  except openai.APIError:
    print("caught an error!")
    time.sleep(10)
    try:
      response = query_dalle(prompt)
      responses[cats].append(response["data"])
    except openai.APIError as e:
      print("another one")
      raise e 
  time.sleep(5)  

100%|██████████| 80/80 [14:15<00:00, 10.70s/it]


In [35]:
def read_img(path, img_size: Tuple[int, int]=(224,224)) -> Image.Image:
    return Image.open(path).convert("RGB").resize(img_size)

def read_img_url(url: str) -> Image.Image:
    response = requests.get(url)
    return read_img(BytesIO(response.content))

imgs = {cat: read_img_url(respons[0][0]["url"]) for cat, respons in responses.items()}

In [43]:
img_array = np.array([np.array(img) for img in imgs.values()])
img_array.shape


(80, 224, 224, 3)

In [46]:
import pickle 
from pathlib import Path


with open(DATA_DIR / "dalle_img_dict.pkl", "wb") as f:
  pickle.dump(imgs, f)

In [3]:
def create_vgg16():
    # download VGG-16 with fully connected layers
    vgg = tf.keras.applications.vgg16.VGG16(
        include_top=True,
        weights="imagenet",
        input_shape=(224, 224, 3),
        pooling=None,
        classes=1000,
        classifier_activation="softmax",
    )
    # remove the classification layer
    new_model = tf.keras.models.Sequential()
    for layer in vgg.layers[:-1]:
        new_model.add(layer)
    inputs = tf.keras.layers.Input(shape=(224, 224, 3))
    x = tf.keras.applications.vgg16.preprocess_input(inputs)
    outputs = new_model(x)
    final_vgg = tf.keras.Model(inputs, outputs)
    return final_vgg

In [5]:
vgg16 = create_vgg16()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5


In [12]:
def read_pickle(path: Path): 
  with open(path, "rb") as f:
    return pickle.load(f)

img_dict = read_pickle(DATA_DIR / "dalle_img_dict.pkl")

In [13]:
img_arr = np.array([np.array(img) for img in img_dict.values()])

In [15]:
dalle_preds = vgg16.predict(img_arr)
assert dalle_preds.shape[0] == 80



In [86]:
from scipy.spatial import KDTree
from collections import Counter
from typing import Dict, Tuple, List

def most_common(arr) -> int:
  counts = np.bincount(arr.astype(int))
  return np.argmax(counts)


def map_to_int(x: List[str]) -> Tuple[np.array, Dict[int, str]]:
    """Map a list of strings to a list of ints and a dict mapping ints to strings."""
    mapping = {s: i for i, s in enumerate(set(x))}
    return np.array([mapping[s] for s in x]), mapping

def invert_dict(d: dict) -> dict:
  return {v: k for k, v in d.items()}

class DalleKNN:
  def __init__(self, labels: List[Tuple[str, str, str]]):
    self.labels = labels
    self.ages, self.age_map = map_to_int([x[0] for x in self.labels])
    self.genders, self.gender_map =  map_to_int([x[1] for x in self.labels])
    self.skin_tones, self.skin_tone_map =  map_to_int([x[2] for x in self.labels])
    assert len(np.unique(self.skin_tones)) == 10, "mismatch!"
    self.age_map = invert_dict(self.age_map)
    self.gender_map = invert_dict(self.gender_map)
    self.skin_tone_map = invert_dict(self.skin_tone_map)
    
  def fit(self, dalle_preds: np.ndarray):
    self.kdtree = KDTree(dalle_preds)
  
  def find_match(self, new_img, k=3) -> Tuple[str, str, str]:
    dist, idx = self.kdtree.query(new_img, k=k)
    age_pred = most_common(self.ages[idx])
    gender_pred = most_common(self.genders[idx])
    skin_tone_pred = most_common(self.skin_tones[idx])
    return self.age_map[age_pred], self.gender_map[gender_pred], self.skin_tone_map[skin_tone_pred]
  


In [56]:
most_common(map_to_int([x[0] for x in cat_list]))

0

In [31]:
test_preds = np.load(DATA_DIR / "vgg_test_preds.npy")

In [87]:
cat_list = [cat for cat in img_dict]

test_knn = DalleKNN(cat_list)
test_knn.fit(dalle_preds)
test_knn.find_match(dalle_preds[0])

('0_17', 'male', 'monk_1')

In [79]:
test_labels = pd.read_csv(DATA_DIR / "test_labels.csv", index_col=0)
test_imgs = np.load(DATA_DIR / "vgg_test_preds.npy")


In [88]:
age_preds = []
gender_preds = []
skin_preds = []

for (i, row), test_img in zip(test_labels.iterrows(), test_imgs):
  agepred, genderpred, skinpred = test_knn.find_match(test_img)
  age_preds.append(agepred)  
  gender_preds.append(genderpred)  
  skin_preds.append(skinpred)  


In [95]:
from sklearn.metrics import confusion_matrix, accuracy_score

In [None]:
def getScore(results):
    acc = results['accuracy']
    disp = results['disparity']
    ad = 2*acc['gender']*(1-disp['gender']) + 4*acc['age']*(1-disp['age']**2) + 10*acc['skin_tone']*(1-disp['skin_tone']**5)
    return ad

  results 

In [None]:
#np.save("gdrive/MyDrive/data/dalle_imgs_preprocessed.npy", processed_imgs)
processed_imgs = np.load("gdrive/MyDrive/data/dalle_imgs_preprocessed.npy")

In [None]:
preds = new_model.predict(processed_imgs)



In [None]:
preds.shape

(79, 4096)

In [None]:
import pickle

with open("gdrive/MyDrive/data/dalle_imgs.pkl", "wb") as f:
  pickle.dump(imgs, f)

In [None]:
# Now lets try to load all the (test) imgs
!unzip -q gdrive/MyDrive/data_bb1_img_recognition.zip -d .

In [None]:
test_labels = pd.read_csv(next(Path("test").glob("*.csv")))
test_imgs = [tf.keras.utils.load_img(Path("test") / path, target_size=(224, 224)) for path in test_labels["name"]]
test_preprocessed = [preprocess(np.asarray(img)) for img in test_imgs]

In [None]:
test_preds = new_model.predict(np.array(test_preprocessed))



In [None]:
np.save("gdrive/MyDrive/data/vgg_test_preds.npy", test_preds)
np.save("gdrive/MyDrive/data/vgg_dalle_preds.npy", preds)


# Pedal to the metal

In [None]:
import pandas as pd
import numpy as np
import pickle

def read_pickle(path):
    with open(path, "rb") as f:
        return pickle.load(f)

dalle_preds = np.load("../train/vgg_dalle_preds.npy")
test_preds = np.load("../train/vgg_test_preds.npy")
test_labels = pd.read_csv("../test/labels.csv", index_col=0)
dalle_imgs = read_pickle("../train/dalle_imgs.pkl")

In [None]:
from scipy.spatial import KDTree

tree = KDTree(dalle_preds)
dist, ind = tree.query(test_preds, k=5)



In [None]:
black_test = test_labels[test_labels["skin_tone"] == "monk_10"].sample(1)
black_test

Unnamed: 0,name,skin_tone,gender,age,real_face
1475,TEST1475.png,monk_10,male,0_17,1.0


In [None]:
test_