## Parquet to SensorThings Streamlike

In [1]:
import csv
import re
import math
import time
import random
import numpy as np
import sys
import json
import requests
import os
import Helper

In [2]:
# dirList: sorted list of dir names
dirList = os.listdir(Helper.Helper.DataHome + "outAllSortByTimeStampAndIDBigAll/")
dirList = [ x for x in dirList if "2016" in x ]
dirList = sorted(dirList, key= lambda myDir: int(myDir.split("=")[1]))



In [3]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
#spark conf
conf = ( SparkConf()
         .setMaster("local[*]")
         .setAppName('pyspark')
        )
ss = SparkSession.builder.config(conf=conf).getOrCreate()
sc = ss.sparkContext


In [4]:
#Common things
mytypes = ["Air-Temperature","Air-Humidity","Air-Pressure"]
urlHome = Helper.Helper.Frosts["dev01"] + ':8080/FROST-Server/v1.0'
urlThings = urlHome + '/Things'
urlSensors = urlHome + '/Sensors'
urlObservedProperty = urlHome + '/ObservedProperties'
urlDataStream = urlHome + '/Datastreams'
urlFoI = urlHome + '/FeaturesOfInterest'

In [35]:
myPrefix = "pfx6"

In [17]:
from pyspark.sql import functions as F

from datetime import tzinfo, timedelta, datetime

myDir = dirList[0]
inputDir = Helper.Helper.DataHome + "outAllSortByTimeStampAndIDBigAll/"+myDir
dataFileDF = ss.read.option("basepath",inputDir).parquet(inputDir)#+"TimeStamp=20160504/ID=I72406BI1")
dataFileDF = dataFileDF.withColumnRenamed("AitTemperature","AirTemperature")
dataFileDF = dataFileDF.filter("(not isnull(ID)) and ID !=''")
idDF = dataFileDF["ID","Latitude","Longitude"]

urlInst = Helper.Helper.Inst_URL
idDF = idDF.withColumn("LatLon",F.array("Latitude","Longitude"))
idDF = idDF.groupBy(idDF.ID).agg(F.first(idDF.LatLon).alias("LatLon"))

idDF = idDF.withColumn("ThUID",F.format_string("Things/%s/WG-WeatherStation-%s",F.lit(urlInst),idDF.ID))
idDF = idDF.withColumn("SensorUID",F.format_string("Sensors/%s/WG-WeatherStation-%s",F.lit(urlInst),idDF.ID))
idDF = idDF.withColumn("DsUID",F.format_string("Datastreams/%s/MS/WG-WeatherStation-%s",F.lit(urlInst),idDF.ID))


In [18]:
idDF.rdd.first()

Row(ID=u'IALFELDL8', LatLon=[u'51.982', u'9.847'], ThUID=u'Things/edu.teco.wang/WG-WeatherStation-IALFELDL8', SensorUID=u'Sensors/edu.teco.wang/WG-WeatherStation-IALFELDL8', DsUID=u'Datastreams/edu.teco.wang/WG-WeatherStation-IALFELDL8')

## Register ObservedProperties

In [36]:
#Observed Property
for mytype in mytypes:
    data ={
        "name": "Air-Temperature",
        "description": "The degree or intensity of heat present in the area",
        "definition": "http://www.qudt.org/qudt/owl/1.0.0/quantity/Instances.html#AreaTemperature",
        "@iot.id": myPrefix + "ObservedProperty/" + mytype
    }
    p = requests.post(urlObservedProperty, json.dumps(data))
    print(data)
    if (p.status_code  == 201):
        print(201)
    else:
        print("Error:", p.status_code)

    for chunk in p.iter_content(chunk_size=128):
        print(chunk)

{'definition': 'http://www.qudt.org/qudt/owl/1.0.0/quantity/Instances.html#AreaTemperature', '@iot.id': 'pfx6ObservedProperty/Air-Temperature', 'name': 'Air-Temperature', 'description': 'The degree or intensity of heat present in the area'}
201
{'definition': 'http://www.qudt.org/qudt/owl/1.0.0/quantity/Instances.html#AreaTemperature', '@iot.id': 'pfx6ObservedProperty/Air-Humidity', 'name': 'Air-Temperature', 'description': 'The degree or intensity of heat present in the area'}
201
{'definition': 'http://www.qudt.org/qudt/owl/1.0.0/quantity/Instances.html#AreaTemperature', '@iot.id': 'pfx6ObservedProperty/Air-Pressure', 'name': 'Air-Temperature', 'description': 'The degree or intensity of heat present in the area'}
201


