In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
from elasticsearch import Elasticsearch, helpers
from pyspark.sql.window import Window
import requests
import numpy as np

spark.conf.set("spark.sql.session.timeZone", "GMT")

ES_HOST = 'da2019w-1019.eastus.cloudapp.azure.com'
# IF RUNNING FROM SAME ENV AS VMs:
# ES_HOST = '10.0.0.25'
es = Elasticsearch([{'host': ES_HOST}], timeout=60000)

spark.conf.set("spark.network.timeout", 10000000)
spark.conf.set("spark.executor.heartbeatInterval", 10000000)
spark.conf.set("spark.driver.maxResultSize", "4g")
spark.conf.set('spark.sql.streaming.stopActiveRunOnRestart', True)
spark.conf.set("spark.sql.broadcastTimeout", 3600)

## Helper Functions:

In [2]:
def transform_raw_data(raw_df, drop_id= True):
  raw_df = raw_df \
            .withColumn('timestamp', raw_df['timestamp']['$numberLong'].cast('bigint')) \
            .withColumn('coordinates', raw_df['loc']['coordinates']) \
            .drop(  'loc',
                    'calendar',
                    'currentHour', 
                    'systemTimestamp', 
                    'dateType', 
                    'dateTypeEnum',
                    'direction',
                    'poiId',
                    'poiId2',
                    'anomaly',
                    'probability',
                    'distanceCovered',
                    'ellapsedTime',
                    'vehicleSpeed',
                    'justLeftStop',
                    'justStopped',
                    'angle',
                    'latitude',
                    'longitude')
  if drop_id:
    raw_df = raw_df.drop('_id')
  
  return raw_df

def read_elastic(index, query="", scroll_size="10000", array_field="", read_metadata=False):
    if not es.indices.exists(index):
        raise Exception("Index doesn't exist!")

    return spark.read\
                .format("org.elasticsearch.spark.sql")\
                .option("es.nodes.wan.only","true")\
                .option("es.port","9200")\
                .option("es.nodes",ES_HOST)\
                .option("es.nodes.client.only", "false")\
                .option("pushdown", "true")\
                .option("es.query", query)\
                .option("es.scroll.size", scroll_size)\
                .option("es.scroll.keepalive", "120m")\
                .option("es.read.field.as.array.include", array_field)\
                .option("es.read.metadata", read_metadata)\
                .load(index)

        
DEFUALT_SCEHMA = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval" : -1
    },
    "mappings": {
        "properties": {
            "actualDelay" : { "type": "long" },
            "filteredActualDelay" : { "type": "long" },
            "delay" : { "type": "long" },
            "areaId" : { "type": "long" },
            "areaId1" : { "type": "long" },
            "areaId2" : { "type": "long" },
            "areaId3" : { "type": "long" },
            "atStop" : { "type": "boolean" },
            "busStop" : { "type": "long" },
            "congestion" : { "type": "boolean" },
            "gridID" : { "type": "keyword" },
            "journeyPatternId" : { "type": "keyword" },
            "lineId" : { "type": "keyword" },
            "coordinates" : { "type": "geo_point" },
            "timestamp" : { "type": "date", "format" : "epoch_millis" },
            "vehicleId" : { "type": "long" },
            "direction" : { "type": "long" },
        }
    }
}

AT_STOP_QUERY ="""{ "query": { "bool": { "must": { "term": { "atStop": "true" } } } } }"""

def write_to_elastic(df, index: str, settings=DEFUALT_SCEHMA, append=True):
    if es.indices.exists(index) and not append:
      es.indices.delete(index=index)
    
    es.indices.create(index=index, ignore=400, body=settings)
        
    df.write.format("org.elasticsearch.spark.sql")\
        .option("es.resource", index)\
        .option("es.nodes.wan.only","true")\
        .option("es.port","9200")\
        .option("es.nodes",ES_HOST)\
        .option("es.nodes.client.only", "false")\
        .mode("append") \
        .save()

def write_stream(df, index: str, settings=DEFUALT_SCEHMA, mode=True, wait=True):
  
  es.indices.create(index=index, ignore=400, body=settings)
  if wait:
    df.writeStream \
        .outputMode("append") \
        .queryName(f"{index}_to_es") \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes.wan.only","true") \
        .option("checkpointLocation", "/tmp/Afik/Stream/") \
        .option("es.resource", index) \
        .option("es.nodes", ES_HOST) \
        .option("es.port","9200") \
        .start().awaitTermination()
  else:
    df.writeStream \
        .outputMode("append") \
        .queryName(f"{index}_to_es") \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes.wan.only","true") \
        .option("checkpointLocation", "/tmp/Afik/Stream/") \
        .option("es.resource", index) \
        .option("es.nodes", ES_HOST) \
        .option("es.port","9200") \
        .start()

