In [1]:
import torch
import cProfile
from time import time

In [2]:
a = torch.tensor([0.4, 0.3, 0.1])
a * 0.5

tensor([0.2000, 0.1500, 0.0500])

#### Testing if it is possible to implement main logic of FV scheme using torchscript and if there is any difference in running time

#### Pure PyTorch functions

In [13]:
def flux(rho, gamma):
    return gamma * rho * (1. - rho)

def d_flux(rho, gamma):
    '''
    Return the derivative of flux given density rho

    Should work both for scalar and vector rho 
    '''
    return gamma * (1. - 2.*rho)

def minmod(a1, a2):
    '''
    Limiter that satisfies the TVD requirement
    '''

    return torch.tensor(.5) * (torch.sign(a1) + torch.sign(a2)) * torch.minimum(torch.abs(a1), torch.abs(a2))

def maxmod(a1, a2):
    '''
    Limiter that satisfies the TVD requirement
    '''

    return torch.tensor(.5) * (torch.sign(a1) + torch.sign(a2)) * torch.maximum(torch.abs(a1), torch.abs(a2))

def superbee(a1, a2):
    '''
    Limiter that satisfies the TVD requirement
    '''
    return maxmod(minmod(2.*a2, a1), minmod(a2, 2.*a1))
    
# def MC():
#     pass

def slope(rho, limiter):
    left = rho[:-1]
    right = rho[1:]
    a = right - left
    # match limiter:
    #     case "minmod":
    #         ##############################################
    #         # Divide by dx or not?
    #         ##############################################
    #         return minmod(a[1:], a[:-1]) / dx
    #     case "maxmod":
    #         return maxmod(a[1:], a[:-1]) / dx
    #     case "superbee":
    #         return superbee(a[1:], a[:-1]) / dx
    if limiter == torch.tensor(1.0):
        return minmod(a[1:], a[:-1])
    elif limiter == torch.tensor(2.0):
        return maxmod(a[1:], a[:-1])
    elif limiter == torch.tensor(3.0):
        return superbee(a[1:], a[:-1])
    
def Rusanov_Flux(left, right, gamma):
    s = gamma * torch.max(torch.abs(d_flux(left, 1.)), torch.abs(d_flux(right, 1.)))
    
    # print(f"s: {s}")
    return 0.5*(flux(left, gamma) + flux(right, gamma)) - 0.5 * s * (right - left)

def L(rho, gamma):
    sigma = slope(rho, torch.tensor(1.0))
    left = rho[1:-1] + torch.tensor(.5)  * sigma
    right = rho[1:-1] - torch.tensor(.5) * sigma
    F = Rusanov_Flux(left[:-1], right[1:], gamma)
    L_out = torch.zeros_like(rho)
    L_out[2:-2] = - (F[1:] - F[:-1])
    return L_out

def SSP_RK(rho, gamma):
    # Need also alpha parameter
    rho_ = rho +  L(rho, gamma)
    rho__ = rho_ +  L(rho_,  gamma)
    rho_new = .5 * (rho + rho__)
    return rho_new

#### Equivalent functions in torchschipt

In [16]:
@torch.jit.script
def flux_ts(rho, gamma):
    return gamma * rho * (1. - rho)

@torch.jit.script
def d_flux_ts(rho, gamma):
    '''
    Return the derivative of flux given density rho

    Should work both for scalar and vector rho 
    '''
    return gamma * (1. - 2.*rho)
@torch.jit.script
def minmod_ts(a1, a2):
    '''
    Limiter that satisfies the TVD requirement
    '''

    return .5 * (torch.sign(a1) + torch.sign(a2)) * torch.minimum(torch.abs(a1), torch.abs(a2))

@torch.jit.script 
def maxmod_ts(a1, a2):
    '''
    Limiter that satisfies the TVD requirement
    '''

    return .5 * (torch.sign(a1) + torch.sign(a2)) * torch.maximum(torch.abs(a1), torch.abs(a2))

@torch.jit.script
def superbee_ts(a1, a2):
    '''
    Limiter that satisfies the TVD requirement
    '''
    return maxmod_ts(minmod_ts(2.*a2, a1), minmod_ts(a2, 2.*a1))
    
# def MC():
#     pass

@torch.jit.script
def slope_ts(rho, limiter):
    left = rho[:-1]
    right = rho[1:]
    a = right - left

    # match limiter:
    #     case "minmod":
    #         ##############################################
    #         # Divide by dx or not?
    #         ##############################################
    #         return minmod(a[1:], a[:-1]) / dx
    #     case "maxmod":
    #         return maxmod(a[1:], a[:-1]) / dx
    #     case "superbee":
    #         return superbee(a[1:], a[:-1]) / dx
    if limiter == torch.tensor(1.0):
        return minmod_ts(a[1:], a[:-1])
    elif limiter == torch.tensor(2.0):
        return maxmod_ts(a[1:], a[:-1])
    else:
        return superbee_ts(a[1:], a[:-1])
    

@torch.jit.script
def Rusanov_Flux_ts(left, right, gamma):
    s = gamma * torch.max(torch.abs(d_flux_ts(left, torch.tensor(1.))), 
                          torch.abs(d_flux_ts(right, torch.tensor(1.))))
    
    # print(f"s: {s}")
    return .5*(flux_ts(left, gamma) + flux_ts(right, gamma)) - .5 * s * (right - left)

@torch.jit.script
def L_ts(rho, gamma):
    sigma = slope_ts(rho, torch.tensor(1.0))
    left = rho[1:-1] +  .5 * sigma
    right = rho[1:-1] - .5 * sigma
    F = Rusanov_Flux_ts(left[:-1], right[1:], gamma)
    L_out = torch.zeros_like(rho)
    L_out[2:-2] = - (F[1:] - F[:-1])
    return L_out

@torch.jit.script
def SSP_RK_ts(rho, gamma):
    # Need also alpha parameter
    rho_ = rho +  L_ts(rho, gamma)
    rho__ = rho_ +  L_ts(rho_,  gamma)
    rho_new = .5 * (rho + rho__)
    return rho_new



In [18]:
print("Pure PyTorch:")
start = time()
for _ in range(10000):
    rho = torch.linspace(0, 1, 5000)
    gamma = torch.tensor(1.4, requires_grad=True)
    new_rho = SSP_RK(rho, gamma)
end = time()
print(f"Time taken: {end - start}")


print("TorchScript:")
start = time()
for _ in range(10000):
    rho = torch.linspace(0, 1, 5000)
    gamma = torch.tensor(1.4, requires_grad=True)
    new_rho = SSP_RK_ts(rho, gamma)

end = time()
print(f"Time taken: {end - start}")

Pure PyTorch:
Time taken: 12.308112859725952
TorchScript:
Time taken: 9.265597581863403


In [6]:
# Slight speedup...