In [None]:
!git clone -b v2 https://github.com/aryanda1/voimo-backend2.git
!mv voimo-backend2/* .
!rm -rf voimo-backend2

In [None]:
from modelSetup import setup_environment,PathSetup,ModelSetup,upscaleSetup,downloadModel,createPaths
from types import SimpleNamespace

setup_environment()

from helpers.model_load import make_linear_decode, load_model, get_model_output_paths

downloadModel()

root = SimpleNamespace(**PathSetup())
root.models_path, root.output_path = get_model_output_paths(root)
root.__dict__.update(ModelSetup())
root.model, root.device = load_model(root, load_on_run_all=True, check_sha256=True, map_location=root.map_location)

upscaleSetup()

In [None]:
openai_api = input('Enter openai Api key')

In [None]:
from fastapi import FastAPI, Request, File, Form, BackgroundTasks
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
import sys
import subprocess,glob,base64,mimetypes
import os

from mainOld import VideoCreator
import time
app = FastAPI()
origins = ["*"]
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

VideoCreator.root = root
VideoCreator.api_key = openai_api
tasks = {}

@app.get('/')
async def home():
  return {'mes':'Welcome!'}


def creating_instance(output_dir, theme, artist, option, upscale,timestring):
    video_creator = VideoCreator(output_dir, theme, artist, int(option), upscale,timestring)
    tasks[timestring]['instance'] = video_creator
    gen_sample_img(timestring)

def gen_sample_img(task_id):
    task = tasks[task_id]
    task['instance'].gen_samples()
    task['status']='samples_created'

def gen_vid(task_id):
    task = tasks[task_id]
    vid_path = task['instance'].create_video_from_music()
    task['status']='video_created'
    task['vid_path'] = vid_path

@app.post("/submit_form")
async def submit_form(background_tasks: BackgroundTasks,audio: bytes = File(...), artist: str = Form(''), theme: str = Form(''), option: str = Form(...), upscale: bool = Form(False)):
    # Save audio to current directory
    timestring = time.strftime('%Y%m%d%H%M%S')
    output_dir = os.path.join(root.output_path,timestring)
    createPaths(output_dir)
    with open(os.path.join(output_dir, "audio.wav"), "wb") as f:
        f.write(audio)

    tasks[timestring] = {'status':'samples_processing'}
    background_tasks.add_task(creating_instance,output_dir,theme,artist,option,upscale,timestring)
    return {"id": timestring}

@app.post('/gen_samples/{task_id}')
async def gen_samples(background_tasks: BackgroundTasks,task_id:str):
    if task_id in tasks and tasks[task_id]['status']!='samples_processing':
        task = tasks[task_id]
        task['status']='samples_processing'
        background_tasks.add_task(gen_sample_img,task_id)
        return {'message':'generating samples'}
    else:
        return {'error':'Task not found'}

@app.post('/gen_video/{task_id}')
async def gen_video(background_tasks: BackgroundTasks,task_id:str):
    if task_id in tasks and tasks[task_id]['status']=='samples_created':
        task = tasks[task_id]
        task['status']='video_processing'
        background_tasks.add_task(gen_vid,task_id)
        return {'message':'generating video'}
    else:
        return {'error':'Task not found'}

@app.post("/get_samples/{task_id}")
async def get_samples(task_id: str):
    if task_id in tasks:
        task = tasks[task_id]
        status = task["status"]
        if status != "samples_processing":
            directory = task['instance'].args.outdir
            img_files = glob.glob(f'{directory}/*.png')
            images = []
            for img_file in img_files:
                media_type, _ = mimetypes.guess_type(img_file)
                with open(img_file, "rb") as file:
                    content = base64.b64encode(file.read()).decode("utf-8")

                images.append({
                    "path": img_file,
                    "media_type": media_type,
                    "content": content
                })
            return images
        else:
            return {"status": status}
    else:
        return {'error':'Task not found'}


@app.post("/get_video/{task_id}")
async def get_video(task_id: str):
    # Retrieve the result of the task from the task dictionary using the task ID
    if task_id in tasks and tasks[task_id]['status'] in ['video_processing','video_created']:
        task = tasks[task_id]
        status = task["status"]
        if status == "video_created":
            video_path = task["vid_path"]
            print(video_path)
            filename = video_path.split('/')[-1]
            headers = {"Content-Disposition": f'attachment; filename="{filename}"'}
            return FileResponse(video_path, headers=headers)
        else:
            return {"status": status}
    else:
        return {"error": "Task not found"}

In [None]:
ngrok_auth = input('Enter Ngrok auth token: ')

!ngrok authtoken $ngrok_auth

In [None]:
import nest_asyncio
from pyngrok import ngrok
import uvicorn

ngrok_tunnel = ngrok.connect(8000)
print('Public URL:', ngrok_tunnel.public_url)
nest_asyncio.apply()
uvicorn.run(app, port=8000, timeout_keep_alive=3600)