## Kriging Use Case
Fetches wunderground data from Kafka, use them to predict air temperature. Predictions are validated with observations from deutsche Wetterdienst
How-tos:
    1. Get data from Kafka to Spark
    2. Get data as File from HDFS to Spark
    3. Manipulate SparkDF
    4. Do some computing: Kriging, in parallel with partitioning


In [1]:
import csv
import re
import math
import time
import random
import numpy as np
import sys
import os
from shapely.geometry import shape, Point 
import folium
from matplotlib.pyplot import axis
from numpy import shape

import copy
import matplotlib
import datetime
import matplotlib as mpl
import pandas as pd

from sklearn.gaussian_process.kernels import Matern
import matplotlib.colors as colors
import matplotlib.pyplot as plt


#    Spark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
#    Spark Streaming
from pyspark.streaming import StreamingContext  
#    Kafka
from pyspark.streaming.kafka import KafkaUtils  
#    json parsing
import json 


#

## Set system env
 <pre>
 1. 2. Specify which python will be used by spark so that the installed packages can be distributed across the cluster in HDFS
 3. Include pacages for kafka-spark connection
 </pre>

In [2]:
## ChangeME
os.environ['PYSPARK_DRIVER_PYTHON'] = "/gpfs/smartdata/urdxb/.conda/envs/pySparkEnv/bin/ipython"
## ChangeME
os.environ['PYSPARK_PYTHON'] = "/gpfs/smartdata/urdxb/.conda/envs/pySparkEnv/bin/python"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

## Create a sparksession

In [3]:
conf = SparkConf().setMaster("local[*]").setAppName('Kriging example')
ss = SparkSession.builder.config(conf=conf).getOrCreate()
sc = ss.sparkContext

## Convertion of Kafka Stream to DataStream
1. Create dataframe that readStream from Kafka. For more infos: https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
2. Convert \$key and \$ value to string where the key is the uid of the observation and the value is the message in JSON
3. Extract the informations in the message


In [4]:
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
# import helper.py, you may want to put the variables or functions there
import Helper

kafkaHome = "kafka-vm0.teco.edu"
brokerUrl = kafkaHome + ":9092"


In [5]:
# Declare a data frame that subscribes to the topic Observations and take the messages from the very begining (offset=0)
df = ss.readStream.format("kafka").option("kafka.bootstrap.servers", brokerUrl) \
        .option("subscribe", "Observations")\
        .option("startingOffsets","earliest")\
        .load()
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [6]:
## Function that parses the value in the message to a Row in SparkSQL
def valToJSON(s):
    if s is not None:
        import ast
        d = ast.literal_eval(s)
        return d#json.loads(d)['result']
def getID(d):
    if d is not None:
        try:
            return json.loads(d)['Datastream']['@iot.id']
        except:
            return None
def getTime(d):
    if d is not None:
        try:
            return json.loads(d)['resultTime']
        except:
            return None

def getLat(d):
    if d is not None:
        try:
            latlon = json.loads(d)['FeatureOfInterest']['@iot.id']
        
            return latlon.split("/")[-1].split(":")[0]
        except:
            return None
def getLon(d):

    if d is not None:
        try:
            latlon = json.loads(d)['FeatureOfInterest']['@iot.id']
        
            return latlon.split("/")[-1].split(":")[1]
        except:
            return None
def getTemp(d):
    if d is not None:
        try:
            return json.loads(d)['result']
        except:
            return None

dateStr, timeStr = "2015-12-01","22:00:00"
        

myValToJSON = udf(valToJSON)
myGetID = udf(getID)
myGetTime = udf(getTime)
myGetLat = udf(getLat)
myGetLon = udf(getLon)
myGetTemp = udf(getTemp)

#df = df.select(df.key, myValToJSON("value").alias("value"))
df = df.select(df.key, df.value, myGetTime(df.value).alias("DateTimeString"))
df = df.filter(df.DateTimeString.contains(dateStr) & df.DateTimeString.contains("T"+ timeStr[0:2])).drop(df.DateTimeString)
#df = df.drop(df.DateTimeString)
df = df.groupBy(df.key).agg(F.last(df.value).alias("value"))

