# NASA template running the Ocean datasets from cloud(SealStorage)
### Total size: ~80 TB compressed

### 10365 timesteps, 90 depths


In [1]:
import sys
import os
from OpenVisus import *
import numpy as np
import matplotlib
from matplotlib import pyplot as plt
import matplotlib.image as mpimg
from matplotlib.animation import FuncAnimation
from ipywidgets import *
import matplotlib.animation as animation
import concurrent.futures
import threading
plt.rcParams['animation.ffmpeg_path'] = '/home/sci/aashishp/anaconda3/lib/python3.9/site-packages/ffmpeg'

%matplotlib widget
from IPython.core.display import display, HTML

vmin = -1
vmax = 1


  from IPython.core.display import display, HTML


In [2]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
display(HTML('<style>.custom-slider .bk-input-group {height: 400px;}</style>'))
display(HTML('<style>.small-custom-slider .bk-input-group {height: 200px;}</style>'))

#slider = pnw.FloatSlider(start=-0.5, end=0.5, value=0,  orientation='vertical', css_classes=["custom-slider"] )

In [3]:
import numpy as np

from bokeh.layouts import row, column, gridplot
from bokeh.models import ColumnDataSource
from bokeh.models.widgets import Slider, TextInput, Button
from bokeh.plotting import figure, output_notebook, show

output_notebook()

In [4]:
def blockPrinting(func):
    def func_wrapper(*args, **kwargs):
        # block all printing to the console
        sys.stdout = open(os.devnull, 'w')
        # call the method in question
        value = func(*args, **kwargs)
        # enable all printing to the console
        sys.stdout = sys.__stdout__
        # pass the return value of the method back
        return value

    return func_wrapper

In [5]:
def Assert(cond):
    if not cond:
        raise Exception("Assert failed")

class CachedDataset(PyDataset):
    
    # constructor
    def __init__(self, args):
        self.local_filename=os.path.abspath(args["local"]).replace("\\","/")
        self.remote_url=args["url"]
        self.remote_access_type = args["access"]
        self.description=args["description"]
        
        super().__init__(LoadDatasetCpp(self.remote_url))
        
        self.num_blocks = len(self.getFields()) * self.getTotalNumberOfBlocks() * len(self.getTimesteps())
        self.num_blocks_cached = 0

        self.stop_thread=False
        self.thread=None
        
        self.progress=None
        self.progress_display=None
        
    def __del__(self):
        self.stopCaching()   
        
    # createAccess
    def createAccess(self, ):
        
        access_config="""
            <access type='multiplex'>
                    <access type='disk' chmod='rw' url='file://{}' />
                    <access type='{}' url='{}' chmod="r" /> 
            </access>  
        """.format(
            self.local_filename.replace("&","&amp;"),
            self.remote_access_type,
            self.remote_url.replace("&","&amp;")) 
        
        # print("Creating access",access_config)

        access= self.createAccessForBlockQuery(StringTree.fromString(access_config))

        # at this point the cache is enabled with the new local idx file
        Assert(os.path.isfile(self.local_filename))

        return access   

    # startCaching
    def startCaching(self, background=True):
        
        if background:
            self.thread = threading.Thread(target=self.startCaching, args=(False,))
            self.stop_thread=False
            self.thread.start()        
            return 

        #print("start caching","...")
        
        access=self.createAccess()

        access.beginRead()
        
        for field in self.getFields():
            for blockid in range(self.getTotalNumberOfBlocks()): 
                for time in self.getTimesteps().asVector():
                    # print("Copying block","time",time,"field",field,"blockid",blockid,"...")
                    buffer =  self.readBlock(blockid, field=field, time=time, access=access)
                    
                     # to debug missing blocks
                    if  False and buffer is None :
                        read_block = db.createBlockQuery(blockid, ord('r'))
                        msg="# {} {} \n".format(blockid,read_block.getLogicBox().toString())
                        os.write(1, bytes(msg,'utf-8'))                   
                    
                    self.num_blocks_cached += 1
                    self.updateProgress()
                    if self.stop_thread:
                        # print("thread stopped")
                        access.endRead()
                        return
                        
        access.endRead()
        self.thread=None
        #print("caching finished done")
        
    # stopCaching
    def stopCaching(self):
        #print("stopping caching...")
        self.stop_thread=True
        if self.thread:
            self.thread.join()
            self.thread=None
    # getWidth
    def getWidth(self):
        p2=self.getLogicBox()[1]
        return p2[0]    
        
    # getHeight
    def getHeight(self):
        p2=self.getLogicBox()[1]
        return p2[1]   
        
    # getDepth
    def getDepth(self):
        
        p2=self.getLogicBox()[1]
        return p2[2]  
        
    # readSlice
    def readSlice(self,dir=0, slice=0,quality=-4, time=1, access=None):
        
        W,H,D=self.getWidth(), self.getHeight(), self.getDepth()
        x=[0,W] if dir!=0 else [slice,slice+1]
        y=[0,H] if dir!=1 else [slice,slice+1]
        z=[0,D] if dir!=2 else [slice,slice+1] 
        ret=self.read(x=x, y=y,z=z, quality=quality,time=time,access=access)
        
        width,height=[value for value in ret.shape if value>1]
        return ret.reshape([width,height])
        
    # readColumn
    def readXYColumn(self,Height, Depth,quality=-3, time=1, access=None):
        W,H,D=self.getWidth(), self.getHeight(), self.getDepth()
        x=[0,W]
        y=[Height,Height+1]
        z=[Depth ,Depth +1] 
        ret=self.read(x=x, y=y,z=z, quality=quality,time=time,access=access)
        #print(">",ret.shape)
        width=[value for value in ret.shape if value>1]
        return ret
        
    # setProgress
    def setProgress(self,progress, progress_display):
        self.progress=progress
        self.progress_display=progress_display   
        self.progress.min=0
        self.progress.max =self.num_blocks       

    # updateProgress
    def updateProgress(self):
                    
        if self.progress:
            self.progress.value = self.num_blocks_cached

        if self.progress_display:
            self.progress_display.value = (
                "Caching progress %.2f%% (%d/%d)" % (
                    100 * self.num_blocks_cached/self.num_blocks, 
                    self.num_blocks_cached,
                    self.num_blocks))                    

print("Utilities defined")

Utilities defined


In [6]:
NasaOceanZone = []

 
NasaOceanlocal_cache="./visus-cache/llc2160/visus.idx"
NasaOcean =    {
    "url":"https://maritime.sealstorage.io/api/v0/s3/utah/nasa/dyamond/mit_output/llc2160_arco/visus.idx?access_key=AKIAQXOZFVQ7DTRF43AC&secret_key=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&endpoint_url=https://maritime.sealstorage.io/api/v0/s3",
    "access":"cloud",
        
        "local": NasaOceanlocal_cache,
        "description":'SealStorage Cloud'
        }



NasaOceandb=CachedDataset(NasaOcean )
NasaOceanaccess=NasaOceandb.createAccess()

NasaOceanZone.append([NasaOceandb, NasaOceanaccess])

NasaOceanZone

[[<__main__.CachedDataset at 0x7fc581696a60>,
  <VisusDbPy.Access; proxy of <Swig Object of type 'std::shared_ptr< Visus::Access > *' at 0x7fc5816967e0> >]]

In [7]:
NasaOceanCurrentZone=0
 
