In [None]:
import json

import query
import prompt as P
from parsers import json_parser, table_parser
from logger import Logger
from llm import ChatGPT
import defines as D
import data_formatter as DF

%load_ext autoreload
%autoreload 2

PROJECT_ID = "Enclave"
LORE = """A medieval fantasy world. Only one God exists (the god is non-gendered and called "the Old One") but people believes also in local beliefs. No undead. A single big nation (empire) on a unique continent the size of europe. """
GROUP_DESCRIPTION = "Near an ancient forest, far from the city, a small quiet and prosperous village surrounded by a lush valley. There are ruins in the forest, and the village is near a river. Mountains can be seen in the distance. The village was founded centuries ago by groups of scholars and adventurers who came to study a long forgotten event. The legacy of the ancient settlers still remains faintly in the village, and the ruins of the ancient forest remain a mystery. Villains, monsters and creatures are pretty rare in the area, so the adventurers are not many but the village is still protected by a small militia and trade is going well."
GROUP_DESCRIPTION = "Near a big calm lake, far from the city, a small quiet and prosperous village surrounded by a mountain range. The village was founded centuries ago by groups of strangers. The legacy of the ancient settlers still remains faintly in the village. Monsters and creatures are lurking in the mountains, so the villagers know how to defend themselves."
GROUP_DESCRIPTION = "Mistwood is an hamlet nestled within a dense forest shrouded in perpetual mist, this small village exudes an ethereal charm. Cottages with thatched roofs line cobblestone streets, adorned with colorful flowers. The villagers, known for their mystical abilities, harness the power of the forest. Luminescent creatures flit between trees, while wisps of enchantment linger in the air. A mystical well at the village center grants healing waters, drawing pilgrims seeking rejuvenation. The village thrives in harmony with nature, its inhabitants practicing ancient arts. Tales of hidden groves, magical artifacts, and the occasional fae encounter make this village a treasure trove of enchantment."
GROUP_DESCRIPTION = "Deep within the heart of the mountain, a small group of resilient dwarf miners toils tirelessly in the depths of the earth. Their home, Shadowvein hall is a testament to their unwavering dedication to their craft. Carved from the living rock, the hall is adorned with shimmering crystals and veins of precious ore, casting a warm, golden glow. The rhythmic sound of pickaxes echoes through the tunnels as the miners unearth rich veins of silver, gold, and gemstones. In the flickering light of the forges, skilled artisans shape the raw materials into intricate works of art, crafting exquisite jewelry and ornate weapons. Despite the dangers that lurk within the dark tunnels, these stout-hearted dwarves stand united, their camaraderie and resilience forming the bedrock of their enduring legacy."
GROUP_DESCRIPTION = "Concealed within the depths of a mountain forest, a small elven settlement lies hidden from prying eyes. Accessed only by winding paths and secret passages, this ethereal haven emanates an air of enchantment and mystique. Elven dwellings, intricately woven from living vines and adorned with vibrant blossoms, blend seamlessly with the surrounding flora. Soft sunlight filters through the dense canopy, casting dappled patterns upon moss-covered paths. The residents, graceful and attuned to nature, practice ancient elven arts, weaving intricate spells and communing with forest spirits. Echoes of elven songs and laughter drift through the glades, while hidden groves reveal shimmering pools where the moon's reflection reveals ancient prophecies. Silvanvale Refuge stands as a sanctuary of elven wisdom, a place where time seems to slow and the harmony of the natural world thrives undisturbed."
GROUP_DESCRIPTION = "Ravenmoor City: Rising like a foreboding monolith from the surrounding landscape, Ravenmoor City looms with an air of mystery and trepidation. Its towering spires pierce the darkened sky, casting elongated shadows over the sprawling suburbs. The city sprawls along the banks of a vast, murky lake, its waters reflecting the city's somber atmosphere. The cobblestone streets are littered with refuse and grime, intertwining with the stench of rot and decay that hangs heavy in the air."
GROUP_DESCRIPTION = "Nestled in the countryside near a vast lake, Lakeview Haven stands as a bustling town along a major trading route, attracting adventurers seeking glory amidst the perilous plains and forest infested with green-skinned creatures. While the town thrives with prosperity and vibrant markets, its suburban outskirts tell a different tale—dirty and dangerous alleyways where poverty-stricken residents struggle amidst dilapidated buildings, casting a shadow over the town's otherwise flourishing reputation as a beacon of hope in the wilderness."
GROUP_DESCRIPTION = "Deep within the heart of the formidable mountains lies The Forgotten Enclave, a resilient community eking out an existence within the vast ruins of a once-majestic city. Cut off from the outside world, these settlers and beggars have embraced their precarious existence, fashioning crude shelters amidst crumbling architecture and overgrown foliage. Silent whispers of a lost civilization echo through the labyrinthine streets, while weathered statues and dilapidated structures stand as testament to a grandeur long past. Life in The Forgotten Enclave is a constant struggle for survival, as residents scavenge for meager resources, relying on their resourcefulness and communal bonds to endure within this hauntingly beautiful, forgotten realm."
WORLD_TYPE = "Medieval fantasy"
INSTRUCTION = f"You are an assistant who generates on-demand data for a tabletop {WORLD_TYPE} role-playing game"
SCALE = "local"
TYPE = "small community"
POPULATION = 80
# SEED = 42
# ratio = {"human": 0.75, "elf": 0.05, "half-elf": 0.05, "dwarf": 0.1, "halfling": 0.025, "gnome": 0.025}
RACE_RATIO = {"human": 0.6, "half-elf": 0.15, "elf":0.05, "dwarf": 0.15, "halfling": 0.025, "gnome": 0.025}

Logger = Logger()

FORCE_HUMAN_READABLE_CACHE = True


with open("openai.key", "r") as f:
    api_key = f.readline().strip()
    api_id = f.readline().strip()
    
LLM = ChatGPT(Logger, INSTRUCTION, model_name="gpt-3.5-turbo", api_key=api_key, api_id=api_id)
Q = query.LLM_Query(LLM, Logger)


# Bootstrap
content = DF.bootstrap_content(
    PROJECT_ID, LORE, TYPE, SCALE, POPULATION, GROUP_DESCRIPTION, WORLD_TYPE)
prompt = P.prompt_bootstrap(content)
Logger.log(prompt, id=PROJECT_ID, type="bootstrap_prompt")

new_content = Q.query(prompt, json_parser, id=PROJECT_ID, type="bootstrap", force_human_readable_cache=FORCE_HUMAN_READABLE_CACHE)
Logger.log_json(new_content, id=PROJECT_ID, type="bootstrap")

content = DF.format_bootstrap_query(content, new_content)

# Generate details
# TODO multithread
categories_to_details = ["customs", "resources", "history", "external_influences", "timeline", "sites", "anecdotes"]
for category in categories_to_details:
    # print(category)
    prompt = P.prompt_category(content, category)
    new_content = Q.query(prompt, json_parser, id=PROJECT_ID, type=f"{category}", force_human_readable_cache=FORCE_HUMAN_READABLE_CACHE)
    Logger.log(prompt, id=PROJECT_ID, type=f"{category}_prompt")
    Logger.log_json(new_content, id=PROJECT_ID, type=f"{category}")

    content = DF.format_category_query(content, new_content, category)


prompt = P.prompt_wk2(content)
Logger.log(prompt, id=PROJECT_ID, type=f"wk2_prompt")
new_content = Q.query(prompt, table_parser, id=PROJECT_ID, type="wk2", force_human_readable_cache=FORCE_HUMAN_READABLE_CACHE)
Logger.log_json(new_content, id=PROJECT_ID, type="wk2")
# content["workplaces"] = new_content
print(len(new_content))
sum_pop_needed = sum([int(v["population"]) for v in new_content])
print(sum_pop_needed)
content["workplaces"] = new_content