df = df.select(myGetID(df.value).alias("ID"),
               myGetTime(df.value).alias("Time"),
               myGetLat(df.value).alias("Latitude"),
               myGetLon(df.value).alias("Longitude"),
               myGetTemp(df.value).alias("AirTemperature"))

## Start the query based on the Spark dataframe and its processing (select,filter,group and so on)
q = df.writeStream.queryName('TemperatureMap').outputMode("complete").format("memory").start()


In [11]:
#
x = ss.sql("select * from TemperatureMap").rdd.collect()
if(len(x)!= 0):
    print(len(x))
    print(x[0])


## Perform partitioning
1. Transform the SparkDF to PandasDF
2. Partition according to the density of the weather stations

In [12]:
# Select the observations from streaming data as a SparkDF and convert it to a PandasDF
observations = pd.DataFrame(ss.sql("select Latitude,Longitude,AirTemperature from TemperatureMap").rdd.collect(),columns =['Latitude','Longitude','AirTemperature'] )
observations[['Latitude','Longitude','AirTemperature']] = observations[['Latitude','Longitude','AirTemperature']].apply(pd.to_numeric,errors="coerce")
observations["key"] = 0
oldNoKeys = 0
# Create a PandasDF for the purpose of partitioning
tileTable = observations.loc[:,['key','Latitude','Longitude']].groupby("key").agg(["min","max"])
krigingRange = [47.2, 55.1,  5.2 , 15.2]
tileTable.loc[0] =krigingRange
# Partition the observations according to their location (lat lon)
while(oldNoKeys != tileTable.shape[0]):
    oldNoKeys = tileTable.shape[0]
    print(oldNoKeys)
    #Create new Tiles
    for aIndex in tileTable.index:
        if(aIndex <= 0):
            newIndexs = [4 * aIndex - 1, 4*aIndex - 2, 4*aIndex - 3, 4*aIndex - 4]
            # 1 2
            # 3 4
            latMax = tileTable.loc[aIndex,"Latitude"]["max"]
            latMin = tileTable.loc[aIndex,"Latitude"]["min"]
            lonMax = tileTable.loc[aIndex,"Longitude"]["max"]
            lonMin = tileTable.loc[aIndex,"Longitude"]["min"]
            newTiles = [[latMin, (latMax + latMin)/2, lonMin, (lonMax + lonMin)/2],
                        [(latMax + latMin)/2, latMax, lonMin, (lonMax + lonMin)/2],
                        [latMin, (latMax + latMin)/2, (lonMax + lonMin)/2, lonMax],
                        [(latMax + latMin)/2, latMax, (lonMax + lonMin)/2, lonMax]]
            forceSplit = False
            split = True
            for aNewTile in newTiles:
                
                aCtr = observations.loc[(observations.Latitude >= aNewTile[0]) & (observations.Latitude <= aNewTile[1])
             & (observations.Longitude >= aNewTile[2]) & (observations.Longitude <= aNewTile[3])].shape[0]
                
                if( aCtr < 10):
                    split = False
                elif(aCtr >= 200):
                    forceSplit = True
                    split = True
                #print(str(aIndex)+" "+str(aCtr))
            
            if(forceSplit or split):
                for i in range(0,4):
                    #split remove old
                    tileTable.loc[newIndexs[i]] = newTiles[i]
                    observations.loc[(observations.Latitude >= newTiles[i][0]) & (observations.Latitude <= newTiles[i][1])
             & (observations.Longitude >= newTiles[i][2]) & (observations.Longitude <= newTiles[i][3]),"key"] = -newIndexs[i]
                    
                    #create new
            else:
                #keep old
                tileTable.loc[-aIndex] = tileTable.loc[aIndex]
            tileTable = tileTable.drop([aIndex])