@blockPrinting
def getHorizontalImage(depth,time,db=NasaOceanZone[NasaOceanCurrentZone][0], access=NasaOceanZone[NasaOceanCurrentZone][1]): 
    global NasaOceanZone, NasaOceanCurrentZone
    
    db=NasaOceanZone[NasaOceanCurrentZone][0] 
    access=NasaOceanZone[NasaOceanCurrentZone][1]
    return db.readSlice(dir=2, slice=(depth//2)*2,access=access,time=time, quality=-8)


In [8]:
import colorsys
import matplotlib
from bokeh.models import LinearColorMapper, BasicTicker, ColorBar
from bokeh.palettes import *


slice = NasaOceandb.getXYSlice(resolution=-4 ,resample_output=False)

myPalette = linear_palette(Reds256, 128) + linear_palette(Blues256 [::-1], 128)
my_cmap1 = LinearColorMapper(palette=myPalette, low=np.min(slice), high=np.max(slice))
# # my_cmap1 = LinearColorMapper(palette="Turbo256", low=np.min(slice), high=np.max(slice))
# # mpl.colormaps[name]
# #my_cmap1 = LinearColorMapper(palette="seismic", low=np.min(slice), high=np.max(slice))


In [9]:
import numpy as np
from bokeh.plotting import figure, show


NasaOceanCurrentZone = 0
#NasaOceanZone

N = 500
x = np.linspace(0, 5, N)
y = np.linspace(0, 10, N)
xx, yy = np.meshgrid(x, y)
d = np.sin(xx)*np.cos(yy)
#myImage.data_source.data  = {"image" :[d]}
counter = 0

# Here should be the code that returns an image2
def getImage(xDim,yDim):
    N = 500
    x = np.linspace(0, xDim, N)
    y = np.linspace(0, yDim, N)
    xx, yy = np.meshgrid(x, y)
    d = np.sin(xx)*np.cos(yy)
    return d
    
# def getHorizontalImage(depth,time,db=NasaOceanZone[NasaOceanCurrentZone][0], access=NasaOceanZone[NasaOceanCurrentZone][1]): 
#     global NasaOceanZone, NasaOceanCurrentZone
#     db=NasaOceanZone[NasaOceanCurrentZone][0] 
#     access=NasaOceanZone[NasaOceanCurrentZone][1]
#     return db.readSlice(dir=2, slice=(depth//2)*2,access=access,time=time, quality=-2)
 
    
dbWidth  = NasaOceandb.getWidth()
dbHeight = NasaOceandb.getHeight()
dbDepth  = NasaOceandb.getDepth()
needsRedraw = True
needToUpdatePanel = True

timeLatDepth = 1
timeLatUpdate = False

timeDelta = 1

def modify_doc(doc):
    global needToUpdatePanel, timeLonDepth, timeLatDepth, timeDelta, NasaOceanCurrentZone


    # Set up data
    wDim = 360
    hDim = 180
    xMin = 0
    xMax = 8640
    yMin = 0
    yMax = 6480 
    yDim = 20
    scale = 0.5
    wDim = int((xMax-xMin)*scale)
    hDim = int((yMax-yMin)*scale)
    needToUpdatePanel = 450
    N = 200
    x = np.linspace(0, 4*np.pi, N)
    y = np.sin(x)
    source = ColumnDataSource(data=dict(x=x, y=y))

    # Set up plot
    plot = figure( height=wDim,  width=wDim, 
                  title="Ocean Visualization",
                  x_axis_label='Longitude',
                  y_axis_label='Latitude',
                  x_range=[0, 8640 ], y_range=[0, 6480 ], toolbar_location=None)
    color_bar = ColorBar(color_mapper=my_cmap1, ticker= BasicTicker(),
                         location=(0,0))    
    plot.add_layout(color_bar, 'right')

    plot.xgrid[0].grid_line_color=None
    plot.ygrid[0].grid_line_color=None
    
    plotLat  = figure( height=wDim,  width=wDim, 
                      x_axis_label='longitude',
                      y_axis_label='latitude',
                       toolbar_location=None)
    plotLat.xgrid[0].grid_line_color=None
    plotLat.ygrid[0].grid_line_color=None

# color_bar = ColorBar(color_mapper=color_mapper, ticker= BasicTicker(),
#                      location=(0,0))
    
    
    #theImage = plot.image(image=[d], x=xMin, y=yMin, dw=xMax-xMin, dh=yMax-yMin, palette="Turbo256", level="image")
    d =  getHorizontalImage(0,0,db=NasaOceanZone[NasaOceanCurrentZone][0], access=NasaOceanZone[NasaOceanCurrentZone][1])
    theImage = plot.image(image=[d], x=0, y=0, dw=8640 , dh=6480 , color_mapper =my_cmap1, level="image")
    
    


 

    #d = getHorizontalImage(20,8)
   

    line_color1="#f46d43"
    line_color="#ffffff"
    line_width=3
    line_width2=1
    xSource = ColumnDataSource()
    ySource = ColumnDataSource()
    plot.line('x', 'y', source=xSource, line_width=line_width,line_color=line_color)
    plot.line('x', 'y', source=ySource, line_width=line_width,line_color=line_color)
    plot.line('x', 'y', source=xSource, line_width=line_width2,line_color=line_color1)
    plot.line('x', 'y', source=ySource, line_width=line_width2,line_color=line_color1)
    latxSource = ColumnDataSource()
    latySource = ColumnDataSource()
    plotLat.line('x', 'y', source=latxSource, line_width=line_width,line_color=line_color)
    plotLat.line('x', 'y', source=latySource, line_width=line_width,line_color=line_color)
    plotLat.line('x', 'y', source=latxSource, line_width=line_width2,line_color=line_color1)
    plotLat.line('x', 'y', source=latySource, line_width=line_width2,line_color=line_color1)
    
    e = getHorizontalImage(0,0,db=NasaOceanZone[NasaOceanCurrentZone][0], access=NasaOceanZone[NasaOceanCurrentZone][1])
    theImageLat = plotLat.image(image=[e], 
                                x=0, y=0, dw=8640 , dh= 6480 , color_mapper = my_cmap1, level="image")


    # orientation="vertical"
    width  = 250
    
    #display(HTML(" <style>.custom-slider .bk-input-group {height: " +str(int(wDim*.9)) +"px;}</style>" ))

    panelSize = Slider(title="size of visualization panel below", value=1400, start=0, end=2800, step=1)

    xPosition = Slider(title="Long", value=500, start=0, end=8640-1, step=1,width=width,
                       css_classes=["custom-slider"])
    yPosition = Slider(title="Lat", value=500, start=0, end=6480-1, step=1, width=width,
                       css_classes=["custom-slider"])

    depth = Slider(title="Depth", value=0, start=0, end=dbDepth-1, step=1,width=width,
                       css_classes=["custom-slider"] )
    timeStep = Slider(title="Time", value=1, start=1, end=10365, step=1, width=width,
                       css_classes=["custom-slider"] )
    
    
  
    
    def setValues():
        xSource.data = dict(x=[xPosition.value,xPosition.value], y=[yMin, yMax])
        ySource.data = dict(x=[xMin, xMax], y=[yPosition.value,yPosition.value])
        # latxSource.data = dict(x=[yPosition.value, yPosition.value], y=[0,2000])
        # latySource.data = dict(x=[0,2000], y=[depth.value, depth.value])

        
    setValues()

    

    def update_panel(forceUpdate = False):
        global needToUpdatePanel
        if needToUpdatePanel or forceUpdate:
            plot.width      = needToUpdatePanel
            plot.height     = needToUpdatePanel
            plotLat. width   = needToUpdatePanel-10
            plotLat. height  = needToUpdatePanel-10
            
 
            #xPosition.height     = needToUpdatePanel
            display(HTML(" <style>.custom-slider .bk-input-group {height: " +str(int(needToUpdatePanel*.9)) +"px;}</style>" ))
            needToUpdatePanel    = False

    def request_update_panel(attrname, old, new):
        global needToUpdatePanel
        needToUpdatePanel = new        
        #update_panel()

    panelSize.on_change('value', request_update_panel)

    #Not using events. Just update asyncronosly
    def update_data(attrname, old, new):
        pass
 
    playSlider = Slider(start=1, end=10365, value=1, step=1, title="Timestep")
    # slider.on_change('value', slider_update)

    playButton   = Button(label='► Play', width=60,button_type='success')
    playButtonS  = Button(label='► Play slow', width=60,button_type='success')
    
    
    
    def animate_update(playButton=playButton):
        global timeLatDepth, timeLatUpdate, timeDelta
 
        timeLatDepth += timeDelta
        if timeLatDepth > 10365:
            timeLatDepth = 1
     
    def animate():
        global timeLatDepth, timeLatUpdate,timeDelta
        if playButtonS.label == '❚❚ Pause' :
            return

        if timeLatUpdate == False :
            timeLatUpdate = True 
            playButton.label = '❚❚ Pause'
            timeLatDepth = 1
            timeDelta = 10
            #doc.add_periodic_callback(animate_update, 200)
        else:
            timeLatUpdate = False 
            playButton.label = '► Play'
            #doc.remove_periodic_callback(animate_update)    
    
    playButton.on_click(animate)
    
    
    def animateS():
        global timeLatDepth, timeLatUpdate, timeDelta
        if playButton.label == '❚❚ Pause' :
            return

        if timeLatUpdate == False :
            timeLatUpdate = True 
            playButtonS.label = '❚❚ Pause'
            timeLatDepth = 1
            timeDelta = 1
            #doc.add_periodic_callback(animate_update, 200)
        else:
            timeLatUpdate = False 
            playButtonS.label = '► Play slow'
            #doc.remove_periodic_callback(animate_update)    
    
    playButtonS.on_click(animateS)
    

    
    
    
    #myWidgets = column(xPosition, yPosition, depth,timeStep) 
    title = Button(label="OpenViSUS streaming analysis of remote data from cloud (SealStorage): 'DYAMOND c1440 llc2160, Ocean Dataset", button_type='primary', height = 30)
    myWidgets = column(xPosition, yPosition, depth,timeStep) 
    myGrid = column(title,panelSize,
                    row(myWidgets,plot,column(playSlider,row(plotLat, column(playButton,playButtonS)))))
    doc.add_root(myGrid)
    
    doc.title = "NASA demo"

    def asyncUpdate():
        global counter
        global needsRedraw, NasaOceanCurrentZone 
        
        if timeLatUpdate :
            #playButton.label = '❚❚ Pause (time '+str(timeLonDepth)+')'
            playSlider.value = timeLatDepth
            theImageLat.data_source.data = {"image" :[getHorizontalImage(0,timeLatDepth)]}
            animate_update()


        # update panel size if needed
        update_panel()

        if not hasattr(asyncUpdate, "timeStepOld"):
            asyncUpdate.timeStepOld  = timeStep.value    
            asyncUpdate.xPositionOld = xPosition.value
            asyncUpdate.yPositionOld = yPosition.value
            asyncUpdate.depthOld     = depth.value
 
        #flush all events until steady
        if (asyncUpdate.timeStepOld  != timeStep.value   or    
            asyncUpdate.xPositionOld != xPosition.value  or
            asyncUpdate.yPositionOld != yPosition.value  or
            asyncUpdate.depthOld     != depth.value    ):
            
            asyncUpdate.timeStepOld  = timeStep.value    
            asyncUpdate.xPositionOld = xPosition.value
            asyncUpdate.yPositionOld = yPosition.value
            asyncUpdate.depthOld     = depth.value
            needsRedraw = True
            return
            
        if needsRedraw:
            setValues()
            theImage.data_source.data     = {"image" :[getHorizontalImage(depth.value    ,timeStep.value)]}
            theImageLat.data_source.data  = {"image" :[getHorizontalImage(depth.value ,timeStep.value)]}
            playSlider.value = timeStep.value      
            
            needsRedraw = False

        counter +=1
 
    doc.add_periodic_callback(asyncUpdate, 50)


In [10]:
# show(modify_doc)

In [11]:
#UNCOMMENT IN CASE OF PROBLEM WITH THE PORT AND REPLACE THE "8888" BELOW WITH THE PORT NUMBER INDICATE IN THE ERROR MESSAGE
import os 
os.environ["BOKEH_ALLOW_WS_ORIGIN"] = "localhost:8892"
show(modify_doc)