with open(f"output/{PROJECT_ID}.json", "w") as f:
    json.dump(content, f, indent=4, sort_keys=True)


In [None]:
import random
import numpy as np
from numpy import random as npr
import json
import generator as G
import os
%load_ext autoreload
%autoreload 2
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.ERROR)

random.seed(42)
np.random.seed(42)


ind_filename = os.path.join("output", f"{PROJECT_ID}_individuals_pool.json")
group_filename = os.path.join("output", f"{PROJECT_ID}_groups_pool.json")
# print(ind_filename, group_filename)

individuals_pool = None
if os.path.exists(ind_filename):
    with open(ind_filename, "r") as infile:
        individuals_pool = json.load(infile)
        print("loading inds from file")

groups_pool = None
if os.path.exists(group_filename):
    with open(group_filename, "r") as infile:
        groups_pool = json.load(infile)
        print("loading groups from file")

# generate reference population (family and solos)
while (individuals_pool is None) or (groups_pool is None) or len(individuals_pool) < sum_pop_needed:
    print("Generating pools...")
    natives, outsiders, groups = G.get_native_populations(content)
    families = G.family_distribution(natives*1.4, RACE_RATIO)
    f_individuals = {f"{m['name']} {m['surname']}": m for g in families for m in g["members"]}
    solos = G.outsiders_distribution(outsiders*1.4, f_individuals, RACE_RATIO)
    s_individuals = {f"{m['name']} {m['surname']}": m for g in solos for m in g["members"]}
    individuals = {**f_individuals, **s_individuals}
    individuals_pool = individuals.copy()
    groups_pool = [*families.copy(), *solos.copy()]

with open(ind_filename, "w") as f:
    json.dump(individuals_pool, f, indent=4, sort_keys=True)

with open(group_filename, "w") as f:
    json.dump(groups_pool, f, indent=4, sort_keys=True)

print("Individuals pool", len(individuals_pool))
print("Groups pool", len(groups_pool))


# Distribute groups to activities
workplaces = content["workplaces"]
group_list = groups_pool.copy()

grp_per_wk_filename = os.path.join("output", f"{PROJECT_ID}_grp_per_wk.json")
grp_per_wk = None
if os.path.exists(grp_per_wk_filename):
    with open(grp_per_wk_filename, "r") as infile:
        grp_per_wk = json.load(infile)
        print("loading grp_per_wk from file")
else:
    grp_per_wk = {}
    for wk in workplaces:
        chosen_groups, chosen_details = G.preferential_group_selection(wk["composition"].lower(), wk["type"].lower(), wk["ages"].lower(), int(wk["population"]), group_list, individuals_pool)
        grp_per_wk[wk["name"]] = [c[0] for c in chosen_details]

    with open(grp_per_wk_filename, "w") as f:
        json.dump(grp_per_wk, f, indent=4, sort_keys=True)

for grp in groups_pool:
    grp["workplace"] = []

if grp_per_wk is not None:
    for k, v in grp_per_wk.items():
        for grp in groups_pool:
            if grp["name"] in v:
                grp["workplace"] = k


In [None]:
from threading import Thread
import time

threads = []
responses = [[] for _ in range(len(groups_pool))]
group_indexes = {}


def people_group_query(prompt, nam, index, res):

    Logger.log(prompt, id=PROJECT_ID, type=f"{nam}_ingroup_prompt")
    new_content = Q.query(prompt, table_parser, id=PROJECT_ID, type=f"{nam}_ingroup")
    Logger.log_json(new_content, id=PROJECT_ID, type=f"{nam}_ingroup")
    res[index] = [*res[index], *new_content]


BATCH_SIZE = 20
for i, group in enumerate(groups_pool):
    group_indexes[group["name"]] = i
    for subset in range(0, len(group["members"]), BATCH_SIZE):
        group_to_query = group.copy()
        group_to_query["members"] = group_to_query["members"][subset:subset+BATCH_SIZE]
        grp_workplaces = [w for w in content["workplaces"] if w["name"] == group["workplace"]]
        workplace = grp_workplaces[0] if len(grp_workplaces) > 0 else None
        prompt = P.prompt_people_in_groups(content, group_to_query, workplace)
        nam = group["name"]
        cache_content = Q.get_cache_type("project", prompt, table_parser, id=PROJECT_ID, type=f"{nam}_ingroup")

        if cache_content is not None:
            responses[i] = [*responses[i], *cache_content]
            threads.append((i, None))
        else:
            threads.append(
                (i, Thread(target=people_group_query, args=[prompt, nam, i, responses])))
    # break

for i, thread in threads:
    if thread is None:
        continue
    print(f"Starting group query job {i} - {groups_pool[i]['name']}...")
    thread.start()
    time.sleep(40)

print(f"{len(threads)} group query job started...")

for i, thread in threads:
    if thread is None:
        continue
    thread.join()


print("Group query jobs done")
content["groups"] = {grp["name"]: grp for grp in groups_pool}
for k, v in group_indexes.items():
    content["groups"][k]["generated_members"] = responses[v]

# FORMAT / CLEAN RESPONSES
for k, v in content["groups"].items():
    for m in v["generated_members"]:
        found = 0
        for n in v["members"]:
            if n["name"] in m["name"]:
                found += 1
                n.update(m)
                break

    for m in v["members"]:
        if m['surname'] in m['name']:
            m['name'] = m['name'].replace(m['surname'], "").strip()
        m['fullname'] = f"{m['name']} {m['surname']}"

    if "generated_members" in v:
        del v["generated_members"]
    if "detailed_members" in v:
        del v["detailed_members"]
    


with open(f"output/{PROJECT_ID}.json", "w") as f:
    json.dump(content, f, indent=4, sort_keys=True)


In [None]:

employed = {} # {workplace: [members]}
unemployed = {}
employed_filename = os.path.join("output", f"{PROJECT_ID}_employed.json")
unemployed_filename = os.path.join("output", f"{PROJECT_ID}_unemployed.json")

if os.path.exists(employed_filename) and os.path.exists(unemployed_filename):
    with open(employed_filename, "r") as infile:
        employed = json.load(infile)
    with open(unemployed_filename, "r") as infile:
        unemployed = json.load(infile)
else:
    
    for k, v in content["groups"].items():
        v["members"] = [m for m in v["members"] if "structure_preference" in m]
        
    individual_pool = {f"{member['fullname']}": member for _, group in content["groups"].items() for member in group["members"]}
    wk_group_members_pool = {
        wk['name']: {
            grp_name: {f"{member['fullname']}":member for member in grp["members"]}
            for grp_name, grp in content["groups"].items() if wk["name"] in grp["workplace"]
            } for wk in content["workplaces"]
    }
    
    
    for i, workplace in enumerate(content["workplaces"]):
        wk_employed = G.preferential_individual_selection(workplace, individual_pool, wk_group_members_pool)
        employed[workplace["name"]] = wk_employed
        
    unemployed = individual_pool.copy()
    
    with open(employed_filename, "w") as f:
        json.dump(employed, f, indent=4, sort_keys=True)
    with open(unemployed_filename, "w") as f:
        json.dump(unemployed, f, indent=4, sort_keys=True)

print("Employed and unemployed loaded", sum([len(v) for v in list(employed.values())]), len(unemployed))

In [None]:
from threading import Thread
import time

threads = []
responses = [[] for _ in content["workplaces"]]
wk_indexes = {}


