Installing the dependencies

In [None]:
!pip install fastapi
!pip install uvicorn
!pip install pydantic
!pip install requests
!pip install pypi-json
!pip install pyngrok
!pip install nest-asyncio

In [None]:
!pip install tensorflow-gpu==2.11.0
!pip install keras-cv

In [3]:
from fastapi import FastAPI
from pydantic import BaseModel
import json
import uvicorn
from pyngrok import ngrok
from fastapi.middleware.cors import CORSMiddleware
import nest_asyncio

In [4]:
import tensorflow as tf
from tensorflow import keras
import base64
import math
import numpy as np

In [5]:
print(tf.__version__)

2.11.0


In [6]:
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [None]:
!nvidia-smi

In [8]:
app = FastAPI()

In [9]:
origins = ["*"]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

In [10]:
# https://docs.pydantic.dev/usage/types/
class model_input(BaseModel):
    context : list
    unconditional_context : list


In [None]:
%cd ../

In [None]:
!git lfs install
!git clone https://huggingface.co/keras-sd/tfs-diffusion-model

In [None]:
!git clone https://huggingface.co/keras-sd/tfs-decoder.git

In [9]:
#!rm -r tfs-decoder

In [14]:
diffusion_model = tf.saved_model.load('/tfs-diffusion-model')

In [15]:
decoder_model = tf.saved_model.load('/tfs-decoder')

In [None]:
from keras_cv.models.stable_diffusion.constants import _UNCONDITIONAL_TOKENS
from keras_cv.models.stable_diffusion.constants import _ALPHAS_CUMPROD
MAX_PROMPT_LENGTH = 77

In [17]:
# Using diffusion saved model

seed = None
batch_size = 1
img_height = 512
img_width = 512
img_height = round(img_height / 128) * 128
img_width = round(img_width / 128) * 128
num_steps = 25
unconditional_guidance_scale = 7.5 

def _get_initial_diffusion_noise(batch_size, seed):
    if seed is not None:
        return tf.random.stateless_normal(
            (batch_size, img_height // 8, img_width // 8, 4),
            seed=[seed, seed],
        )
    else:
        return tf.random.normal(
            (batch_size, img_height // 8, img_width // 8, 4)
        )

def _get_initial_alphas(timesteps):
    alphas = [_ALPHAS_CUMPROD[t] for t in timesteps]
    alphas_prev = [1.0] + alphas[:-1]

    return alphas, alphas_prev

def _get_timestep_embedding(timestep, batch_size, dim=320, max_period=10000):
    half = dim // 2
    freqs = tf.math.exp(
        -math.log(max_period) * tf.range(0, half, dtype=tf.float32) / half
    )
    args = tf.convert_to_tensor([timestep], dtype=tf.float32) * freqs
    embedding = tf.concat([tf.math.cos(args), tf.math.sin(args)], 0)
    embedding = tf.reshape(embedding, [1, -1])
    return tf.repeat(embedding, batch_size, axis=0)


In [18]:
@app.post('/diffusion_model_inference')
def diffusion_inference(input_data : model_input):
    
    inputs = input_data.json()
    input_dictionary = json.loads(inputs)
    context = input_dictionary['context']
    print(context)
    unconditional_context = input_dictionary['unconditional_context']
    print(unconditional_context)
    
    latent = _get_initial_diffusion_noise(batch_size, seed)

    # Iterative reverse diffusion stage
    timesteps = tf.range(1, 1000, 1000 // num_steps)
    alphas, alphas_prev = _get_initial_alphas(timesteps)
    progbar = keras.utils.Progbar(len(timesteps))
    iteration = 0
    for index, timestep in list(enumerate(timesteps))[::-1]:
        latent_prev = latent  # Set aside the previous latent vector
        t_emb = _get_timestep_embedding(timestep, batch_size)
        unconditional_latent = diffusion_model([latent, t_emb, unconditional_context], training = False)
        latent = diffusion_model([latent, t_emb, context], training = False)
        latent = unconditional_latent + unconditional_guidance_scale * (
            latent - unconditional_latent
        )
        a_t, a_prev = alphas[index], alphas_prev[index]
        pred_x0 = (latent_prev - math.sqrt(1 - a_t) * latent) / math.sqrt(a_t)
        latent = latent * math.sqrt(1.0 - a_prev) + math.sqrt(a_prev) * pred_x0
        iteration += 1
        progbar.update(iteration)
    # decoder stage
    print('decoder')
    decoded = decoder_model(latent, training=False)
    decoded = ((decoded + 1) / 2) * 255
    image = np.clip(decoded, 0, 255).astype("uint8")

    diffusion_result = {
        'output':np.array(image).tolist()
        }
    
    return json.dumps(diffusion_result)

In [None]:
ngrok_tunnel = ngrok.connect(8000)
print('Public URL:', ngrok_tunnel.public_url)
nest_asyncio.apply()
uvicorn.run(app, port=8000)