sam / do
watch download tarball
public
Description: DataObjects
Homepage: http://rubyforge.org/projects/dorb
Clone URL: git://github.com/sam/do.git
do / data_objects / lib / data_objects / support / pooling.rb
100644 149 lines (118 sloc) 3.648 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
require 'set'
 
class Object
  
  module Pooling
    
    class MustImplementDisposeError < StandardError
    end
    
    def self.included(target)
      target.extend(ClassMethods)
    end
    
    def release
      @__pool.release(self)
    end
    
    class Pools
      
      attr_reader :type
      attr_accessor :size
      
      def initialize(type, size = 4)
        @type = type
        @size = size
        @pools = Hash.new { |h,k| h[k] = Pool.new(@size, @type, k) }
      end
      
      def flush!
        @pools.each_pair do |args,pool|
          pool.flush!
        end
        
        @pools.clear
        self
      end
      
      def [](*args)
        @pools[*args]
      end
    
      class Pool
      
        attr_reader :type, :available, :reserved
      
        def initialize(size, type, initializer)
          @size = size
          @type = type
          @initializer = initializer
          @lock = Mutex.new
          @available = []
          @reserved = Set.new
        end
      
        def flush!
          @lock.synchronize do
            reserved.each do |instance|
              if @reserved.delete?(instance)
                @available << instance
              end
            end
        
            available.each do |instance|
              instance.dispose
            end
          
            @available = []
            @reserved = Set.new
          end
        end
        
        def new
          if @available.empty?
            @lock.synchronize do
              instance = nil
              
              if @available.empty?
                if @reserved.size < @size
                  instance = @type.allocate
                  instance.send(:initialize, *@initializer)
                  at_exit { instance.dispose }
                  instance.instance_variable_set("@__pool", self)
                else
                  # until(instance) do
                    # TODO: Need to wait for an instance to become available,
                    # but to do that we need to not use a synchronization block.
                  # end
                  
                  instance = @type.allocate
                  instance.send(:initialize, *@initializer)
                  at_exit { instance.dispose }
                  instance.instance_variable_set("@__pool", self)
                end
              else
                instance = @available.pop
              end
              
              @reserved << instance
              instance
            end
          else
            aquire_instance!
          end
        end
 
        def release(instance)
          @lock.synchronize do
            if @reserved.delete?(instance)
              @available << instance
            end
          end
          return nil
        end
        
        private
        def aquire_instance!
          instance = nil
          
          @lock.synchronize do
            instance = @available.pop
            raise StandardError.new("Concurrency Error!") unless instance
            @reserved << instance
          end
          
          instance
        end
      end
    end
    
    module ClassMethods
      
      def new(*args)
        unless instance_methods.include?("dispose")
          raise MustImplementDisposeError.new("#{self.name} must implement a `dispose' instance-method.")
        end
        
        pools[*args].new
        # uri = uri.is_a?(String) ? Addressable::URI::parse(uri) : uri
        # DataObjects.const_get(uri.scheme.capitalize)::Connection.acquire(uri)
      end
      
      def pools
        @pools ||= Pools.new(self)
      end
    end
    
  end
  
end