def calculate_centroids(index):
    centroids = []
    num_partitions = 10

    for partition in range(num_partitions):
        search = {
            "size" : 0,
            "query": { "bool": { "must": { "term": { "atStop": "true" } } } },
            "aggs" : {
              "busStop" : {
                "terms" : { 
                  "field" : "busStop",
                  "size" : 5000,
                  "include": {
                    "partition": partition,
                    "num_partitions": num_partitions
                  }
                },
                "aggs" : { "centroid" : { "geo_centroid" : { "field" : "coordinates" } } }
              }
            }
        }
        res = es.search(index=index, body=search)
        for bucket1 in res['aggregations']['busStop']['buckets']:
            busStop = bucket1['key']
            coordinates = [bucket1['centroid']['location']['lon'],  bucket1['centroid']['location']['lat']]
            centroids.append((busStop, coordinates))
                
    schema = StructType([StructField('busStop',LongType(),True),
                        StructField('coordinates',ArrayType(DoubleType(),True))])
    centroids_df = spark.createDataFrame(centroids, schema)
    return centroids_df

from json import dumps
def filter_by_dist(index, centroids_df, distance='500m'):
  '''
  distance by meters, string (e.g. 500m)
  '''
  accum_df = None
  for row in centroids_df.toLocalIterator():
    bus_stop = row.busStop
    coords = row.coordinates
    q = {"query": {
                  "bool" : {
                      "must" : {
                          "term" : bus_stop
                      },
                      "filter" : {
                          "geo_distance" : {
                              "distance" : distance,
                              "pin.location" : coords
                          }
                      }
                    }
                  }
              }
    temp_df = read_elastic(index, query=dumps(q))
    if accum_df is None:
      accum_df = temp_df
    else:
      accum_df = accum_df.union(temp_df)
      
  return accum_df
    
  
def filter_by_line_route(df, drop_busStops= True):
  routes_df = read_elastic("dublin-bus-routes", array_field='busStops')
  joined_df = df.join(routes_df.hint("broadcast"), on='lineId', how='inner')
  filtered_df =  joined_df.filter(F.expr("array_contains(busStops, busStop)"))
  if drop_busStops: 
    filtered_df = filtered_df.drop('busStops')
  return filtered_df  
  
@F.udf("int")
def calculate_direction(coordinates, prev_coordinates):
  if prev_coordinates is None or coordinates == prev_coordinates:
    return None
  longitude, latitude = coordinates
  prev_longitude, prev_latitude = prev_coordinates
  if longitude == prev_longitude:
    return 8 # inf
  slope = (latitude - prev_latitude) / (longitude - prev_longitude)
  if slope > (np.sqrt(2) + 1):
    return 8
  elif slope > (np.sqrt(2) - 1):
    return 1
  elif slope > (-np.sqrt(2) + 1):
    return 0
  elif slope > (-np.sqrt(2) - 1):
    return -1
  else:
    return 8

# Needs _id field!
def add_direction(df):
  vehicleId_window = Window.partitionBy('vehicleId').orderBy('timestamp')
  df = df.withColumn('prevCoordinates', F.lag(df.coordinates).over(vehicleId_window))
  df = df.withColumn('direction', calculate_direction(df.coordinates, df.prevCoordinates))
  return df.drop('prevCoordinates')


def extract_transform_load(df):
  etl_df = transform_raw_data(df, drop_id=False)
  etl_df = filter_by_line_route(etl_df, drop_busStops= False)
  etl_df = add_direction(etl_df)
  
  # Fix atStop:
  etl_df = atStop_adjustment(etl_df, 'dublin-bus-route-aligned-filtered-100m-centroids', distance=500)
  
  return etl_df

from math import radians, cos, sin, asin, sqrt

@F.udf(FloatType())
def euclidean_dist(x, y):
  return sqrt(sum([(a - b) ** 2 for a, b in zip(x, y)]))


def dist_approx(coord_a, coord_b):
  longit_a, latit_a = coord_a
  longit_b, latit_b = coord_b
  if None in [longit_a, latit_a, longit_b, latit_b]:
      return 9999
  # Transform to radians
  longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
  R = 6371 * 1000  # radius of the earth in meters
  x = (longit_a - longit_b) * cos( 0.5*(latit_b+latit_a))
  y = latit_b - latit_a
  d = R * sqrt(x**2 + y**2)
  return abs(round(d, 4))

