In [None]:
import csv
from PIL import Image  
import gdown
import os
import numpy as np
import tqdm
import json
import traceback
import multiprocessing

In [None]:
csf_file = "/home/oleksandr/projects/upwork/esov-api/tmp/YouSov NFT randomization - NFT Randomization.csv"
rows = []
with open(csf_file, newline='') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
        print(row)
        rows.append(row)

In [None]:
def download_image_gdown(url, img_name):
    if not url:
        return None
    if not os.path.exists(img_name):
        print(f"Downloading image {img_name} from {url}")
        gdown.download(url, img_name, quiet=False, fuzzy=True, use_cookies=True)
    # else:
    #     print(f"Image {img_name} already exists")
    return Image.open(img_name)
    

def parser_row(prefix, out_fld, color_idx = None, img_idx = 3, filename=None):
    print("Parsing rows with prefix:", prefix)
    rows_filtered = [r for r in rows if prefix in r[0]]
    parsed = []
    out_imgs_folder = f"{out_fld}/{filename}"
    os.makedirs(out_imgs_folder, exist_ok=True)
    for i, row in enumerate(rows_filtered):
        img_name = f"{out_imgs_folder}/{prefix}_{i}.png"
        parsed.append({
            "caption": row[0],
            "probability": float(row[1].replace("%", "").replace(",","."))/100,
            "name": row[2],
            "gdrive_path": row[img_idx],
            "color": None if color_idx is None else row[color_idx],
            "pillow_img": download_image_gdown(row[img_idx], img_name),
        })
    sum_probs = sum([r["probability"] for r in parsed])
    print("Sum of probabilities:", sum_probs)
    json_filename = f"{out_fld}/{filename}.json"
    with open(json_filename, "w") as jsonfile:
        jsonfile.write(json.dumps({
            "sum_probs": sum_probs,
            "prefix": prefix,
            "data": [{
                "caption": r["caption"],
                "probability": r["probability"],
                "name": r["name"],
                "color": r["color"],
                "gdrive_path": r["gdrive_path"] if r["pillow_img"] is not None else None,
                "img_path": f"{prefix}/{prefix}_{i}.png" if r["pillow_img"] is not None else None,
            } for i, r in enumerate(parsed)]
        }, indent=4))
    return parsed

folder_meta = "/home/oleksandr/projects/upwork/esov-api/tmp/out_meta"
meta_L1 = parser_row("L1", folder_meta, filename="L1")
meta_L2 = parser_row("[", folder_meta, filename="L2")
meta_L3 = parser_row("L3", folder_meta, color_idx=3, img_idx=4, filename="L3")
meta_L4 = parser_row("L4", folder_meta, color_idx=3, img_idx=4, filename="L4")

In [None]:
loaded_L1 = json.load(open(f"{folder_meta}/L1.json"))
loaded_L2 = json.load(open(f"{folder_meta}/L2.json"))
loaded_L3 = json.load(open(f"{folder_meta}/L3.json"))
loaded_L4 = json.load(open(f"{folder_meta}/L4.json"))

In [None]:
def load_prob_map(meta):
    # order meta by probability
    print("===Loading prob map")
    meta_sorted = sorted(meta, key=lambda x: x["probability"], reverse=False)
    buckets = {}
    buckets_count = 0
    min_prob = meta_sorted[0]["probability"]
    max_prob = meta_sorted[-1]["probability"]
    sum_prob = 0
    for i, p in enumerate(meta_sorted):
        prob = p["probability"]
        p["idx"] = i
        if not prob in buckets:
            buckets_count += 1
            buckets[prob] = {
                "probability": 0,
                "item_probability": prob,
                "idx": buckets_count,
                "items": {}
            }
        bucket = buckets[prob]
        sum_prob += prob
        bucket["probability"] += prob
        bucket["lower_bound"] = meta_sorted[i]["probability"]
        bucket["upper_bound"] = meta_sorted[i+1]["probability"] if i < len(meta_sorted)-1 else meta_sorted[i]["probability"]
        bucket["items"][i] = p

    print(f"Total buckets: {buckets_count}")
    print(f"Sum of probabilities: {sum_prob}")
    print(f"Min prob: {min_prob}, max prob: {max_prob}")
    print("===Prob map loaded")
    return {
        "buckets": buckets,
        "sorted_meta": meta_sorted,
        "min_prob": min_prob,
        "max_prob": max_prob,
        "sum_prob": sum_prob,
    }

def get_random_item(buckets, random_val):
    for prob in buckets.values():
        if prob["lower_bound"] <= random_val < prob["upper_bound"]:
            items = prob["items"]
            # get rand item
            rand_item_idx = np.random.choice(list(items.keys()))
            return items[rand_item_idx], prob
        
prob_map_L1 = load_prob_map(loaded_L1["data"])
prob_map_L2 = load_prob_map(loaded_L2["data"])
prob_map_L3 = load_prob_map(loaded_L3["data"])
prob_map_L4 = load_prob_map(loaded_L4["data"])

def benchmark(probmeta, times, label=""):
    try:
        print(f"Running benchmark {label}")
        buckets = probmeta["buckets"]
        sorted_meta = probmeta["sorted_meta"]
        min_prob = probmeta["min_prob"]
        max_prob = probmeta["max_prob"]
        hits = {s["idx"]: 0 for s in sorted_meta}
        print(f"Min prob: {min_prob}, max prob: {max_prob}")
        for i in tqdm.tqdm(range(times)):
            probabilities = [p["probability"] for p in buckets.values()]
            bucket_rolled = np.random.choice(list(buckets.values()), p=probabilities)
            random_item = np.random.choice(list(bucket_rolled["items"].values()))
            hits[random_item["idx"]] += 1
        # validate that each bucket hit correctly by its probability
        print(f"Validation, rolling {times} times")
        total_deviation = 0
        max_deviation = 0
        min_deviation = 100
        for p in sorted_meta:
            prob = p["probability"]
            caption = p["caption"]
            total_hits = hits[p["idx"]]
            hits_prob = total_hits / times
            deviation = (abs(prob-hits_prob)/prob)*100
            total_deviation += deviation

            if deviation > max_deviation:
                max_deviation = deviation

            if deviation < min_deviation:
                min_deviation = deviation

            print(f"Item name: {caption}, Hits {total_hits}/{times}  Probability: {prob}, Real Probability: {hits_prob}, deviation %: {deviation}")
        print(f"Min deviation %: {min_deviation}")
        print(f"Max deviation %: {max_deviation}")
        print(f"Average deviation %: {total_deviation/len(sorted_meta)}")
        print("Validation done")
        

    except Exception as e:
        print(traceback.format_exc())

# benchmark(prob_map_L1, 100000)

rolls = 1000000
benchmark(prob_map_L1, rolls, "L1")
benchmark(prob_map_L2, rolls, "L2")
benchmark(prob_map_L3, rolls, "L3")
benchmark(prob_map_L4, rolls, "L4")
    



