Skip to content

ProSnippets StreamLayers

UmaHarano edited this page May 6, 2024 · 10 revisions
Language:              C#  
Subject:               StreamLayers  
Contributor:           ArcGIS Pro SDK Team <arcgisprosdk@esri.com>  
Organization:          esri, http://www.esri.com  
Date:                  4/22/2024  
ArcGIS Pro:            3.3  
Visual Studio:         2022  
.NET Target Framework: .Net 6  

Create Stream Layer

Create Stream Layer with URI

//Must be on the QueuedTask
var url = "https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
var createParam = new FeatureLayerCreationParams(new Uri(url))
{
  IsVisible = false //turned off by default
};
var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(createParam, map);

//or use "original" create layer (will be visible by default)
Uri uri = new Uri(url);
streamLayer = LayerFactory.Instance.CreateLayer(uri, map) as StreamLayer;
streamLayer.SetVisibility(false);//turn off

Create a stream layer with a definition query

//Must be on the QueuedTask
var url = "https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
var lyrCreateParam = new FeatureLayerCreationParams(new Uri(url))
{
  IsVisible = true,
  //At 2.x - DefinitionFilter = new CIMDefinitionFilter()
  //{
  //  DefinitionExpression = "RWY = '29L'",
  //  Name = "Runway"
  //}
  DefinitionQuery = new DefinitionQuery(whereClause: "RWY = '29L'", name: "Runway")
};

var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(lyrCreateParam, map);

Create a stream layer with a simple renderer

var url = @"https://geoeventsample1.esri.com:6443/arcgis/rest/services/LABus/StreamServer";
var uri = new Uri(url, UriKind.Absolute);
//Must be on QueuedTask!
var createParams = new FeatureLayerCreationParams(uri)
{
  RendererDefinition = new SimpleRendererDefinition()
  {
    SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
                        ColorFactory.Instance.BlueRGB,
                        12,
                 SimpleMarkerStyle.Pushpin).MakeSymbolReference()
  }
};
var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
                    createParams, map);

Setting a unique value renderer for latest observations

var url = @"https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
var uri = new Uri(url, UriKind.Absolute);
//Must be on QueuedTask!

var createParams = new FeatureLayerCreationParams(uri)
{
  IsVisible = false
};
var streamLayer = LayerFactory.Instance.CreateLayer<StreamLayer>(
                    createParams, map);
//Define the unique values by hand
var uvr = new CIMUniqueValueRenderer()
{
  Fields = new string[] { "ACTYPE" },
  UseDefaultSymbol = true,
  DefaultLabel = "Others",
  DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
              CIMColor.CreateRGBColor(185, 185, 185), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
};

var classes = new List<CIMUniqueValueClass>();
//add in classes - one for ACTYPE of 727, one for DC 9
classes.Add(
  new CIMUniqueValueClass() {
        Values = new CIMUniqueValue[] {
              new CIMUniqueValue() { FieldValues = new string[] { "B727" } } },
        Visible = true,
        Label = "Boeing 727",
        Symbol = SymbolFactory.Instance.ConstructPointSymbol(
              ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
});
classes.Add(
  new CIMUniqueValueClass()
  {
    Values = new CIMUniqueValue[] {
              new CIMUniqueValue() { FieldValues = new string[] { "DC9" } } },
    Visible = true,
    Label = "DC 9",
    Symbol = SymbolFactory.Instance.ConstructPointSymbol(
              ColorFactory.Instance.GreenRGB, 10, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
  });
//add the classes to a group
var groups = new List<CIMUniqueValueGroup>()
{
  new CIMUniqueValueGroup() {
     Classes = classes.ToArray()
  }
};
//add the groups to the renderer
uvr.Groups = groups.ToArray();
//Apply the renderer (for current observations)
streamLayer.SetRenderer(uvr);
streamLayer.SetVisibility(true);//turn on the layer

Stream Layer settings and properties

Find all Stream Layers that are Track Aware

var trackAwareLayers = MapView.Active.Map.GetLayersAsFlattenedList()
                           .OfType<StreamLayer>().Where(sl => sl.IsTrackAware)?.ToList();

Determine the Stream Layer type

//spatial or non-spatial?
if (streamLayer.TrackType == TrackType.AttributeOnly)
{
  //this is a non-spatial stream layer
}
else
{
  //this must be a spatial stream layer
}

Check the Stream Layer connection state

if (!streamLayer.IsStreamingConnectionOpen)
  //Must be on QueuedTask!
  streamLayer.StartStreaming();

Start and stop streaming

//Must be on QueuedTask!
//Start...
streamLayer.StartStreaming();
//Stop...
streamLayer.StopStreaming();

Delete all current and previous observations

//Must be on QueuedTask!
//Must be called on the feature class
using (var rfc = streamLayer.GetFeatureClass())
  rfc.Truncate();

Get the Track Id Field

if (streamLayer.IsTrackAware)
{
  var trackField = streamLayer.TrackIdFieldName;
  //TODO use the field name
}

Get The Track Type

var trackType = streamLayer.TrackType;
switch(trackType)
{
  //TODO deal with tracktype
  case TrackType.None:
  case TrackType.AttributeOnly:
  case TrackType.Spatial:
    break;
}

Set the Maximum Count of Previous Observations to be Stored in Memory

//Must be on QueuedTask
//Set Expiration Method and Max Expiration Count
if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureCount)
  streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureCount);
