## img-text-img

此文件的作用是使用img2text，然后再拼接上自定义的风格词，然后再次出图片

In [1]:
import json
from urllib import request, parse
import random
import uuid
import urllib.request
import urllib.parse
import io
import os
import time
from PIL.PngImagePlugin import PngInfo
import hashlib
from urllib.error import URLError


## import json file

In [2]:
promptJson = 'api.json'

with open(promptJson) as f:
    prompt = json.load(f)

In [3]:
from enum import Enum, unique

@unique
class RunMode(Enum):
    LOCAL = 1
    POD_LOCAL = 2
    POD_REMOTE = 3

run_mode = RunMode.LOCAL
# 设置运行模式，来实现在本机，runpod本地，runpod远程的切换
server_address = "2bzezpggl7sjvi-3000.proxy.runpod.net/" if run_mode == RunMode.POD_REMOTE else "0.0.0.0:3000" if run_mode == RunMode.POD_LOCAL else "127.0.0.1:8188"
client_id = str(uuid.uuid4())


In [4]:
def changeImageSize(width, height):
    # 最小的宽高为本机896，runpod1024
    limited = 896 if run_mode == RunMode.LOCAL else 1024
    minSize = min(width, height)

    # 缩放宽高，使得最窄的边等于limited
    if minSize > limited:
        scale = minSize / limited
        widthScale = int(width / scale)
        heightScale = int(height / scale)
    else:
        scale = limited / minSize
        widthScale = int(width * scale)
        heightScale = int(height * scale)

    # 将两个值都除以64，然后取整,也就是说边长必须是64的倍数
    widthScale = int(widthScale/64) * 64
    heightScale = int(heightScale/64) * 64

    return widthScale, heightScale

def checkImageExsit(imageDir, imageName):
    for file in os.listdir(imageDir):
        if file.startswith(imageName):
            return True
    return False


def queue_prompt(prompt):
    while True:
        try:
            p = {"prompt": prompt, "client_id": client_id}
            data = json.dumps(p).encode('utf-8')
            req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
            return json.loads(urllib.request.urlopen(req, timeout=5).read())
        except URLError:
            print("连接超时，正在重试...")

def get_history(prompt_id):
    with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response:
        return json.loads(response.read())

def get_image(filename, subfolder, folder_type):
    data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
    url_values = urllib.parse.urlencode(data)
    with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response:
        return response.read()

def handle_whitespace(string: str):
    return string.strip().replace("\n", " ").replace("\r", " ").replace("\t", " ")

def parse_name(ckpt_name):
    path = ckpt_name
    filename = path.split("/")[-1]
    filename = filename.split(".")[:-1]
    filename = ".".join(filename)
    return filename


def calculate_sha256(file_path):
    sha256_hash = hashlib.sha256()

    with open(file_path, "rb") as f:
        # Read the file in chunks to avoid loading the entire file into memory
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)

    return sha256_hash.hexdigest()

def get_images(ws, prompt):
    prompt_id = queue_prompt(prompt)['prompt_id']
    output_images = {}
    while True:
        out = ws.recv()
        if isinstance(out, str):
            message = json.loads(out)
            if message['type'] == 'executing':
                data = message['data']
                if data['node'] is None and data['prompt_id'] == prompt_id:
                    break #Execution is done
        else:
            continue #previews are binary data

    history = get_history(prompt_id)[prompt_id]
    for o in history['outputs']:
        for node_id in history['outputs']:
            node_output = history['outputs'][node_id]
            if 'images' in node_output:
                images_output = []
                for image in node_output['images']:
                    image_data = get_image(image['filename'], image['subfolder'], image['type'])
                    images_output.append(image_data)
                output_images[node_id] = images_output

    return output_images
    
def saveImages(result, imageDir,imageName,comment,previewImage=0): 
    metadata = PngInfo()
    metadata.add_text("parameters", comment)  
    k = 0 
    for node_id in result:    
        if k < previewImage:
            k += 1
            continue    
        for image_data in result[node_id]:            
            image = Image.open(io.BytesIO(image_data))
            imagePath = imageDir + "/" + imageName + f"_{k}" + ".png"
            image.save(imagePath,pnginfo=metadata) 
            k += 1