In [None]:
from pyspark.sql import functions as F

from datetime import tzinfo, timedelta, datetime

def registerEntities(row):
    #Thing
    
    data =  {
        "name": "WG-WeatherStation-" + row.ID,
        "description": "This is DWD-Sensor-" + row.ID,
        "@iot.id": myPrefix + row.ThUID,
        "Locations": [
        {
            "name": row.ThUID,
            "description": "This is the location of WG-Sensor-" + row.ID,
            "encodingType": "application/vnd.geo+json",
            "location": {
              "type": "Point",
              "coordinates": [float(row.LatLon[0]), float(row.LatLon[1])]
            },
            "@iot.id": myPrefix+"Location/" + row.ThUID 
        }
      ]
        }
    p = requests.post(urlThings, json.dumps(data))
    print("Thing:")
    print(data)
    if (p.status_code  == 201):
        print(201)
        
    else:
        print("Error:", p.status_code)

        for chunk in p.iter_content(chunk_size=128):
            print(chunk)

    #Sensor

    deviceAddr = "https://www.wunderground.com/personal-weather-station/dashboard?ID="+row.ID
    for mytype in mytypes:
        data = {
            "name": "WG-WeatherStation-" + row.ID + "-" + "" + mytype,
            "description": "This is a Sensor from Netatmo Weather Station",
            "encodingType": "application/pdf",
            "metadata": deviceAddr,
            "@iot.id": myPrefix+row.SensorUID +"-"+ mytype
        }
        p = requests.post(urlSensors, json.dumps(data))
        print("Sensor:")
        print(data)
        if (p.status_code  == 201):
            print(201)
            
            
        else:
            print("Error:", p.status_code)

        for chunk in p.iter_content(chunk_size=128):
            print(chunk)

    #DataStream

    for mytype in mytypes:
        data = {
          "name": "MS/LiveMeasure",
          "description": "Datastream for recording temperature",
          "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
          "@iot.id":myPrefix + row.DsUID + "-" + mytype + "/MS/LiveMeasure",
          "unitOfMeasurement": {
            "name": "Degree Celsius",
            "symbol": "degC",
            "definition": "http://www.qudt.org/qudt/owl/1.0.0/unit/Instances.html#DegreeCelsius"
          },
          "Thing":{"@iot.id":myPrefix+row.ThUID},
          "ObservedProperty":{"@iot.id":myPrefix+"ObservedProperty/" + mytype},
          "Sensor":{"@iot.id":myPrefix+row.SensorUID +"-"+ mytype}
        }
        p = requests.post(urlDataStream, json.dumps(data))
        print("DS:")
        print(data)
        if (p.status_code  == 201):
            print(201)
            
        else:
            print("Error:", p.status_code)

        for chunk in p.iter_content(chunk_size=128):
            print(chunk)

    # FoI
    data = {
      "name": "Weather Station-" + row.ID,
      "description": "Weather Station-" + row.ID,
      "encodingType": "application/vnd.geo+json",
        "@iot.id": myPrefix +"/FoI/" +myPrefix+ row.DsUID + "-Air-Temperature/MS/LiveMeasure/"+row.LatLon[0]+":"+row.LatLon[1],
      "feature": {
        "type": "Point",
        "coordinates": [float(row.LatLon[0]), float(row.LatLon[1])]
      }
    }
    p = requests.post(urlFoI, json.dumps(data))
    print("FOI:")
    print(data)
    if (p.status_code  == 201):
        print(201)
        
        
    else:
        print("Error:", p.status_code)

    for chunk in p.iter_content(chunk_size=128):
        print(chunk)




for myDir in dirList:
    inputDir = Helper.Helper.DataHome + "outAllSortByTimeStampAndIDBigAll/"+myDir
    dataFileDF = ss.read.option("basepath",inputDir).parquet(inputDir)#+"TimeStamp=20160504/ID=I72406BI1")
    dataFileDF = dataFileDF.withColumnRenamed("AitTemperature","AirTemperature")
    dataFileDF = dataFileDF.filter("(not isnull(ID)) and ID !=''")
    idDF = dataFileDF["ID","Latitude","Longitude"]
    
    urlInst = Helper.Helper.Inst_URL
    idDF = idDF.withColumn("LatLon",F.array("Latitude","Longitude"))
    idDF = idDF.groupBy(idDF.ID).agg(F.first(idDF.LatLon).alias("LatLon"))

    idDF = idDF.withColumn("ThUID",F.format_string("Thing/%s/WG-WeatherStation-%s",F.lit(urlInst),idDF.ID))
    idDF = idDF.withColumn("SensorUID",F.format_string("Sensor/%s/WG-WeatherStation-%s",F.lit(urlInst),idDF.ID))
    idDF = idDF.withColumn("DsUID",F.format_string("Datastream/%s/WG-WeatherStation-%s",F.lit(urlInst),idDF.ID))


    for hourInt in range(0,24):
        oneHourDataFileDF = dataFileDF.rdd.filter(lambda row: int(row.Time.split(':')[0]) == hourInt)
        print(oneHourDataFileDF.count())
        for row in oneHourDataFileDF.collect():
            

            class TZ(tzinfo):
                def utcoffset(self, dt): return timedelta(minutes=120)
            isoTime =datetime(int(row.Date.split('-')[0]),
                         int(row.Date.split("-")[1]),
                         int(row.Date.split("-")[2]),
                         int(row.Time.split(':')[0]),
                         int(row.Time.split(':')[1]),
                         int(row.Time.split(':')[2]),tzinfo=TZ()).isoformat()




            urlObs = urlHome+"/Datastreams('"+myPrefix+"Datastream/" + urlInst + "/WG-WeatherStation-" +row.ID+ "-" + "Air-Temperature/MS/LiveMeasure')/Observations"
            print(urlObs)
            data = {
              "phenomenonTime": isoTime,
              "resultTime" : isoTime,
              "result" : row.AirTemperature,
              "FeatureOfInterest": {
                  "@iot.id": myPrefix +"/FoI/" + myPrefix + "Datastream/"+urlInst +"/WG-WeatherStation-"+row.ID + "-Air-Temperature/MS/LiveMeasure/" +row.Latitude+":"+row.Longitude
              }
            }
            p = requests.post(urlObs, json.dumps(data))

            if (p.status_code  == 201):
                print(p.headers["location"])
            else:
                print("Error:", p.status_code)
                #print(data)
                #print(idDF.rdd.take(5))
                registerEntities(idDF.rdd.filter(lambda aRow:aRow.ID == row.ID).first())

            for chunk in p.iter_content(chunk_size=128):
                print(chunk)
    time.sleep(10)
    

14761
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-ILDINGHA4-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('3eee9c10-65db-11e8-801b-331ac83856e4')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-ILDINGHA4-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('3f008a06-65db-11e8-801b-f7d265f1555e')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-ILDINGHA4-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('3f03af6a-65db-11e8-801b-431b3ffebba2')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-ILDINGHA4-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST

http://localhost:8080/FROST-Server/v1.0/Observations('3f6e016c-65db-11e8-801b-27be8b48ed3e')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IWUPPERT89-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('3f71e9e4-65db-11e8-801b-db686ec28951')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IWUPPERT89-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('3f74f85a-65db-11e8-801b-97af1b67a356')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IWUPPERT89-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('3f77ee3e-65db-11e8-801b-d36beb69a70e')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherS

http://localhost:8080/FROST-Server/v1.0/Observations('3ff147ac-65db-11e8-801b-93b7ddaf17a7')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMNCHEN108-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('3ff48ade-65db-11e8-801b-77d62575115b')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMNCHEN108-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('3ff91d10-65db-11e8-801b-1b3217438d81')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMNCHEN108-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('3ffcd93c-65db-11e8-801b-8b1a6d9f2b5b')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherS

http://localhost:8080/FROST-Server/v1.0/Observations('407312d2-65db-11e8-801b-b7cd7589433e')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-INIENHAG8-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('4077f54a-65db-11e8-801b-479d3c8da791')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-INORDRHE146-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('407bb0d6-65db-11e8-801b-17711fa72a1b')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-INORDRHE146-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('407f4610-65db-11e8-801b-176a12d580a5')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-Weather

