Skip to content
This repository
Browse code

Initial commit of prototype hdfs-data-local execution method.

  • Loading branch information...
commit 67aa43413acdf8977ec200b940130471a3daaed1 1 parent 5072bd2
Davin Potts authored February 14, 2013
16  webhdfs/hdfs_filemapper.py
... ...
@@ -0,0 +1,16 @@
  1
+import requests, urlparse
  2
+
  3
+
  4
+def run(hdfs_filepath):
  5
+    """Ask local WebHDFS for optimal uri to pull file from HDFS."""
  6
+
  7
+    # TODO: Generalize beyond always looking to localhost:50070
  8
+    uri = "http://localhost:50070/webhdfs/v1{}?op=OPEN".format(hdfs_filepath)
  9
+    try:
  10
+        response = requests.request("GET", uri, allow_redirects=False)
  11
+        file_uri = str(response.headers['Location'])
  12
+    except:
  13
+        file_uri = "ERROR"
  14
+
  15
+    return file_uri
  16
+
37  webhdfs/overtrusting_service.py
... ...
@@ -0,0 +1,37 @@
  1
+import sys, os.path, SimpleXMLRPCServer
  2
+
  3
+
  4
+def obedient_executor(module_name, module_source, args=()):
  5
+    """
  6
+    >>> obedient_executor("blah", "def run(): return 4*5")
  7
+    20
  8
+    >>> obedient_executor("blah", "")
  9
+    20
  10
+    >>> obedient_executor("blah2", "def run(x, y): return x*y", (7,8))
  11
+    56
  12
+    """
  13
+    if module_source != "":
  14
+        with open(os.path.join(".", str(module_name) + ".py"), 'w') as fp:
  15
+            fp.write(module_source)
  16
+    supplied_module = __import__(module_name)
  17
+    result = supplied_module.run(*args)
  18
+    return result
  19
+
  20
+
  21
+def ping():
  22
+    return "pong"
  23
+
  24
+
  25
+
  26
+if __name__ == '__main__':
  27
+    listen_port = 9002
  28
+    if len(sys.argv) > 1:
  29
+        listen_port = int(sys.argv[1])
  30
+
  31
+    server = SimpleXMLRPCServer.SimpleXMLRPCServer(("0.0.0.0", listen_port))
  32
+    server.register_function(obedient_executor)
  33
+    server.register_function(ping)
  34
+
  35
+    print "Listening on port {}".format(listen_port)
  36
+    server.serve_forever()
  37
+
154  webhdfs/pseudoblaze_hdfs.py
... ...
@@ -0,0 +1,154 @@
  1
+import xmlrpclib, multiprocessing
  2
+
  3
+
  4
+def _doacross_action(module_name, module_source, node_uri, file_uris, \
  5
+        results, fixed_args=()):
  6
+    """Intended to be invoked inside a single thread with 'results' as
  7
+    multiprocessing.Queue() instance."""
  8
+    proxy = xmlrpclib.ServerProxy(node_uri)
  9
+    for file_uri in file_uris:
  10
+        out = proxy.obedient_executor(module_name, module_source, \
  11
+                (file_uri,) + fixed_args)
  12
+        results.put((node_uri, file_uri, out))
  13
+
  14
+
  15
+
  16
+class HDFSClusterSupervisor:
  17
+    """Prototype supervisor for performing HDFS-data-local code execution."""
  18
+
  19
+    def __init__(self, desired_worker_nodes=[], verify_nodes_now=True):
  20
+        self.worker_nodes = desired_worker_nodes
  21
+        if verify_nodes_now:
  22
+            self.verify_active_nodes()
  23
+
  24
+
  25
+    def add_worker_node(self, node_uri, verify_node_now=True):
  26
+        was_added = False
  27
+        if verify_node_now:
  28
+            if self._ping_node(node_uri):
  29
+                self.worker_nodes += [node_uri]
  30
+                was_added = True
  31
+        else:
  32
+            self.worker_nodes += [node_uri]
  33
+            was_added = True
  34
+        return was_added
  35
+
  36
+
  37
+    def doacross(self, module_name, module_source, nodes_files_uris, \
  38
+            fixed_args=()):
  39
+        """Invokes the supplied module (with given name and source) run()
  40
+        method on each node specified in the nodes_files_uris list of tuples,
  41
+        where items in the nodes_files_uris list are of the form
  42
+        (node_uri, [file_uri0, file_uri1, ...]).  All invocations of run() on
  43
+        a particular node are performed from the same thread and each
  44
+        distinct node gets its own thread for launching runs there."""
  45
+        manager = multiprocessing.Manager()
  46
+        results = manager.Queue()
  47
+        pool = multiprocessing.Pool(len(nodes_files_uris))
  48
+        for (node_uri, file_uris) in nodes_files_uris:
  49
