In [1]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.transforms import trigger
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions

import google.auth
import json
import pandas as pd

import json
import datetime
from shapely import geometry

In [2]:
# Setting up the Beam pipeline options
options = pipeline_options.PipelineOptions()

# Sets the pipeline mode to streaming
options.view_as(pipeline_options.StandardOptions).streaming = True

# Auth
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

In [3]:
# Recording Duration
ib.options.recording_duration = "2m"

In [4]:
class EventParser(beam.DoFn):
    def process(self, element):
        self.element = element
        
        element = json.loads(element)
        
        parsed = {
            "coordinates":geometry.Point(element["position"]["geographic"]["coordinates"]),
            "eventTimeStamp":element["eventDateTime"]
        }
        
        yield parsed

In [5]:
def map_with_area(event, side_input):
    point = event["coordinates"]
    
    location = {
        "coordinates":point
    }
    
    for area in side_input:
        coordinate = area["coordinates"]
        name = area["name"]
        if(coordinate.contains(point)):
            location["name"] = name
            return [location]
        else:
            continue

In [6]:
with beam.Pipeline(options=options) as pipeline:
  
  # Defining Paths
  areas_path = "gs://de-team-bucket/data/areas_noZ.json"
  event_topic = "projects/garrido-ml-demos/topics/user-location"

  # Raw Collections
  areas = pipeline | "Read Areas" >> beam.io.ReadFromText(areas_path)
  events = pipeline | "Read Events" >> beam.io.ReadFromPubSub(event_topic) 

  # Parsing Areas (Batch)
  p_areas = (
     areas
     | "Load Areas" >> beam.Map(json.loads)
     | "Array to Polygon" >> beam.Map(lambda e: [e["properties"]["name"],
                            json.loads(e["geometry"])["coordinates"]])
     | "Flatten" >> beam.Map(lambda e: 
                               {
                                "name":e[0], 
                                "coordinates":geometry.Polygon(e[1][0])
                                })
  )

  # Parsing Events (Streaming)  
  p_point = (
    events
    | "Windowing Point (5 seconds)" >> beam.WindowInto(beam.window.FixedWindows(5))  
    | "Process Event" >> beam.ParDo(EventParser())
  )
  
  # Getting location (flat mapping with areas)
  p_location = (
      p_point
      | "Windowing Location (5 seconds)" >> beam.WindowInto(beam.window.FixedWindows(5))
      | beam.FlatMap(map_with_area, 
                     side_input=beam.pvalue.AsList(p_areas))
      | beam.Map(print)
  )

RuntimeError: Transform node AppliedPTransform([6]: Read Areas/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected.