From 4bf331d668c279f2f6e462c1bfcaebffa06082f1 Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal Date: Fri, 19 Feb 2016 00:57:40 +0530 Subject: [PATCH] STORM-1246: port backtype.storm.local-state to java --- .../clj/org/apache/storm/daemon/nimbus.clj | 31 ++-- .../org/apache/storm/daemon/supervisor.clj | 65 +++++++-- .../clj/org/apache/storm/daemon/worker.clj | 10 +- .../src/clj/org/apache/storm/local_state.clj | 134 ------------------ .../apache/storm/local_state_converter.clj | 24 ++++ .../src/clj/org/apache/storm/testing.clj | 10 +- .../org/apache/storm/utils/LocalState.java | 112 +++++++++++++-- 7 files changed, 209 insertions(+), 177 deletions(-) delete mode 100644 storm-core/src/clj/org/apache/storm/local_state.clj create mode 100644 storm-core/src/clj/org/apache/storm/local_state_converter.clj diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index a3497d61368..28a6fb81472 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -46,11 +46,11 @@ KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction - ProfileRequest ProfileAction NodeInfo]) + ProfileRequest ProfileAction NodeInfo LSTopoHistory]) (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.validation ConfigValidation]) (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) - (:use [org.apache.storm util config log zookeeper local-state]) + (:use [org.apache.storm util config log zookeeper]) (:require [org.apache.storm [cluster :as cluster] [converter :as converter] [stats :as stats]]) @@ -60,7 +60,7 @@ (:use [org.apache.storm.daemon common]) (:use [org.apache.storm config]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) - (:import [org.apache.storm.utils VersionInfo] + (:import [org.apache.storm.utils VersionInfo LocalState] [org.json.simple JSONValue]) (:require [clj-time.core :as time]) (:require [clj-time.coerce :as coerce]) @@ -1181,11 +1181,8 @@ [mins nimbus] (locking (:topology-history-lock nimbus) (let [cutoff-age (- (Time/currentTimeSecs) (* mins 60)) - topo-history-state (:topo-history-state nimbus) - curr-history (vec (ls-topo-hist topo-history-state)) - new-history (vec (filter (fn [line] - (> (line :timestamp) cutoff-age)) curr-history))] - (ls-topo-hist! topo-history-state new-history)))) + topo-history-state (:topo-history-state nimbus)] + (.filterOldTopologies ^LocalState topo-history-state cutoff-age)))) (defn cleanup-corrupt-topologies! [nimbus] (let [storm-cluster-state (:storm-cluster-state nimbus) @@ -1275,11 +1272,9 @@ (locking (:topology-history-lock nimbus) (let [topo-history-state (:topo-history-state nimbus) users (ConfigUtils/getTopoLogsUsers topology-conf) - groups (ConfigUtils/getTopoLogsGroups topology-conf) - curr-history (vec (ls-topo-hist topo-history-state)) - new-history (conj curr-history {:topoid storm-id :timestamp (Time/currentTimeSecs) - :users users :groups groups})] - (ls-topo-hist! topo-history-state new-history)))) + groups (ConfigUtils/getTopoLogsGroups topology-conf)] + (.addTopologyHistory ^LocalState topo-history-state + (LSTopoHistory. storm-id (Time/currentTimeSecs) users groups))))) (defn igroup-mapper [storm-conf] @@ -1295,10 +1290,18 @@ (let [groups (user-groups user storm-conf)] (> (.size (set/intersection (set groups) (set groups-to-check))) 0))) +(defn ->topo-history + [thrift-topo-hist] + { + :topoid (.get_topology_id thrift-topo-hist) + :timestamp (.get_time_stamp thrift-topo-hist) + :users (.get_users thrift-topo-hist) + :groups (.get_groups thrift-topo-hist)}) + (defn read-topology-history [nimbus user admin-users] (let [topo-history-state (:topo-history-state nimbus) - curr-history (vec (ls-topo-hist topo-history-state)) + curr-history (vec (map ->topo-history (.getTopoHistoryList ^LocalState topo-history-state))) topo-user-can-access (fn [line user storm-conf] (if (nil? user) (line :topoid) diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index ad9db760143..5685a09f792 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -24,12 +24,12 @@ [java.net JarURLConnection] [java.net URI URLDecoder] [org.apache.commons.io FileUtils]) - (:use [org.apache.storm config util log local-state]) + (:use [org.apache.storm config util log local-state-converter]) (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo]) (:import [java.nio.file Files StandardCopyOption]) (:import [org.apache.storm Config]) - (:import [org.apache.storm.generated WorkerResources ProfileAction]) + (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment]) (:import [org.apache.storm.localizer LocalResource]) (:use [org.apache.storm.daemon common]) (:require [org.apache.storm.command [healthcheck :as healthcheck]]) @@ -85,6 +85,10 @@ :profiler-actions new-profiler-actions :versions new-assignments}))) +(defn mk-local-assignment + [storm-id executors resources] + {:storm-id storm-id :executors executors :resources resources}) + (defn- read-my-executors [assignments-snapshot storm-id assignment-id] (let [assignment (get assignments-snapshot storm-id) my-slots-resources (into {} @@ -125,6 +129,20 @@ (defn- read-downloaded-storm-ids [conf] (map #(URLDecoder/decode %) (Utils/readDirContents (ConfigUtils/supervisorStormDistRoot conf)))) +(defn ->executor-list + [executors] + (into [] + (for [exec-info executors] + [(.get_task_start exec-info) (.get_task_end exec-info)]))) + +(defn ls-worker-heartbeat + [^LocalState local-state] + (if-let [worker-hb (.getWorkerHeartBeat ^LocalState local-state)] + {:time-secs (.get_time_secs worker-hb) + :storm-id (.get_topology_id worker-hb) + :executors (->executor-list (.get_executors worker-hb)) + :port (.get_port worker-hb)})) + (defn read-worker-heartbeat [conf id] (let [local-state (ConfigUtils/workerState conf id)] (try @@ -172,7 +190,7 @@ (let [conf (:conf supervisor) ^LocalState local-state (:local-state supervisor) id->heartbeat (read-worker-heartbeats conf) - approved-ids (set (keys (ls-approved-workers local-state)))] + approved-ids (set (keys (clojurify-structure (.getApprovedWorkers ^LocalState local-state))))] (into {} (dofor [[id hb] id->heartbeat] @@ -198,7 +216,7 @@ (defn- wait-for-worker-launch [conf id start-time] (let [state (ConfigUtils/workerState conf id)] (loop [] - (let [hb (ls-worker-heartbeat state)] + (let [hb (.getWorkerHeartBeat state)] (when (and (not hb) (< @@ -209,7 +227,7 @@ (Time/sleep 500) (recur) ))) - (when-not (ls-worker-heartbeat state) + (when-not (.getWorkerHeartBeat state) (log-message "Worker " id " failed to start") ))) @@ -414,6 +432,19 @@ [pred amap] (into {} (filter (fn [[k v]] (pred k)) amap))) +(defn ->local-assignment + [^LocalAssignment thrift-local-assignment] + (mk-local-assignment + (.get_topology_id thrift-local-assignment) + (->executor-list (.get_executors thrift-local-assignment)) + (.get_resources thrift-local-assignment))) + +;TODO: when translating this function, you should replace the map-val with a proper for loop HERE +(defn ls-local-assignments + [^LocalState local-state] + (if-let [thrift-local-assignments (.getLocalAssignmentsMap local-state)] + (map-val ->local-assignment thrift-local-assignments))) + ;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE (defn sync-processes [supervisor] (let [conf (:conf supervisor) @@ -453,9 +484,9 @@ ", Heartbeat: " (pr-str heartbeat)) (shutdown-worker supervisor id))) (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)] - (ls-approved-workers! local-state + (.setApprovedWorkers ^LocalState local-state (merge - (select-keys (ls-approved-workers local-state) + (select-keys (clojurify-structure (.getApprovedWorkers ^LocalState local-state)) (keys keepers)) valid-new-worker-ids)) (wait-for-workers-launch conf (keys valid-new-worker-ids))))) @@ -553,6 +584,22 @@ (rm-topo-files conf storm-id localizer false) storm-id))))) +(defn ->LocalAssignment + [{storm-id :storm-id executors :executors resources :resources}] + (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))] + (if resources (.set_resources assignment + (doto (WorkerResources. ) + (.set_mem_on_heap (first resources)) + (.set_mem_off_heap (second resources)) + (.set_cpu (last resources))))) + assignment)) + +;TODO: when translating this function, you should replace the map-val with a proper for loop HERE +(defn ls-local-assignments! + [^LocalState local-state assignments] + (let [local-assignment-map (map-val ->LocalAssignment assignments)] + (.setLocalAssignmentsMap local-state local-assignment-map))) + (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager] (fn this [] (let [conf (:conf supervisor) @@ -1265,10 +1312,10 @@ (prepare [this conf local-dir] (reset! conf-atom conf) (let [state (LocalState. local-dir) - curr-id (if-let [id (ls-supervisor-id state)] + curr-id (if-let [id (.getSupervisorId state)] id (generate-supervisor-id))] - (ls-supervisor-id! state curr-id) + (.setSupervisorId state curr-id) (reset! id-atom curr-id)) ) (confirmAssigned [this port] diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index c2a767a5109..60bc0709f0a 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -15,7 +15,7 @@ ;; limitations under the License. (ns org.apache.storm.daemon.worker (:use [org.apache.storm.daemon common]) - (:use [org.apache.storm config log util local-state]) + (:use [org.apache.storm config log util local-state-converter]) (:require [clj-time.core :as time]) (:require [clj-time.coerce :as coerce]) (:require [org.apache.storm.daemon [executor :as executor]]) @@ -33,7 +33,7 @@ (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback]) (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.serialization KryoTupleSerializer]) - (:import [org.apache.storm.generated StormTopology]) + (:import [org.apache.storm.generated StormTopology LSWorkerHeartbeat]) (:import [org.apache.storm.tuple AddressedTuple Fields]) (:import [org.apache.storm.task WorkerTopologyContext]) (:import [org.apache.storm Constants]) @@ -84,7 +84,11 @@ (let [conf (:conf worker) state (ConfigUtils/workerState conf (:worker-id worker))] ;; do the local-file-system heartbeat. - (ls-worker-heartbeat! state (Time/currentTimeSecs) (:storm-id worker) (:executors worker) (:port worker)) + (.setWorkerHeartBeat state (LSWorkerHeartbeat. + (Time/currentTimeSecs) + (:storm-id worker) + (->ExecutorInfo-list (:executors worker)) + (:port worker))) (.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up. ; it shouldn't take supervisor 120 seconds between listing dir and reading it diff --git a/storm-core/src/clj/org/apache/storm/local_state.clj b/storm-core/src/clj/org/apache/storm/local_state.clj deleted file mode 100644 index df67c5eb368..00000000000 --- a/storm-core/src/clj/org/apache/storm/local_state.clj +++ /dev/null @@ -1,134 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.local-state - (:use [org.apache.storm log util]) - (:import [org.apache.storm.generated StormTopology - InvalidTopologyException GlobalStreamId - LSSupervisorId LSApprovedWorkers - LSSupervisorAssignments LocalAssignment - ExecutorInfo LSWorkerHeartbeat - LSTopoHistory LSTopoHistoryList - WorkerResources] - [org.apache.storm.utils Utils]) - (:import [org.apache.storm.utils LocalState])) - -(def LS-WORKER-HEARTBEAT "worker-heartbeat") -(def LS-ID "supervisor-id") -(def LS-LOCAL-ASSIGNMENTS "local-assignments") -(def LS-APPROVED-WORKERS "approved-workers") -(def LS-TOPO-HISTORY "topo-hist") - -(defn ->LSTopoHistory - [{topoid :topoid timestamp :timestamp users :users groups :groups}] - (LSTopoHistory. topoid timestamp users groups)) - -(defn ->topo-history - [thrift-topo-hist] - { - :topoid (.get_topology_id thrift-topo-hist) - :timestamp (.get_time_stamp thrift-topo-hist) - :users (.get_users thrift-topo-hist) - :groups (.get_groups thrift-topo-hist)}) - -(defn ls-topo-hist! - [^LocalState local-state hist-list] - (.put local-state LS-TOPO-HISTORY - (LSTopoHistoryList. (map ->LSTopoHistory hist-list)))) - -(defn ls-topo-hist - [^LocalState local-state] - (if-let [thrift-hist-list (.get local-state LS-TOPO-HISTORY)] - (map ->topo-history (.get_topo_history thrift-hist-list)))) - -(defn ls-supervisor-id! - [^LocalState local-state ^String id] - (.put local-state LS-ID (LSSupervisorId. id))) - -(defn ls-supervisor-id - [^LocalState local-state] - (if-let [super-id (.get local-state LS-ID)] - (.get_supervisor_id super-id))) - -(defn ls-approved-workers! - [^LocalState local-state workers] - (.put local-state LS-APPROVED-WORKERS (LSApprovedWorkers. workers))) - -(defn ls-approved-workers - [^LocalState local-state] - (if-let [tmp (.get local-state LS-APPROVED-WORKERS)] - (into {} (.get_approved_workers tmp)))) - -(defn ->ExecutorInfo - [[low high]] (ExecutorInfo. low high)) - -(defn ->ExecutorInfo-list - [executors] - (map ->ExecutorInfo executors)) - -(defn ->executor-list - [executors] - (into [] - (for [exec-info executors] - [(.get_task_start exec-info) (.get_task_end exec-info)]))) - -(defn ->LocalAssignment - [{storm-id :storm-id executors :executors resources :resources}] - (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))] - (if resources (.set_resources assignment - (doto (WorkerResources. ) - (.set_mem_on_heap (first resources)) - (.set_mem_off_heap (second resources)) - (.set_cpu (last resources))))) - assignment)) - -(defn mk-local-assignment - [storm-id executors resources] - {:storm-id storm-id :executors executors :resources resources}) - -(defn ->local-assignment - [^LocalAssignment thrift-local-assignment] - (mk-local-assignment - (.get_topology_id thrift-local-assignment) - (->executor-list (.get_executors thrift-local-assignment)) - (.get_resources thrift-local-assignment))) - -;TODO: when translating this function, you should replace the map-val with a proper for loop HERE -(defn ls-local-assignments! - [^LocalState local-state assignments] - (let [local-assignment-map (map-val ->LocalAssignment assignments)] - (.put local-state LS-LOCAL-ASSIGNMENTS - (LSSupervisorAssignments. local-assignment-map)))) - -;TODO: when translating this function, you should replace the map-val with a proper for loop HERE -(defn ls-local-assignments - [^LocalState local-state] - (if-let [thrift-local-assignments (.get local-state LS-LOCAL-ASSIGNMENTS)] - (map-val - ->local-assignment - (.get_assignments thrift-local-assignments)))) - -(defn ls-worker-heartbeat! - [^LocalState local-state time-secs storm-id executors port] - (.put local-state LS-WORKER-HEARTBEAT (LSWorkerHeartbeat. time-secs storm-id (->ExecutorInfo-list executors) port) false)) - -(defn ls-worker-heartbeat - [^LocalState local-state] - (if-let [worker-hb (.get local-state LS-WORKER-HEARTBEAT)] - {:time-secs (.get_time_secs worker-hb) - :storm-id (.get_topology_id worker-hb) - :executors (->executor-list (.get_executors worker-hb)) - :port (.get_port worker-hb)})) - diff --git a/storm-core/src/clj/org/apache/storm/local_state_converter.clj b/storm-core/src/clj/org/apache/storm/local_state_converter.clj new file mode 100644 index 00000000000..e8eeaca5351 --- /dev/null +++ b/storm-core/src/clj/org/apache/storm/local_state_converter.clj @@ -0,0 +1,24 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.local-state-converter + (:import [org.apache.storm.generated ExecutorInfo])) + +(defn ->ExecutorInfo + [[low high]] (ExecutorInfo. low high)) + +(defn ->ExecutorInfo-list + [executors] + (map ->ExecutorInfo executors)) diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index 4ad5ff80d8b..781792973d0 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -29,7 +29,7 @@ (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) (:import [java.util.concurrent ConcurrentHashMap]) - (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils]) + (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils LocalState]) (:import [org.apache.storm.tuple Fields Tuple TupleImpl]) (:import [org.apache.storm.task TopologyContext]) (:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions]) @@ -51,7 +51,7 @@ [org.json.simple JSONValue]) (:require [org.apache.storm [zookeeper :as zk]]) (:require [org.apache.storm.daemon.acker :as acker]) - (:use [org.apache.storm cluster util config log local-state]) + (:use [org.apache.storm cluster util config log local-state-converter]) (:use [org.apache.storm.internal thrift])) (defn feeder-spout @@ -395,14 +395,14 @@ (defn find-worker-id [supervisor-conf port] (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf) - worker->port (ls-approved-workers supervisor-state)] + worker->port (.getApprovedWorkers ^LocalState supervisor-state)] (first ((clojurify-structure (Utils/reverseMap worker->port)) port)))) (defn find-worker-port [supervisor-conf worker-id] (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf) - worker->port (ls-approved-workers supervisor-state)] - (worker->port worker-id))) + worker->port (.getApprovedWorkers ^LocalState supervisor-state)] + (if worker->port (.get worker->port worker-id)))) (defn mk-capture-shutdown-fn [capture-atom] diff --git a/storm-core/src/jvm/org/apache/storm/utils/LocalState.java b/storm-core/src/jvm/org/apache/storm/utils/LocalState.java index aef1c1c3f9c..2f0bb60bfaf 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/LocalState.java +++ b/storm-core/src/jvm/org/apache/storm/utils/LocalState.java @@ -18,24 +18,28 @@ package org.apache.storm.utils; import org.apache.commons.io.FileUtils; +import org.apache.storm.generated.LSApprovedWorkers; +import org.apache.storm.generated.LSSupervisorAssignments; +import org.apache.storm.generated.LSSupervisorId; +import org.apache.storm.generated.LSTopoHistory; +import org.apache.storm.generated.LSTopoHistoryList; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.LocalStateData; +import org.apache.storm.generated.ThriftSerializedObject; +import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Map; +import java.util.ArrayList; import java.util.HashMap; -import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.thrift.TBase; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; - -import org.apache.storm.generated.LocalStateData; -import org.apache.storm.generated.ThriftSerializedObject; +import java.util.List; +import java.util.Map; /** * A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. @@ -43,6 +47,11 @@ */ public class LocalState { public static final Logger LOG = LoggerFactory.getLogger(LocalState.class); + public static final String LS_WORKER_HEARTBEAT = "worker-heartbeat"; + public static final String LS_ID = "supervisor-id"; + public static final String LS_LOCAL_ASSIGNMENTS = "local-assignments"; + public static final String LS_APPROVED_WORKERS = "approved-workers"; + public static final String LS_TOPO_HISTORY = "topo-hist"; private VersionedStore _vs; public LocalState(String backingDir) throws IOException { @@ -157,6 +166,85 @@ public synchronized void cleanup(int keepVersions) throws IOException { _vs.cleanup(keepVersions); } + public List getTopoHistoryList() { + LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY); + if (null != lsTopoHistoryListWrapper) { + return lsTopoHistoryListWrapper.get_topo_history(); + } + return null; + } + + /** + * Remove topologies from local state which are older than cutOffAge. + * @param cutOffAge + */ + public void filterOldTopologies(long cutOffAge) { + LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY); + List filteredTopoHistoryList = new ArrayList<>(); + if (null != lsTopoHistoryListWrapper) { + for (LSTopoHistory topoHistory : lsTopoHistoryListWrapper.get_topo_history()) { + if (topoHistory.get_time_stamp() > cutOffAge) { + filteredTopoHistoryList.add(topoHistory); + } + } + } + put(LS_TOPO_HISTORY, new LSTopoHistoryList(filteredTopoHistoryList)); + } + + public void addTopologyHistory(LSTopoHistory lsTopoHistory) { + LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY); + List currentTopoHistoryList = new ArrayList<>(); + if (null != lsTopoHistoryListWrapper) { + currentTopoHistoryList.addAll(lsTopoHistoryListWrapper.get_topo_history()); + } + currentTopoHistoryList.add(lsTopoHistory); + put(LS_TOPO_HISTORY, new LSTopoHistoryList(currentTopoHistoryList)); + } + + public String getSupervisorId() { + LSSupervisorId lsSupervisorId = (LSSupervisorId) get(LS_ID); + if (null != lsSupervisorId) { + return lsSupervisorId.get_supervisor_id(); + } + return null; + } + + public void setSupervisorId(String supervisorId) { + put(LS_ID, new LSSupervisorId(supervisorId)); + } + + public Map getApprovedWorkers() { + LSApprovedWorkers lsApprovedWorkers = (LSApprovedWorkers) get(LS_APPROVED_WORKERS); + if (null != lsApprovedWorkers) { + return lsApprovedWorkers.get_approved_workers(); + } + return null; + } + + public void setApprovedWorkers(Map approvedWorkers) { + put(LS_APPROVED_WORKERS, new LSApprovedWorkers(approvedWorkers)); + } + + public LSWorkerHeartbeat getWorkerHeartBeat() { + return (LSWorkerHeartbeat) get(LS_WORKER_HEARTBEAT); + } + + public void setWorkerHeartBeat(LSWorkerHeartbeat workerHeartBeat) { + put(LS_WORKER_HEARTBEAT, workerHeartBeat, false); + } + + public Map getLocalAssignmentsMap() { + LSSupervisorAssignments assignments = (LSSupervisorAssignments) get(LS_LOCAL_ASSIGNMENTS); + if (null != assignments) { + return assignments.get_assignments(); + } + return null; + } + + public void setLocalAssignmentsMap(Map localAssignmentMap) { + put(LS_LOCAL_ASSIGNMENTS, new LSSupervisorAssignments(localAssignmentMap)); + } + private void persistInternal(Map serialized, TSerializer ser, boolean cleanup) { try { if (ser == null) {