## Mining Live BMS Stream

In order to retrieve data from the live BMS stream we need to either tap into the websockets stream directly or grab html data from the updating page.

In [226]:
from bs4 import BeautifulSoup
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium import webdriver

import time

browser = webdriver.Chrome()
browser.get("https://api.usb.urbanobservatory.ac.uk/live/")

# Wait for the dynamically loaded elements to show up
WebDriverWait(browser, 10).until(
    EC.visibility_of_element_located((By.TAG_NAME, "li")))

time.sleep(2)

elements = browser.find_elements_by_tag_name("li")

text = [element.text for element in elements]

inner_html = [element.get_attribute("innerHTML") for element in elements]

browser.quit()


In [235]:
import json

def parse_log(html):
    log = {}
    log["value"] = html.find("span", class_ = "number").string
    log["timestamp"] = html.find("span", class_ = "timestamp").string
    log["unit"] = html.find_all("span")[1].text.split(log["value"])[1][:-1]
    log["title"] = html.find("a")["title"]
    log["metric"] = html.find("a").string
    log["location"] = html.find_all("span")[1].get_text().split(log["metric"] + " ")[1].split(" is now")[0]
    
    index_name = "general"
    
    if "in " in log["location"]:
        log["location"] = log["location"].split("in ")[1]
        index_name = "room"
        
    _id = log["title"] + "_" + log["timestamp"]
    log["value"] = float(log["value"])
    return (index_name, _id, json.dumps(log))#, indent=4, sort_keys=True)

In [236]:
from bs4 import BeautifulSoup

html_doc = inner_html[2]

soup = BeautifulSoup(html_doc, 'html.parser')

print(soup.prettify())

<span class="timestamp">
 2018-02-11T20:37:54.538Z
</span>
<span class="description">
 <span class="variable">
  <a title="Plt02'Meters'SwIncome'L32Namps">
   L3 to N
  </a>
 </span>
 (Amps) (Plt02'Meters'SwIncome'L32Namps) is now
 <span class="number">
  46.1
 </span>
 amperes.
</span>



In [237]:
print(parse_log(soup))

('general', "Plt02'Meters'SwIncome'L32Namps_2018-02-11T20:37:54.538Z", '{"value": 46.1, "timestamp": "2018-02-11T20:37:54.538Z", "unit": " amperes", "title": "Plt02\'Meters\'SwIncome\'L32Namps", "metric": "L3 to N", "location": "(Amps) (Plt02\'Meters\'SwIncome\'L32Namps)"}')


In [238]:
room_html = '<li>    <span class="timestamp">2018-02-11T12:26:22.172Z</span>     <span class="description"><span class="variable"><a title="LightingControl.03_RoomControl.342_WorkspaceZ4_3_049_BrightnessValue">Room Brightness</a></span> in <strong>Room 3.049 (Workspace Z4)</strong> is now <span class="number">14900</span> luxes.</span>  </li>'


room_soup = BeautifulSoup(room_html, 'html.parser')

print(room_soup.prettify())

<li>
 <span class="timestamp">
  2018-02-11T12:26:22.172Z
 </span>
 <span class="description">
  <span class="variable">
   <a title="LightingControl.03_RoomControl.342_WorkspaceZ4_3_049_BrightnessValue">
    Room Brightness
   </a>
  </span>
  in
  <strong>
   Room 3.049 (Workspace Z4)
  </strong>
  is now
  <span class="number">
   14900
  </span>
  luxes.
 </span>
</li>


In [239]:
print(parse_log(room_soup))

('room', 'LightingControl.03_RoomControl.342_WorkspaceZ4_3_049_BrightnessValue_2018-02-11T12:26:22.172Z', '{"value": 14900.0, "timestamp": "2018-02-11T12:26:22.172Z", "unit": " luxes", "title": "LightingControl.03_RoomControl.342_WorkspaceZ4_3_049_BrightnessValue", "metric": "Room Brightness", "location": "Room 3.049 (Workspace Z4)"}')


## Talking to ElasticSearch

Now that we have formatted data from the page we can try sending it to elasticsearch and visualizing it using Kibana.

In [240]:
# make sure ES is up and running
import requests

# This is the hostname for docker machine running from docker toolbox on windows 10
# It's very likely to be different for your set up so be sure to check where your elk docker service runs!
host = "192.168.99.100"

res = requests.get('http://' + host + ':9200')
res.json()

{'cluster_name': 'docker-cluster',
 'cluster_uuid': 'xC2OucNfQJqcWfmrSUnzEQ',
 'name': 'KHIiGGd',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2018-01-26T18:22:55.523Z',
  'build_hash': 'af51318',
  'build_snapshot': False,
  'lucene_version': '7.1.0',
  'minimum_index_compatibility_version': '5.0.0',
  'minimum_wire_compatibility_version': '5.6.0',
  'number': '6.1.3'}}

