# Streaming Meetups Dashboard

The purpose of this notebook is to give an all-in-one demo of streaming data from the [meetup.com RSVP API](http://www.meetup.com/meetup_api/docs/stream/2/rsvps/#websockets), through a local [Spark Streaming job](http://spark.apache.org/streaming/), and into [declarative widgets](https://github.com/jupyter-incubator/declarativewidgets) in a dashboard layout.

On your first visit to this notebook, we recommend that you execute one cell at a time as you read along. Later, if you  just want to see the demo, select *Cell > Run All* from the menu bar. Once you've run all of the cells, select *View > View Dashboard* and then click on the **Stream** toggle to start the data stream.

**Table of Contents**

1. [Create the Frontend Widgets](#Create-the-Widgets-Top) <span class="text-muted" style="float:right">topic histogram, filter entry, user card, global heatmap</span>
2. [Define the Spark Streaming Job](#Define-the-Spark-Streaming-Job-Top) <span style="float:right" class="text-muted">filter by topic, top topics, venue metadata</span>
3. [Publish Data to Widget Channels](#Publish-Data-to-Widget-Channels-Top)
4. [Connect to the Data Source](#Connect-to-the-Data-Source-Top) <span class="text-muted" style="float:right">"custom receiver", websocket connection, stream toggle</span>
5. [Arrange the Dashboard Layout](#Arrange-the-Dashboard-Layout-Top)

<div class="alert alert-info" role="alert" style="margin-top: 10px">
<p><strong>Note</strong><p>

<p>We've condensed all of the demo logic into a single notebook for educational purposes. If you want to turn this into a scalable, multi-tenant dashboard, you'll want to separate the stream processing portions from the dashboard view. That way, multiple dashboard instances can pull from the same processed data stream instead of recomputing it.</p>
</div>

### Initialize DeclarativeWidgets

In [1]:
%AddJar http://localhost:8888/nbextensions/urth_widgets/urth-widgets.jar

Starting download from http://localhost:8888/nbextensions/urth_widgets/urth-widgets.jar
Finished download of urth-widgets.jar


In [2]:
import urth.widgets._
initWidgets

#### Adding external dependencies

In [3]:
%AddDeps eu.piotrbuda scalawebsocket_2.10 0.1.1 --transitive

:: loading settings :: url = jar:file:/usr/local/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
:: resolving dependencies :: org.apache.toree#kernel;working
	confs: [default]
	found eu.piotrbuda#scalawebsocket_2.10;0.1.1 in central
	found com.ning#async-http-client;1.7.13 in central
	found io.netty#netty;3.6.3.Final in central
	found com.typesafe#scalalogging-log4j_2.10;1.0.1 in central
	found org.apache.logging.log4j#log4j-api;2.0-beta4 in central
	found org.apache.logging.log4j#log4j-core;2.0-beta4 in central
	found ch.qos.logback#logback-classic;1.0.9 in central
	found ch.qos.logback#logback-core;1.0.9 in central
:: resolution report :: resolve 569ms :: artifacts dl 16ms
	:: modules in use:
	ch.qos.logback#logback-classic;1.0.9 from central in [default]
	ch.qos.logback#logback-core;1.0.9 from central in [default]
	com.ning#async-http-client;1.7.13 from central in [default]
	com.typesafe#scalalogging-log4j_2.10;1.0.1 f

## Create the Widgets <span style="float: right; font-size: 0.5em"><a href="#Streaming-Meetups-Dashboard">Top</a></span>

We'll start by defining what we want to show in our dashboard, and use that definition to drive the stream processing we'll perform. Just keep in mind that none of the widgets we include here will render anything useful yet.

### Topic Bar Chart

Here we insert a `<urth-viz-chart>` to show the top 25 meetup topics by occurrence in the stream. Take note of the `<template>` element. We use it to specify that the HTML within will make use of a `counts` channel. We will put data on the `counts` channel later in this notebook.

In [4]:
%%html
<link rel="import" href="urth_components/urth-viz-chart/urth-viz-chart.html" is="urth-core-import">

<template is="urth-core-bind" channel="counts">
    <urth-viz-chart type='bar' datarows='[[counts.data]]' columns='[[counts.columns]]' rotatelabels='30'></urth-viz-chart>
</template>

### Topic Filter

Next we create an `<urth-core-function>` which that binds the value of a `<paper-input>` widget to a Python function that sets a global variable. The function will set a string that we'll use to filter the incoming events to only pertaining to a certain topic.

Notice that the `<link>` tag here is different than what we specified above. `<urth-viz-chart>` is already loaded within the notebook, but here we are using a third-party [Polymer](https://www.polymer-project.org/1.0/) element which needs to download first. To handle that automatically, we specify `is="urth-core-import"` and set the [bower](http://bower.io/) package name as the `package` attribute value.

In [None]:
val set_topic_filter = (value: String) => {
    MeetupApp.topic_filter = value
}

In [None]:
%%html
<link rel="import" href="urth_components/paper-input/paper-input.html"
    is="urth-core-import" package="PolymerElements/paper-input">
    
<template is="urth-core-bind" channel="filter" id="filter-input">
    <urth-core-function auto
        id="set_topic_filter"
        ref="set_topic_filter"
        arg-value="{{topic_filter}}">
    </urth-core-function>
        
    <paper-input label="Filter" value="{{topic_filter}}"></paper-input>
</template>

### User Card

Now we add a simple `<paper-card>` element showing the name and photo of one user who RSVPed recently in the event stream. We add some custom styling and a bit of custom JavaScript in this case to format the datetime associated with the RSVP event.

In [5]:
%%html
<link rel="import" href="urth_components/paper-card/paper-card.html"
    is="urth-core-import" package="PolymerElements/paper-card">

<style is="custom-style">
    paper-card.meetups-card {
        max-width: 400px;
        width: 100%;
        
        --paper-card-header: {
            height: 100px;
            border-bottom: 1px solid #e8e8e8;
        };

        --paper-card-header-image: {
            height: 80px;
            width: 80px !important;
            margin: 10px;
            border-radius: 50px;
            width: auto;
            border: 10px solid white;
            box-shadow: 0 0 1px 1px #e8e8e8;
        };
        
        --paper-card-header-image-text: {
            left: auto;
            right: 0px;
            width: calc(100% - 130px);
            text-align: right;
            text-overflow: ellipsis;
            overflow: hidden;
        };
    }
    
    .meetups-card .card-content a {
        display: block;
        overflow: hidden;
        text-overflow: ellipsis;
        white-space: nowrap;
    }
</style>

<template is="urth-core-bind" channel="meetups" id="meetup-card">
    <paper-card
            class="meetups-card"
            heading="[[meetup.member.member_name]]"
            image="[[meetup.member.photo]]">
        <div class="card-content">
            <p><a href="[[meetup.event.event_url]]" target="_blank">[[meetup.event.event_name]]</a></p>
            <p>[[getPrettyTime(meetup.event.time)]]</p>
        </div>
    </paper-card>
</template>

<!-- see https://github.com/PolymerElements/iron-validator-behavior/blob/master/demo/index.html -->
<script>
    (function() {
        var dateStringOptions = {weekday:'long', year:'numeric', month: 'long', hour:'2-digit', minute:'2-digit', day:'numeric'};
        var locale = navigator.language || navigator.browserLanguage || navigator.systemLanguage || navigator.userLanguage;

        var scope = document.querySelector('template#meetup-card');
        scope.getPrettyTime = function(timestamp) {
            try {
                console.log('The date is', timestamp)
                var d = new Date(timestamp);
                return d.toLocaleDateString(locale, dateStringOptions);
            } catch(e){
                return ''
            }
        }
    })();
</script>

### Map Venues

Finally, we add a [WebGL globe](https://github.com/dataarts/webgl-globe) showing the location of meetup venues to which users are RSVPing in the stream. On the globe we render bars to represent the number of recent RSVPs in a geographic area.

In [None]:
%%html
<link rel="import" href="urth_components/webgl-globe/webgl-globe.html"
  is="urth-core-import" package="http://github.com/ibm-et/webgl-globe.git">

<template is="urth-core-bind" channel="venues">
    <webgl-globe data=[[venue_data]]></webgl-globe>
</template>

## Define the Spark Streaming Job <span style="float: right; font-size: 0.5em"><a href="#Streaming-Meetups-Dashboard">Top</a></span>

With the frontend widgest in mind, we'll now setup our Spark Streaming job to fulfill their data requirements. In this section, we'll define a set of functions that act on a `SparkStreamingContext` or `RDDs` from that context.

### Create Spark Contexts

Here we create a function that will initialize our Spark contexts. We'll use this function at the end of the notebook when we tie Spark to the data source and a final widget controlling the stream flow.

In [6]:
import scalawebsocket.WebSocket
import org.apache.spark.storage.StorageLevel    
import org.apache.spark.streaming.receiver.Receiver

class WebSocketReceiver(url: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER) extends Receiver[play.api.libs.json.JsValue](storageLevel) {
    @volatile private var webSocket: WebSocket = _

    def onStart() {
        try{
          val newWebSocket = WebSocket().open(url).onTextMessage({ msg: String => parseJson(msg) })
          setWebSocket(newWebSocket)
        } catch {
          case e: Exception => restart("Error starting WebSocket stream", e)
        }
    }

    def onStop() {
        setWebSocket(null)
    }

    private def setWebSocket(newWebSocket: WebSocket) = synchronized {
        if (webSocket != null) {
          webSocket.shutdown()
        }
        webSocket = newWebSocket
    }

    private def parseJson(jsonStr: String): Unit = {
        val json: play.api.libs.json.JsValue = play.api.libs.json.Json.parse(jsonStr)
        store(json)
    }
}

case class TopicCount(topic: String, count: Int)


object MeetupApp extends Serializable {    
    import play.api.libs.json._
    import org.apache.spark.storage.StorageLevel    
    import org.apache.spark.Logging
    import org.apache.spark.streaming._
    import org.apache.spark.sql.functions._
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.rdd.RDD
    import urth.widgets.WidgetChannels.channel
    import org.apache.spark.sql.SQLContext
    @transient var ssc:StreamingContext = _
    @transient val sqlContext:SQLContext = new SQLContext(sc)
    
    import sqlContext.implicits._
    
    var topic_filter = ""

    def create_streaming_context(sample_rate: Int): StreamingContext = {
        return new StreamingContext(sc, Seconds(sample_rate))
    }
    
    /**
        Creates a websocket client that pumps events into a ring buffer queue. Creates
        a SparkStreamContext that reads from the queue. Creates the events, topics, and
        venues DStreams, setting the widget channel publishing functions to iterate over
        RDDs in each. Starts the stream processing.
    */
    def start_stream() = {
        ssc = create_streaming_context(5)
        ssc.checkpoint("/tmp/checkpoint.checkpoint")
        val events = get_events(ssc, sample_event)
        get_topics(events, get_topic_counts)
        ssc.start()
    }

    /**
        Shuts down the websocket, stops the streaming context, and cleans up the file ring.
    */
    def shutdown_stream() = {
        ssc.stop()
    }
    
    /**
    Parses the events from the queue. Retains only those events that have at
    least one topic exactly matching the current topic_filter. Sends event
    RDDs to the for_each function. Returns the event DStream.
    */
    def get_events(ssc: StreamingContext, for_each: (RDD[play.api.libs.json.JsValue]) => Unit): org.apache.spark.streaming.dstream.DStream[play.api.libs.json.JsValue] = {

        val  all_events = ssc.receiverStream( new WebSocketReceiver("ws://stream.meetup.com/2/rsvps"))

        // Filter set of event
        val events = all_events.filter(retain_event)

        // Send event data to a widget channel. This will be covered below.
        events.foreachRDD(for_each)

        return events
    }
    
    /**
    Returns true if the user defined topic filter is blank or if at least one
    group topic in the event exactly matches the user topic filter string.
    */
    def retain_event(event: play.api.libs.json.JsValue):Boolean = {
        val topics = (event \ "group" \ "group_topics").as[play.api.libs.json.JsArray].value
        val isEmpty = topic_filter.trim == ""
        val containsTopic =  topics.map(topic => topic_filter.
                                   equals((topic\"urlkey").as[play.api.libs.json.JsString].value)
                                ).reduce( (a,b) => a || b)

        isEmpty || containsTopic   
    }
    
    /*
    Takes an RDD from the event DStream. Takes one event from the RDD.
    Substitutes a placeholder photo if the member who RSVPed does not
    have one. Publishes the event metadata on the meetup channel.
    */
    def sample_event(rdd: org.apache.spark.rdd.RDD[play.api.libs.json.JsValue]):Unit = {
        
        try {
            val event = rdd.take(1)(0)

            // use a fallback photo for those members without one
            val default_event: play.api.libs.json.JsObject = play.api.libs.json.Json.parse("""{
                 "member" : {
                     "photo" : "http://photos4.meetupstatic.com/img/noPhoto_50.png"
                 }
             }
            """).as[play.api.libs.json.JsObject]
            val fixed_event = default_event ++ (event).as[play.api.libs.json.JsObject] 

            channel("meetups").set("meetup", fixed_event.value)
        } 
    }
    
    /**
    Pulls group topics from meetup events. Counts each one once and updates
    the global topic counts seen since stream start. Sends topic count RDDs
    to the for_each function. Returns nothing new.
    */    
    def get_topics(events:DStream[JsValue], for_each: (RDD[((String,String),Int)]) => Unit){
        //Extract the group topic url keys and "namespace" them with the current topic filter
        val topics = events.
                        flatMap( (event: JsValue) => {
                          (event \ "group" \ "group_topics").as[JsArray].value
                        }).map((topic: JsValue) => {
                            val filter = if(topic_filter.equals("")) {
                                "*"
                            } else { 
                                topic_filter 
                            }
                            ((filter, (topic \ "urlkey").as[JsString].value), 1)      
                        })

        val topic_counts = topics.updateStateByKey(update_topic_counts)

        // Send topic data to a widget channel. This will be covered below.
        topic_counts.foreachRDD(for_each)
    }
    
    /**
    Sums the number of times a topic has been seen in the current sampling
    window. Then adds that to the number of times the topic has been
    seen in the past. Returns the new sum.
    */
    def update_topic_counts(new_values: Seq[Int], last_sum: Option[Int]): Option[Int] = {
        return Some((new_values :+ 0).reduce(_+_) + last_sum.getOrElse(0))
    }
        
    /**
    Takes an RDD from the topic DStream. Takes the top 25 topics by occurrence
    and publishes them in a pandas DataFrame on the counts channel.
    */
    def get_topic_counts(rdd: RDD[((String,String),Int)]){
        //counts = rdd.takeOrdered(25, key=lambda x: -x[1])
        val filterStr = if (topic_filter.equals("")) "*" else topic_filter
        
        /*
         keep only those matching current filter
         and sort in descending order, taking top 25
        */
        val countDF = rdd.filter((x:((String,String),Int)) => filterStr.equals(x._1._1)).
                    map(tuple => TopicCount(tuple._1._2, tuple._2)).toDF().sort($"count".desc)
//         val counts = countDF.
//                     takeOrdered(25)(Ordering[Int].reverse.on(x => x.count))
        
                    
        channel("counts").set("counts", countDF)

    }
}

In [None]:
val x = Seq[Int]()

In [None]:
(x :+ 0).reduce(_+_)

In [None]:
Option(Array()).flatMap(x => "!")

In [7]:
%%html
<template is="urth-core-bind" channel="meetups">
    Sample event is <span>{{meetup}}</span>
</template>

In [8]:
%%html
<template is="urth-core-bind" channel="counts">
    Sample event is <span>{{counts}}</span>
</template>

In [9]:
MeetupApp.start_stream()

In [None]:
MeetupApp.shutdown_stream()

### Process Events

Next we create a function, `get_events` to parse the RSVP stream JSON events and optionally filter them by topic. We define a few helper functions as well.

### Process Topics

Now we define `get_topics` to pull the URL key topic from every group sponsoring a meetup event in the stream and update the global topic counts.

In [None]:
def update_topic_counts(new_values, last_sum):
    '''
    Sums the number of times a topic has been seen in the current sampling
    window. Then adds that to the number of times the topic has been
    seen in the past. Returns the new sum.
    '''
    return sum(new_values) + (last_sum or 0)

def get_topics(events, for_each):
    '''
    Pulls group topics from meetup events. Counts each one once and updates
    the global topic counts seen since stream start. Sends topic count RDDs
    to the for_each function. Returns nothing new.
    '''
    # Extract the group topic url keys and "namespace" them with the current topic filter
    topics = (events
                .flatMap(lambda event: event['group']['group_topics'])
                .map(lambda topic: ((topic_filter if topic_filter else '*', topic['urlkey']), 1)))
    
    topic_counts = topics.updateStateByKey(update_topic_counts)

    # Send topic data to a widget channel. This will be covered below.
    topic_counts.foreachRDD(for_each)

### Process Venues

Finally, we add `get_venues` to extract and relay venue metadata.

In [None]:
def get_venues(events, for_each):
    '''
    Pulls venu metadata from meetup events if it exists. Sends venue 
    dictionaries RDDs to the for_each function. Returns nothing new.
    '''
    venues = (events
        .filter(lambda event: 'venue' in event)
        .map(lambda event: event['venue']))
    
    # Send topic data to a widget channel
    venues.foreachRDD(for_each)

## Publish Data to Widget Channels <span style="float: right; font-size: 0.5em"><a href="#Streaming-Meetups-Dashboard">Top</a></span>

With both the frontend and Spark job definition in hand, we can now begin to link them. Three of our widgets are set to update when they receive data on channels `counts`, `topics`, and `venues`. We'll define functions that publish data on these channels here.

We'll eventually pass the `sample_event` function as the `for_each` parameter of the `get_events` function that we defined above.

Likewise, we'll eventually pass the `get_topic_counts` function as the `for_each` parameter value to the `get_topics` function that we defined above.

In [None]:
def get_topic_counts(rdd):
    '''
    Takes an RDD from the topic DStream. Takes the top 25 topics by occurrence
    and publishes them in a pandas DataFrame on the counts channel.
    '''
    #counts = rdd.takeOrdered(25, key=lambda x: -x[1])
    filterStr = topic_filter if topic_filter else '*'
    counts = (rdd
                .filter(lambda x: x[0][0] == filterStr) # keep only those matching current filter
                .takeOrdered(25, key=lambda x: -x[1]))  # sort in descending order, taking top 25
    if not counts:
        # If there are no data, the bar chart will error out. Instead,
        # we send a tuple whose count is zero.
        counts = [('NO DATA', 0)]
    else:
        # Drop the topic filter from the tuple
        counts = list(map(lambda x: (x[0][1], x[1]), counts))
    df = pd.DataFrame(counts)
    channel('counts').set('counts', df)

And, finally, we'll pass the `aggregate_venues` function as the `for_each` parameter value to the `get_venues` function.

In [None]:
venue_data = []
lon_bins = np.linspace(-180, 180, 361)
lat_bins = np.linspace(-90, 90, 181)
scale=100

def aggregate_venues(rdd):
    '''
    Takes an RDD from the venues DStream. Builds a histogram of events by 
    latitude and longitude. Publishes the histogram as a list of three-tuples
    on the venues channel.
    
    Note: To improve scalability, this binning should be performed
    on the Spark workers, not collected and performed on the driver.
    '''
    global venue_data

    # create new lists from previous data and new incoming venues
    venues = rdd.collect()
    lats = [v[0] for v in venue_data] + [x['lat'] for x in venues]
    lons = [v[1] for v in venue_data] + [x['lon'] for x in venues]
    weights = [v[2] for v in venue_data] + ([1./scale] * len(venues))
    
    # create histogram from aggregate data
    density, _, _ = np.histogram2d(lats, lons, [lat_bins, lon_bins], weights=weights)
    venue_data = [[lat-90, lon-180, min(mag,1)]
                     for lat,dlats in enumerate(density)
                     for lon,mag in enumerate(dlats)
                     if mag > 0]
    
    channel('venues').set('venue_data', venue_data)

## Connect to the Data Source <span style="float: right; font-size: 0.5em"><a href="#Streaming-Meetups-Dashboard">Top</a></span>

We're finally ready to connect the meetup.com RSVP websocket server to our Spark job. To do so, we want to write a [custom DStream receiver bridging the Websocket to a `SparkStreamingContext`](http://spark.apache.org/docs/latest/streaming-custom-receivers.html). Unfortunately, we can't write a [custom receiver in Python yet](http://spark.apache.org/docs/latest/streaming-programming-guide.html#custom-sources). We'll work around this limitation by dumping incoming Websocket messages to disk in a ring buffer, and using the Spark `textFileStream` API to read them.

We now define a functions to start and stop the stream processing. We track the websocket client, SparkStreamingContext, and ring buffer between start and stop calls.

In [None]:
def start_stream():
    '''
    Creates a websocket client that pumps events into a ring buffer queue. Creates
    a SparkStreamContext that reads from the queue. Creates the events, topics, and
    venues DStreams, setting the widget channel publishing functions to iterate over
    RDDs in each. Starts the stream processing.
    '''
    global conn_future
    global ssc
    global receiver
    
    receiver = FileRingReceiver(max_batches=100)  
    conn_future = websocket_connect('ws://stream.meetup.com/2/rsvps', on_message_callback=receiver.put)
    ssc = create_streaming_context(receiver.queue, 5)
    events = get_events(ssc, receiver.queue, sample_event)
    get_topics(events, get_topic_counts)
    get_venues(events, aggregate_venues)
    ssc.start()
    
def shutdown_stream():
    '''
    Shuts down the websocket, stops the streaming context, and cleans up the file ring.
    '''
    global conn_future
    global ssc
    global receiver
    
    conn_future.result().close()
    ssc.stop()
    receiver.destroy()

We give each of these functions a frontend `<urth-core-function>` representation. We bind these functions to a `<paper-toggle-button>` that starts and stops the stream processing.

In [None]:
%%html
<link rel="import" href="urth_components/paper-toggle-button/paper-toggle-button.html"
    is="urth-core-import" package="PolymerElements/paper-toggle-button#v1.0.10">
    
<template is="urth-core-bind">
    <urth-core-function id="streamFunc" ref="start_stream"></urth-core-function>
    <urth-core-function id="shutdownFunc" ref="shutdown_stream"></urth-core-function>
</template>

<style is="custom-style">
    paper-toggle-button {
        --default-primary-color: green;
    }
    
    paper-toggle-button:hover {
        cursor: pointer;
    }
        
    .toggle-btn-container {
        margin: 1em 0;
        text-align: right;
    }
    
    #stream-label {
        font-size: larger;
        margin: 0;
        padding: 0 0.5em;
    }
</style>

<div class="toggle-btn-container">
    <paper-toggle-button id="stream-btn"></paper-toggle-button>
    <label id="stream-label">Stream</label>
</div>

<script>
    $('#stream-btn').on('change', function() {
        if ($(this).attr('checked')) {
            // start streaming
            console.warn('Starting Spark Streaming');
            $('#streamFunc').get(0).invoke();
        } else {
            // stop streaming
            console.warn('Stopping Spark Streaming');
            $('#shutdownFunc').get(0).invoke();
        }
    });
</script>

## Arrange the Dashboard Layout <span style="float: right; font-size: 0.5em"><a href="#Streaming-Meetups-Dashboard">Top</a></span>

Before toggling the stream on/off switch, we should switch to dashboard view. Otherwise, we'll need to scroll up and down this notebook to see the widgets updating. For convenience, this notebook already contains metadata to position our widgets in a grid layout.

Select *View > View Dashboard* from the menu bar to see the dashboard view now. Then toggle the stream switch in the top right of the dashboard to begin stream processing. To return to the regular notebook view, select *View > Notebook*.

If you want to arrange the notebook cells differently, select *View > Layout Dashboard*. Then, hover your mouse over the main notebook / dashboard area. When you do, you'll see icons appear that allow you to:

- Drag cells to new locations
- Resize cells
- Show / hide cells in the dashboard view
- Flip to editing mode for a cell

Save the notebook to save your changes to the layout within the notebook file itself.

<div class="alert alert-info" role="alert" style="margin-top: 10px">
<p><strong>Note</strong><p>

<p>in a fresh notebook, the dashboard will only show cells with non-empty output. All other cells can be found in the *Hidden* section at the bottom of the dashboard layout page. You can quickly add all cell outputs or remove all cell outputs from the dashboard using the show / hide icons that appear in the notebook toolbar when you are in layout mode.</p>
</div>

# TESTING

In [None]:
start_stream()

In [None]:
%%html
<template is="urth-core-bind" channel="meetups">
    Sample event is <span>{{meetup}}</span>
</template>

In [None]:

val value = play.api.libs.json.Json.parse("""{
   "venue":{
      "venue_name":"Ozio Rooftop Lounge ",
      "lon":0,
      "lat":0,
      "venue_id":23666225
   },
   "visibility":"public",
   "response":"yes",
   "guests":0,
   "member":{
      "member_id":2846786,
      "photo":"http:\/\/photos4.meetupstatic.com\/photos\/member\/7\/4\/6\/0\/thumb_197129792.jpeg",
      "member_name":"Kristy"
   },
   "rsvp_id":1594217638,
   "mtime":1455301122999,
   "event":{
      "event_name":"EXPATS 'Saturday' HAPPY HOUR",
      "event_id":"fzzldlyvfbcc",
      "time":1455404400000,
      "event_url":"http:\/\/www.meetup.com\/Internationals\/events\/219588759\/"
   },
   "group":{
      "group_topics":[
         {
            "urlkey":"language-exchange",
            "topic_name":"Language Exchange"
         },
         {
            "urlkey":"international-professionals",
            "topic_name":"International Professionals"
         },
         {
            "urlkey":"expat",
            "topic_name":"Expat"
         },
         {
            "urlkey":"professional-networking",
            "topic_name":"Professional Networking"
         },
         {
            "urlkey":"professional-singles",
            "topic_name":"Single Professionals"
         },
         {
            "urlkey":"culture-exchange",
            "topic_name":"Culture Exchange"
         },
         {
            "urlkey":"global-oneness",
            "topic_name":"Global Oneness"
         },
         {
            "urlkey":"international-friends",
            "topic_name":"International Friends"
         },
         {
            "urlkey":"newintown",
            "topic_name":"New In Town"
         },
         {
            "urlkey":"international-travel",
            "topic_name":"International Travel"
         },
         {
            "urlkey":"intlrel",
            "topic_name":"International Relations"
         },
         {
            "urlkey":"global-citizenship",
            "topic_name":"Global Citizenship"
         }
      ],
      "group_city":"Washington",
      "group_country":"us",
      "group_id":1304426,
      "group_name":"DC International Professionals\/Expats Meetup",
      "group_lon":-77.02,
      "group_urlname":"Internationals",
      "group_state":"DC",
      "group_lat":38.91
   }
}
""")

In [None]:
shutdown_stream()