streamLayer.SetExpirationMaxCount(15);
//FYI
if (streamLayer.IsTrackAware)
{
  //MaxCount is per track! otherwise for the entire layer
}

Set the Maximum Age of Previous Observations to be Stored in Memory

//Must be on QueuedTask
//Set Expiration Method and Max Expiration Age
if (streamLayer.GetExpirationMethod() != FeatureExpirationMethod.MaximumFeatureAge)
  streamLayer.SetExpirationMethod(FeatureExpirationMethod.MaximumFeatureAge);
//set to 12 hours (max is 24 hours)
streamLayer.SetExpirationMaxAge(new TimeSpan(12,0,0));

//FYI
if (streamLayer.IsTrackAware)
{
  //MaxAge is per track! otherwise for the entire layer
}

Set Various Stream Layer properties via the CIM

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask
//get the CIM Definition
var def = streamLayer.GetDefinition() as CIMFeatureLayer;
//set the number of previous observations, 
def.PreviousObservationsCount = (int)streamLayer.GetExpirationMaxCount() - 1;
//set show previous observations and track lines to true
def.ShowPreviousObservations = true;
def.ShowTracks = true;
//commit the changes
streamLayer.SetDefinition(def);

Rendering

Defining a unique value renderer definition

var uvrDef = new UniqueValueRendererDefinition()
{
  ValueFields = new List<string> { "ACTYPE" },
  SymbolTemplate = SymbolFactory.Instance.ConstructPointSymbol(
    ColorFactory.Instance.RedRGB, 10, SimpleMarkerStyle.Hexagon)
      .MakeSymbolReference(),
  ValuesLimit = 5
};
//Note: CreateRenderer can only create value classes based on
//the current events it has received
streamLayer.SetRenderer(streamLayer.CreateRenderer(uvrDef));

Setting a unique value renderer for latest observations

//Define the classes by hand to avoid using CreateRenderer(...)
CIMUniqueValueClass uvcB727 = new CIMUniqueValueClass()
{
  Values = new CIMUniqueValue[] { new CIMUniqueValue() { FieldValues = new string[] { "B727" } } },
  Visible = true,
  Label = "Boeing 727",
  Symbol = SymbolFactory.Instance.ConstructPointSymbol(CIMColor.CreateRGBColor(255, 0, 0), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
};

CIMUniqueValueClass uvcD9 = new CIMUniqueValueClass()
{
  Values = new CIMUniqueValue[] { new CIMUniqueValue() { FieldValues = new string[] { "DC9" } } },
  Visible = true,
  Label = "DC 9",
  Symbol = SymbolFactory.Instance.ConstructPointSymbol(CIMColor.CreateRGBColor(0, 255, 0), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
};
//Assign the classes to a group
CIMUniqueValueGroup uvGrp = new CIMUniqueValueGroup()
{
  Classes = new CIMUniqueValueClass[] { uvcB727, uvcD9 }
};
//assign the group to the renderer
var UVrndr = new CIMUniqueValueRenderer()
{
  Fields = new string[] { "ACTYPE" },
  Groups = new CIMUniqueValueGroup[] { uvGrp },
  UseDefaultSymbol = true,
  DefaultLabel = "Others",
  DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
    CIMColor.CreateRGBColor(185, 185, 185), 8, SimpleMarkerStyle.Hexagon).MakeSymbolReference()
};
//set the renderer. Depending on the current events received, the
//layer may or may not have events for each of the specified
//unique value classes
streamLayer.SetRenderer(UVrndr);

Setting a unique value renderer for previous observations

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask!
//Define unique value classes same as we do for current observations
//or use "CreateRenderer(...)" to assign them automatically
CIMUniqueValueClass uvcB727Prev = new CIMUniqueValueClass()
{
  Values = new CIMUniqueValue[] { new CIMUniqueValue() {
    FieldValues = new string[] { "B727" } } },
  Visible = true,
  Label = "Boeing 727",
  Symbol = SymbolFactory.Instance.ConstructPointSymbol(
    CIMColor.CreateRGBColor(255, 0, 0), 4, SimpleMarkerStyle.Hexagon)
    .MakeSymbolReference()
};

CIMUniqueValueClass uvcD9Prev = new CIMUniqueValueClass()
{
  Values = new CIMUniqueValue[] { new CIMUniqueValue() {
    FieldValues = new string[] { "DC9" } } },
  Visible = true,
  Label = "DC 9",
  Symbol = SymbolFactory.Instance.ConstructPointSymbol(
    CIMColor.CreateRGBColor(0, 255, 0), 4, SimpleMarkerStyle.Hexagon)
    .MakeSymbolReference()
};

CIMUniqueValueGroup uvGrpPrev = new CIMUniqueValueGroup()
{
  Classes = new CIMUniqueValueClass[] { uvcB727Prev, uvcD9Prev }
};

var UVrndrPrev = new CIMUniqueValueRenderer()
{
  Fields = new string[] { "ACTYPE" },
  Groups = new CIMUniqueValueGroup[] { uvGrpPrev },
  UseDefaultSymbol = true,
  DefaultLabel = "Others",
  DefaultSymbol = SymbolFactory.Instance.ConstructPointSymbol(
    CIMColor.CreateRGBColor(185, 185, 185), 4, SimpleMarkerStyle.Hexagon)
    .MakeSymbolReference()
};

streamLayer.SetRenderer(UVrndr, FeatureRendererTarget.PreviousObservations);

Setting a simple renderer to draw track lines

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask!
//Note: only a simple renderer with solid line symbol is supported for track 
//line renderer
var trackRenderer = new SimpleRendererDefinition()
{
  SymbolTemplate = SymbolFactory.Instance.ConstructLineSymbol(
      ColorFactory.Instance.BlueRGB, 2, SimpleLineStyle.Solid)
        .MakeSymbolReference()
};
streamLayer.SetRenderer(
     streamLayer.CreateRenderer(trackRenderer), 
       FeatureRendererTarget.TrackLines);

Check Previous Observation and Track Line Visibility

//The layer must be track aware and spatial for these settings
//to have an effect
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask
if (!streamLayer.AreTrackLinesVisible)
  streamLayer.SetTrackLinesVisibility(true);
if (!streamLayer.ArePreviousObservationsVisible)
  streamLayer.SetPreviousObservationsVisibility(true);

Make Track Lines and Previous Observations Visible

//The layer must be track aware and spatial for these settings
//to have an effect
if (streamLayer.TrackType != TrackType.Spatial)
  return;

//Must be on QueuedTask

//Note: Setting PreviousObservationsCount larger than the 
//"SetExpirationMaxCount()" has no effect
streamLayer.SetPreviousObservationsCount(6);
if (!streamLayer.AreTrackLinesVisible)
  streamLayer.SetTrackLinesVisibility(true);
if (!streamLayer.ArePreviousObservationsVisible)
  streamLayer.SetPreviousObservationsVisibility(true);

Retrieve the current observation renderer

//Must be on QueuedTask!
var renderer = streamLayer.GetRenderer();

Retrieve the previous observation renderer

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask!
var prev_renderer = streamLayer.GetRenderer(
    FeatureRendererTarget.PreviousObservations);

Retrieve the track lines renderer

//The layer must be track aware and spatial
if (streamLayer.TrackType != TrackType.Spatial)
  return;
//Must be on QueuedTask!
var track_renderer = streamLayer.GetRenderer(
    FeatureRendererTarget.TrackLines);

Subscribe and SearchAndSubscribe

Search And Subscribe for Streaming Data

await QueuedTask.Run(async () =>
{
  //query filter can be null to search and retrieve all rows
  //true means recycling cursor
  using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
  {
    //waiting for new features to be streamed
    //default is no cancellation
    while (await rc.WaitForRowsAsync())
    {
      while (rc.MoveNext())
      {
        using (var row = rc.Current)
        {
          //determine the origin of the row event
          switch (row.GetRowSource())
          {
            case RealtimeRowSource.PreExisting:
              //pre-existing row at the time of subscribe
              continue;
            case RealtimeRowSource.EventInsert:
              //row was inserted after subscribe
              continue;
            case RealtimeRowSource.EventDelete:
              //row was deleted after subscribe
              continue;
          }
        }
      }
    }
  }//row cursor is disposed. row cursor is unsubscribed

  //....or....
  //Use the feature class instead of the layer
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //non-recycling cursor - 2nd param "false"
    using (var rc = rfc.SearchAndSubscribe(qfilter, false))
    {
      //waiting for new features to be streamed
      //default is no cancellation
      while (await rc.WaitForRowsAsync())
      {
        //etc
      }
    }
  }
});

