Permalink
Browse files

Leak cleanup (native threads and connections)

Many methods now properly closing HTables (this was the source of
native thread leaks)
HBaseAdmin connections now being closed on versions of hbase newer than
0.90.1 (this was the source of HConnection leaks)
added 'version' def to get underlying hbase version
  • Loading branch information...
1 parent 4aa2f35 commit 1b47c28c950f4e408a95363e5ef6e751ad89635a @mtm mtm committed Mar 20, 2012
Showing with 73 additions and 40 deletions.
  1. +73 −40 src/org/rathore/amit/capjure.clj
@@ -1,10 +1,12 @@
(ns org.rathore.amit.capjure
(:refer-clojure :exclude [flatten])
(:use org.rathore.amit.capjure-utils)
+ (:require
+ [clojure.string :as string])
(:import (java.util Set Map)
(org.apache.hadoop.hbase HBaseConfiguration HColumnDescriptor HTableDescriptor KeyValue HColumnDescriptor)
- (org.apache.hadoop.hbase.client Delete Get HBaseAdmin HTable Put Scan ResultScanner Result HTable$ClientScanner)
- (org.apache.hadoop.hbase.util Bytes)
+ (org.apache.hadoop.hbase.client Delete Get HBaseAdmin HTable Put Scan ResultScanner Result HTable$ClientScanner HConnectionManager)
+ (org.apache.hadoop.hbase.util Bytes VersionInfo)
(org.apache.hadoop.hbase.filter Filter InclusiveStopFilter)
(org.apache.hadoop.hbase.io.hfile Compression$Algorithm)))
@@ -15,11 +17,17 @@
(def HAS-MANY-STRINGS "1c8fd7d")
+(def version
+ (let [parse-int (fn [s] (Integer/parseInt s))
+ [major minor micro] (map parse-int (string/split (. VersionInfo getVersion) #"\."))]
+ {:major major
+ :minor minor
+ :micro micro}))
+
;; Hack to support hbase 0.20 and hbase 0.90
-(def is-hbase-02 (try
- (import '[org.apache.hadoop.hbase.client Scanner])
- true
- (catch Exception e false)))
+(def is-hbase-02 (and (= (:major version) 0)
+ (>= (:minor version) 20)
+ (< (:minor version) 90)))
(defmacro with-hbase-table [[table hbase-table-name] & exprs]
@@ -28,6 +36,20 @@
(.close ~table)
ret#))
+(defmacro with-hbase-admin [admin & exprs]
+ `(let [~admin ^HBaseAdmin (hbase-admin)
+ ret# (do ~@exprs)]
+
+ ;; If we are on 0.90.2 or newer we must cleanup the connection used by HBaseAdmin
+ ;; Versions of HBaseAdmin prior to 0.90.2 reused existing connections and therefore
+ ;; didn't leak.
+ ;; Hopefully this will be addressed in later versions (0.92+)
+ (when (and (>= (:minor version) 90)
+ (>= (:micro version) 2))
+ (let [config# (.getConfiguration ~admin)]
+ (HConnectionManager/deleteConnection config# true)))
+ ret#))
+
;; (defmacro with-scanner [[scanner] & exprs]
;; `(let [ret# (do ~@exprs)]
;; (.close ~scanner)
@@ -105,7 +127,7 @@
(.put table put)))
(defn insert-with-put [object-to-save hbase-table-name ^Put put]
- (let [^HTable table (hbase-table hbase-table-name)]
+ (with-hbase-table [table hbase-table-name]
(insert-with-table-and-put object-to-save table put)))
(defn capjure-insert-with-table
@@ -300,7 +322,7 @@
(hydrate (read-as-hash hbase-table-name row-id)))
(defn row-exists? [hbase-table-name row-id-string]
- (let [^HTable table (hbase-table hbase-table-name)]
+ (with-hbase-table [table hbase-table-name]
(.exists table ^bytes (.getBytes ^String row-id-string))))
(defn cell-value-as-string [^Result row ^String column-name]
@@ -323,9 +345,9 @@
the-get)))
(defn get-result-for [hbase-table-name #^String row-id]
- (let [^HTable table (hbase-table hbase-table-name)
- hbase-get-row-id (create-get row-id)]
- (.get table hbase-get-row-id)))
+ (with-hbase-table [table hbase-table-name]
+ (let [hbase-get-row-id (create-get row-id)]
+ (.get table hbase-get-row-id))))
(defn read-row [hbase-table-name row-id]
(get-result-for hbase-table-name row-id))
@@ -377,10 +399,10 @@
(defn read-all-versions
([hbase-table-name row-id-string number-of-versions]
- (let [#^HTable table (hbase-table hbase-table-name)]
+ (with-hbase-table [table hbase-table-name]
(stringify-nav-map (.getMap (.get table (create-get row-id-string number-of-versions))))))
([hbase-table-name row-id-string column-family-as-string number-of-versions]
- (let [#^HTable table (hbase-table hbase-table-name)]
+ (with-hbase-table [table hbase-table-name]
(stringify-nav-map (.getMap (.get table (create-get row-id-string [column-family-as-string] number-of-versions)))))))
(defn read-all-multi-col-versions
@@ -498,12 +520,12 @@
(count (table-iterator hbase-table-name columns)))
(defn delete-row-col-at [hbase-table-name ^String row-id ^String family ^String qualifier timestamp]
- (let [table ^HTable (hbase-table hbase-table-name)
- delete (Delete. (.getBytes row-id))]
- (if timestamp
- (.deleteColumn delete (.getBytes family) (.getBytes qualifier) timestamp)
- (.deleteColumn delete (.getBytes family) (.getBytes qualifier)))
- (.delete table delete)))
+ (with-hbase-table [table hbase-table-name]
+ (let [delete (Delete. (.getBytes row-id))]
+ (if timestamp
+ (.deleteColumn delete (.getBytes family) (.getBytes qualifier) timestamp)
+ (.deleteColumn delete (.getBytes family) (.getBytes qualifier)))
+ (.delete table delete))))
(defn delete-row-col-latest [hbase-table-name row-id family qualifier]
(delete-row-col-at hbase-table-name row-id family qualifier nil))
@@ -547,9 +569,9 @@
(map #(String. (.getNameAsString ^HColumnDescriptor %)) (.getFamilies table-descriptor)))))
(defn simple-delete-row [hbase-table-name ^String row-id]
- (let [table ^HTable (hbase-table hbase-table-name)
- delete (Delete. (.getBytes row-id))]
- (.delete table delete)))
+ (with-hbase-table [table hbase-table-name]
+ (let [delete (Delete. (.getBytes row-id))]
+ (.delete table delete))))
(defn simple-delete-rows-with-sleep [hbase-table-name rows]
(doseq [row rows]
@@ -590,7 +612,8 @@
(.addFamily desc hcdesc)))]
(doseq [family-entry column-families-and-versions]
(col-desc family-entry))
- (.createTable ^HBaseAdmin (hbase-admin) desc)))
+ (with-hbase-admin admin
+ (.createTable admin desc))))
(defn create-hbase-table [#^String table-name max-versions & column-families]
(let [desc (HTableDescriptor. table-name)
@@ -599,37 +622,44 @@
(.setMaxVersions hcdesc max-versions)
(.addFamily desc hcdesc)))]
(doall (map col-desc column-families))
- (.createTable ^HBaseAdmin (hbase-admin) desc)))
+ (with-hbase-admin admin
+ (.createTable admin desc))))
(defn add-hbase-columns [^String table-name column-family-names versions]
(if-not (empty? column-family-names)
- (let [admin ^HBaseAdmin (hbase-admin)
- col-desc (fn [^String col-name]
- (let [desc (hbase-column-descriptor col-name)]
- (.setMaxVersions desc versions)
- desc))]
- (.disableTable admin (.getBytes table-name))
- (doall (map #(.addColumn admin table-name ^HColumnDescriptor (col-desc %)) column-family-names))
- (.enableTable admin (.getBytes table-name)))))
+ (with-hbase-admin admin
+ (let [col-desc (fn [^String col-name]
+ (let [desc (hbase-column-descriptor col-name)]
+ (.setMaxVersions desc versions)
+ desc))]
+ (.disableTable admin (.getBytes table-name))
+ (doall (map #(.addColumn admin table-name ^HColumnDescriptor (col-desc %)) column-family-names))
+ (.enableTable admin (.getBytes table-name))))))
(defn clone-table [#^String new-hbase-table-name #^String from-hbase-table-name max-versions]
(apply create-hbase-table new-hbase-table-name max-versions (column-families-for from-hbase-table-name)))
(defn disable-table [^String table-name]
- (.disableTable ^HBaseAdmin (hbase-admin) (.getBytes table-name)))
+ (with-hbase-admin admin
+ (.disableTable admin (.getBytes table-name))))
(defn enable-table [#^String table-name]
- (.enableTable ^HBaseAdmin (hbase-admin) (.getBytes table-name)))
+ (with-hbase-admin admin
+ (.enableTable admin (.getBytes table-name))))
(defn drop-hbase-table [#^String hbase-table-name]
- (.deleteTable ^HBaseAdmin (hbase-admin) hbase-table-name))
+ (with-hbase-admin admin
+ (.deleteTable admin hbase-table-name)))
(defn truncate-hbase-table [#^String hbase-table-name]
(with-hbase-table [table hbase-table-name]
(let [table-descriptor (.getTableDescriptor table)]
- (disable-table hbase-table-name)
- (drop-hbase-table hbase-table-name)
- (.createTable ^HBaseAdmin (hbase-admin) table-descriptor))))
+ (try
+ (disable-table hbase-table-name)
+ (drop-hbase-table hbase-table-name)
+ (catch Exception e))
+ (with-hbase-admin admin
+ (.createTable admin table-descriptor)))))
(defn hbase-table [^String hbase-table-name]
(let [table (HTable. ^HBaseConfiguration (hbase-config) hbase-table-name)]
@@ -638,12 +668,15 @@
(defn table-exists?
([table-name]
- (table-exists? table-name (hbase-admin)))
+ (with-hbase-admin admin
+ (table-exists? table-name admin)))
([^String table-name ^HBaseAdmin hadmin]
(.tableExists hadmin table-name)))
(defn get-table-name-from-desc [^HTableDescriptor table-desc]
(apply str (map char (.getName table-desc))))
(defn list-tables []
- (map get-table-name-from-desc (.listTables ^HBaseAdmin (hbase-admin))))
+ (with-hbase-admin admin
+ (let [tables (.listTables admin)]
+ (map get-table-name-from-desc tables))))

0 comments on commit 1b47c28

Please sign in to comment.