1
4
16
46
112
148
157


In [13]:
# Stop the query if you want
q.stop()

## Draw partition lines
Check the result of partitioning, can also be skipped
1. Convert partioning result to a geoJSON
2. Plot using Folium

In [14]:
tileTable["P1"] = tileTable.Longitude["min"].map(lambda x: [x]) + tileTable.Latitude["min"].map(lambda x: [x])
tileTable["P2"] = tileTable.Longitude["min"].map(lambda x: [x]) + tileTable.Latitude["max"].map(lambda x: [x])
tileTable["P3"] = tileTable.Longitude["max"].map(lambda x: [x]) + tileTable.Latitude["max"].map(lambda x: [x])
tileTable["P4"] = tileTable.Longitude["max"].map(lambda x: [x]) + tileTable.Latitude["min"].map(lambda x: [x])
tileTable["Ps"] = tileTable.P1.map(lambda x: [x]) + tileTable.P2.map(lambda x: [x]) + tileTable.P3.map(lambda x: [x]) + tileTable.P4.map(lambda x: [x])
tileTable["Ps"] = tileTable.Ps.map(lambda x: [x])
#keys["JSONstr"] = keys.P1.map(lambda x:)

#tileTable[["idx","Ps"]].iloc[0].idx.values[0]
#tileTable[["idx","Ps"]].iloc[0].Ps.values[0]
#tileTable

In [15]:

myFeatures = []
for aKey in tileTable.index:
    aFeature = {"type":"Feature",
                "id":aKey,
                "properties":{"name":aKey},
                "geometry":{"type":"Polygon","coordinates":tileTable.loc[aKey].Ps.values[0]}}
    #print(json.dumps(aFeature))
    myFeatures.append(aFeature)
    #print(keys.loc[aKey].idx.values[0])

myMultiPolygon = {"type":"FeatureCollection","features":myFeatures}
myJSON = json.loads(json.dumps(myMultiPolygon))
myJSON