udf_dist_approx = F.udf(dist_approx,FloatType())

@F.udf(StructType([StructField("distance", DoubleType(), False), StructField("busStop", LongType(), False)]))
def closest_stop(array, coords= [-6.201828763161048,53.39088490713303]):
  res = [(dist_approx(stop_coords[1], coords), stop_coords[0]) for stop_coords in array]
  return min(res)


@F.udf("float")
def haversine_dist(coord_a, coord_b):
  longit_a, latit_a = coord_a
  longit_b, latit_b = coord_b
  if None in [longit_a, latit_a, longit_b, latit_b]:
      return 9999
  # Transform to radians
  longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
  dist_longit = longit_b - longit_a
  dist_latit = latit_b - latit_a
  # Calculate area
  area = sin(dist_latit/2)**2 + cos(latit_a) * cos(latit_b) * sin(dist_longit/2)**2
  # Calculate the central angle
  central_angle = 2 * asin(sqrt(area))
  radius = 6371
  # Calculate Distance
  distance = central_angle * radius
  return abs(round(distance, 4))

def add_distance_to_centroid(centroid_df, stop_df, drop_centroid_col=True):
    c_df = centroid_df.selectExpr("coordinates as c_coordinates", "busStop as c_busStop")
    left_join = stop_df.join(c_df.hint("broadcast"), stop_df['busStop'] == c_df['c_busStop'], how='inner')
    res = left_join.withColumn('distance', udf_dist_approx(left_join.c_coordinates, left_join.coordinates)).drop('c_busStop')
    if drop_centroid_col:
        return res.drop('c_coordinates')
    return res

@F.udf(BooleanType())
def is_approx_near(coords_a, coords_b, decimal=5):
    lng_a, lat_a = coords_a
    lng_b, lat_b = coords_b

    return (round(lng_a, decimal) == round(lng_b, decimal)) and (round(lat_a, decimal) == round(lat_b, decimal))
  
@F.udf(ArrayType(DoubleType()))
def rounded_coords(coords, decimal=5):
  lng, lat = coords
  return [ound(lng, decimal), round(lat, decimal)]
  
# RTPi Helpers:

def get_stations_from_es(stations, index):
  search = { "query": { "bool": { "must": { "terms" : { "busStop": stations} } } } }
  response = es.search(index=index, body=search)
  results = response['hits']['hits']
  return results


def get_route_stations(lineId):
#   routes_index = 'dublin-bus-routes'
  
#   # first, search in es:
#   search = {
#         "size" : 1,
#         "query": { "bool": { "must": { "term" : { "lineId": lineId} } } }
#   }
#   response = es.search(index=routes_index, body=search)
#   num_hits = response['hits']['total']['value']
#   if num_hits > 0:
#     return response['hits']['hits'][0]['_source']['busStops']
  
#   else, check RTPi
  params = {'operator': 'bac', 
              'routeid': lineId}
  response = requests.get("https://data.smartdublin.ie/cgi-bin/rtpi/routeinformation", params=params).json()
  if not response['errorcode'] == '0' or response['numberofresults'] == 0:
    return [] # No results
  bus_stops = set()
  results = response['results']
  for result in results:
    for stop in result['stops']:
      bus_stops.add(stop['stopid'])
  
  bus_stops = list(bus_stops)
  # Add missing route:
#   doc = { "lineId": lineId, "busStops": bus_stops }
#   es.index(index=routes_index, body=doc)
  return bus_stops

@F.udf(ArrayType(DoubleType()))
def get_station_coord(stopNumber):
    res = requests.get(f"https://data.smartdublin.ie/cgi-bin/rtpi/busstopinformation?stopid={stopNumber}").json()
    if not res['errorcode'] == '0' or not res['results']:
        return [0.0, 0.0]
    res = res['results'][0]
    return [float(res['longitude']), float(res['latitude'])]
  
def calculate_journey_centroids(index):
  journey_centroids = read_elastic(index)
  centroids = journey_centroids\
              .groupby("busStop")\
              .agg(F.array((F.sum(F.col("centroidCoordinates")[0] * F.col("stopCount"))/F.sum(F.col("stopCount"))),
                           (F.sum(F.col("centroidCoordinates")[1] * F.col("stopCount"))/F.sum(F.col("stopCount")))).alias('coordinates'))
  return centroids