http://localhost:8080/FROST-Server/v1.0/Observations('43d328b8-65db-11e8-801b-5ba6cc21e9a7')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IFAHRENB2-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('43d64048-65db-11e8-801b-6742c2c4a274')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMENDENS21-Air-Temperature/MS/LiveMeasure')/Observations
('Error:', 400)
Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-IMENDENS21', 'Locations': [{'location': {'type': 'Point', 'coordinates': [51.416, 7.832]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-IMENDENS21', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-IMENDENS21', 'description': u'This is the location of WG-Sensor-IMENDENS21'}], 'name': u'WG-WeatherStation-IMENDENS21', 'descripti

FOI:
{'feature': {'type': 'Point', 'coordinates': [53.124, 13.608]}, '@iot.id': u'pfx6/FoI/pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMILMERS2-Air-Temperature/MS/LiveMeasure/53.124:13.608', 'encodingType': 'application/vnd.geo+json', 'name': u'Weather Station-IMILMERS2', 'description': u'Weather Station-IMILMERS2'}
201
No such entity 'Datastream' with id pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMILMERS2-Air-Temperature/MS/LiveMeasure
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMILMERS2-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('46907bc8-65db-11e8-801b-5b94e240c861')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMILMERS2-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('46967fd2-65db-11e8-801b-fb7444e93448')
http://smartaqnet-de

http://localhost:8080/FROST-Server/v1.0/Observations('4887a014-65db-11e8-801b-e3739ae5da62')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IWEINHEI19-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('488a9c1a-65db-11e8-801b-8fe41f0a0f75')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IWEINHEI19-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('488dc282-65db-11e8-801b-1f9304e73a7a')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IBAYERNE35-Air-Temperature/MS/LiveMeasure')/Observations
('Error:', 400)
Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-IBAYERNE35', 'Locations': [{'location': {'type': 'Point', 'coordinates': [49.86, 9.844]}, '@iot.id': u'pfx6Location/Thing/e

http://localhost:8080/FROST-Server/v1.0/Observations('4ab3381c-65db-11e8-801b-3b773d6dee30')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IWARTHAU3-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('4ab67176-65db-11e8-801b-2f92ea10428f')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-ITHRNICH2-Air-Temperature/MS/LiveMeasure')/Observations
('Error:', 400)
Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-ITHRNICH2', 'Locations': [{'location': {'type': 'Point', 'coordinates': [49.847, 6.842]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-ITHRNICH2', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-ITHRNICH2', 'description': u'This is the location of WG-Sensor-ITHRNICH2'}], 'name': u'WG-WeatherStation-ITHRNICH2', 'description': u