def workplace_employees_query(wk, employees, i, responses):
    prompt = P.prompt_workplace_employees(content, wk, employees)
    nam = wk["name"]
    Logger.log(prompt, id=PROJECT_ID, type=f"{nam}_wk_emp_prompt")
    new_content = Q.query(prompt, table_parser, id=PROJECT_ID, type=f"{nam}_wk_emp", force_human_readable_cache=True)
    Logger.log_json(new_content, id=PROJECT_ID, type=f"{nam}_wk_emp")
    responses[i] = new_content


for i, wk in enumerate(content["workplaces"]):
    wk_indexes[wk["name"]] = i
    employees = employed[wk["name"]]
    prompt = P.prompt_workplace_employees(content, wk, employees)
    nam = wk["name"]
    cache_content = Q.get_cache_type("project", prompt, table_parser, id=PROJECT_ID, type=f"{nam}_wk_emp_prompt")

    if cache_content is not None:
        responses[i] = [*responses[i], *cache_content]
        threads.append((i, None))
    else:
        threads.append((i, Thread(target=workplace_employees_query, args=[wk, employees, i, responses])))
    # break

for i, thread in threads:
    if thread is None:
        continue
    print(f"\nStarting group query job {i}", end="...")
    thread.start()
    time.sleep(0.2)
    if len(responses[i]) > 0:
        print(f"Cache found for {i}", end="")
        continue
    time.sleep(25)

print(f"{len(threads)} group query job started...")

for i, thread in threads:
    if thread is None:
        continue
    thread.join()

workplaces_employees = {k: responses[v] for k, v in wk_indexes.items()}
for wk, emp in workplaces_employees.items():
    for m in emp:
        m['fullname'] = m['name']
        del m['name']

content["employees"] = workplaces_employees
with open(f"output/{PROJECT_ID}.json", "w") as f:
    json.dump(content, f, indent=4, sort_keys=True)


In [None]:
from threading import Thread
import time

threads = []
responses = [[] for _ in content["workplaces"]]
wk_indexes = {}


def workplace_details_query(wk, employees, i, responses):
    prompt = P.prompt_workplace_details(content, wk, employees)
    # print(prompt)
    nam = wk["name"]
    Logger.log(prompt, id=PROJECT_ID, type=f"{nam}_wk_dtls_prompt")
    new_content = Q.query(prompt, json_parser, id=PROJECT_ID, type=f"{nam}_wk_dtls")
    Logger.log_json(new_content, id=PROJECT_ID, type=f"{nam}_wk_dtls")
    responses[i] = new_content


for i, wk in enumerate(content["workplaces"]):
    wk_indexes[wk["name"]] = i
    employees = employed[wk["name"]]
    wk_employees = {e["fullname"]:e for e in workplaces_employees[wk["name"]]}
    to_delete = []
    for v in employees:
        if v["fullname"] not in wk_employees:
            to_delete.append(v)
            # print(v["fullname"])
        else:
            v.update(wk_employees[v["fullname"]])
    for v in to_delete:
        employees.remove(v)
    
    prompt = P.prompt_workplace_details(content, wk, employees)
    nam = wk["name"]
    cache_content = Q.get_cache_type("project", prompt, json_parser, id=PROJECT_ID, type=f"{nam}_wk_dtls")
    if cache_content is not None:
        responses[i] = cache_content
        threads.append((i, None))
    else:
        threads.append((i, Thread(target=workplace_details_query, args=[wk, employees, i, responses])))

for i, thread in threads:
    if thread is None:
        continue
    print(f"Starting group query job {i}")
    thread.start()
    time.sleep(25)

print(f"{len(threads)} group query job started...")

for i, thread in threads:
    if thread is None:
        continue
    thread.join()

# print(responses)
workplaces_details = {k: responses[v] for k, v in wk_indexes.items()}
content["wk_details"] = workplaces_details
with open(f"output/{PROJECT_ID}.json", "w") as f:
    json.dump(content, f, indent=4, sort_keys=True)

In [None]:
with open(employed_filename, "w") as f:
    json.dump(employed, f, indent=4, sort_keys=True)

In [None]:
# format sites workplaces

for k, v in content["wk_details"].items():
    new_wk_sites = {}
    #test if sites is a list
    if isinstance(v["sites"], list):
        for obj in v["sites"]:
            # print(obj)
            if "keywords" in obj:
                key_names = [key for key in obj.keys() if key.find('name') != -1]
                if len(key_names) > 0:
                    # print(obj[key_names[0]], obj["keywords"])
                    new_wk_sites[obj[key_names[0]]] = obj["keywords"]
            elif len(obj.keys()) == 1:
                key_name = list(obj.keys())[0]
                # print(key_name, obj[key_name])
                new_wk_sites[key_name] = obj[key_name]
            else:
                for key in obj.keys():
                    print(key, obj[key])
                    new_wk_sites[key] = obj[key]
        v["sites"] = new_wk_sites
        


with open(f"output/{PROJECT_ID}.json", "w") as f:
    json.dump(content, f, indent=4, sort_keys=True)


In [None]:
%load_ext autoreload
%autoreload 2

prompt = P.prompt_architecture_and_sites(content)
Logger.log(prompt, id=PROJECT_ID, type=f"architecture_prompt")
new_content = Q.query(prompt, json_parser, id=PROJECT_ID, type="architecture", force_human_readable_cache=FORCE_HUMAN_READABLE_CACHE)
Logger.log_json(new_content, id=PROJECT_ID, type="architecture")
print(new_content)
content["architecture"] = new_content
with open(f"output/{PROJECT_ID}.json", "w") as f:
    json.dump(content, f, indent=4, sort_keys=True)


In [None]:

threads = []
responses = [[] for _ in content["workplaces"]]
wk_indexes = {}


def workplace_sites_query(wk, wk_details, i, responses):
    prompt = P.prompt_sites(content, wk, wk_details)
    # print(prompt)
    nam = wk["name"]
    Logger.log(prompt, id=PROJECT_ID, type=f"{nam}_wk_site_prompt")
    new_content = Q.query(prompt, json_parser, id=PROJECT_ID, type=f"{nam}_wk_site")
    Logger.log_json(new_content, id=PROJECT_ID, type=f"{nam}_wk_site")
    responses[i] = new_content


for i, wk in enumerate(content["workplaces"]):
    wk_indexes[wk["name"]] = i
    wk_details = content["wk_details"][wk["name"]]
    
    prompt = P.prompt_sites(content, wk, wk_details)
    # print(prompt)
    nam = wk["name"]
    cache_content = Q.get_cache_type("project", prompt, json_parser, id=PROJECT_ID, type=f"{nam}_wk_site")
    if cache_content is not None:
        responses[i] = cache_content
        threads.append((i, None))
    else:
        threads.append((i, Thread(target=workplace_sites_query, args=[wk, wk_details, i, responses])))

for i, thread in threads:
    if thread is None:
        continue
    print(f"Starting group query job {i}")
    thread.start()
    time.sleep(25)

print(f"{len(threads)} group query job started...")

for i, thread in threads:
    if thread is None:
        continue
    thread.join()

# print(responses)
workplaces_details = {k: responses[v] for k, v in wk_indexes.items()}
content["wk_sites"] = workplaces_details
with open(f"output/{PROJECT_ID}.json", "w") as f:
    json.dump(content, f, indent=4, sort_keys=True)



In [None]:
content_str = json.dumps(content, indent=4, sort_keys=True)
for k, v in content["wk_details"].items():
    content_str = content_str.replace(k, v["new_name"])
    # print(k, "|", v["new_name"])

with open(f"output/{PROJECT_ID}_renamed.json", "w") as f:
    f.write(content_str)
    
content = json.loads(content_str)

In [None]:
key_figures = []