Search And Subscribe With Cancellation

await QueuedTask.Run(async () =>
{
  //Recycling cursor - 2nd param "true"
  //or streamLayer.Subscribe(qfilter, true) to just subscribe
  using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
  {
    //auto-cancel after 20 seconds
    var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
    //catch TaskCanceledException
    try
    {
      while (await rc.WaitForRowsAsync(cancel.Token))
      {
        //check for row events
        while (rc.MoveNext())
        {
          using (var row = rc.Current)
          {
            //etc
          }
        }
      }
    }
    catch (TaskCanceledException )
    {
      //Handle cancellation as needed
    }
    cancel.Dispose();
  }
});

Explicitly Cancel WaitForRowsAsync

//somewhere in our code we create a CancellationTokenSource
var cancel = new CancellationTokenSource();
//...

//call cancel on the CancellationTokenSource anywhere in
//the add-in, assuming the CancellationTokenSource is in scope
if (SomeConditionForCancel)
  cancel.Cancel();//<-- will cancel the token

//Within QueuedTask we are subscribed! streamLayer.Subscribe() or SearchAndSubscribe()
try
{
  //TaskCanceledException will be thrown when the token is cancelled
  while (await rc.WaitForRowsAsync(cancel.Token))
  {
    //check for row events
    while (rc.MoveNext())
    {
      using (var row = rc.Current)
      {
        //etc
      }
    }
  }
}
catch (TaskCanceledException )
{
  //Handle cancellation as needed
}
cancel.Dispose();

Realtime FeatureClass

Connect to a real-time feature class from a real-time datastore

var url = "https://geoeventsample1.esri.com:6443/arcgis/rest/services/AirportTraffics/StreamServer";
     
