diff --git a/src/com/compass/hbase/schema.clj b/src/com/compass/hbase/schema.clj index d798cbe..61aebd5 100644 --- a/src/com/compass/hbase/schema.clj +++ b/src/com/compass/hbase/schema.clj @@ -104,10 +104,29 @@ (defn get-schema [name] (if-let [recs @*schemas*] (recs (keyword name)))) -(defn canonical-family-spec [fams [fam fspec]] +(defn- matching-type? [rtype vtype] + (case rtype + :string (string? vtype) + :keyword (keyword? vtype) + :symbol (symbol? vtype) + :bool (or (= vtype true) (= vtype false)) + (:int :long) (integer? vtype) + (:float :double) (float? vtype) + (:json :json-key :ser :raw) true + true)) + +(defn- valid-exception-map? [type emap] + (cond (nil? emap) true + (nil? type) + (throw (java.lang.Error. "Families with exceptions must specify key-type")) + (not (every? (partial matching-type? type) (keys emap))) + (throw (java.lang.Error. "Exception keys must match family key-type")))) + +(defn- canonical-family-spec [fams [fam fspec]] (assert (or (keyword? fam) (string? fam))) (cond (map? fspec) (do (assert (every? #{:key-type :value-type :exceptions} (keys fspec))) + (valid-exception-map? (:key-type fspec) (:exceptions fspec)) (assoc fams fam fspec)) (and (sequential? fspec) (= (count fspec) 2)) (assoc fams @@ -226,20 +245,22 @@ (decode-row schema result)) (loop [kvs (.raw result) kv-map {}] - (with-robust-decode [:value result] - (if-let [kv (first kvs)] - (let [family (decode-value (.getFamily kv) :keyword) - qualifier (decode-value (.getQualifier kv) - (qualifier-type schema family)) - timestamp (.getTimestamp kv) - value (let [value (.getValue kv)] - (if (> (count value) 0) + (if-let [kv (first kvs)] + (let [family (with-robust-decode [:keyword result] + (decode-value (.getFamily kv) :keyword)) + qualifier (with-robust-decode [:qualifier result] + (decode-value (.getQualifier kv) + (qualifier-type schema family))) + timestamp (.getTimestamp kv) + value (let [value (.getValue kv)] + (if (> (count value) 0) + (with-robust-decode [:keyword result] (decode-value (.getValue kv) - (value-type schema family qualifier)) - nil))] - (recur (next kvs) - (assoc-in kv-map [family qualifier timestamp] value))) - kv-map)))])) + (value-type schema family qualifier))) + nil))] + (recur (next kvs) + (assoc-in kv-map [family qualifier timestamp] value))) + kv-map))])) (defn decode-latest "Given an HBase Result object, decode the latest versions of all the @@ -252,23 +273,24 @@ (decode-row schema result)) (loop [remaining-kvs (seq (.raw result)) keys #{}] - (with-robust-decode [:value result] - (if-let [kv (first remaining-kvs)] - (let [family (.getFamily kv) - qualifier (.getQualifier kv)] - (recur (next remaining-kvs) - (conj keys [family qualifier]))) - ;; At this point, we have a duplicate-less list of [f q] keys in keys. - ;; Go back through, pulling the latest values for these keys. - (loop [remaining-keys keys - kv-map {}] - (if-let [[family qualifier] (first remaining-keys)] - (let [keyfam (decode-value family :keyword) - qual (decode-value qualifier (qualifier-type schema keyfam))] - (recur (next remaining-keys) - (assoc-in kv-map [keyfam qual] - (let [value (.getValue result family qualifier)] - (if (> (count value) 0) - (decode-value value (value-type schema keyfam qual)) - nil))))) - kv-map)))))])) + (if-let [kv (first remaining-kvs)] + (let [family (.getFamily kv) + qualifier (.getQualifier kv)] + (recur (next remaining-kvs) + (conj keys [family qualifier]))) + ;; At this point, we have a duplicate-less list of [f q] keys in keys. + ;; Go back through, pulling the latest values for these keys. + (loop [remaining-keys keys + kv-map {}] + (if-let [[family qualifier] (first remaining-keys)] + (let [keyfam (decode-value family :keyword) + qual (with-robust-decode [:qualifier result] + (decode-value qualifier (qualifier-type schema keyfam)))] + (recur (next remaining-keys) + (assoc-in kv-map [keyfam qual] + (let [value (.getValue result family qualifier)] + (if (> (count value) 0) + (with-robust-decode [:cell result] + (decode-value value (value-type schema keyfam qual))) + nil))))) + kv-map))))]))