for fname, family in content["groups"].items():
    for member in family["members"]:
        if "key_figure" not in member:
            pass
            # print("****", fname, member["fullname"], family["name"])
        elif member["key_figure"] == "yes":
            for wname, employees in content["employees"].items():
                if member["fullname"] in [e["fullname"] for e in employees]:
                    for workplace in content["workplaces"]:
                        if workplace["name"] == wname:
                            key_figures.append((member, employees, family, workplace))
            

for wname, employees in content["employees"].items():
    for mb in employees:
        if "key_figure" not in mb or mb["key_figure"] == "no":
            continue
        # print("-----", mb)
        member = None
        for fname, family in content["groups"].items():
            for fmember in family["members"]:
                if fmember["fullname"] == mb["fullname"]:
                    member = fmember
                    break
        if member is None:
            continue

        for workplace in content["workplaces"]:
            if workplace["name"] == wname:
                is_present = False
                for it in key_figures:
                    if it[0]["fullname"] == member["fullname"]:
                        is_present = True
                if not is_present:
                    key_figures.append((member, employees, family, workplace))

threads = []
responses = [[] for _ in key_figures]
mb_indexes = {}

print(len(key_figures), "key figures")

def member_details_query(mb, colleagues, family, workplace, i, responses):
    prompt = P.prompt_member_details(content, mb, colleagues, family, workplace)
    nam = mb["fullname"]
    Logger.log(prompt, id=PROJECT_ID, type=f"{nam}_member_details_prompt")
    new_content = Q.query(prompt, json_parser, id=PROJECT_ID, type=f"{nam}_member_details")
    Logger.log_json(new_content, id=PROJECT_ID, type=f"{nam}_member_details")
    responses[i] = new_content




for i, (mb, colleagues, family, workplace) in enumerate(key_figures):
    mb_indexes[mb["fullname"]] = i
    prompt = P.prompt_member_details(content, mb, colleagues, family, workplace)
    nam = mb["fullname"]
    cache_content = Q.get_cache_type("project", prompt, json_parser, id=PROJECT_ID, type=f"{nam}_member_details")
    if cache_content is not None:
        responses[i] = cache_content
        threads.append((i, None))
    else:
        threads.append((i, Thread(target=member_details_query, args=[mb, colleagues, family, workplace, i, responses])))


for i, thread in threads:
    if thread is None:
        continue
    print(f"Starting member query job {i}")
    thread.start()
    time.sleep(25)

print(f"{len(threads)} member query job started...")

for i, thread in threads:
    if thread is None:
        continue
    thread.join()

members_details = {k: responses[v] for k, v in mb_indexes.items()}
# print(members_details)
content["members_details"] = members_details
with open(f"output/{PROJECT_ID}.json", "w") as f:
    json.dump(content, f, indent=4, sort_keys=True)

In [None]:
from copy import deepcopy as dc

bak_content = dc(content)
rcontent = {}

rcontent["project_id"] = content["project_id"]
rcontent["name"] = content["name"]
rcontent["scale"] = content["scale"]
rcontent["size"] = content["size"]
rcontent["type"] = content["type"]
rcontent["structure"] = content["structure"]
rcontent["lore"] = content["lore"]

rcontent["details"] = {
    "description": content["details"],
    "anecdotes": content["anecdotes"],
    "architecture": content["architecture"], ## merge with sites
    "culture": content["culture"],
    "customs": content["customs"],
    "goals": content["goals"],
    "history": content["history"],
    "resources": content["resources"],
    "timeline": content["timeline"],
    "external_influences": content["external_influences"]
}

rcontent["details"]["architecture"]["details"] = content["sites"]["details"]
rcontent["details"]["architecture"]["keywords"] = content["sites"]["keywords"]
rcontent["details"]["architecture"]["style"] = rcontent["details"]["architecture"]["architecture"]
del rcontent["details"]["architecture"]["architecture"]


npcs = {}

rcontent["groups"] = {}
for k, v in content["groups"].items():
    # print(v)
    rcontent["groups"][k] = v.copy()
    rcontent["groups"][k]["members"] = []
    rcontent["groups"][k]["key_figures"] = []
    for member in v["members"]:
        npcs[member["fullname"]] = member.copy()
        rcontent["groups"][k]["members"].append(member["fullname"])
        if member.get("key_figure", "no") == "yes":
            rcontent["groups"][k]["key_figures"].append(member["fullname"])

wks = {}
for v in content["workplaces"]:
    k = v["name"]
    wks[k] = v.copy()
    wks[k]["employees"] = []
    wks[k]["key_figures"] = []
    for employee in content["employees"][k]:
        fullname = employee["fullname"]
        wks[k]["employees"].append(fullname)
        if fullname not in npcs:
            # check if a family is related to workspace and test name
            for fname, family in content["groups"].items():
                if k in family["workplace"]:
                    fullname = fullname + " " + fname
                    if fullname in npcs:
                        npcs[fullname]["job"] = employee.copy()
                    else:
                        print("Unable to find thus removing:", fullname, "in npcs for workplace:", k, "| group:", fname, "| generated:", fullname)
        else:
            npcs[fullname]["job"] = employee.copy()
            npcs[fullname]["job"]["workplace"] = k
            
        if employee.get("key_figure", "no") == "yes":
            wks[k]["key_figures"].append(fullname)
        
print("NPCs:", len(npcs))
print("NPCs with job:", len([k for k, v in npcs.items() if "job" in v]))
print("NPCs without job:", len([k for k, v in npcs.items() if "job" not in v]))


for k, w in content["wk_details"].items():
    v = w.copy()
    v["old_name"] = v["name"]
    del v["new_name"]
    del v["name"]
    wks[k].update(v)
    wks[k]["sites"] = content["wk_sites"][k]

rcontent["workplaces"] = wks

for wk in rcontent["workplaces"].values():
    if isinstance(wk["anecdotes"], list):
        wk["anecdotes"] = ". ".join(wk["anecdotes"])
    if isinstance(wk["plot"], list):
        wk["plot"] = ". ".join(wk["plot"])
    wk["keywords"] = wk["keywords"].replace('\"', '')

for fullname, npc in npcs.items():
    if "description" in npc:
        npc["short_description"] = npc["description"]
    npc["family"] = {
        "key_figure": npc.get("key_figure", "no"),
        "rank": npc.get("rank", npc["situation"]),
        "relationship": npc.get("relationship", ""),
        "situation": npc["situation"],
    }
    for k in ["key_figure", "rank", "relationship", "situation"]:
        if k in npc:
            del npc[k]
    if "job" in npc:
        if "working clothes" in npc["job"]:
            npc["job"]["working_clothes"] = npc["job"]["working clothes"]
            del npc["job"]["working clothes"]

for k, v in content["members_details"].copy().items():
    v["description"] = v["desc"]
    v["goals"] = v["goals_keywords"]
    del v["goals_keywords"]
    del v["desc"]
    del v["fullname"]
    npcs[k].update(v)
    
rcontent["npcs"] = npcs

with open(f"output/{PROJECT_ID}_r.json", "w") as f:
    json.dump(rcontent, f, indent=4, sort_keys=True)
    
content = bak_content

In [None]:

with open(f"output/{PROJECT_ID}_r.json", "r") as f:
    rcontent = json.load(f)
    
scrap_json = dc(rcontent)