In [241]:
#connect to our cluster
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': host, 'port': 9200}])

In [242]:
#index some test data
es.index(index='test-index', doc_type='test', id=1, body={'test': 'test'})

{'_id': '1',
 '_index': 'test-index',
 '_primary_term': 1,
 '_seq_no': 3,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_type': 'test',
 '_version': 4,
 'result': 'updated'}

In [265]:
from elasticsearch import client

mapping_json = { "mappings" : {
    "bms_log": {
          "properties": {
            "value": { "type": "double" },
            "timestamp": { "type": "date" },
            "unit": { "type": "keyword" },
            "title": { "type": "text",
                       "fields": {
                           "keyword": {
                               "type": "keyword",
                               "ignore_above": 200
                           }   
                       }
                     },
            "metric": { "type": "text" },
            "location": { "type": "text" }
            }
        }
    }
}

mapping_json = json.dumps(mapping_json)

# Connect to client
es_client = client.IndicesClient(es)

# Clean up miss-mapped indexes first
try:
    es_client.delete(index = ["general","room"])
except:
    pass

# Map indexes properly
es_client.create(index = "general", body = mapping_json)
es_client.create(index = "room", body = mapping_json)

DELETE http://192.168.99.100:9200/general,room [status:404 request:0.003s]


{'acknowledged': True, 'index': 'room', 'shards_acknowledged': True}

In [250]:
mapping_json

'{"mappings": {"bms_log": {"properties": {"value": {"type": "double"}, "timestamp": {"type": "date"}, "unit": {"type": "keyword"}, "title": {"type": "text"}, "metric": {"type": "text"}, "location": {"type": "text"}}}}}'

In [251]:
# index the clean data we just scraped
input = parse_log(room_soup)

es.index(index = input[0], doc_type = "bms_log", id = input[1], body = input[2])

{'_id': 'LightingControl.03_RoomControl.342_WorkspaceZ4_3_049_BrightnessValue_2018-02-11T12:26:22.172Z',
 '_index': 'room',
 '_primary_term': 1,
 '_seq_no': 0,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_type': 'bms_log',
 '_version': 1,
 'result': 'created'}

In [252]:
# index the clean data we just scraped
input = parse_log(soup)

es.index(index = input[0], doc_type = "bms_log", id = input[1], body = input[2])

{'_id': "Plt02'Meters'SwIncome'L32Namps_2018-02-11T20:37:54.538Z",
 '_index': 'general',
 '_primary_term': 1,
 '_seq_no': 0,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_type': 'bms_log',
 '_version': 1,
 'result': 'created'}

### Scaling Up!!

Now that we have managed to parse our data and input some basic indexes into elastic lets do so for a larger number of logs.

In [266]:
from bs4 import BeautifulSoup
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium import webdriver

import time

browser = webdriver.Chrome()
browser.get("https://api.usb.urbanobservatory.ac.uk/live/")

# Wait for the dynamically loaded elements to show up
WebDriverWait(browser, 10).until(
    EC.visibility_of_element_located((By.TAG_NAME, "li")))

time.sleep(2)

i = 0

successful = 0

while i < 100000:
    element = browser.find_element_by_tag_name("li")

    inner_html = BeautifulSoup(element.get_attribute("innerHTML"), 'html.parser')

    try:
        input = parse_log(inner_html)
        es.index(index = input[0], doc_type = "bms_log", id = input[1], body = input[2])
        successful += 1
    except:
        pass
   
    time.sleep(0.1)
    i+=1
    

browser.quit()

print(successful)

KeyboardInterrupt: 

In [267]:
new_mapping = { "mappings" : {
    "bms_log": {
          "properties": {
            "value": { "type": "double" },
            "timestamp": { "type": "date" },
            "unit": { "type": "keyword" },
            "title": { "type": "text",
                       "fields": {
                           "keyword": {
                               "type": "keyword",
                               "ignore_above": 200
                           }   
                       }
                     },
            "metric": { "type": "text" },
            "location": { "type": "text" }
            }
        }
    }
}

new_mapping = json.dumps(new_mapping)

# Connect to client
es_client = client.IndicesClient(es)

# Set new mapping
es_client.put_mapping(doc_type = "bms_log", body = new_mapping)

PUT http://192.168.99.100:9200/_mapping/bms_log [status:400 request:0.008s]


RequestError: TransportError(400, 'mapper_parsing_exception', 'Root mapping definition has unsupported parameters:  [mappings : {bms_log={properties={value={type=double}, timestamp={type=date}, unit={type=keyword}, title={type=text, fields={keyword={type=keyword, ignore_above=200}}}, metric={type=text}, location={type=text}}}}]')