#**Emerging Technologies - Assignment**

**Area 1** : IoT and mobile devices with Google Cloud backend (or firebase): set the serverless backend for IoT (MQTT protocol) and mobile devices (iOS or Android HTTP REST API) with firestore db.

**IoT (any VM, or Linux/Win/MacOS machine) and any mobile devices(web or phone)**: Here we are using Google Colab as VM and Web application as mobile device side.

**Simulated application**: IoT device provide real-time sensor data via MQTT to the Cloud, mobile device can receive real-time updates or notification from the backend (REST API + Websocket or Notification), mobile device can also send command to the IoT device (e.g., set value), mobile device can fetch history data and plot the diagram (via REST API, draw the graph/diagram).

Overview of steps followed for this project is as follows:

* Using Google Colab as Virtual machine for Iot side  
* Using a Google cloud function Rest API written in node js to send data from VM to  Google pubsub through MQTT protocol.
* Sending the data from Google PubSub to Google Firestore using HTTP API.
* Access data through a web app for GET, POST, PUT,DELETE operations.
* Plotting graphs considering the historical data of temperature

# Install required packages 
Most of them are already installed in Colab

In [None]:
!pip install google-cloud-iot



In [None]:
!pip install google-api-python-client



In [None]:
!pip install google-cloud-storage



In [None]:
!pip install cryptography pyjwt paho-mqtt 



In [None]:
import paho.mqtt.client as mqtt
import jwt
import time
import ssl
import random
import os
import logging
import datetime
import argparse

# Setup Google Cloud Client

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
!gcloud config list

[component_manager]
disable_update_check = True
[compute]
gce_metadata_read_timeout_sec = 0
region = us-central1
zone = us-central1a
[core]
account = leelaalekhya.vedula@sjsu.edu
project = cmpeiot

Your active configuration is: [default]


In [None]:
!gcloud config set compute/region us-central1

Updated property [compute/region].


In [None]:
!gcloud config set compute/zone us-central1a

Updated property [compute/zone].


In [None]:
!gcloud config list project

[core]
project = cmpeiot

Your active configuration is: [default]


In [None]:
!gcloud config set project 'cmpeiot'

Updated property [core/project].


In [None]:
!gcloud config set project 'cmpeiot'

Updated property [core/project].


In [None]:
!gcloud config list project

[core]
project = cmpeiot

Your active configuration is: [default]


# Mount Google Drive

In [None]:
from google.colab import drive
import os
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


Create a service credentials, ref: https://cloud.google.com/storage/docs/reference/libraries#client-libraries-usage-nodejs

In [None]:
myservicecredentials='/content/gdrive/"My Drive"/"Colab Notebooks"/Googlecerts'

In [None]:
!ls {myservicecredentials}

cmpeiot-92ac83152fbc.json  iotdevicenew  roots.pem


In [None]:
newdevicecredentials_path=os.path.join(myservicecredentials, 'iotdevicenew')
!ls {newdevicecredentials_path}

rsa_cert.pem  rsa_private.pem


In [None]:
myservicecredentials_path=os.path.join(myservicecredentials, 'cmpeiot-92ac83152fbc.json')

In [None]:
!cp {myservicecredentials_path} . #copy to the local directory

In [None]:
!cp -r {newdevicecredentials_path} .

In [None]:
rootcredentials_path=os.path.join(myservicecredentials, 'roots.pem')
!cp {rootcredentials_path} .

In [None]:
!ls

adc.json		   gdrive	 roots.pem
cmpeiot-92ac83152fbc.json  iotdevicenew  sample_data


In [None]:
GOOGLE_APPLICATION_CREDENTIALS='./cmpeiot-92ac83152fbc.json'

In [None]:
!echo $GOOGLE_APPLICATION_CREDENTIALS

./cmpeiot-92ac83152fbc.json


# Google IoT Defined functions

In [None]:
import paho.mqtt.client as mqtt
import jwt
import time
import ssl
import random
import os
import logging
import datetime
import argparse

from google.cloud import storage

In [None]:
# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1

# The maximum backoff time before giving up, in seconds.
MAXIMUM_BACKOFF_TIME = 32

# Whether to wait with exponential backoff before publishing.
should_backoff = False