def clean_json(json):
    if isinstance(json, dict):
        for k, v in json.items():
            if isinstance(v, str):
                if json[k] != "etc...":
                    json[k] = ""
            elif isinstance(v, list):
                json[k] = v[:1]
                clean_json(json[k])
            elif isinstance(v, dict):
                if not any([ki in v.keys() for ki in ["details", "keywords", "fullname", "description", "desc", "name"]]):                    
                    json[k] = {f"{k}_name_1": vv for kk, vv in v.items() if kk == list(v.keys())[-1]}
                    json[k][f"{k}_name_2"] = "etc..."
                clean_json(json[k])
            elif isinstance(v, int):
                pass
            else:
                print(k, v, "not a type")

    elif isinstance(json, list):
        for k, v in enumerate(json):
            if isinstance(v, str):
                if json[k] != "etc...":
                    json[k] = ""
            elif isinstance(v, list):
                json[k] = v[:1]
                clean_json(json[k])
            elif isinstance(v, dict):
                if not any([ki in v.keys() for ki in ["details", "keywords", "fullname", "description", "desc", "rank"]]):
                    json[k] = {f"{k}_name_1": vv for kk, vv in v.items() if kk == list(v.keys())[-1]}
                    json[k][f"{k}_name_2"] = "etc..."
                clean_json(json[k])
            elif isinstance(v, int):
                pass
            else:
                print(k, v, "not a type")

clean_json(scrap_json)
print(json.dumps(scrap_json))

In [None]:
import webuiapi
import os
import json
import random
from PIL import Image
import base64
import requests
api = webuiapi.WebUIApi(host='127.0.0.1', port=7860)


OUTPUT = f"output/{PROJECT_ID}"
folder_list = ["", "/portraits", "/portraits/prompts", "/sites", "/sites/prompts"]
for folder in folder_list:
    if not os.path.exists(OUTPUT + folder):
        os.makedirs(OUTPUT + folder)
    
    
def submit_post(url: str, data: dict):
    """
    Submit a POST request to the given URL with the given data.
    """
    return requests.post(url, data=json.dumps(data))


def save_encoded_image(b64_image: str, output_path: str):
    """
    Save the given image to the given output path.
    """
    with open(output_path, "wb") as image_file:
        image_file.write(base64.b64decode(b64_image))


def encode_image(image_path):
    with open(image_path, "rb") as i:
        b64 = base64.b64encode(i.read())
    return b64.decode("utf-8")


api.util_set_model("dreamlikeDiffusion10_10.ckpt [0aecbcfa2c]")
# api.util_set_model('elldrethSLucidMix_v10.safetensors [67abd65708]')
# api.util_set_model('realisticVisionV13_v13.safetensors [c35782bad8]')
# api.util_set_model("RPG-v4.safetensors [e04b020012]")
# api.util_set_model('realisticVisionV13_v13.safetensors [c35782bad8]')

# negative_prompt = "(clones), (((twins))), (((double persons))), couple, (double people), blur, ((dual face)), ((double face)), (((poorly drawn eyes))), (mutated eyes), (poorly drawn hand), (mutated hand), text, watermark, bad art, (playing card), card, ((out of frame)) (low resolution). blurred. ((3D render)) warped, ((watermark)), extra fingers, mutated hands, ((poorly drawn hands)), ((extra limbs)), cloned face, gross proportions, (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, (fused fingers)"

negative_prompt = "((3d)), ((asian)), low resolution, blurred, badhandv4:1.3, ((watermark)), extra fingers, mutated hands, ((poorly drawn hands)), ((extra limbs)), (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, ((fused fingers)), ((poorly drawn eyes)), (mutated eyes), (poorly drawn hand), (mutated hand), (clones), (((twins))), (((double person))), couple, (double people), blur, ((dual face)), ((double face)), ((two pairs of hears)), ((cloned hears))"

# , mutated eyes, ((poorly drawn eyes)), (low quality eyes):1.3
# (mutated eyes:1.2), 

# negative_prompt = "((out of frame)), ((3d)), ((asian)), low resolution, blurred, badhandv4:1.3, ((watermark)), extra fingers, mutated hands, ((poorly drawn hands)), ((extra limbs)), (malformed limbs), ((missing arms)), ((missing legs)), (((extra arms))), (((extra legs))), mutated hands, ((fused fingers)), ((poorly drawn eyes)), (mutated eyes), (poorly drawn hand), (mutated hand), (clones), (((twins))), (((double person))), couple, (double people), blur, ((dual face)), ((double face)), ((two pairs of hears)), ((cloned hears))"

def render_prompt(npc_name, npc, environment):
    clothes = npc["job"]["working_clothes"] if "job" in npc else npc["clothes"]
    job = "" if "job" not in npc else npc["job"]["job"]
    p = npc
    
    # prompt = f"""((heroic fantasy)), masterpiece, ((front full body view)) of a ({p["age_look"]}:1.3) ({p["gender"]}) ({p["race"]}) ({job}) with a (less than {p["beauty"]}:1.3 face) ({p["beauty"]}:1.3 appearance), {clothes}, {p["weight"]} weight, {p["height"]} size, {p["eyes"]} eyes, {p["hair"]} hair, {p["skin"]} skin, by greg rutkowski, ({p["age_look"]}:1.2 character), photorealistic, realistic eyes, detailed eyes, extremely detailed eyes, cinematic lighting, ((masterpiece)), sharp focus, ({environment} background)"""
    
    prompt = f"""((heroic fantasy)), masterpiece, fine art painting, rpg book illustration, ((front full body view)) of a ({p["age_look"]}:1.3) ({p["gender"]}) ({p["race"]}) ({job}) with a ({p["beauty"]} face):1.3 and a ({p["beauty"]}:1.3 appearance), poor and ragged {clothes}, {p["weight"]} weight, {p["height"]} size, (({p["beauty"]}:1.3 figure), {p["eyes"]} eyes, {p["hair"]} hair, {p["skin"]} skin, (({p["beauty"]}:1.3 character) by greg rutkowski, photorealistic, realistic eyes, realistic skin, (detailed eyes), (extremely detailed eyes), cinematic lighting, ((masterpiece)), sharp focus, ({environment} background), <lora:epiNoiseoffset_v2:1>"""
        
    # prompt = f"""masterpiece, best quality, ultra-detailed, fine art painting, photorealistic, ultra realistic, illustration, heroic fantasy, dungeons and dragons, ({p["age_look"]}):1.3 {p["gender"]}:1.1 ({p["race"]} race):1.3 ({job}) with a ({p["beauty"]} face):1.3, {clothes}, {p["weight"]} weight, {p["height"]} size, {p["eyes"]} eyes, {p["hair"]} hair, {p["skin"]} skin, ({p["age_look"]} character):1.3, by greg rutkowski and Thomas Benjamin Kennington, sharp focus, cinematic lighting, photorealistic:1.2,  <lora:epiNoiseoffset_v2:1>, {environment} background, matte, bokeh, fantasy background, realistic eyes, detailed eyes, ultra detailed, extremely detailed, 8 k, hdr, hd"""
    
    
    prompt_name = f"{OUTPUT}/portraits/prompts/{npc_name.strip()}.txt"

    with open(prompt_name, "w") as outfile:
        outfile.write(prompt)
        outfile.write("\n\n")
        outfile.write(negative_prompt)
            
    return prompt