await QueuedTask.Run(() =>
{
  var realtimeServiceConProp = new RealtimeServiceConnectionProperties(
                                   new Uri(url),
                                   RealtimeDatastoreType.StreamService
                                );
  using (var realtimeDatastore = new RealtimeDatastore(realtimeServiceConProp))
  {
    //A Realtime data store only contains **one** Realtime feature class (or table)
    var name = realtimeDatastore.GetTableNames().First();
    using (var realtimeFeatureClass = realtimeDatastore.OpenTable(name) as RealtimeFeatureClass)
    {
      //feature class, by default, is not streaming (opposite of the stream layer)
      realtimeFeatureClass.StartStreaming();
      //TODO use the feature class
      //...
    }
  }

});

Check the Realtime Feature Class is Track Aware

using (var rfc = streamLayer.GetFeatureClass())
using (var rfc_def = rfc.GetDefinition())
{
  if (rfc_def.HasTrackIDField())
  {
    //Track aware
  }
}

Get the Track Id Field from the Realtime Feature class

//Must be on QueuedTask
using (var rfc = streamLayer.GetFeatureClass())
using (var rfc_def = rfc.GetDefinition())
{
  if (rfc_def.HasTrackIDField())
  {
    var fld_name = rfc_def.GetTrackIDField();

  }
}

Subscribe to Streaming Data

//Note: with feature class we can also use a System Task to subscribe and
//process rows
await QueuedTask.Run(async () =>
{
  // or var rfc = realtimeDatastore.OpenTable(name) as RealtimeFeatureClass
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //non-recycling cursor - 2nd param "false"
    //subscribe, pre-existing rows are not searched
    using (var rc = rfc.Subscribe(qfilter, false))
    {
      SpatialQueryFilter spatialFilter = new SpatialQueryFilter();
      //waiting for new features to be streamed
      //default is no cancellation
      while (await rc.WaitForRowsAsync())
      {
        while (rc.MoveNext())
        {
          using (var row = rc.Current)
          {
            switch (row.GetRowSource())
            {
              case RealtimeRowSource.EventInsert:
                //getting geometry from new events as they arrive
                Polygon poly = ((RealtimeFeature)row).GetShape() as Polygon;

                //using the geometry to select features from another feature layer
                spatialFilter.FilterGeometry = poly;//project poly if needed...
                countyFeatureLayer.Select(spatialFilter);
                continue;
              default:
                continue;
            }
          }                  
        }
      }
    }//row cursor is disposed. row cursor is unsubscribed
  }
});

Search Existing Data and Subscribe for Streaming Data

//Note we can use System Task with the Realtime feature class
//for subscribe
await System.Threading.Tasks.Task.Run(async () =>
// or use ... QueuedTask.Run()
{
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //non-recycling cursor - 2nd param "false"
    using (var rc = rfc.SearchAndSubscribe(qfilter, false))
    {
      //waiting for new features to be streamed
      //default is no cancellation
      while (await rc.WaitForRowsAsync())
      {
        //pre-existing rows will be retrieved that were searched
        while (rc.MoveNext())
        {
          using (var row = rc.Current)
          {
            var row_source = row.GetRowSource();
            switch (row_source)
            {
              case RealtimeRowSource.EventDelete:
                //TODO - handle deletes
                break;
              case RealtimeRowSource.EventInsert:
                //TODO handle inserts
                break;
              case RealtimeRowSource.PreExisting:
                //TODO handle pre-existing rows
                break;
            }
          }
        }
      }
    }//row cursor is disposed. row cursor is unsubscribed
  }
});

Search And Subscribe With Cancellation

await System.Threading.Tasks.Task.Run(async () =>
// or use ... QueuedTask.Run()
{
  using (var rfc = streamLayer.GetFeatureClass())
  {
    //Recycling cursor - 2nd param "true"
    using (var rc = rfc.SearchAndSubscribe(qfilter, true))
    {
      //auto-cancel after 20 seconds
      var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
      //catch TaskCanceledException
      try
      {
        while (await rc.WaitForRowsAsync(cancel.Token))
        {
          //check for row events
          while (rc.MoveNext())
          {
            using (var record = rc.Current)
            {
              //etc
            }
          }
        }
      }
      catch(TaskCanceledException )
      {
        //Handle cancellation as needed
      }
      cancel.Dispose();
    }
  }
});

Home

ProSnippets: StreamLayers

Clone this wiki locally