def create_jwt(project_id, private_key_file, algorithm):
    """Creates a JWT (https://jwt.io) to establish an MQTT connection.
        Args:
         project_id: The cloud project ID this device belongs to
         private_key_file: A path to a file containing either an RSA256 or
                 ES256 private key.
         algorithm: The encryption algorithm to use. Either 'RS256' or 'ES256'
        Returns:
            A JWT generated from the given project_id and private key, which
            expires in 20 minutes. After 20 minutes, your client will be
            disconnected, and a new JWT will have to be generated.
        Raises:
            ValueError: If the private_key_file does not contain a known key.
        """

    token = {
        # The time that the token was issued at
        'iat': datetime.datetime.utcnow(),
        # The time the token expires.
        'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=20),
        # The audience field should always be set to the GCP project id.
        'aud': project_id
    }

    # Read the private key file.
    with open(private_key_file, 'r') as f:
        private_key = f.read()

    print('Creating JWT using {} from private key file {}'.format(
        algorithm, private_key_file))

    return jwt.encode(token, private_key, algorithm=algorithm)
# [END iot_mqtt_jwt]

In [None]:
def error_str(rc):
    """Convert a Paho error to a human readable string."""
    return '{}: {}'.format(rc, mqtt.error_string(rc))


def on_connect(unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print('on_connect', mqtt.connack_string(rc))

    # After a successful connect, reset backoff time and stop backing off.
    global should_backoff
    global minimum_backoff_time
    should_backoff = False
    minimum_backoff_time = 1


def on_disconnect(unused_client, unused_userdata, rc):
    """Paho callback for when a device disconnects."""
    print('on_disconnect', error_str(rc))

    # Since a disconnect occurred, the next loop iteration will wait with
    # exponential backoff.
    global should_backoff
    should_backoff = True


def on_publish(unused_client, unused_userdata, unused_mid):
    """Paho callback when a message is sent to the broker."""
    print('on_publish')


def on_message(unused_client, unused_userdata, message):
    """Callback when the device receives a message on a subscription."""
    payload = str(message.payload.decode('utf-8'))
    print('Received message \'{}\' on topic \'{}\' with Qos {}'.format(
        payload, message.topic, str(message.qos)))

In [None]:
def get_client(
        project_id, cloud_region, registry_id, device_id, private_key_file,
        algorithm, ca_certs, mqtt_bridge_hostname, mqtt_bridge_port):
    """Create our MQTT client. The client_id is a unique string that identifies
    this device. For Google Cloud IoT Core, it must be in the format below."""
    client_id = 'projects/{}/locations/{}/registries/{}/devices/{}'.format(
        project_id, cloud_region, registry_id, device_id)
    print('Device client_id is \'{}\''.format(client_id))

    client = mqtt.Client(client_id=client_id)

    # With Google Cloud IoT Core, the username field is ignored, and the
    # password field is used to transmit a JWT to authorize the device.
    client.username_pw_set(
        username='unused',
        password=create_jwt(
            project_id, private_key_file, algorithm))

    # Enable SSL/TLS support.
    client.tls_set(ca_certs=ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

    # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
    # describes additional callbacks that Paho supports. In this example, the
    # callbacks just print to standard out.
    client.on_connect = on_connect
    client.on_publish = on_publish
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # Connect to the Google MQTT bridge.
    client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

    # This is the topic that the device will receive configuration updates on.
    mqtt_config_topic = '/devices/{}/config'.format(device_id)

    # Subscribe to the config topic.
    client.subscribe(mqtt_config_topic, qos=1)

    # The topic that the device will receive commands on.
    mqtt_command_topic = '/devices/{}/commands/#'.format(device_id)

    # Subscribe to the commands topic, QoS 1 enables message acknowledgement.
    print('Subscribing to {}'.format(mqtt_command_topic))
    client.subscribe(mqtt_command_topic, qos=0)

    return client
# [END iot_mqtt_config]

In [None]:
!ls

adc.json		   gdrive	 roots.pem
cmpeiot-92ac83152fbc.json  iotdevicenew  sample_data


In [None]:
GOOGLE_APPLICATION_CREDENTIALS='./cmpeiot-92ac83152fbc.json'
ROOT_CREDENTIALS='./roots.pem'

In [None]:
class Args:
  algorithm = 'RS256'
  ca_certs = '/content/roots.pem'
  cloud_region = 'us-central1'
  data = 'Hello there'
  device_id = 'cmpeiotdevice2'
  jwt_expires_minutes = 20
  listen_dur = 60
  message_type = 'event'
  mqtt_bridge_hostname = 'mqtt.googleapis.com'
  mqtt_bridge_port = 8883
  num_messages = 25
  private_key_file = '/content/iotdevicenew/rsa_private.pem'#/content/gdrive/"My Drive"/CurrentWork/CMPE181Sp2020/Googlecerts/cmpe181dev1/rsa_private.pem'
  project_id = 'cmpeiot'
  registry_id = 'CMPEIoT1'
  service_account_json = '/content/cmpeiot-92ac83152fbc.json'#os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
  imagefolder_path= './data'


args=Args()

In [None]:
print(args.private_key_file)

/content/iotdevicenew/rsa_private.pem


In [None]:

def parse_command_line_args():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description=(
        'Example Google Cloud IoT Core MQTT device connection code.'))
    parser.add_argument(
        '--algorithm',
        choices=('RS256', 'ES256'),
        required=True,
        help='Which encryption algorithm to use to generate the JWT.')
    parser.add_argument(
        '--ca_certs',
        default='./roots.pem',
        help='CA root from https://pki.google.com/roots.pem')
    parser.add_argument(
        '--cloud_region', default='us-central1', help='GCP cloud region')
    parser.add_argument(
        '--data',
        default='Hello there',
        help='The telemetry data sent on behalf of a device')
    parser.add_argument(
        '--device_id', required=True, help='Cloud IoT Core device id')
    parser.add_argument(
        '--gateway_id', required=False, help='Gateway identifier.')
    parser.add_argument(
        '--jwt_expires_minutes',
        default=20,
        type=int,
        help='Expiration time, in minutes, for JWT tokens.')
    parser.add_argument(
        '--listen_dur',
        default=60,
        type=int,
        help='Duration (seconds) to listen for configuration messages')
    parser.add_argument(
        '--message_type',
        choices=('event', 'state'),
        default='event',
        help=('Indicates whether the message to be published is a '
              'telemetry event or a device state message.'))
    parser.add_argument(
        '--mqtt_bridge_hostname',
        default='mqtt.googleapis.com',
        help='MQTT bridge hostname.')
    parser.add_argument(
        '--mqtt_bridge_port',
        choices=(8883, 443),
        default=8883,
        type=int,
        help='MQTT bridge port.')
    parser.add_argument(
        '--num_messages',
        type=int,
        default=100,
        help='Number of messages to publish.')
    parser.add_argument(
        '--private_key_file',
        required=True,
        help='Path to private key file.')
    parser.add_argument(
        '--project_id',
        default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
        help='GCP cloud project name')
    parser.add_argument(
        '--registry_id', required=True, help='Cloud IoT Core registry id')
    parser.add_argument(
        '--service_account_json',
        default=os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"),
        help='Path to service account json file.')

    return parser.parse_args()




In [None]:
import random
import time
import datetime
import json

def read_sensor(count):
    tempF = 20 + 0.2*count + (random.random() * 15)
    humidity = 60 + 0.3*count+ (random.random() * 20)
    temp = '{0:0.2f}'.format(tempF)
    hum = '{0:0.2f}'.format(humidity)
    sensorZipCode = 95192#"94043"
    sensorLat = 37.3382082+ (random.random() /100)#"37.421655"
    sensorLong = -121.8863286 + (random.random() /100)#"-122.085637"
    sensorLatf = '{0:0.6f}'.format(sensorLat)
    sensorLongf = '{0:0.6f}'.format(sensorLong)
    return (temp, hum, sensorZipCode, sensorLatf, sensorLongf)

def createJSON(reg_id, dev_id, timestamp, zip, lat, long, temperature, humidity):
    data = {
      'registry_id' : reg_id,
      'device_id' : dev_id,
      'timecollected' : timestamp,
      'zipcode' : zip,
      'latitude' : lat,
      'longitude' : long,
      'temperature' : temperature,
      'humidity' : humidity,
      # 'image_file' : img_file
    }

    json_str = json.dumps(data)
    return json_str


In [None]:
def mqtt_device_demo(args):
    """Connects a device, sends data, and receives data."""
    # [START iot_mqtt_run]
    global minimum_backoff_time
    global MAXIMUM_BACKOFF_TIME

    # Publish to the events or state topic based on the flag.
    sub_topic = 'events' if args.message_type == 'event' else 'state'

    mqtt_topic = '/devices/{}/{}'.format(args.device_id, sub_topic)

    jwt_iat = datetime.datetime.utcnow()
    jwt_exp_mins = args.jwt_expires_minutes
    client = get_client(
        args.project_id, args.cloud_region, args.registry_id,
        args.device_id, args.private_key_file, args.algorithm,
        args.ca_certs, args.mqtt_bridge_hostname, args.mqtt_bridge_port)

    # Publish num_messages messages to the MQTT bridge once per second.
    for i in range(1, args.num_messages + 1):
        # Process network events.
        client.loop()
        currentTime = str(datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
        (temp, hum, sensorZipCode, sensorLat, sensorLong) = read_sensor(i)
            #(id, timestamp, zip, lat, long, temperature, humidity, img_file)
        payloadJSON = createJSON(args.registry_id, args.device_id, currentTime, sensorZipCode, sensorLat, sensorLong, temp, hum)

            #payload = '{}/{}-image-{}'.format(args.registry_id, args.device_id, i)
        print('Publishing message {}/: \'{}\''.format(i, payloadJSON))
        

        # Wait if backoff is required.
        if should_backoff:
            # If backoff time is too large, give up.
            if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
                print('Exceeded maximum backoff time. Giving up.')
                break

            # Otherwise, wait and connect again.
            delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
            print('Waiting for {} before reconnecting.'.format(delay))
            time.sleep(delay)
            minimum_backoff_time *= 2
            client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)

        payload = '{}/{}-payload-{}'.format(
            args.registry_id, args.device_id, i)
        print('Publishing message {}/{}: \'{}\''.format(
            i, args.num_messages, payload))
        # [START iot_mqtt_jwt_refresh]
        seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
        if seconds_since_issue > 60 * jwt_exp_mins:
            print('Refreshing token after {}s'.format(seconds_since_issue))
            jwt_iat = datetime.datetime.utcnow()
            client.loop()
           
            client.disconnect()
            client = get_client(
                args.project_id, args.cloud_region,
                args.registry_id, args.device_id, args.private_key_file,
                args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
                args.mqtt_bridge_port)
        # [END iot_mqtt_jwt_refresh]
        # Publish "payload" to the MQTT topic. qos=1 means at least once
        # delivery. Cloud IoT Core also supports qos=0 for at most once
        # delivery.
        client.publish(mqtt_topic, payloadJSON, qos=1)



        
        # Send events every second. State should not be updated as often
        time.sleep(1)
        # for i in range(0, 60):
        #     time.sleep(1)
        #     client.loop()
    # [END iot_mqtt_run]

In [None]:
mqtt_device_demo(args)


Device client_id is 'projects/cmpeiot/locations/us-central1/registries/CMPEIoT1/devices/cmpeiotdevice2'
Creating JWT using RS256 from private key file /content/iotdevicenew/rsa_private.pem
Subscribing to /devices/cmpeiotdevice2/commands/#
Publishing message 1/: '{"registry_id": "CMPEIoT1", "device_id": "cmpeiotdevice2", "timecollected": "2020-10-11 04:23:26", "zipcode": 95192, "latitude": "37.340980", "longitude": "-121.885958", "temperature": "32.72", "humidity": "67.18"}'
Publishing message 1/25: 'CMPEIoT1/cmpeiotdevice2-payload-1'
on_connect Connection Accepted.
Publishing message 2/: '{"registry_id": "CMPEIoT1", "device_id": "cmpeiotdevice2", "timecollected": "2020-10-11 04:23:27", "zipcode": 95192, "latitude": "37.344453", "longitude": "-121.883747", "temperature": "22.36", "humidity": "72.90"}'
Publishing message 2/25: 'CMPEIoT1/cmpeiotdevice2-payload-2'
Publishing message 3/: '{"registry_id": "CMPEIoT1", "device_id": "cmpeiotdevice2", "timecollected": "2020-10-11 04:23:28", "zip

In [None]:
print(args.imagefolder_path) #upload some images to this folder

./data


Now data is sent from VM to pubsub. Check the data in pubsub and add it to firestore to make http calls and send to a web app using Rest API.