def render(prompt, p):
    # print(prompt)
    seed = random.randint(0, 1000000)
    
    width = 576
    height = 768
    render_options = {}
    if "job" in p:
        render_options = {
            "enable_hr": True,
            "denoising_strength": 0.5,
            "hr_scale": 2,
            "hr_upscaler": "Latent",
            "hr_resize_x": width*2,
            "hr_resize_y": height*2,
        }
    
    
    img_names = [f"{p['fullname']}", f"{p['fullname'].strip()}", " ".join(p["fullname"].split(" ")[::-1]), " ".join(p["fullname"].split(" ")[::-1]).strip()]
    for img_name in img_names:
        img_path = f"{OUTPUT}/portraits/{img_name}.jpg"
        if os.path.exists(img_path):
            return
    
    print("Generating", p["fullname"])
    data = {"prompt": prompt,
            "negative_prompt": negative_prompt,
            # "init_images": [image],	# For img2img
            "sampler_name": 'DPM++ 2M Karras',
            "seed": -1,
            "cfg_scale": 4.5,
            "n_iter": 1,
            "steps": 40,
            "batch_size": 1,
            "width": width,
            "height": height,
            "restore_faces": True,
            **render_options
            
            # "alwayson_scripts": {
            #     "controlnet": {
            #         "args": [
            #             {
            #                 "input_image": pose_img,
            #                 # "module": "depth","model": "control_depth-fp16 [400750f6]"
            #                 "module": "none" if pose_on else "openpose", "model": "control_openpose-fp16 [9ca67cc5]",
            #             }
            #         ]
            #     }
            # }
            }
    
    query_url = 'http://127.0.0.1:7860/sdapi/v1/txt2img'
    response = submit_post(query_url, data)

    save_encoded_image(
        response.json()['images'][0], img_path)



for k, v in rcontent["npcs"].items():
    if "clothes" not in v:
        continue
    environment = None
    if "job" in npc:
        workplace = rcontent["workplaces"][npc["job"]["workplace"]]
        environment = workplace["keywords"]
    # print("** Generating", k)
    prompt = render_prompt(k, v, environment)
    render(prompt, v)


In [None]:
import webuiapi
import os
import json
import random
from PIL import Image
import base64
import requests
api = webuiapi.WebUIApi(host='127.0.0.1', port=7860)


def submit_post(url: str, data: dict):
    """
    Submit a POST request to the given URL with the given data.
    """
    return requests.post(url, data=json.dumps(data))


def save_encoded_image(b64_image: str, output_path: str):
    """
    Save the given image to the given output path.
    """
    with open(output_path, "wb") as image_file:
        image_file.write(base64.b64decode(b64_image))


def encode_image(image_path):
    with open(image_path, "rb") as i:
        b64 = base64.b64encode(i.read())
    return b64.decode("utf-8")


api.util_set_model('dreamlikeDiffusion10_10.ckpt [0aecbcfa2c]')
# api.util_set_model('elldrethSLucidMix_v10.safetensors [67abd65708]')
# api.util_set_model('realisticVisionV13_v13.safetensors [c35782bad8]')
# api.util_set_model("RPG-v4.safetensors [e04b020012]")
# api.util_set_model('realisticVisionV13_v13.safetensors [c35782bad8]')

negative_prompt = "flowers, ((3d)), text, watermark, people, man, woman, fog, blur, bad art, (playing card), card, ((out of frame)) painting. cartoon. (low resolution). blurred. (computer generated). ((3d model)), ugly, ((3D render)), warped, ((watermark)), plan, blueprint, map"

def render_building_prompt(site_name, site, wk_name, wk):
    local_desc = ", ".join(rcontent["details"]["architecture"]["style"])
    
    desc = ""
    if site["inherits_architecture"]:
        desc = ", ".join(site["architecture"])

    
    prompt = f"""an award winning high angle view of a ((({site["state"]} {site["details"]}))), ({wk["prosperity"]}), {wk['activity']}, ((({local_desc}))), medieval, {desc}, {site["type"]}:0.1 concept art, ((heroic fantasy)), (illustration), fine art, digital art, (cinematic lighting), high quality, high contrast, realistic lighting, center of frame, 4k textures, (intricate), elegant, highly detailed, sharp focus, 8k, hdr, epic, sense of awe, insane details, intricate details, hyperdetailed, harsh cinematic lights, outdoor atmosphere, <lora:epiNoiseoffset_v2:1>"""

    prompt_name = f"{OUTPUT}/sites/prompts/{site_name}_{wk_name}.txt"
    with open(prompt_name, "w") as outfile:
        outfile.write(prompt)
        outfile.write("\n\n")
        outfile.write(negative_prompt)
            
    return [prompt]


def render_global_prompts(content_name, archi, type):
    prompts = []
    site_names = []
    parent_name = []
    style = ", ".join(archi["style"])
    
    view_types = ["high angle view", "aerial view", "global view", "bird view", "street scene"]
    styles = ["", style, style, "", style]
    for view_type, style in zip(view_types, styles):
        prompt = f"""an award winning {view_type} of a ((({archi["global_view"]}))), medieval, {type}:1, ({style}):0.5, concept art, ((heroic fantasy)), (illustration), fine art, digital art, (cinematic lighting), high quality, high contrast, realistic lighting, center of frame, 4k textures, (intricate), elegant, highly detailed, sharp focus, 8k, hdr, epic, sense of awe, insane details, intricate details, hyperdetailed, harsh cinematic lights, outdoor atmosphere, <lora:epiNoiseoffset_v2:1>"""
        prompts.append(prompt)
        site_names.append(view_type)
        parent_name.append(content_name)
    
    for k, v in archi["sites_keywords"].items():
        desc = ", ".join(v)
        prompt = f"""an award winning high angle view of a (({k})), ({desc}), {site["type"]}:0.1 concept art, ((heroic fantasy)), (illustration), fine art, digital art, (cinematic lighting), high quality, high contrast, realistic lighting, center of frame, 4k textures, (intricate), elegant, highly detailed, sharp focus, 8k, hdr, epic, sense of awe, insane details, intricate details, hyperdetailed, harsh cinematic lights, outdoor atmosphere, <lora:epiNoiseoffset_v2:1>"""
        prompts.append(prompt)
        site_names.append(k)
        parent_name.append(content_name)

    prompt_name = f"{OUTPUT}/sites/prompts/{content_name}.txt"
    with open(prompt_name, "w") as outfile:
        for prompt in prompts:
            outfile.write(prompt)
            outfile.write("\n")
            outfile.write(negative_prompt)
            outfile.write("\n\n\n")
            
    return prompts, site_names, parent_name

def render_building(prompts, site_names, parent_names):
    seed = random.randint(0, 1000000)
    width = 720
    height = 512

    for prompt, site_name, parent_name in zip(prompts, site_names, parent_names):
        img_path = f"{OUTPUT}/sites/{site_name}_{parent_name}.jpg"
        if os.path.exists(img_path):
            # print(f"Image {site_name} already exists, skipping.")
            continue
        print(f"Generating {site_name} view for {parent_name}.")

        data = {"prompt": prompt,
                "negative_prompt": negative_prompt,
                # "init_images": [image],	# For img2img
                "sampler_name": 'DPM++ 2S a Karras',
                "seed": -1,
                "cfg_scale": 6,
                "n_iter": 1,
                "steps": 30,
                "batch_size": 1,
                "width": width,
                "height": height,
                "restore_faces": False,
                "enable_hr": True,
                "denoising_strength": 0.65,
                "hr_scale": 2,
                "hr_upscaler": "Latent",
                "hr_resize_x": width*2,
                "hr_resize_y": height*2,
                }
        query_url = 'http://127.0.0.1:7860/sdapi/v1/txt2img'
        response = submit_post(query_url, data)

        save_encoded_image(
            response.json()['images'][0], img_path)


for k, v in rcontent["workplaces"].items():
    for site_name, site in v["sites"].items():
        prompts = render_building_prompt(site_name, site, k, v)
        render_building(prompts, [site_name], [k])
        

prompts, site_names, parent_name = render_global_prompts(rcontent["name"], rcontent["details"]["architecture"], rcontent["type"])
render_building(prompts, site_names, parent_name)


In [None]:
import os
from jinja2 import Environment, FileSystemLoader
import json
from pathlib import Path
import random

PROJECT_ID = "Enclave"
OUTPUT = f"output/{PROJECT_ID}"

RELOAD_GLOBAL = True


if RELOAD_GLOBAL:
    if os.path.exists(f"{OUTPUT}/global.json"):
        with open(f"{OUTPUT}/global.json", "r") as infile:
            gcontent = json.load(infile)
