# Video Generation Script

This script takes input from [NorthCOVID-19](https://covid.datalab.science/) (see model [here](https://covid.datalab.science/model)) and produces a video animation (see sample_video.mp4) of the results. The file "sample_output.csv" is an example of what the output will look like, "sample_parameters.json" is an example saved file of the parameters used, and "sample_results.json" is an example of the results output to the website from the simulation.

In [None]:
# First, take care of the dependencies
!pip install moviepy
!pip install opencv-python
!pip install gdown
!pip install n2w

import gdown
import sys
import warnings
import os
from os.path import exists, join, expanduser

####################################################
# IF YOU ARE RUNNING THIS LOCALLY:
# Ensure 'PyTorch' version '>=1.5.1' is installed on your system
# If using Anaconda, do "conda install -c pytorch pytorch"
#
# If on Windows, install ''ffmpeg' and have it on your PATH
# https://ffmpeg.zeranoe.com/builds/
#
# If on Windows, install 'ImageMagick' and have it on your PATH
# https://imagemagick.org/script/download.php
####################################################

# Note: The lines below this will install ImageMagick on Linux (as well as Colab)
#       so ignore any errors if you're not on Linux.
!apt update &> /dev/null
!apt install imagemagick &> /dev/null
!apt install ffmpeg &> /dev/null
!pip3 install moviepy[optional] &> /dev/null
!pip3 install scipy==1.1.0 &> /dev/null
!sed -i '/<policy domain="path" rights="none" pattern="@\*"/d' /etc/ImageMagick-6/policy.xml

#########################################################
# Taken from sample notebooks provided by these authors
# https://github.com/tugstugi/pytorch-dc-tts
project_name = "pytorch-dc-tts"
if not exists(project_name):
    !git clone --quiet https://github.com/tugstugi/{project_name}
    !cd {project_name} && pip install -q -r requirements.txt

if not exists("ljspeech-text2mel.pth"):
    !gdown --id 1rXMp5wFAs51o-6GD5vomucnV2sjwHHDd --output ljspeech-text2mel.pth

if not exists("ljspeech-ssrn.pth"):
    !gdown --id 1gMwPSGM-Ci4MbtoOzaSRsdu-oSsBKH2i --output ljspeech-ssrn.pth

sys.path.append(project_name)
warnings.filterwarnings("ignore")
#########################################################

In [None]:
# Next, take care of the imports
import numpy as np
import pandas as pd
import n2w

import matplotlib
import matplotlib.pyplot as plt

import matplotlib.animation as animation
import moviepy.editor as mp
import cv2

from pathlib import Path
import glob
import math

import torch
from tqdm import *
from hparams import HParams as hp
from audio import save_to_wav
from models import Text2Mel, SSRN
from datasets.lj_speech import vocab, idx2char, get_test_data

In [None]:
# Read the output data
try:
    input_data = pd.read_csv('sample_output.csv')
except:
    # We're on colab
    !gdown --id 1m5Q7jTq4tVl4qW1MxRF0wO9lI-gl5p5w --output sample_output.csv
    !gdown --id 15N-sT4h687aOCsqDyvAwPJRzMttjxk7s --output sample_parameters.json
    !gdown --id 1_bKvB89svR5R2Fme-K_cSxoXWfPRkyRG --output sample_results.json
    !gdown --id 12V5q8T7vEzkLxxBojDIn3uxjrVqPw36B --output plain.jpg

    input_data = pd.read_csv('sample_output.csv')


# Read the parameters data
parameter_data = pd.read_json('sample_parameters.json', typ='series')

# Read the results data
results_data = pd.read_json('sample_results.json', typ='series')


# Create/clear the working directory
Path("output/working/").mkdir(parents=True, exist_ok=True)

files = glob.glob('output/*')
for f in files:
    try:
        os.remove(f)
    except:
        continue

files = glob.glob('output/working/*')
for f in files:
    try:
        os.remove(f)
    except:
        continue
        
# Output the important information we've read
max_day = int(math.ceil(results_data['infectionEndDays'] / 10.0) * 10)
print("-----INFECTION RELATED PARAMETERS-----")
print("The infectivity of the infection was: " + str(parameter_data['T']))
print("The contact rate of the infection was set to: " + str(parameter_data['C']))
print("The number of days it takes to recover from the infection: " + str(parameter_data['NU']))

print("\n-----POPULATION RELATED PARAMETERS-----")
print("The total number of individuals was: " +  str(parameter_data['N']))
print("The number of initially infected individuals was: " + str(parameter_data['NI']))

print("\n-----HOSPITAL RELATED PARAMETERS-----")
print("The maximum capacity of the ICU was set to: " + str(parameter_data['NC']))

print("\n-----FINAL DAY OUTPUTS-----")
print("The number of days it took the ICU to first reach peak capacity was: " + str(results_data['icuFullDays']))
print("The number of days it took for the infection to end was: " + str(results_data['infectionEndDays']))

print("\n-----FINAL COUNT OUTPUTS-----")
print("The total number of individuals that became infected was: " + str(results_data['totalInfected']))
print("The total number of individuals that died was: " + str(results_data['totalDead']))

In [None]:
# Create the introductory slide using matplotlib [Preetkumar and Neel]
videoSize = (1280, 720)
mainImage = cv2.imread('plain.jpg')
mainImage = cv2.resize(mainImage, videoSize)
centerImage = mainImage.copy()

# First Slide
font = cv2.FONT_HERSHEY_DUPLEX
fontScale = 2
text = "Visualization of NorthCOVID-19"
textsize = cv2.getTextSize(text, font, fontScale, 2)[0]
textX = int((centerImage.shape[1] - textsize[0]) / 2)
textY = int((centerImage.shape[0] + textsize[1]) / 2)
cv2.putText(centerImage, text, (textX, textY), font, fontScale, (0, 0, 0), 2)

# Second Slide
i=0
TitleText = ["The results in this video are based","on the following parameters:"]
for eachText in TitleText:
    fontType = cv2.FONT_HERSHEY_DUPLEX
    fontScale = 1.8
    eachSize = cv2.getTextSize(eachText, fontType, fontScale,2)[0]
    X = int((mainImage.shape[1] - eachSize[0])/2)
    cv2.putText(mainImage,eachText,(X,130+i),fontType, fontScale, (0,0,0),2,cv2.LINE_AA)
    i += 60

textArray = ["Initial population: " + format(int(parameter_data['N']), ","),
             "Initial infected: " + format(int(parameter_data['NI']), ","),
             "ICU capacity: " + format(int(parameter_data['NC']), ","),
             "Contact rate: " + str(round(parameter_data['C'], 1)),
             "Infectivity: " + str(round(100 * parameter_data['T'], 1)) + "%",
             "Illness duration: " + str(round(parameter_data['NU'], 1)) + " days"]
i=0
for eachText in textArray:
    fontType = cv2.FONT_HERSHEY_DUPLEX
    fontScale = 1.6
    eachSize = cv2.getTextSize(eachText, fontType, fontScale,2)[0]
    X = int((mainImage.shape[1] - eachSize[0])/2)
    cv2.putText(mainImage,eachText,(X,290+i),fontType, fontScale, (60,60,60),2,cv2.LINE_AA)
    i += 80

#Creating video writer.
out = cv2.VideoWriter('output/working/Introduction.mp4',cv2.VideoWriter_fourcc(*'MP4V'), 20, videoSize)

# For 1 second, take 20.
for i in range(60):
    out.write(centerImage)
for i in range(180):
    out.write(mainImage)
out.release()

In [None]:
# Create the initial numbers slide ('Infectious' column) [Preetkumar and Neel]
x = np.arange(0, results_data['infectionEndDays'] + 0.1, 0.1)
y = np.array(input_data[['Infectious']]).flatten()[:x.shape[0]]

fig, ax = plt.subplots(figsize=(1280/96, 720/96), dpi=96)

plt.xlabel('Day', color = '#072b57')
plt.ylabel('Individuals', color = '#072b57')
plt.title('Infectious Individuals', color = '#072b57')

line, = ax.plot(x,y,label='Infectious', color = '#FFA500')
plt.legend(loc='upper left')
ax.margins(0.05)
def init():  # only required for blitting to give a clean slate.
    line.set_data([], [])
    return line,

def animate(i):
    xdata = x[0:int(i*2) + 30]
    ydata = y[0:int(i*2) + 30]
    line.set_data(xdata, ydata)
    return line,

ani = animation.FuncAnimation(fig, animate, init_func=init,frames=(max_day * 5) + 90,interval=200)
writer = animation.FFMpegWriter(fps=30, metadata=dict(artist='Me'), bitrate=1800)
ani.save("output/working/Initial_Slide.mp4", writer=writer)

In [None]:
# Create the hospital slide ('Hospitalized', 'ICU', and 'Ward' columns) [Vinit and Parth]
x = np.arange(0, results_data['infectionEndDays'] + 0.1, 0.1)
y2 = np.array(input_data[['ICU']]).flatten()[:x.shape[0]]
y3 = np.array(input_data[['Ward']]).flatten()[:x.shape[0]]

fig, ax = plt.subplots(figsize=(1280/96, 720/96), dpi=96)

plt.xlabel('Day', color = '#072b57')
plt.ylabel('Individuals', color = '#072b57')
plt.title('Individuals in the Hospital', color = '#072b57')

line1, = ax.plot(x,y2,label='ICU', color = '#FF0000')
line2, = ax.plot(x,y3,label='Ward', color = '#800080')
plt.legend(loc='upper left')
ax.margins(0.05)
def init():  # only required for blitting to give a clean slate.
    line.set_data([], [])
    return line,

def animate(i):
    xdata = x[0:int(i*2) + 30]
    y2data = y2[0:int(i*2) + 30]
    y3data = y3[0:int(i*2) + 30]
    line1.set_data(xdata, y2data)
    line2.set_data(xdata, y3data)
    return line,

ani = animation.FuncAnimation(fig, animate, init_func=init,frames=(max_day * 5) + 90,interval=200)
writer = animation.FFMpegWriter(fps=30, metadata=dict(artist='Me'), bitrate=1800)
ani.save("output/working/Hospital_Slide.mp4", writer=writer)

In [None]:
# Create the deaths slide ('Recovered' and 'Death' columns) [Vaibhav and Pruthvi]
x = np.arange(0, results_data['infectionEndDays'] + 0.1, 0.1)
y2 = np.array(input_data[['Recovered']]).flatten()[:x.shape[0]]
y3 = np.array(input_data[['Death']]).flatten()[:x.shape[0]]

fig, ax = plt.subplots(figsize=(1280/96, 720/96), dpi=96)

plt.xlabel('Day', color = '#072b57')
plt.ylabel('Individuals', color = '#072b57')
plt.title('Recovered and Death Counts', color = '#072b57')

line1, = ax.plot(x,y2,label='Recovered', color = '#008000')
line2, = ax.plot(x,y3,label='Deaths', color = '#000000')
plt.legend(loc='upper left')
ax.margins(0.05)
def init():  # only required for blitting to give a clean slate.
    line.set_data([], [])
    return line,

def animate(i):
    xdata = x[0:int(i*2) + 30]
    y2data = y2[0:int(i*2) + 30]
    y3data = y3[0:int(i*2) + 30]
    line1.set_data(xdata, y2data)
    line2.set_data(xdata, y3data)
    return line,

ani = animation.FuncAnimation(fig, animate, init_func=init,frames=(max_day * 5) + 90,interval=200)
writer = animation.FFMpegWriter(fps=30, metadata=dict(artist='Me'), bitrate=1800)
ani.save("output/working/Deaths_Slide.mp4", writer=writer)

In [None]:
# Create the conclusion slide ('Hospitalized', 'ICU', 'Ward', 'Recovered', 'Death')
fig = plt.figure(figsize=(1280/96, 720/96), dpi=96)

position = np.arange(6) + .5 

plt.tick_params(axis = 'x', colors = '#072b57')
plt.tick_params(axis = 'y', colors = '#072b57')

max_heights = [results_data['totalInfected'],
               math.ceil(input_data['ICU'].max() + 1), 
               input_data['Ward'].max(),
               input_data['Recovered'].max(), 
               input_data['Death'].max()]
max_peak = max(max_heights)

speeds = [results_data['totalInfected'] / 120, 
          math.ceil(input_data['ICU'].max() + 1) / 120,
          input_data['Ward'].max() / 120,
          input_data['Recovered'].max() / 120,
          input_data['Death'].max() / 120]
heights = [0, 0, 0, 0, 0, 0]

rects = plt.bar(position, heights, align = 'center', width = 0.5, color=['#FFA500', '#FF0000', '#800080', '#008000', '#000000']) 
plt.xticks(position, ('Infected', 'ICU Peak', 'Ward Peak', 'Recovered', 'Deaths'))

plt.xlabel('Total Counts', color = '#072b57')
plt.ylabel('Individuals', color = '#072b57')
plt.title('Summarized Results After the Infection Ended in ' + format(int(results_data['infectionEndDays']), ",") + " Days", color = '#072b57')

rounded_peak = int(math.ceil(max(max_heights) / 1000.0)) * 1000
plt.ylim((0, rounded_peak))
plt.xlim((0,5))

rs = [r for r in rects]

def init():
    return rs

def animate(i):
    global rs, heights

    if all(map(lambda x: x==1, heights)):
        heights = [0, 0, 0, 0, 0, 0]
    else:
        heights = [min(h+s,mh) for h,s,mh in zip(heights,speeds,max_heights)]

    for h,r in zip(heights,rs):
        r.set_height(h)
    return rs

ani = animation.FuncAnimation(fig, animate, frames=(max_day * 2) + 450,interval=200, blit=True)
writer = animation.FFMpegWriter(fps=30, metadata=dict(artist='Me'), bitrate=1800)
ani.save("output/working/Conclusion_Slide.mp4", writer=writer)

In [None]:
# Define a method to create audio files from text
# Adapted from sample notebooks provided by these authors
# https://github.com/tugstugi/pytorch-dc-tts
def text_to_wav(model, ssrn, sentence_set, filename):
    count = 0
    file_set = []
    for sentence in sentence_set:
        sentence += "."
        normalized_sentence = "".join([c if c.lower() in vocab else '' for c in sentence])
        sentences = [normalized_sentence]
        max_N = len(normalized_sentence)
        L = torch.from_numpy(get_test_data(sentences, max_N))
        zeros = torch.from_numpy(np.zeros((1, hp.n_mels, 1), np.float32))
        Y = zeros
        A = None

        for t in range(hp.max_T):
            try:
                _, Y_t, A = model(L, Y, monotonic_attention=False)
            except:
                _, Y_t, A = model(L.long(), Y, monotonic_attention=False)
                
            Y = torch.cat((zeros, Y_t), -1)
            _, attention = torch.max(A[0, :, -1], 0)
            attention = attention.item()
            if L[0, attention] == vocab.index('E'):  # EOS
                break

        _, Z = ssrn(Y)
      
        Z = Z.cpu().detach().numpy()

        tmp_name = "output/working/" + filename + "_" + str(count) + ".wav"
        save_to_wav(Z[0, :, :].T, tmp_name)

        tmp_audio = mp.AudioFileClip(tmp_name)
        file_set.append(tmp_audio)
        count += 1
    
    # Concat all of the audio files into one audio
    final_clip = mp.concatenate_audioclips(file_set)
    final_clip.write_audiofile('output/working/' + filename + ".mp3")

In [None]:
# Initialize the pytorch model for text-to-speech processing
# Adapted from sample notebooks provided by these authors
# https://github.com/tugstugi/pytorch-dc-tts

# Make sure the pre-trained models are downloaded
if not exists("ljspeech-text2mel.pth"):
    !gdown --id 1rXMp5wFAs51o-6GD5vomucnV2sjwHHDd --output ljspeech-text2mel.pth

if not exists("ljspeech-ssrn.pth"):
    !gdown --id 1gMwPSGM-Ci4MbtoOzaSRsdu-oSsBKH2i --output ljspeech-ssrn.pth

# Initialize the models
torch.set_grad_enabled(False)
model = Text2Mel(vocab)
model.load_state_dict(torch.load("ljspeech-text2mel.pth").state_dict())
model = model.eval()

ssrn = SSRN()
ssrn.load_state_dict(torch.load("ljspeech-ssrn.pth").state_dict())
ssrn = ssrn.eval()

In [None]:
# Edit the initial slide video
clip = mp.VideoFileClip('output/working/Initial_Slide.mp4').crossfadein(1).crossfadeout(1)

text = mp.TextClip("Total Infected:\n" + format(int(results_data['totalInfected']), ","), fontsize=28).set_pos((850, 175)).set_start(clip.duration - 7).set_duration(7).crossfadein(1)

initial_slide = mp.CompositeVideoClip([clip, text])
initial_slide.set_fps(30)
initial_slide.write_videofile('output/working/Initial-Slide.mp4')
initial_slide = mp.VideoFileClip('output/working/Initial-Slide.mp4')

# Add audio to the video
text = ["This scenario started with " + n2w.convert(int(parameter_data['NI'])) + " infected individual" + ("s" if int(parameter_data['NI']) > 1 else ""),
        "Once the infection ended after " + n2w.convert(int(results_data['infectionEndDays'])) + " days, there was a total of " + n2w.convert(int(results_data['totalInfected'])) + " individuals that had become infected"]
text_to_wav(model, ssrn, text, "Initial")

audio_file = mp.AudioFileClip("output/working/Initial.mp3")
initial_slide = initial_slide.set_audio(audio_file)

In [None]:
# Edit the hospital slide video
clip = mp.VideoFileClip('output/working/Hospital_Slide.mp4').crossfadein(1).crossfadeout(1)

text = mp.TextClip("ICU Peak:\n" + format(math.ceil(input_data['ICU'].max() + 1), ",") + "\n\nWard Peak:\n" + format(int(input_data['Ward'].max()), ","), fontsize=28).set_pos((850, 175)).set_start(clip.duration - 7).set_duration(7).crossfadein(1)

hospital_slide = mp.CompositeVideoClip([clip, text])
hospital_slide.set_fps(30)
hospital_slide.write_videofile('output/working/Hospital-Slide.mp4')
hospital_slide = mp.VideoFileClip('output/working/Hospital-Slide.mp4')

# Add audio to the video
text = ["Over the course of the infection, the I see you capacity peaked at " + n2w.convert(math.ceil(input_data['ICU'].max() + 1)) + " individuals while the ward peaked at " + n2w.convert(int(input_data['Ward'].max())) + " individuals",
        "The I see you limit in this scenario was set to " + n2w.convert(int(parameter_data['NC'])) + " individuals"]
text_to_wav(model, ssrn, text, "Hospital")

audio_file = mp.AudioFileClip("output/working/Hospital.mp3")
hospital_slide = hospital_slide.set_audio(audio_file)

In [None]:
# Edit the deaths slide video
clip = mp.VideoFileClip('output/working/Deaths_Slide.mp4').crossfadein(1).crossfadeout(1)

text = mp.TextClip("Total Recovered:\n" + format(int(input_data['Recovered'].max()), ",") + "\n\nTotal Deaths:\n" + format(int(input_data['Death'].max()), ","), fontsize=28).set_pos((850, 175)).set_start(clip.duration - 7).set_duration(7).crossfadein(1)

deaths_slide = mp.CompositeVideoClip([clip, text])
deaths_slide.set_fps(30)
deaths_slide.write_videofile('output/working/Deaths-Slide.mp4')
deaths_slide = mp.VideoFileClip('output/working/Deaths-Slide.mp4')

# Add audio to the video
text = ["Out of the " + n2w.convert(int(results_data['totalInfected'])) + " individuals that became infected",
        "A total of " + n2w.convert(int(input_data['Recovered'].max())) + " individuals recovered from the infection while " + n2w.convert(int(input_data['Death'].max())) + " deaths were recorded"]
text_to_wav(model, ssrn, text, "Deaths")

audio_file = mp.AudioFileClip("output/working/Deaths.mp3")
deaths_slide = deaths_slide.set_audio(audio_file)

In [None]:
# Add audio to the introduction slide
introduction_slide = mp.VideoFileClip('output/working/Introduction.mp4')

text = ["Visualization of North COVID nine teen",
         "The results in this viddeo are based on the following parameters",
         "an initial population of " + n2w.convert(int(parameter_data['N'])),
         "Initial infected of " + n2w.convert(int(parameter_data['NI'])),
         "I see you capacity of " + n2w.convert(int(parameter_data['NC'])),
         "Contact rate of " + n2w.convert(int(parameter_data['C'])) + " point " + n2w.convert(int(10 * (round(parameter_data['C'], 1) - int(parameter_data['C'])))),
         "an infectivity of " + n2w.convert(int(100 * parameter_data['T'])) + " point " + n2w.convert(int(10 * (round(100 * parameter_data['T'], 1) - int(100 * parameter_data['T'])))) + " percent",
         "and an illness duration of " + n2w.convert(int(parameter_data['NU'])) + " point " + n2w.convert(int(10 * (round(parameter_data['NU'], 1) - int(parameter_data['NU'])))) + " days"]
text_to_wav(model, ssrn, text, "Introduction")

audio_file = mp.AudioFileClip("output/working/Introduction.mp3")
introduction_slide = introduction_slide.set_audio(audio_file)
introduction_slide = introduction_slide.set_duration(audio_file.duration)
introduction_slide = introduction_slide.crossfadein(1).crossfadeout(1)

In [None]:
# Add audio to the conclusion slide
conclusion_slide = mp.VideoFileClip('output/working/Conclusion_Slide.mp4').crossfadein(1).crossfadeout(3)

text = ["After the infection ended in " + n2w.convert(int(results_data['infectionEndDays'])) + " days the total infected peaked at " + n2w.convert(int(results_data['totalInfected'])),
         "the I see you peaked at " + n2w.convert(math.ceil(input_data['ICU'].max() + 1)),
         "the ward peaked at " + n2w.convert(int(input_data['Ward'].max())),
         "the recovered peaked at " + n2w.convert(int(input_data['Recovered'].max())),
         "and the deaths peaked at " + n2w.convert(int(input_data['Death'].max()))]
text_to_wav(model, ssrn, text, "Conclusion")

audio_file = mp.AudioFileClip("output/working/Conclusion.mp3")
conclusion_slide = conclusion_slide.set_audio(audio_file)

In [None]:
# Combine all videos and render the final product
clips = [introduction_slide,
         initial_slide,
         hospital_slide,
         deaths_slide,
         conclusion_slide]

final_clip = mp.concatenate_videoclips(clips, method="compose")
final_clip.set_fps(30)
final_clip.write_videofile('output/Final_Result.mp4')

In [None]:
# Clear the working folder
files = glob.glob('output/working/*')
for f in files:
    try:
        os.remove(f)
    except:
        continue