forked from ngrunwald/datasplash
-
Notifications
You must be signed in to change notification settings - Fork 0
/
datastore.clj
187 lines (166 loc) · 9.77 KB
/
datastore.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
(ns datasplash.datastore
(:require [datasplash.core :as ds])
(:import
[com.google.datastore.v1.client DatastoreHelper]
[org.apache.beam.sdk.io.gcp.datastore DatastoreIO]
[com.google.datastore.v1 Entity Value Key Entity$Builder Value$Builder Value$ValueTypeCase Key$PathElement PartitionId]
[com.google.protobuf ByteString NullValue]
[java.util Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry])
(:gen-class))
(defn write-datastore-raw
"Write a pcoll of already generated datastore entity in datastore"
[{:keys [project-id disable-ramp-up-throttling?] :as options} pcoll]
(let [opts (assoc options :label :write-datastore-raw)
ptrans (-> (DatastoreIO/v1)
(.write)
(.withProjectId project-id)
(cond-> disable-ramp-up-throttling? (.withRampupThrottlingDisabled)))]
(ds/apply-transform pcoll ptrans ds/named-schema opts)))
(defn read-datastore-raw
"Read a datastore source return a pcoll of raw datastore entities"
[{:keys [project-id query namespace num-query-split] :as options} pcoll]
(let [opts (assoc options :label :write-datastore)
ptrans (-> (DatastoreIO/v1)
(.read)
(.withProjectId project-id)
(cond-> query (.withQuery query))
(cond-> namespace (.withNamespace namespace))
(cond-> num-query-split (.withNumQuerySplits num-query-split)))]
(ds/apply-transform pcoll ptrans ds/named-schema opts)))
(defn delete-datastore-raw
"delete a pcoll of already generated datastore entity from datastore"
[{:keys [project-id disable-ramp-up-throttling?] :as options} pcoll]
(let [opts (assoc options :label :delete-datastore-raw)
ptrans (-> (DatastoreIO/v1)
(.deleteEntity)
(.withProjectId project-id)
(cond-> disable-ramp-up-throttling? (.withRampupThrottlingDisabled)))]
(ds/apply-transform pcoll ptrans ds/named-schema opts)))
(declare value->clj)
(declare entity->clj)
(def ^:dynamic type-mapping
{(Value$ValueTypeCase/valueOf "INTEGER_VALUE") (fn [^Value v] (.getIntegerValue v))
(Value$ValueTypeCase/valueOf "DOUBLE_VALUE") (fn [^Value v] (.getDoubleValue v))
(Value$ValueTypeCase/valueOf "STRING_VALUE") (fn [^Value v] (.getStringValue v))
(Value$ValueTypeCase/valueOf "BOOLEAN_VALUE") (fn [^Value v] (.getBooleanValue v))
(Value$ValueTypeCase/valueOf "BLOB_VALUE") (fn [^Value v]
(.toByteArray (.getBlobValue v)))
(Value$ValueTypeCase/valueOf "ARRAY_VALUE") (fn [^Value v]
(mapv value->clj (.getValuesList (.getArrayValue v))))
(Value$ValueTypeCase/valueOf "ENTITY_VALUE") (fn [^Value v]
(entity->clj (.getEntityValue v)))
(Value$ValueTypeCase/valueOf "TIMESTAMP_VALUE") (fn [^Value v] (DatastoreHelper/toDate v))
(Value$ValueTypeCase/valueOf "GEO_POINT_VALUE") (fn [^Value v] (.getGeoPointValue v))
(Value$ValueTypeCase/valueOf "NULL_VALUE") (constantly nil)})
(defn value->clj
"Converts a Datastore Value to its Clojure equivalent"
[^Value v]
(let [t (.getValueTypeCase v)
tx (type-mapping t)]
(if tx
(tx v)
(throw (ex-info (format "Datastore type not supported: %s" t) {:value v :type t})))))
(defn get-name-or-id [key-path-element]
(let [name (.getName key-path-element)
name-or-id (if (empty? name) (.getId key-path-element) name)]
name-or-id))
(defn entity->clj
"Converts a Datastore Entity to a Clojure map with the same properties. Repeated fields are handled as vectors and nested Entities as maps. All keys are turned to keywords. If the entity has a Key, Kind or Namespace, these can be found as :key, :kind, :namespace and :path in the meta of the returned map"
[^Entity e]
(let [props (persistent!
(reduce (fn [acc ^Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry kv]
(let [value (value->clj (.getValue kv))]
(assoc! acc (keyword (.getKey kv)) value)))
(transient {}) (.getProperties e)))
[^Key k key-name kind path] (when (.hasKey e) (let [k (.getKey e)
results (map (fn [^Key$PathElement p]
{:kind (.getKind p) :key (get-name-or-id p)})
(.getPathList k))
{:keys [kind key]} (last results)]
[k key kind (butlast results)]))
namespace (when (and k (.hasPartitionId k))
(some-> k (.getPartitionId) (.getNamespaceId)))]
(-> props
(cond-> k (with-meta {:key key-name :kind kind :namespace namespace :path path})))))
(defprotocol IValDS
"Protocol governing to conversion to datastore Value types"
(make-ds-value-builder-p [v options] "Returns a Datastore Value builder for this particular value"))
(declare make-ds-value)
(declare make-ds-entity)
(extend-protocol IValDS
(Class/forName "[B")
(make-ds-value-builder-p [v _] (DatastoreHelper/makeValue (ByteString/copyFrom ^bytes v)))
String
(make-ds-value-builder-p [^String v _] (DatastoreHelper/makeValue v))
clojure.lang.Keyword
(make-ds-value-builder-p [v _] (DatastoreHelper/makeValue (name v)))
java.util.Date
(make-ds-value-builder-p [^java.util.Date v _] (DatastoreHelper/makeValue v))
clojure.lang.PersistentList
(make-ds-value-builder-p [v options] (DatastoreHelper/makeValue ^Iterable (mapv #(make-ds-value % options) v)))
clojure.lang.PersistentHashSet
(make-ds-value-builder-p [v options] (DatastoreHelper/makeValue ^Iterable (mapv #(make-ds-value % options) v)))
clojure.lang.PersistentTreeSet
(make-ds-value-builder-p [v options] (DatastoreHelper/makeValue ^Iterable (mapv #(make-ds-value % options) v)))
clojure.lang.PersistentVector
(make-ds-value-builder-p [v options] (DatastoreHelper/makeValue ^Iterable (mapv #(make-ds-value % options) v)))
clojure.lang.PersistentHashMap
(make-ds-value-builder-p [v options] (DatastoreHelper/makeValue ^Entity (make-ds-entity v options)))
clojure.lang.PersistentArrayMap
(make-ds-value-builder-p [v options] (DatastoreHelper/makeValue ^Entity (make-ds-entity v options)))
clojure.lang.PersistentTreeMap
(make-ds-value-builder-p [v options] (DatastoreHelper/makeValue ^Entity (make-ds-entity v options)))
nil
(make-ds-value-builder-p [_ _] (-> (Value/newBuilder) (.setNullValue (NullValue/valueOf "NULL_VALUE"))))
Object
(make-ds-value-builder-p [v _] (DatastoreHelper/makeValue v)))
(defn make-ds-value-builder
([v] (make-ds-value-builder-p v {}))
([v options] (make-ds-value-builder-p v options)))
(defn make-ds-key
[{:keys [dataset namespace kind key path]}]
(let [path (->> path (mapcat vals))]
(-> (DatastoreHelper/makeKey (into-array Object (concat path [kind key])))
(cond-> (or namespace dataset) (.setPartitionId (-> (PartitionId/newBuilder)
(cond-> dataset (.setProjectId dataset))
(cond-> namespace (.setNamespaceId namespace))
(.build))))
(.build))))
(defn- add-ds-key-namespace-kind
[^Entity$Builder builder options]
(let [new-key (make-ds-key options)]
(.setKey builder new-key)
builder))
(defn- make-ds-entity-builder
[raw-values {:keys [exclude-from-index no-index-large-strings] :as options}]
(let [excluded-set (into #{} (map name exclude-from-index))
^Entity$Builder entity-builder (Entity/newBuilder)]
(doseq [[v-key v-val] raw-values]
(.put (.getMutableProperties entity-builder)
(if (keyword? v-key) (name v-key) v-key)
(let [^Value$Builder val-builder (make-ds-value-builder v-val options)]
(-> val-builder
(cond->
(or (excluded-set (name v-key))
(and no-index-large-strings (string? v-val) (>= (alength (.getBytes v-val)) 1500)))
(.setExcludeFromIndexes true))
(.build)))))
entity-builder))
(defn make-ds-value
([v options]
(.build ^Value$Builder (make-ds-value-builder v options)))
([v]
(make-ds-value v {})))
(defn make-ds-entity
"Builds a Datastore Entity with the given Clojure value which is a map or seq of KVs corresponding to the desired entity, and options contains an optional key, path, namespace, kind, dataset (project-id) and an optional set of field names that shoud not be indexed. Supports repeated fields and nested entities (as nested map)"
([raw-values {:keys [key namespace kind path dataset exclude-from-index no-index-large-strings] :as options}]
(let [^Entity$Builder builder (-> (make-ds-entity-builder raw-values options)
(cond-> key (add-ds-key-namespace-kind options)))]
(.build builder)))
([raw-values] (make-ds-entity raw-values {})))
(comment
(make-ds-entity (print (with-meta {:foo {:bar "sdfsd"}} {:key 5643444932837376, :kind "event", :namespace "Events", :path nil})) {:namespace "namespace" :kind "kind" :path nil :key 5643444932837376 :dataset "project-foo"})
(make-ds-entity {:long-sting (str (range 10000))} {:no-index-large-strings true})
(make-ds-entity {:string "string" :integer 42 :double 65.78 :nil nil
:array [1 "two" 3] :entity {:deeply "nested"}} {:key "key" :kind "kind" :namespace "ns" :path [{:kind "kind" :key "first"}
{:kind "kind" :key "last"}]}))