SCHEMA = StructType([StructField('_id',StructType([StructField('$oid',StringType(),True)]),True),
                     StructField('actualDelay',LongType(),True),
                     StructField('angle',DoubleType(),True),
                     StructField('anomaly',BooleanType(),True),
                     StructField('areaId',LongType(),True),
                     StructField('areaId1',LongType(),True),
                     StructField('areaId2',LongType(),True),
                     StructField('areaId3',LongType(),True),
                     StructField('atStop',BooleanType(),True),
                     StructField('busStop',LongType(),True),
                     StructField('calendar',StructType([StructField('$numberLong',StringType(),True)]),True),
                     StructField('congestion',BooleanType(),True),
                     StructField('currentHour',LongType(),True),
                     StructField('dateType',LongType(),True),
                     StructField('dateTypeEnum',StringType(),True),
                     StructField('delay',LongType(),True),
                     StructField('direction',LongType(),True),
                     StructField('distanceCovered',DoubleType(),True),
                     StructField('ellapsedTime',LongType(),True),
                     StructField('filteredActualDelay',LongType(),True),
                     StructField('gridID',StringType(),True),
                     StructField('journeyPatternId',StringType(),True),
                     StructField('justLeftStop',BooleanType(),True),
                     StructField('justStopped',BooleanType(),True),
                     StructField('latitude',DoubleType(),True),
                     StructField('lineId',StringType(),True),
                     StructField('loc',StructType([StructField('coordinates',ArrayType(DoubleType(),True),True),StructField('type',StringType(),True)]),True),
                     StructField('longitude',DoubleType(),True),
                     StructField('poiId',LongType(),True),
                     StructField('poiId2',LongType(),True),
                     StructField('probability',DoubleType(),True),
                     StructField('systemTimestamp',DoubleType(),True),
                     StructField('timestamp',StructType([StructField('$numberLong',StringType(),True)]),True),
                     StructField('vehicleId',LongType(),True),
                     StructField('vehicleSpeed',LongType(),True)])

## Stream App:

In [4]:
streamingPath = "/FileStore/Afik/DublinApp/Stream/"
raw_stream_df = spark.readStream\
                        .schema(SCHEMA)\
                        .option("maxFilesPerTrigger", 1)\
                        .json(streamingPath)
    
raw_stream_df = transform_raw_data(raw_stream_df)

# Write raw data:
write_stream(raw_stream_df, 'dublin-bus-stream', wait=False)
# display(stream_df)


# Filter by route:
stream_df = filter_by_line_route(raw_stream_df)

# Add direciton field:
# stream_df = add_direction(stream_df) # Not working since window on vehicleId. (streaming supports only time windowing)

# Fix atStop:
routes_df = read_elastic("dublin-bus-routes", array_field='busStops')

stop_estimators_index ='dublin-bus-route-aligned-filtered-100m-centroids'
distance= 50

# Create lineId_df:
stops_df = routes_df.withColumn('busStop', F.explode('busStops')).drop('busStops')
centroids_df = read_elastic(stop_estimators_index).selectExpr('busStop', 'coordinates as c_coords')
stops_df = stops_df.join(F.broadcast(centroids_df), on='busStop', how='inner')
line_df = stops_df.groupBy('lineId').agg(F.arrays_zip(F.collect_list('busStop'), F.collect_list('c_coords')).alias('centroids_coords'))

joined_df = stream_df.join(F.broadcast(line_df), on='lineId', how='inner')
joined_df = joined_df.withColumn("res", closest_stop('centroids_coords', 'coordinates'))
joined_df = joined_df.withColumn("busStop", joined_df.res.busStop).drop('centroids_coords')

joined_df = joined_df.withColumn('atStop', joined_df.res.distance <= distance).drop('res')

write_stream(joined_df, 'dublin-bus-stream-fixed', wait=False)
# display(joined_df)

## Re-calculate Centroids using Streamed Data:

In [5]:
# Centroids:

centroids_df = calculate_centroids('dublin-bus-stream-fixed')

settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "busStop" : { "type": "long" },
            "coordinates" : { "type": "geo_point" },
        }
    }
}

write_to_elastic(centroids_df, "dublin-bus-route-aligned-atstop-fix-centroids", settings=settings, append=False)



In [6]:
# KMeans:

@F.udf("float")
def euclidean_distance_udf(coordinates, centroid):
  h, k = coordinates
  p, q = centroid
  dist = (h - p) ** 2 + (k - q) ** 2
  return np.float(dist)