http://localhost:8080/FROST-Server/v1.0/Observations('4ca47532-65db-11e8-801b-272d5358a7d1')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-ISANKTOS8-Air-Temperature/MS/LiveMeasure')/Observations
('Error:', 400)
Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-ISANKTOS8', 'Locations': [{'location': {'type': 'Point', 'coordinates': [48.912, 13.384]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-ISANKTOS8', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-ISANKTOS8', 'description': u'This is the location of WG-Sensor-ISANKTOS8'}], 'name': u'WG-WeatherStation-ISANKTOS8', 'description': u'This is DWD-Sensor-ISANKTOS8'}
201
Sensor:
{'metadata': u'https://www.wunderground.com/personal-weather-station/dashboard?ID=ISANKTOS8', 'encodingType': 'application/pdf', '@iot.id': u'pfx6Sensor/edu.teco.wang/WG-WeatherStation-ISANKTOS8-Air-Temperature', 'name':

http://localhost:8080/FROST-Server/v1.0/Observations('4e26702c-65db-11e8-801b-437ce2743c3e')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMNCHEN22-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('4e2dce44-65db-11e8-801b-47ead1a2ec52')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-INORDRHE178-Air-Temperature/MS/LiveMeasure')/Observations
('Error:', 400)
Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-INORDRHE178', 'Locations': [{'location': {'type': 'Point', 'coordinates': [51.553, 8.557]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-INORDRHE178', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-INORDRHE178', 'description': u'This is the location of WG-Sensor-INORDRHE178'}], 'name': u'WG-WeatherStation-INORDRHE178', 'des

http://localhost:8080/FROST-Server/v1.0/Observations('504335e8-65db-11e8-801b-2b77c0a5406c')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IMNSTER23-Air-Temperature/MS/LiveMeasure')/Observations
('Error:', 400)
Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-IMNSTER23', 'Locations': [{'location': {'type': 'Point', 'coordinates': [51.932, 7.673]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-IMNSTER23', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-IMNSTER23', 'description': u'This is the location of WG-Sensor-IMNSTER23'}], 'name': u'WG-WeatherStation-IMNSTER23', 'description': u'This is DWD-Sensor-IMNSTER23'}
201
Sensor:
{'metadata': u'https://www.wunderground.com/personal-weather-station/dashboard?ID=IMNSTER23', 'encodingType': 'application/pdf', '@iot.id': u'pfx6Sensor/edu.teco.wang/WG-WeatherStation-IMNSTER23-Air-Temperature', 'name': 

Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-IGERLING13', 'Locations': [{'location': {'type': 'Point', 'coordinates': [48.789, 9.064]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-IGERLING13', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-IGERLING13', 'description': u'This is the location of WG-Sensor-IGERLING13'}], 'name': u'WG-WeatherStation-IGERLING13', 'description': u'This is DWD-Sensor-IGERLING13'}
201
Sensor:
{'metadata': u'https://www.wunderground.com/personal-weather-station/dashboard?ID=IGERLING13', 'encodingType': 'application/pdf', '@iot.id': u'pfx6Sensor/edu.teco.wang/WG-WeatherStation-IGERLING13-Air-Temperature', 'name': u'WG-WeatherStation-IGERLING13-Air-Temperature', 'description': 'This is a Sensor from Netatmo Weather Station'}
201
Sensor:
{'metadata': u'https://www.wunderground.com/personal-weather-station/dashboard?ID=IGERLING13', 'encodingType': 'application/pdf', '@iot.id': u'pfx6S

http://localhost:8080/FROST-Server/v1.0/Observations('54402688-65db-11e8-801b-238d11898931')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IBERNKAS5-Air-Temperature/MS/LiveMeasure')/Observations
('Error:', 400)
Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-IBERNKAS5', 'Locations': [{'location': {'type': 'Point', 'coordinates': [49.917, 7.076]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-IBERNKAS5', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-IBERNKAS5', 'description': u'This is the location of WG-Sensor-IBERNKAS5'}], 'name': u'WG-WeatherStation-IBERNKAS5', 'description': u'This is DWD-Sensor-IBERNKAS5'}
201
Sensor:
{'metadata': u'https://www.wunderground.com/personal-weather-station/dashboard?ID=IBERNKAS5', 'encodingType': 'application/pdf', '@iot.id': u'pfx6Sensor/edu.teco.wang/WG-WeatherStation-IBERNKAS5-Air-Temperature', 'name': 

Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-IUEDEM6', 'Locations': [{'location': {'type': 'Point', 'coordinates': [51.656, 6.364]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-IUEDEM6', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-IUEDEM6', 'description': u'This is the location of WG-Sensor-IUEDEM6'}], 'name': u'WG-WeatherStation-IUEDEM6', 'description': u'This is DWD-Sensor-IUEDEM6'}
201
Sensor:
{'metadata': u'https://www.wunderground.com/personal-weather-station/dashboard?ID=IUEDEM6', 'encodingType': 'application/pdf', '@iot.id': u'pfx6Sensor/edu.teco.wang/WG-WeatherStation-IUEDEM6-Air-Temperature', 'name': u'WG-WeatherStation-IUEDEM6-Air-Temperature', 'description': 'This is a Sensor from Netatmo Weather Station'}
201
Sensor:
{'metadata': u'https://www.wunderground.com/personal-weather-station/dashboard?ID=IUEDEM6', 'encodingType': 'application/pdf', '@iot.id': u'pfx6Sensor/edu.teco.wang/WG-Weather

http://localhost:8080/FROST-Server/v1.0/Observations('56788788-65db-11e8-801b-576605e5e167')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IESSEN116-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('567f7584-65db-11e8-801b-a77c9b970efb')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IRAUSCHE2-Air-Temperature/MS/LiveMeasure')/Observations
('Error:', 400)
Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-IRAUSCHE2', 'Locations': [{'location': {'type': 'Point', 'coordinates': [50.892, 8.985]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-IRAUSCHE2', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-IRAUSCHE2', 'description': u'This is the location of WG-Sensor-IRAUSCHE2'}], 'name': u'WG-WeatherStation-IRAUSCHE2', 'description': u

FOI:
{'feature': {'type': 'Point', 'coordinates': [50.943, 6.923]}, '@iot.id': u'pfx6/FoI/pfx6Datastream/edu.teco.wang/WG-WeatherStation-IKLN303-Air-Temperature/MS/LiveMeasure/50.943:6.923', 'encodingType': 'application/vnd.geo+json', 'name': u'Weather Station-IKLN303', 'description': u'Weather Station-IKLN303'}
201
No such entity 'Datastream' with id pfx6Datastream/edu.teco.wang/WG-WeatherStation-IKLN303-Air-Temperature/MS/LiveMeasure
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IKLN303-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('5886c0d0-65db-11e8-801b-1f1765c0da52')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-IKLN303-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('588d58a0-65db-11e8-801b-27e59455dee7')
http://smartaqnet-dev01.teco.edu:8

DS:
{'observationType': 'http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement', '@iot.id': u'pfx6Datastream/edu.teco.wang/WG-WeatherStation-IGTTINGE35-Air-Pressure/MS/LiveMeasure', 'name': 'MS/LiveMeasure', 'unitOfMeasurement': {'definition': 'http://www.qudt.org/qudt/owl/1.0.0/unit/Instances.html#DegreeCelsius', 'symbol': 'degC', 'name': 'Degree Celsius'}, 'Thing': {'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-IGTTINGE35'}, 'ObservedProperty': {'@iot.id': 'pfx6ObservedProperty/Air-Pressure'}, 'Sensor': {'@iot.id': u'pfx6Sensor/edu.teco.wang/WG-WeatherStation-IGTTINGE35-Air-Pressure'}, 'description': 'Datastream for recording temperature'}
201
FOI:
{'feature': {'type': 'Point', 'coordinates': [51.556, 9.948]}, '@iot.id': u'pfx6/FoI/pfx6Datastream/edu.teco.wang/WG-WeatherStation-IGTTINGE35-Air-Temperature/MS/LiveMeasure/51.556:9.948', 'encodingType': 'application/vnd.geo+json', 'name': u'Weather Station-IGTTINGE35', 'description': u'Weather Station-IGTTINGE35'}

Thing:
{'@iot.id': u'pfx6Thing/edu.teco.wang/WG-WeatherStation-INEURIED12', 'Locations': [{'location': {'type': 'Point', 'coordinates': [48.461, 7.806]}, '@iot.id': u'pfx6Location/Thing/edu.teco.wang/WG-WeatherStation-INEURIED12', 'encodingType': 'application/vnd.geo+json', 'name': u'Thing/edu.teco.wang/WG-WeatherStation-INEURIED12', 'description': u'This is the location of WG-Sensor-INEURIED12'}], 'name': u'WG-WeatherStation-INEURIED12', 'description': u'This is DWD-Sensor-INEURIED12'}
201
Sensor:
{'metadata': u'https://www.wunderground.com/personal-weather-station/dashboard?ID=INEURIED12', 'encodingType': 'application/pdf', '@iot.id': u'pfx6Sensor/edu.teco.wang/WG-WeatherStation-INEURIED12-Air-Temperature', 'name': u'WG-WeatherStation-INEURIED12-Air-Temperature', 'description': 'This is a Sensor from Netatmo Weather Station'}
201
Sensor:
{'metadata': u'https://www.wunderground.com/personal-weather-station/dashboard?ID=INEURIED12', 'encodingType': 'application/pdf', '@iot.id': u'pfx6S

FOI:
{'feature': {'type': 'Point', 'coordinates': [52.013, 8.881]}, '@iot.id': u'pfx6/FoI/pfx6Datastream/edu.teco.wang/WG-WeatherStation-ILEMGO6-Air-Temperature/MS/LiveMeasure/52.013:8.881', 'encodingType': 'application/vnd.geo+json', 'name': u'Weather Station-ILEMGO6', 'description': u'Weather Station-ILEMGO6'}
201
No such entity 'Datastream' with id pfx6Datastream/edu.teco.wang/WG-WeatherStation-ILEMGO6-Air-Temperature/MS/LiveMeasure
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-ILEMGO6-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('5d7022bc-65db-11e8-801b-e735e5ce1d8c')
http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0/Datastreams('pfx6Datastream/edu.teco.wang/WG-WeatherStation-ILEMGO6-Air-Temperature/MS/LiveMeasure')/Observations
http://localhost:8080/FROST-Server/v1.0/Observations('5d7338c6-65db-11e8-801b-83bce95aec93')
http://smartaqnet-dev01.teco.edu:8

In [None]:
Datastream/edu.teco.wang/WG-WeatherStation-ILDINGHA4-Air-Temperature/MS/LiveMeasure
Datastream/edu.teco.wang/WG-WeatherStation-ILDINGHA4-Air-Temperature/MS/LiveMeasure
Datastream/edu.teco.wang/WG-WeatherStation-ILDINGHA4-Air-Temperature/MS/LiveMeasure
pfx6Datastream/edu.teco.wang/WG-WeatherStation-IHERBORN7-Air-Pressure/MS/LiveMeasure

In [None]:
urlObs = urlHome+"/Datastreams('"+myPrefix+"edu.teco.wang/"+row.ID+"/Datastreams/" + "Air-Temperature" + "-" + row.ID+"')/Observations"

p = requests.post(urlObs, json.dumps(
{
  "phenomenonTime": isoTime,
  "resultTime" : isoTime,
  "result" : row.AirTemperature,
  "FeatureOfInterest": {
      "@iot.id": myPrefix + urlInst+"/"+row.ID + "/FoI/"+row.ID+"/"+row.Latitude+":"+row.Longitude
  }
}
))

if (p.status_code  == 201):
    print(p.headers["location"])
else:
    print("Error:", p.status_code)

for chunk in p.iter_content(chunk_size=128):
    print(chunk)

In [None]:

#idDF = idDF.withColumn("FoIUID",F.format_string("%s/FeaturesOfInterest/%s",idDF.ThID,idDF.ID))
#idDF = idDF.withColumn("SensorUID",F.format_string("%s/Sensors/%s",idDF.ThID,idDF.ID))


In [None]:
row = idDF.collect()[0]

row

In [None]:
# Observations
dataFileDF.show(10)

In [None]:
row = dataFileDF.collect()[0]
row

In [None]:
from datetime import tzinfo, timedelta, datetime
for row in dataFileDF.collect():
    

    class TZ(tzinfo):
        def utcoffset(self, dt): return timedelta(minutes=120)
    isoTime =datetime(int(row.Date.split('-')[0]),
                 int(row.Date.split("-")[1]),
                 int(row.Date.split("-")[2]),
                 int(row.Time.split(':')[0]),
                 int(row.Time.split(':')[1]),
                 int(row.Time.split(':')[2]),tzinfo=TZ()).isoformat()
    
    
    
    
    urlObs = urlHome+"/Datastreams('edu.teco.wang/"+row.ID+"/Datastreams/" + "Air-Temperature" + "-" + row.ID+"')/Observations"
    data ={
      "phenomenonTime": isoTime,
      "resultTime" : isoTime,
      "result" : row.AirTemperature
    }
    p = requests.post(urlObs, json.dumps(data))
    if (p.status_code  == 201):
        print(p.headers["location"])
    else:
        print("Error:", p.status_code)

    for chunk in p.iter_content(chunk_size=128):
        print(chunk)


In [None]:
urlObs

In [None]:
p.headers["location"]

In [None]:
from datetime import tzinfo, timedelta, datetime
row = dataFileDF.first()
row.Date.split("-")[0]
class TZ(tzinfo):
    def utcoffset(self, dt): return timedelta(minutes=120)
datetime(int(row.Date.split('-')[0]),
         int(row.Date.split("-")[1]),
         int(row.Date.split("-")[2]),
         int(row.Time.split(':')[0]),
         int(row.Time.split(':')[1]),
         int(row.Time.split(':')[2]),tzinfo=TZ()).isoformat()

## Tryouts

In [None]:
row = idDF.collect()[1]
urlHome = 'http://smartaqnet.teco.edu:8080/FROST-Server/v1.0'
urlThings = urlHome + '/Things'
data =  {
        "name": "DWD-Sensor-noSens" + row[0],
        "description": "DWD_Sensor-" + row[0],
        "@iot.id": "DWD-Sensor/noSensblblb" + row[0]
    }
p = requests.post(urlThings, json.dumps(data))
if (p.status_code  == 201):
    print(201)
else:
    print("Error:", p.status_code)
    
    for chunk in p.iter_content(chunk_size=128):
        print(chunk)

In [None]:

idDF = idDF.groupBy(idDF.ID).agg(F.first(idDF.LatLon).alias("LatLon"))#.show(3)#.agg(F.first(idDF.Longitude))

In [None]:
urlHome = 'http://smartaqnet-dev01.teco.edu:8080/FROST-Server/v1.0'
#urlHome = 'http://smartaqnet-dev.teco.edu:8080/FROST-Server/v1.0'
urlThings = urlHome + '/Things'

In [None]:

#idDF2 = idDF.select("ID","LatLon").rdd.map(DfToSensorthings)

In [None]:
row = idDF.collect()[1]
import requests
urlHome = 'http://smartaqnet.teco.edu:8080/FROST-Server/v1.0'
urlThings = urlHome + '/Things'
sensorAddr = "https://www.wunderground.com/personal-weather-station/dashboard?ID="+row[0]
data =  {
        "name": "DWD-Sensor-" + row[0],
        "description": "DWD_Sensor-" + row[0],
        "@iot.id": "DWD-Sensor/" + row[0]
    }
p = requests.get(urlThings+"('DWD-Sensor/noSensblbl" + row[0]+"')")
if (p.status_code  == 200):
    print(200)
else:
    print("Error:", p.status_code)
    
for chunk in p.iter_content(chunk_size=128):
    print(chunk)

In [None]:
row

In [None]:
p = requests.post(urlThings, json.dumps(data))
if (p.status_code  == 201):
    print("Creation successful")
else:
    print("Error:", p.status_code)
    for chunk in p.iter_content(chunk_size=128):
        print(chunk)
        

In [None]:
#Delete a thing

deleteID = urlThings + "('DWD-Sensors/IBREUNA2')"
print(deleteID)
p = requests.delete(urlThings)
if (p.status_code  == 201):
    print("Deletion successful")
else:
    print("Error:", p.status_code)
    for chunk in p.iter_content(chunk_size=128):
        print(chunk)

In [None]:
#Query a thing

getID = urlThings + "(\"teco.edu/Test-2\")"
print(getID)
p = requests.get(urlThings)
if (p.status_code  == 201):
    print("Get successful")
else:
    print("Error:", p.status_code)
    for chunk in p.iter_content(chunk_size=128):
        print(chunk)

In [None]:
p.content

In [None]:
sensors = idDF.collect()
for sensor in sensors:
    print(DfToSensorthingsPost(sensor))
    #self link

In [None]:
import requests

In [None]:
dataFileDF = ss.read.parquet(inputDir)
outDir = "./outDir/outAllAll5/"
print(csv.__file__)

In [None]:
validFileNames = [inputDir+f for f in os.listdir(inputDir) if ('.' not in f) and ("part-" in f)]
validFileNames

In [None]:
for aFile in validFileNames:
    with open(aFile) as csvfile:
        for row in csvfile:
            k = row[1:9]
            v = row[13:-3]+'\n'
            vNew = re.sub(';','\n',v)
            outFile = open(outDir+k,"w")
            outFile.write(vNew)
            outFile.close()


In [None]:
row = csvfile.readline()

In [None]:
k = row[1:9]
v = row[13:-3].split(";")
k,v

In [None]:
with open(inputFile) as csvfile:
    for row in csvfile:
        k = row[1:9]
        v = row[13:-3]+'\n'
        vNew = re.sub(';','\n',v)
        outFile = open(outDir+k,"w")
        outFile.write(vNew)
        outFile.close()

In [None]:
import re

print(vNew)

In [None]:
outFile = open(outDir+k,"w")
outFile.write(vNew)
outFile.close()

In [None]:
with open(inputFile,'rb') as csvfile:
    reader = csv.Reader(csvfile)
    row = reader.__next__()
row

In [None]:
row

In [None]:
all=string.maketrans('','')
nodigs=all.translate(all, string.digits)

In [None]:
class Del:
  def __init__(self, keep=string.digits):
    self.comp = dict((ord(c),c) for c in keep)
  def __getitem__(self, k):
    return self.comp.get(k)

In [None]:
with open(inputPath) as csvfile:
    reader = csv.DictReader(csvfile)
    for i in range(10):
        row = reader.__next__()
        
        lineID = row['id']
        timeStamp = row['time'].translate(Del())
        print(lineID, timeStamp)
        