{u'features': [{u'geometry': {u'coordinates': [[[5.2, 47.2],
      [5.2, 49.175000000000004],
      [7.699999999999999, 49.175000000000004],
      [7.699999999999999, 47.2]]],
    u'type': u'Polygon'},
   u'id': 5,
   u'properties': {u'name': 5},
   u'type': u'Feature'},
  {u'geometry': {u'coordinates': [[[5.2, 53.125],
      [5.2, 55.1],
      [7.699999999999999, 55.1],
      [7.699999999999999, 53.125]]],
    u'type': u'Polygon'},
   u'id': 10,
   u'properties': {u'name': 10},
   u'type': u'Feature'},
  {u'geometry': {u'coordinates': [[[12.7, 47.2],
      [12.7, 49.175000000000004],
      [15.2, 49.175000000000004],
      [15.2, 47.2]]],
    u'type': u'Polygon'},
   u'id': 15,
   u'properties': {u'name': 15},
   u'type': u'Feature'},
  {u'geometry': {u'coordinates': [[[12.7, 49.175000000000004],
      [12.7, 51.150000000000006],
      [15.2, 51.150000000000006],
      [15.2, 49.175000000000004]]],
    u'type': u'Polygon'},
   u'id': 16,
   u'properties': {u'name': 16},
   u'type': u'

In [16]:
m = folium.Map([51, 9.], tiles='cartodbpositron', zoom_start=6)
folium.GeoJson(myJSON).add_to(m)
m

# Kriging on DWD
1. Read observations into a RDD from DWD as files
2. Parse and select the observations from a specific date and time
3. Assign the selected observations to a partition created before
4. Run kriging on each partition in parallel
5. Plot the result using foliium

## Read data from DWD

In [17]:
#sc = ss.sparkContext
# Read DWD Observations with some preprocessing to eliminate invalid data 
## ChangeME
dwdRdd = sc.wholeTextFiles(Helper.Helper.DWDData).flatMap(lambda (k,v):v.split("\n")).cache()
#dwdDf = ss.read.format("csv").load("data/DWDResult/xaa")
dwdRdd1516 = dwdRdd.filter(lambda x: len(x[1:-1].split(","))==6).cache()
dwdRdd1516 = dwdRdd1516.filter(lambda x: x[1:-1].split(",")[1][2:6]=="2015" or x[1:-1].split(",")[1][2:6]=="2016").cache()

In [18]:
dwdRdd1516 = dwdRdd1516.map(lambda x: x[1:-1].replace("'","").split(",")).cache()
from pyspark.sql.types import Row
def f(x):
    d = {}
    d["ID"] = x[0]
    d["Date"] = x[1].strip()
    d["Time"] = x[2].strip()
    d["Latitude"] = float(x[3].strip())
    d["Longitude"] = float(x[4].strip())
    d["AirTemperature"] = float(x[5].strip())
    return d
dwdDf = dwdRdd1516.map(lambda x: Row(**f(x))).toDF()

## Select the observations from a specific date and time

In [19]:

dwdDf.createOrReplaceTempView("table1")
smalldf = ss.sql("select * from table1 where Time = '{1}' and Date = '{0}'".format(dateStr, timeStr)).cache()

In [20]:
## Convert to Pandas DF

In [21]:
cols = ["AirTemperature","Date","ID","Latitude","Longitude","Time"]
aDF =pd.DataFrame.from_records(smalldf.rdd.collect(), columns = cols)


In [22]:
def assignToTile(latlon):
    for aIndex in tileTable.index:
        if(latlon[0] >= tileTable.loc[aIndex,"Latitude"]["min"]
           and latlon[0] <= tileTable.loc[aIndex,"Latitude"]["max"]
           and latlon[1] >= tileTable.loc[aIndex,"Longitude"]["min"]
           and latlon[1] <= tileTable.loc[aIndex,"Longitude"]["max"]):
            return int(aIndex)
aDF.loc[:,"key"] = aDF.loc[:,"Latitude"].map(lambda x: [x])+aDF.loc[:,"Longitude"].map(lambda x: [x])
#tempDf["idx"] = tempDf["idx"].map(assignToTile2)
aDF.loc[:,"key"] = aDF.loc[:,"key"].map(assignToTile)


## Assign the observations to partitions according the result of the partitioning

In [23]:
# A list to store the kriging result
sTotalResult = []
# A list of the tuple of WG-Observations, DWD-Observations, each tuple represents a partition
krigingPairs = []
for aIndex in tileTable.index.values:
    
    #aIndex = tileTable.index.values[0]
    aKnownPoints = observations.loc[observations.key == aIndex,["Latitude","Longitude","AirTemperature"]].dropna()
    aKrigingGrid = aDF.loc[aDF.key == aIndex,["Latitude","Longitude"]].apply(pd.to_numeric,errors="coerce")
    #aKrigingGrid["Observations"] =aDF.loc[aDF.key == aIndex,"AirTemperature"]
    krigingPairs.append((aKnownPoints,aKrigingGrid))


## Distribute the PandasDF across the cluster and run Kriging in parallel

In [25]:
krigingPairsRdd = ss.sparkContext.parallelize(krigingPairs)
def paralKriging(v):
    import pandas
    from sklearn.gaussian_process import GaussianProcessRegressor
    from sklearn.gaussian_process.kernels import Matern
    aKnownPoints = v[0]
    aKrigingGrid = v[1]
    if( len(aKnownPoints) > 2 and len(aKrigingGrid) > 0):
        kernel =   Matern(1.0, (1e-2, 1e-1))

        gp = GaussianProcessRegressor(kernel=kernel,n_restarts_optimizer=30)#,  normalize_y=True)
        gp.fit(aKnownPoints.values[:,0:2], aKnownPoints.values[:,2])

        y_pred,sigma = gp.predict(aKrigingGrid.values[:,0:2], return_std=True)
        aKrigingGrid["y_pred"] = y_pred
        aKrigingGrid["sigma"] = sigma
        return aKrigingGrid
    
krigingPairsRdd = krigingPairsRdd.map(paralKriging)

In [26]:
sTotalResultPd = pd.concat(krigingPairsRdd.collect())
sTotalResultPdSorted = sTotalResultPd.sort_index()
#sTotalResultPdSorted.loc[:,"y_pred"].loc[0]

In [27]:
sTotalResultPdSorted_WithObs = sTotalResultPdSorted
sTotalResultPdSorted_WithObs["Observations"] = aDF["AirTemperature"]
sTotalResultPdSorted_WithObs["ID"] = aDF["ID"]
#sTotalResultPdSorted_WithObs


In [75]:
sTotalResultPdSorted_WithObs.values[1]

array([47.8741, 8.0044, 3.658621455351808, 0.8261662994030426, 0.6,
       u'01346'], dtype=object)

In [29]:
data = sTotalResultPdSorted_WithObs.values[:,0:3].tolist()
from folium.plugins import HeatMap

m = folium.Map([48., 5.], tiles='stamentoner', zoom_start=6)

HeatMap(data).add_to(m)


m

In [30]:
m = folium.Map([51, 9.], tiles='cartodbpositron', zoom_start=6)
for aRow in sTotalResultPdSorted_WithObs.values:
    folium.map.Marker(
        [aRow[0], aRow[1]],
                    popup='ID:{0},Obs:{1},Pred:{2}.'.format(aRow[5],aRow[4],aRow[2])).add_to(m)
m

## Kriging + Visualization
1. Load deutschLandGeoJSON
2. Calculate the centroid of each json
3. Assign centroids to partitioned tiles
4. Apply Kriging in different partitions to predict the air temperature of each centroid in parallel
5. Use the predictions to represent the air temperature of each geojson and plot the result with Folium

In [66]:
#fname = "./deutschlandGeoJSON/1_deutschland/4_niedrig.geojson"
#fname = "./deutschlandGeoJSON/2_bundeslaender/4_niedrig.geojson"
#fname = "./deutschlandGeoJSON/3_regierungsbezirke/4_niedrig.geojson"
## ChangeME
fname = Helper.Helper.DeutschlandJSON+"/deutschlandGeoJSON/4_kreise/4_niedrig.geojson"
#fname = "./deutschlandGeoJSON/4_kreise/2_hoch.geojson"
deutschLandGeoJSON = json.loads(open(fname).read())

In [67]:
for aJSON in deutschLandGeoJSON['features']:
    #print(np.asarray(aJSON['geometry']['coordinates'][0]).shape)
    #print(aJSON['id'])
    if(len(np.asarray(aJSON['geometry']['coordinates'][0]).shape) != 2):
        aJSON[u'centroid'] = [np.asarray(aJSON['geometry']['coordinates'][0][0])[:,0].mean(),np.asarray(aJSON['geometry']['coordinates'][0][0])[:,1].mean()]
    else:
        aJSON[u'centroid'] = [np.asarray(aJSON['geometry']['coordinates'][0])[:,0].mean(),np.asarray(aJSON['geometry']['coordinates'][0])[:,1].mean()]
    
    #print(aJSON[u'id'],aJSON[u'centroid'][1],aJSON[u'centroid'][0])

In [68]:
deutschLandGeoJSONRdd = ss.sparkContext.parallelize(deutschLandGeoJSON['features'])

def addCentroid(aGeoJSON):
    import numpy as np
    if(len(np.asarray(aGeoJSON['geometry']['coordinates'][0]).shape) != 2):
        aGeoJSON[u'centroid'] = [np.asarray(aGeoJSON['geometry']['coordinates'][0][0])[:,0].mean(),np.asarray(aGeoJSON['geometry']['coordinates'][0][0])[:,1].mean()]
    else:
        aGeoJSON[u'centroid'] = [np.asarray(aGeoJSON['geometry']['coordinates'][0])[:,0].mean(),np.asarray(aGeoJSON['geometry']['coordinates'][0])[:,1].mean()]

    return (aGeoJSON[u'id'],aGeoJSON[u'centroid'][1],aGeoJSON[u'centroid'][0])

krigingGrid = deutschLandGeoJSONRdd.map(addCentroid).collect()
cols = ["Id","Latitude","Longitude"]
krigingGridDF = pd.DataFrame.from_records(krigingGrid, columns = cols)

In [69]:
def assignToTile(latlon):
    for aIndex in tileTable.index:
        if(latlon[0] >= tileTable.loc[aIndex,"Latitude"]["min"]
           and latlon[0] <= tileTable.loc[aIndex,"Latitude"]["max"]
           and latlon[1] >= tileTable.loc[aIndex,"Longitude"]["min"]
           and latlon[1] <= tileTable.loc[aIndex,"Longitude"]["max"]):
            return int(aIndex)
krigingGridDF.loc[:,"key"] = krigingGridDF.loc[:,"Latitude"].map(lambda x: [x])+krigingGridDF.loc[:,"Longitude"].map(lambda x: [x])
#tempDf["idx"] = tempDf["idx"].map(assignToTile2)
krigingGridDF.loc[:,"key"] = krigingGridDF.loc[:,"key"].map(assignToTile)

### Parallel with spark

In [70]:
sTotalResult = []
krigingPairs = []
for aIndex in tileTable.index.values:
    
    #aIndex = tileTable.index.values[0]
    aKnownPoints = observations.loc[observations.key == aIndex,["Latitude","Longitude","AirTemperature"]].dropna()
    aKrigingGrid = krigingGridDF.loc[krigingGridDF.key == aIndex,["Latitude","Longitude"]].apply(pd.to_numeric,errors="coerce")
    krigingPairs.append((aKnownPoints,aKrigingGrid))
krigingPairsRdd = ss.sparkContext.parallelize(krigingPairs)

In [71]:
def paralKriging(v):
    import pandas
    from sklearn.gaussian_process import GaussianProcessRegressor
    from sklearn.gaussian_process.kernels import Matern
    aKnownPoints = v[0]
    aKrigingGrid = v[1]
    if( len(aKnownPoints) > 2 and len(aKrigingGrid) > 0):
        kernel =   Matern(1.0, (1e-2, 1e-1))

        gp = GaussianProcessRegressor(kernel=kernel,n_restarts_optimizer=30)#,  normalize_y=True)
        gp.fit(aKnownPoints.values[:,0:2], aKnownPoints.values[:,2])

        y_pred,sigma = gp.predict(aKrigingGrid.values[:,0:2], return_std=True)
        aKrigingGrid.shape
        aKrigingGrid["y_pred"] = y_pred
        aKrigingGrid["sigma"] = sigma
        return aKrigingGrid
    
krigingPairsRdd = krigingPairsRdd.map(paralKriging)

In [72]:

sTotalResultPd = pd.concat(krigingPairsRdd.collect())
sTotalResultPdSorted = sTotalResultPd.sort_index()
#sTotalResultPdSorted.loc[:,"y_pred"].loc[0]

In [None]:
sTotalResultPdSorted

## Visualization with Folium

In [73]:
## sTotalResultPdSorted.loc[:,"y_pred"].values.min()


import branca.colormap as cm
linearCM = cm.LinearColormap(
    ['purple','blue','green','red','black'],
    index = [-7,0,5,10,21.5],
    vmin=-7, vmax=19
)

linearCM.caption="Air Temperature"

In [74]:
m = folium.Map([51, 9.], tiles='cartodbpositron', zoom_start=6)
folium.GeoJson(deutschLandGeoJSON,
              style_function=lambda feature: {
        'fillColor': linearCM(sTotalResultPdSorted.loc[:,"y_pred"].loc[feature['id']]),
        'color': 'black',
        'weight': 1
    }).add_to(m)
m.add_child(linearCM)
m