+            pool.apply_async(_doacross_action, \
  50
+                    (module_name, module_source, node_uri, file_uris, \
  51
+                    results, fixed_args))
  52
+        pool.close()
  53
+        pool.join()
  54
+
  55
+        result_items = []
  56
+        while results.empty() == False:
  57
+            result_items.append(results.get(block=False))
  58
+        return result_items
  59
+
  60
+
  61
+    def build_map_files_to_local_uris(self, filepaths):
  62
+        """Build a map of uris to HDFS Datanodes where a given file is
  63
+        actually stored (redundantly, in some cases) by querying WebHDFS
  64
+        on each worker node."""
  65
+        mapped_files = {}
  66
+        if type(filepaths) == type(""):
  67
+            # Interpret string as directory name to walk through files within.
  68
+            # TODO: Permit a directory name and discover files within
  69
+            filepaths = []  # TODO: build this by walking
  70
+            raise ValueError("Unimplemented: Walking through a directory.")
  71
+
  72
+        # The hdfs_filemapper module abuses the 'file_uris' by passing
  73
+        # HDFS file paths in their stead; 'out' (as seen inside #doacross())
  74
+        # contains the optimal Datanode uri for reading that file,
  75
+        # according to the worker node (WebHDFS there) that was queried.
  76
+        mapped_optimal_uris = self.doacross("hdfs_filemapper", "", \
  77
+                zip(self.worker_nodes, len(self.worker_nodes)*[filepaths]))
  78
+        for (node_uri, filename, file_uri) in mapped_optimal_uris:
  79
+            if filename not in mapped_files:
  80
+                mapped_files[filename] = []
  81
+            if file_uri != 'None':
  82
+                mapped_files[filename].append((node_uri, file_uri))
  83
+
  84
+        return mapped_files
  85
+
  86
+
  87
+    def verify_active_nodes(self):
  88
+        """Walk through and keep nodes in the 'worker_nodes' list that
  89
+        respond to a ping in a timely manner."""
  90
+        self.worker_nodes = [ node_uri for node_uri in self.worker_nodes \
  91
+                              if self._ping_node(node_uri) ]
  92
+        return len(self.worker_nodes)
  93
+
  94
+
  95
+    def _ping_node(self, node_uri):
  96
+        """Returns True if specified node responds to a ping, else False."""
  97
+        got_response = True
  98
+        try:
  99
+            proxy = xmlrpclib.ServerProxy(node_uri)
  100
+            proxy.ping()
  101
+        except:
  102
+            got_response = False
  103
+        return got_response
  104
+
  105
+
  106
+
  107
+class HDFSClusterJob:
  108
+    def __init__(self, sup, filepaths=[], module_name=None, module_source=None):
  109
+        self._cluster_supervisor = sup
  110
+        self._module_name, self._module_source = module_name, module_source
  111
+        self._previously_run_with_same_module = False
  112
+        self.filepaths = filepaths
  113
+        self.execution_plan = []
  114
+        if len(filepaths) != 0:
  115
+            self.prepare_execution_plan()
  116
+
  117
+
  118
+    def prepare_execution_plan(self):
  119
+        mapped_files = self._cluster_supervisor.build_map_files_to_local_uris(self.filepaths)
  120
+        node_tasks = {}
  121
+        # Employs sub-optimal single pass with no global optimization.
  122
+        for filename in mapped_files:
  123
+            for (node_uri, file_uri) in mapped_files[filename]:
  124
+                # Initialize node_tasks.
  125
+                if node_uri not in node_tasks:
  126
+                    node_tasks[node_uri] = []
  127
+            chosen_node_uri, chosen_file_uri = sorted([ \
  128
+                    (len(node_tasks[node_uri]), (node_uri, file_uri)) \
  129
+                    for (node_uri, file_uri) in mapped_files[filename] ])[0][1]
  130
+            node_tasks[chosen_node_uri].append(chosen_file_uri)
  131
+        self.execution_plan = node_tasks.items()
  132
+
  133
+
  134
+    def set_module_from_string(self, module_name, module_source):
  135
+        self._module_name, self._module_source = module_name, module_source
  136
+        self._previously_run_with_same_module = False
  137
+
  138
+
  139
+    def set_module_from_file(self, filename):
  140
+        with open(filename, 'r') as fp:
  141
+            self._module_source = fp.read()
  142
+        self._module_name = filename.split('.')[0]
  143
+        self._previously_run_with_same_module = False
  144
+
  145
+
  146
+    def run(self, fixed_args=()):
  147
+        module_source = self._module_source
  148
+        if self._previously_run_with_same_module:
  149
+            # Reuse the cached copy on the nodes, skip resending over wire.
  150
+            module_source = ""
  151
+        return self._cluster_supervisor.doacross(self._module_name, \
  152
+                module_source, self.execution_plan, fixed_args)
  153
+
  154
+

0 notes on commit 67aa434

Please sign in to comment.
Something went wrong with that request. Please try again.