def saveTags(ws, prompt, textDir, originalImagName):
    prompt_id = queue_prompt(prompt)['prompt_id']
    while True:
        out = ws.recv()
        if isinstance(out, str):
            message = json.loads(out)
            if message['type'] == 'executing':
                data = message['data']
                if data['node'] is None and data['prompt_id'] == prompt_id:
                    break #Execution is done
        else:
            continue #previews are binary data

    history = get_history(prompt_id)[prompt_id]
    for node_id in history['outputs']:
        node_output = history['outputs'][node_id]
        print(node_output)
        if 'tags' in node_output:
            tags = node_output['tags']
            with open(textDir + "/" + originalImagName + ".txt", "w") as f:
                for tag in tags:
                    f.write(tag + "\n")
    

In [5]:
# 子文件夹
subDir = "BoA"

# 原图文件夹
originalDir = f"f:/Poledriver/Paid/{subDir}" if run_mode != RunMode.POD_LOCAL else f"../../../image-inputs/{subDir}"
# 设置文本保存路径
textDir = originalDir + '-text'
# 如果文件夹不存在，创建文件夹
if not os.path.exists(textDir):
    os.mkdir(textDir)

In [6]:

from PIL import Image
%pip install sd-parsers
from sdparsers import ParserManager
import websocket

ws = websocket.WebSocket()
ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))

batch_size = 4
# 遍历原图文件夹里所有的图片
for originalImag in os.listdir(originalDir):
        # 如果不是jpg或png文件，跳过
        if not (originalImag.endswith(".jpg") or originalImag.endswith(".png") or originalImag.endswith(".jpeg")):
            continue
        
        originalImagePath = originalDir + "/" + originalImag
        # 如果文件名包含_H或者_B，跳过
        if originalImag.find("_H") != -1 or originalImag.find("_B") != -1:
            print(f"{originalImag}包含_H或者_B,判定为mask文件，跳过")
            continue
        
        # 读取原图片名字，去掉后缀.png
        originalImagName = originalImag.split(".")[0]
        # 检测目标文件夹里是否已经存在该图片，如果存在，跳过
        if checkImageExsit(textDir, originalImagName):
            continue 
        
        prompt['44']['inputs']['image'] = originalImagePath
        
        # 连接api，生成并保存图片
        start_time = time.time()        
        # images = get_images(ws, prompt)
        saveTags(ws, prompt, textDir, originalImagName)                    
        
        end_time = time.time()
        print(f"耗时{end_time-start_time}秒")
        

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


{'tags': ['1girl, solo, long_hair, looking_at_viewer, smile, bangs, black_hair, bow, navel, sitting, closed_mouth, nipples, braid, hair_bow, nude, glasses, pussy, indoors, spread_legs, black_eyes, twin_braids, flat_chest, lips, loli, completely_nude, window, uncensored, black-framed_eyewear']}
{'text': ['1girl, solo, long_hair, looking_at_viewer, smile, bangs, black_hair, bow, navel, sitting, closed_mouth, nipples, braid, hair_bow, nude, glasses, pussy, indoors, spread_legs, black_eyes, twin_braids, flat_chest, lips, loli, completely_nude, window, uncensored, black-framed_eyewear']}
耗时1.674680471420288秒
{'tags': ['1girl, solo, long_hair, breasts, looking_at_viewer, smile, bangs, black_hair, navel, twintails, sitting, closed_mouth, nipples, collarbone, braid, nude, glasses, pussy, indoors, spread_legs, twin_braids, flat_chest, lips, parted_bangs, loli, completely_nude, arm_support, black-framed_eyewear, hair_tie, on_floor']}
{'text': ['1girl, solo, long_hair, breasts, looking_at_viewer,

In [7]:
import pygame
pygame.init()
my_sound = pygame.mixer.Sound('../../assets/sound/download-complete.wav')
my_sound.play()

pygame 2.5.2 (SDL 2.28.3, Python 3.10.6)
Hello from the pygame community. https://www.pygame.org/contribute.html


<pygame.mixer.Channel at 0x201e369a770>