def euclidean_distance(coordinates, centroid):
  h, k = coordinates
  p, q = centroid
  dist = (h - p) ** 2 + (k - q) ** 2
  return np.float(dist)

@F.udf("float")
def uncertain_line_distance_udf(coordinates, centroid, direction, radius):
  if direction is None:
    return euclidean_distance(coordinates, centroid)
  h, k = coordinates
  p, q = centroid
  if direction == 8:
    a = h
    c = h
    b = k - radius
    d = k + radius
  else:
    a = h - radius * 1.0 / np.sqrt(1 + direction ** 2)
    c = h + radius * 1.0 / np.sqrt(1 + direction ** 2)
    b = k - direction * radius * 1.0 / np.sqrt(1 + direction ** 2)
    d = k + direction * radius * 1.0 / np.sqrt(1 + direction ** 2)
  D = np.sqrt((c - a) ** 2 + (d - b) ** 2)
  B = 2 * ((c - a) * (a - p) + (d - b) * (b - q))
  C = (p - a) ** 2 + (q - b) ** 2
  dist = ((D ** 3) / 3) + (B / 2) + C
  return np.float(dist)

@F.udf("float")
def uncertain_circle_distance_udf(coordinates, centroid, radius):
  h, k = coordinates
  p, q = centroid
  dist = (h - p) ** 2 + (k - q) ** 2 + (radius ** 2) / 2
  return np.float(dist)

@F.udf("int")
def calculate_direction(coordinates, prev_coordinates):
  if prev_coordinates is None or coordinates == prev_coordinates:
    return None
  longitude, latitude = coordinates
  prev_longitude, prev_latitude = prev_coordinates
  if longitude == prev_longitude:
    return 8 # inf
  slope = (latitude - prev_latitude) / (longitude - prev_longitude)
  if slope > (np.sqrt(2) + 1):
    return 8
  elif slope > (np.sqrt(2) - 1):
    return 1
  elif slope > (-np.sqrt(2) + 1):
    return 0
  elif slope > (-np.sqrt(2) - 1):
    return -1
  else:
    return 8
  
def KMeans(data, init_centroids, num_epochs=10):
  w = Window().partitionBy('_id').orderBy(euclidean_distance_udf('coordinates', 'centroidCoordinates').asc())
  centroids = init_centroids
  results = data
  for epoch in range(num_epochs):
    results = results.drop('busStop')
    results = results.crossJoin(centroids.hint("broadcast"))
    results = results.withColumn("rn", F.row_number().over(w)).where(F.col("rn") == 1).drop("rn", "centroidCoordinates")
    centroids = results\
                .groupby('busStop')\
                .agg(F.array(F.avg(F.col("coordinates")[0]), F.avg(F.col("coordinates")[1])).alias("centroidCoordinates"), F.count('_id').alias('stopCount'))
  return centroids

def UCKMeans(data, init_centroids, num_epochs=20, R=0.00075):
  w = Window().partitionBy('_id').orderBy(uncertain_circle_distance_udf('coordinates', 'centroidCoordinates', F.lit(R)).asc())
  centroids = init_centroids
  results = data
  for epoch in range(num_epochs):
    results = results.drop('busStop')
    results = results.crossJoin(centroids.hint("broadcast"))
    results = results.withColumn("rn", F.row_number().over(w)).where(F.col("rn") == 1).drop("rn", "centroidCoordinates")
    centroids = results\
                .groupby('busStop')\
                .agg(F.array(F.avg(F.col("coordinates")[0]), F.avg(F.col("coordinates")[1])).alias("centroidCoordinates"), F.count('_id').alias('stopCount'))
  return centroids

def ULKMeans(data, init_centroids, num_epochs=20, R=0.00075):
  w = Window().partitionBy('_id').orderBy(uncertain_line_distance_udf('coordinates', 'centroidCoordinates', 'direction', F.lit(R)).asc())
  centroids = init_centroids
  results = data
  for epoch in range(num_epochs):
    results = results.drop('busStop')
    results = results.crossJoin(centroids.hint("broadcast"))
    results = results.withColumn("rn", F.row_number().over(w)).where(F.col("rn") == 1).drop("rn", "centroidCoordinates")
    centroids = results\
                .groupby('busStop')\
                .agg(F.array(F.avg(F.col("coordinates")[0]), F.avg(F.col("coordinates")[1])).alias("centroidCoordinates"), F.count('_id').alias('stopCount'))
  return centroids