else:
    gcontent = dc(rcontent)


groups_to_delete = []
for wkn, wk in gcontent["groups"].items():
    key_fig = None
    if len(wk["key_figures"]) > 0:
        key_fig = random.choice(wk["key_figures"])
    elif len(wk["members"]) > 0:
        key_fig = random.choice(wk["members"])
    else:
        print(f"Group {wkn} has no members or key figures, deleting.")
        groups_to_delete.append(wkn)
        continue

    wk["key_fig"] = key_fig
    wk["rtype"] = wk["type"].replace("_type", "")

    if wk["origin"] == "outsiders":
        prefix = wk["type"].replace("_type", "")
        if prefix == "solo":
            prefix = ""
        suffix = ""
        if len(wk["key_figures"]) > 0:
            suffix = wk["key_figures"][0].split(" ")[-1]
        else:
            suffix = wk["members"][0].split(" ")[-1]
        wk["name"] = f"{suffix} {prefix}"
    else:
        if wk["type"] == "family_type":
            wk["name"] = wk["name"] + " family"

# for g in groups_to_delete:
#     del gcontent["groups"][g]


to_delete = []
for npc_name, npc in gcontent["npcs"].items():
    img_names = [f"{npc['fullname']}", f"{npc['fullname'].strip()}", " ".join(npc["fullname"].split(" ")[::-1]), " ".join(npc["fullname"].split(" ")[::-1]).strip()]
    for img_name in img_names:
        img_path = f"portraits/{img_name}.jpg"
        if os.path.exists(OUTPUT + "/" + img_path):
            npc["img_path"] = img_path
            break
    if "img_path" not in npc:
        print(f"Could not find image for {npc_name}")
    
    
    if "clothes" not in npc:
        to_delete.append(npc_name)
for n in to_delete:
    del gcontent["npcs"][n]
    
gcontent["size"] = len(list(gcontent["npcs"].keys()))
gcontent["key_npcs"] = {}
gcontent["lo_npcs"] = {}
for npc_name, npc in gcontent["npcs"].items():
    # print(npc_name)
    key_npc = False
    for k, v in gcontent["groups"].items():
        if npc_name in v["key_figures"] or npc_name in v["members"]:
            npc["family"]["family_name"] = v["name"]
    
    if "key_figure" in npc:
        if npc["key_figure"] == "yes":
            key_npc = True
    if "job" in npc:
        if "key_figure" in npc["job"]:
            if npc["job"]["key_figure"] == "yes":
                key_npc = True
    if key_npc:
        gcontent["key_npcs"][npc_name] = npc
        # gcontent["key_npcs"][npc_name.strip()] = npc
    else:
        gcontent["lo_npcs"][npc_name] = npc
        # gcontent["key_npcs"][npc_name.strip()] = npc


for k, v in gcontent["workplaces"].items():
    for site_name, site in v["sites"].items():
        img_path = f"sites/{site_name}_{k}.jpg"
        site["img_path"] = img_path
        gcontent["workplaces"][k]["sites"][site_name] = site
        

for wkn, wk in gcontent["workplaces"].items():
    wk["rsite"] = list(wk["sites"].values())[0]["img_path"]

gcontent["details"]["architecture"]["sites_img"] = {}
for k, v in gcontent["details"]["architecture"]["sites_details"].items():
    img_path = f"sites/{k}_{gcontent['name']}.jpg"
    gcontent["details"]["architecture"]["sites_img"][k] = img_path

view_types = ["high angle view", "aerial view", "global view", "bird view", "street scene"]
gcontent["details"]["global_views"] = {}
for view_type in view_types:
    img_name = f"{view_type}_{gcontent['name']}"
    img_path = f"sites/{img_name}.jpg"
    gcontent["details"]["global_views"][view_type] = img_path

local_path = Path.cwd()
templates_dir = os.path.join(local_path, 'templates')
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template('place.html')
filename = os.path.join(f"output/{PROJECT_ID}", 'local_index.html')
# gcontent["folder_path"] = "file:///D:/shared/onedrive_cirad/OneDrive - Cirad/Self/ttrpg_content_generator/output/B"
gcontent["folder_path"] = "."
with open(filename, 'w') as fh:
    fh.write(template.render(
        **gcontent
    ))

with open(f"output/{PROJECT_ID}/global.json", "w") as f:
    json.dump(gcontent, f, indent=4, sort_keys=True)

In [None]:
# Generate PDF
import os
from jinja2 import Environment, FileSystemLoader
from pathlib import Path
import pdfkit
import json

local_path = Path.cwd()
templates_dir = os.path.join(local_path, 'templates')
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template('pdf.html')

# dir_list = os.listdir("output")
dir_list = [PROJECT_ID]
for d in dir_list:
    if not os.path.isdir(f"output/{d}"):
        continue
    
    if not os.path.exists(f"output/{d}/global.json"):
        print(f"output/{d}/global.json not found")
        continue
    with open(f"output/{d}/global.json") as ff:
        local_content = json.load(ff)
        filename = os.path.join(f"output/{d}", 'pdf.html')
        with open(filename, 'w') as fh:
            fh.write(template.render(
                **local_content
            ))

        path_to_wkhtmltopdf = r"D:\\shared\\onedrive_cirad\\OneDrive - Cirad\\Self\\ttrpg_content_generator\\pdf\\wkhtmltopdf.exe"
        path_to_file = os.path.join(f"output/{local_content['name']}.pdf")
        config = pdfkit.configuration(wkhtmltopdf=path_to_wkhtmltopdf)
        pdfkit.from_file(filename, output_path=path_to_file, configuration=config, options={"enable-local-file-access": "", "image-dpi": 150, "image-quality": 30, "lowquality": ""})

In [None]:
#upload all photo to imgbox and regenerate the web.json and index.html

import asyncio
import os
from jinja2 import Environment, FileSystemLoader
from pathlib import Path
import sys
import pyimgbox
import logging
import json
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.ERROR)


local_path = Path.cwd()
templates_dir = os.path.join(local_path, 'templates')
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template('place.html')

def save(lcontent):
    with open(f"output/{lcontent['project_id']}/web.json", "w") as f:
        json.dump(lcontent, f, indent=4, sort_keys=True)


