In [1]:
require 'webhdfs'

class Tracker
  def initialize(file_path)
    @transported_list = {}
    @tracker_file = file_path
    load_file
  end
  
  def fileAlreadyTransported?(file_discr)
    @transported_list.key? file_discr.abs_path
  end
  
  def update_list(collection)
      puts "    << adding  #{collection.abs_path} in tracked list" 
      @transported_list.store(collection.abs_path,collection.to_h)
  end
  
  def clear_list
    @transported_list.clear
  end
  
  def list
    @transported_list
  end
  
  def save_to_file
    begin 
      File.open(@tracker_file,"w") {|f| f << @transported_list.to_json}
      puts "saved tracker list to #{@tracker_file}"
    rescue Errno::ENOENT => e
      print "inavlid tracker file path ..", e,"\n"
    end
  end
  
  def load_file
    begin
     @transported_list = JSON.parse(File.read(@tracker_file))
      puts "loaded tracker file  #{@tracker_file}"
    rescue Errno::ENOENT => e
      print "inavlid tracker file path ..", e,"\n"
    rescue JSON::ParserError => e
      print "invalid tracker file  #{@tracker_file}. . ", e,"\n"
    end
  end
end

class FileDesc < Hash
  def initialize(abs_path,discr)
    super()
    @abs_path = abs_path
    discr.each {|k,v|
      self[k] =v if ["fileId","accessTime","length","modificationTime"].include?(k) 
      }
  end
  
  def abs_path
    @abs_path
  end
  
end

:abs_path

In [14]:
class DirWatcher
  
  def initialize(webhdfs_client,tracker,watchFinishedEvent=nil,filediscoveryEvent=nil)
    @wh_client = webhdfs_client
    @tracker = tracker
    @file_discovery_Event = filediscoveryEvent
    @watch_Finished_Event = watchFinishedEvent
    
    @files_found_list = []
  end
  
  def start_watch(dir_path)
    @files_found_list = []
    dir_path = dir_path.strip
    dir_path = dir_path[0..-2] if dir_path[-1] == '/'
    watch_directory(dir_path)
    @watch_Finished_Event.call( @files_found_list,@tracker) if not @watch_Finished_Event.nil?
  end
  
  private
  def watch_directory(dir_path)
    puts "entering  '#{dir_path}'"
    begin
      for item_desc in @wh_client.list(dir_path)
        abs_path_of_item = [dir_path,item_desc['pathSuffix']].join('/')
        if item_desc["type"] == "FILE"
          file_descr = FileDesc.new abs_path_of_item,item_desc
          
           if not @tracker.fileAlreadyTransported? file_descr
             @file_discovery_Event.call(file_descr,@tracker) if not @file_discovery_Event.nil?
             @files_found_list << file_descr
           else
             puts "   skipping already transported #{file_descr.abs_path}"
           end
          
        else
          watch_directory(abs_path_of_item)
        end
      end
     rescue WebHDFS::IOError => e
       puts e
    end
  end
  
end

:watch_directory

In [16]:
def onFileFound(file_descr,tracker)
  puts "   ..........new File  =   #{file_descr.abs_path} .."
  #tracker.update_list(file_descr)
end

def onWatchFinished(file_list,tracker)
  file_list.each {|x|  tracker.update_list(x)}
  tracker.save_to_file
end
require 'webhdfs'
require 'json'
client = WebHDFS::Client.new("127.0.0.1",50070)
path = "/hist"

tracker = Tracker.new "/home/flytxt/Desktop/tracker.json"
dw = DirWatcher.new(client,tracker,method(:onWatchFinished),method(:onFileFound))

loaded tracker file  /home/flytxt/Desktop/tracker.json


#<DirWatcher:0x00000001742f88 @wh_client=#<WebHDFS::Client:0x000000017455d0 @host="127.0.0.1", @port=50070, @username=nil, @doas=nil, @proxy_address=nil, @proxy_port=nil, @retry_known_errors=false, @retry_times=1, @retry_interval=1, @httpfs_mode=false, @ssl=false, @ssl_ca_file=nil, @ssl_verify_mode=nil, @ssl_cert=nil, @ssl_key=nil, @ssl_version=nil, @kerberos=false, @kerberos_keytab=nil, @http_headers={}>, @tracker=#<Tracker:0x000000017454b8 @transported_list={"/hist/a/sample1"=>{"accessTime"=>1475490281414, "fileId"=>16396, "length"=>22, "modificationTime"=>1475490281520}, "/hist/b/sample1"=>{"accessTime"=>1475490310258, "fileId"=>16397, "length"=>22, "modificationTime"=>1475490310371}, "/hist/b/sample2"=>{"accessTime"=>1475490374968, "fileId"=>16398, "length"=>24, "modificationTime"=>1475490375076}, "/src.rb"=>{"accessTime"=>1475477698327, "fileId"=>16392, "length"=>2340, "modificationTime"=>1475477698434}, "/test.jhist"=>{"accessTime"=>1475487636187, "fileId"=>16391, "length"=>36672

In [17]:
l=dw.start_watch(path)

entering  '/hist'
entering  '/hist/a'
   skipping already transported /hist/a/sample1
entering  '/hist/b'
entering  '/hist/b/b1'
   skipping already transported /hist/b/b1/samplbb1
   skipping already transported /hist/b/sample1
   skipping already transported /hist/b/sample2
saved tracker list to /home/flytxt/Desktop/tracker.json


In [13]:
tracker.save_to_file

saved tracker list to /home/flytxt/Desktop/tracker.json