def calculate_journey_centroids(index):
  journey_centroids = read_elastic(index)
  centroids = journey_centroids\
              .groupby("busStop")\
              .agg(F.array((F.sum(F.col("centroidCoordinates")[0] * F.col("stopCount"))/F.sum(F.col("stopCount"))),
                           (F.sum(F.col("centroidCoordinates")[1] * F.col("stopCount"))/F.sum(F.col("stopCount")))).alias('coordinates'))
  return centroids

stream_index = 'dublin-bus-stream-fixed'
num_epochs = 5

search = {
    "size" : 0,
    "query": {
      "bool": {
        "must": [
          { "term": { "atStop": True } }
        ]
        }
      },
    "aggs" : {
        "journeys" : {
            "terms" : {
                "field" : "journeyPatternId",
                "size" : 10000
            }
        }
    }
}
journeys_query = es.search(index=stream_index, body=search)
journeys = [journey['key'] for journey in journeys_query['aggregations']['journeys']['buckets']]
num_journeys = len(journeys)

settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "busStop" : { "type": "long" },
            "journeyPatternId" : { "type": "keyword" },
            "centroidCoordinates" : { "type": "geo_point" },
            "stopCount" : { "type" : "long" }
        }
    }
}


full_df = read_elastic('dublin-bus-stream-filtered-100m', read_metadata=True)
full_df = full_df.withColumn('_id', full_df._metadata._id).drop('_metadata')
full_df = add_direction(full_df)


for i, journey in enumerate(journeys):
  q_cen ={
   "query": {
      "bool": {
        "must":
          { "term": { "journeyPatternId": journey } }
        }
      }
    }
  q_cen = dumps(q_cen)
  centroids = read_elastic("dublin-bus-route-aligned-filtered-100m-journey-centroids", query=q_cen)\
                  .select("coordinates", "busStop")\
                  .withColumnRenamed('coordinates', 'centroidCoordinates')

  journey_df = full_df.filter(f"atStop = True and journeyPatternId = {journey}")
  # Kmean
  data_kmeans = journey_df.select('_id', 'coordinates', 'busStop')
  journey_kmeans_centroids = KMeans(data_kmeans, centroids, num_epochs=num_epochs)\
                     .withColumn('journeyPatternId', F.lit(journey))

  write_to_elastic(journey_kmeans_centroids, "dublin-bus-stream-kmeans-journey-centroids-5-epochs", settings=settings, append=True)
  
  # Uncertain Line Kmean
  data_ul_kmeans = journey_df.select('_id', 'coordinates', 'busStop', 'direction')
  journey_ul_kmeans_centroids = ULKMeans(data_ul_kmeans, centroids, num_epochs=num_epochs)\
                       .withColumn('journeyPatternId', F.lit(journey))
  
  write_to_elastic(journey_ul_kmeans_centroids, "dublin-bus-stream-ul-kmeans-journey-centroids-5-epochs", settings=settings, append=True)
  
  # Uncertain Circle Kmean
  data_uc_kmeans = journey_df.select('_id', 'coordinates', 'busStop')
  journey_uc_kmeans_centroids = UCKMeans(data_uc_kmeans, centroids, num_epochs=num_epochs)\
                      .withColumn('journeyPatternId', F.lit(journey))
  
  write_to_elastic(journey_uc_kmeans_centroids, "dublin-bus-stream-uc-kmeans-journey-centroids-5-epochs", settings=settings, append=True)

settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval" : -1
    },
    "mappings": {
        "properties": {
            "busStop" : { "type": "long" },
            "coordinates" : { "type": "geo_point" }
        }
    }
}

kmeans_centroids = calculate_journey_centroids("dublin-bus-stream-kmeans-journey-centroids-5-epochs")
ul_kmeans_centroids = calculate_journey_centroids("dublin-bus-stream-ul-kmeans-journey-centroids-5-epochs")
uc_kmeans_centroids = calculate_journey_centroids("dublin-bus-stream-uc-kmeans-journey-centroids-5-epochs")

write_to_elastic(kmeans_centroids, "dublin-bus-stream-kmeans-centroids", settings=settings, append=False)
write_to_elastic(ul_kmeans_centroids, "dublin-bus-stream-ul-kmeans-centroids", settings=settings, append=False)
write_to_elastic(uc_kmeans_centroids, "dublin-bus-stream-uc-kmeans-centroids", settings=settings, append=False)