async def upload_images(lcontent, gallery, pid):
    path = f"./output/{pid}/"
    for npc_name, npc in lcontent["npcs"].items():
        print(npc_name, end=" ")
        if "img_submission" in npc:
            print("skipped")
            continue
        
        img_path = path + npc["img_path"]
        submission = await gallery.upload(img_path)
        print(submission.image_url)
        npc["img_path"] = submission.image_url
        npc["img_submission"] = str(submission)
        
        if npc_name in lcontent["lo_npcs"]:
            lcontent["lo_npcs"][npc_name]["img_path"] = submission.image_url
            lcontent["lo_npcs"][npc_name]["img_submission"] = str(submission)
        elif npc_name in lcontent["key_npcs"]:
            lcontent["key_npcs"][npc_name]["img_path"] = submission.image_url
            lcontent["key_npcs"][npc_name]["img_submission"] = str(submission)
        save(lcontent)
        # break


    for wk_name, v in lcontent["workplaces"].items():
        for site_name, site in v["sites"].items():
            print(wk_name, site_name, end=" ")
            if "img_submission" in site:
                print("skipped")
                continue
            
            img_path = path + site["img_path"]
            submission = await gallery.upload(img_path)
            print(submission.image_url)
            site["img_path"] = submission.image_url
            site["img_submission"] = str(submission)
            save(lcontent)
            # break
        save(lcontent)
        # break

    for _, wk in lcontent["workplaces"].items():
        print("Setting rsite for", wk["name"])
        wk["rsite"] = list(wk["sites"].values())[0]["img_path"]
        save(lcontent)
        # break


    if "sites_submission" not in lcontent["details"]["architecture"]:
        lcontent["details"]["architecture"]["sites_submission"] = {}
    
    for k, v in lcontent["details"]["architecture"]["sites_img"].items():
        print(k, end=" ")
        if k in lcontent["details"]["architecture"]["sites_submission"]:
            print("skipped")
            continue
        
        img_path = path + v
        submission = await gallery.upload(img_path)
        print(submission.image_url)
        lcontent["details"]["architecture"]["sites_submission"][k] = submission
        lcontent["details"]["architecture"]["sites_img"][k] = submission.image_url
        save(lcontent)
        # break
    

    view_types = ["high angle view", "aerial view", "global view", "bird view", "street scene"]
    if "global_submissions" not in lcontent["details"]:
        lcontent["details"]["global_submissions"] = {}

    for view_type in view_types:
        print(view_type, end=" ")
        if view_type in lcontent["details"]["global_submissions"]:
            print("skipped")
            continue
        
        img_path = path + lcontent["details"]["global_views"][view_type]
        print(img_path)
        submission = await gallery.upload(img_path)
        print(submission.image_url)
        lcontent["details"]["global_submissions"][view_type] = submission
        lcontent["details"]["global_views"][view_type] = submission.image_url
        save(lcontent)
        # break
    
    return lcontent


async def cdn():
    # dir_list = os.listdir("output")
    # site_list = {}
    # for d in dir_list:
    d = "Enclave"
    # d = "P"
    if os.path.isdir(f"output/{d}"):
        print(f"processing the folder {d}")
        
        if not os.path.exists(f"output/{d}/global.json"):
            print(f"output/{d}/global.json not found")
            print(f"skipping the folder {d}")
            return
        
        try:
            with open(f"output/{d}/global.json") as ff:
                local_content = json.load(ff)
        except Exception as e:
            print(e)
            print(f"output/{d}/global.json not found")
        
        if os.path.exists(f"output/{d}/web.json"):
            try:
                with open(f"output/{d}/web.json") as ff:
                    local_content = json.load(ff)
                    print("Using web.json")
            except:
                print(f"output/{d}/web.json not found")
                
        print(local_content['name'])
        async with pyimgbox.Gallery(title=f"{local_content['name']}_gen_img") as gallery:
            try:
                await gallery.create()
            except ConnectionError as e:
                print('Gallery creation failed:', str(e))
            else:
                print('Gallery URL:', gallery.url)
                web_content = await upload_images(local_content, gallery, d)
                # print(web_content["npcs"])
                filename = os.path.join(f"output/{d}", 'index.html')
                with open(filename, 'w') as fh:
                    fh.write(template.render(
                    **web_content
                ))
            finally:
                await gallery.close()

loop = asyncio.get_event_loop()
loop.create_task(cdn())

In [None]:
# import os
# from jinja2 import Environment, FileSystemLoader
# from pathlib import Path

# local_path = Path.cwd()
# templates_dir = os.path.join(local_path, 'templates')
# env = Environment(loader=FileSystemLoader(templates_dir))
# template = env.get_template('place.html')

# d = "G"
# for d in ["B", "G", "N", "X", "Z"]:
#     with open(f"output/{d}/web.json") as f:
#         web_content = json.load(f)
#         filename = os.path.join(f"output/{d}", 'index.html')
#         with open(filename, 'w') as fh:
#             fh.write(template.render(
#                 **web_content
#             ))

In [None]:
dir_list = os.listdir("output")
site_list = {}
for d in dir_list:
    if os.path.isdir(f"output/{d}"):
        # test if index.html exists
        if os.path.exists(f"output/{d}/index.html"):
            local_content = json.load(open(f"output/{d}/web.json"))
            name = local_content["name"]
            site_name = list(local_content["details"]["architecture"]["sites_keywords"].keys())[0]
            desc = local_content["details"]["architecture"]["global_view"]
            img_path = {"view": local_content["details"]["global_views"]["global view"]}
            site_list[name] = {
                "id": d,
                "name": name,
                "desc": desc,
                "site": site_name,
                "size": local_content["size"],
                "img_path": img_path,
            }
            
jinja_obj = {
    "folder_path": ".",
    "sites": site_list,
}

print(jinja_obj)

local_path = Path.cwd()
templates_dir = os.path.join(local_path, 'templates')
env = Environment(loader=FileSystemLoader(templates_dir))
template = env.get_template('index.html')
filename = os.path.join(f"output", 'index.html')
with open(filename, 'w') as fh:
    fh.write(template.render(
        **jinja_obj
    ))

In [None]:
### Correct image silhouettes

import os
from os.path import join as pjoin

# folder = "openpose_db"
# dir_list = os.listdir(folder)
# cat = [[] for i in range(5)]
# for n in dir_list:
#     # get unique value for each word separated by _
#     if "boy" in n:
#         os.rename(pjoin(folder, n), pjoin(folder, n.replace("boy", "child")))
#         print("renaming", n)
    
#     words = n.split("_")[:-1]
#     for i, w in enumerate(words):
#         if w not in cat[i]:
#             cat[i].append(w)
# print(cat)


# pattern = "['dwarf', 'elf', 'gnome', 'human', 'half_elf', 'halfling']_['female', 'male']_['child', 'elderly', 'infant', 'middle aged', 'preteen', 'teenager', 'thirties', 'very old', 'young adult']_['fat', 'muscular', 'normal', 'skinny']_['normal', 'small', 'tall']"

import json

import query
import prompt as P
from parsers import json_parser, table_parser
from logger import Logger
from llm import ChatGPT
import defines as D
import data_formatter as DF
Logger = Logger()

FORCE_HUMAN_READABLE_CACHE = True

with open("openai.key", "r") as f:
    api_key = f.readline().strip()
    api_id = f.readline().strip()
    
LLM = ChatGPT(Logger, "Your an assistant to generate data for a medieval fantasy tabletop rpg game", model_name="gpt-3.5-turbo", api_key=api_key, api_id=api_id)
Q = query.LLM_Query(LLM, Logger)


instructions = """This is a list of rpg characters (heroic fantasy) and I want you to translate and reorganize terms.

I want you to reformat and modify the list to have a new one with the following columns:

fullname|age|size|body_type

For each columns the allowed values are as follows:
- for age: [infant, child, preteen, teenager, young adult, thirties, middle aged, very old, elderly]
- for size: [normal, small, tall]
- for body-type: [fat, muscular, normal, skinny]

Keep only the authorized values !

your list:

"""
dir_list = os.listdir("output")
site_list = {}
characs = ["fullname", "age", "age_look", "race", "gender", "beauty", "height", "weight"]
with open("output/characters_correction.txt", "w") as f:
    count = 1
    query_string = "|".join(characs) + "\n"
    for d in dir_list:
        if os.path.isdir(f"output/{d}"):
            local_content = json.load(open(f"output/{d}/global.json"))
            for name, npc in local_content["npcs"].items():
                # f.write("|".join([npc[c] for c in characs]) + "\n")
                query_string += "|".join([npc[c] for c in characs]) + "\n"

                if count % 100 == 0:
                    query_string += instructions
                    content, response = LLM.query(query_string, options={"id": "all", "type": "correction"})
                    f.write(content + "\n")
                    query_string = "|".join(characs) + "\n"
                
                count += 1
f.close()
            

In [53]:
from threading import Thread

def queue_thread():
    while True:
        for function in queue:
            function()
            time.sleep(40)
        time.sleep(0.5)

queue_thread = Thread(name="queue", target=lambda: queue_thread())