Skip to content

Commit

Permalink
Restructured recurse method with explicit lock
Browse files Browse the repository at this point in the history
  • Loading branch information
akharit committed Aug 1, 2018
1 parent f0c00ca commit cd4e938
Showing 1 changed file with 70 additions and 73 deletions.
143 changes: 70 additions & 73 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,95 +1129,92 @@ def trim(self):
return self.relative_to(self.anchor)


import Queue
class CountUpDownLatch:
def __init__(self):
self.lock = threading.Condition()
self.val = 0

def increment(self):
self.lock.acquire()
self.val +=1
self.lock.release()

def decrement(self):
self.lock.acquire()
self.val -= 1
if self.val <=0:
self.lock.notifyAll()
self.lock.release()

def value(self):
self.lock.acquire()
temp = self.val
self.lock.release()
return temp

def isZero(self):
self.lock.acquire()
while self.val > 0:
self.lock.wait()
self.lock.release()
return True


from concurrent.futures import ThreadPoolExecutor
import multiprocessing
import threading

def recurse_method(AzureDLFileSystemObject=None, path=None, file_method=None, dir_method=None,
file_method_kwargs={}, dir_method_kwargs={}):
"""General purpose framework for traversing the dir tree and applying methods on files and dirs"""
dir_queue = Queue.Queue()
file_queue = Queue.Queue()
dir_pool = ThreadPoolExecutor(max_workers=1)# multiprocessing.cpu_count()*3)
file_pool = ThreadPoolExecutor(max_workers=1)# multiprocessing.cpu_count()*10)
main_pool = ThreadPoolExecutor(max_workers=2)


def zero_threads_still_in_thread_pool(thread_pool):
#TODO Is there a better way than accessing private members
return len(thread_pool._threads) == 0 and thread_pool._work_queue.qsize() == 0

def list_status(dir_path, ls_arguments={'detail': True}):
#TODO Replace with Minimal API call Or more generic method call
folders = AzureDLFileSystemObject.ls(dir_path, **ls_arguments)
result = []
for elements in folders:
try:
reduced_details = {key: elements[key] for key in ['type', 'pathSuffix']}
result.append(reduced_details)
except KeyError:
#TODO Logging
pass
return result
adl = AzureDLFileSystemObject
dir_pool = ThreadPoolExecutor(max_workers=50)# multiprocessing.cpu_count()*3)
file_pool = ThreadPoolExecutor(max_workers=50)# multiprocessing.cpu_count()*10)
dir_processed_lock = CountUpDownLatch()
file_processed_lock = CountUpDownLatch()

def walk_with_dirs(path, invalidate_cache=True):
fi = list(adl._ls(path, invalidate_cache))
adl._emptyDirs = []
for apath in fi:
if apath['type'] == 'DIRECTORY':
sub_elements = adl._ls(apath['name'], invalidate_cache)
fi.extend(sub_elements)

return fi

def dir_processor(dir_path):
print("In dir processor at", dir_path)
try:
if dir_method is not None:
dir_method(path=dir_path, **dir_method_kwargs)
except:
print("Failure on "+dir_path)
pass

for end_path in list_status(dir_path=dir_path):
complete_path = AzureDLPath(dir_path / end_path['pathSuffix'])
if end_path['type'] == 'DIRECTORY':
dir_queue.put(complete_path)
else:
file_queue.put(complete_path)
print("check", dir_path)

def dir_thread():
print("In dir thread")
while not dir_queue.empty() or not zero_threads_still_in_thread_pool(dir_pool):
if dir_queue.empty():
continue
else:
dir_path = dir_queue.get()
print("Submitting thread to dirpool", dir_path)
dir_pool.submit(dir_processor(dir_path))
dir_queue.task_done()
print("Failure on ",dir_path)
dir_processed_lock.decrement()

def file_processor(file_path):
try:
file_method(path=file_path, **file_method_kwargs)
if file_method is not None:
file_method(path=file_path, **file_method_kwargs)
except:
print("Failure on ",file_path)
file_processed_lock.decrement()

for item in walk_with_dirs(path):
real_path = AzureDLPath(item['name'])
if item['type'] == 'FILE':
file_processed_lock.increment()
file_pool.submit(file_processor(real_path))
elif item['type'] == 'DIRECTORY':
dir_processed_lock.increment()
dir_pool.submit(dir_processor(real_path))
else:
# TODO Logging
print("Shouldn't happen")

while not file_processed_lock.isZero() and not dir_processed_lock.isZero():
pass




def file_thread():
while not file_queue.empty() or not dir_queue.empty() or not zero_threads_still_in_thread_pool(dir_pool):
if file_queue.empty():
continue
else:
file_path = file_queue.get()
file_queue.task_done()
if file_method is not None:
file_pool.submit(file_processor(file_path))

dir_processor(path)
main_pool.submit(dir_thread)
main_pool.submit(file_thread)

while not file_queue.empty() or not dir_queue.empty() or not zero_threads_still_in_thread_pool(
file_pool) or not zero_threads_still_in_thread_pool(dir_pool):
fq = not file_queue.empty()
dq = not dir_queue.empty()
fp = not zero_threads_still_in_thread_pool(file_pool)
dp = not zero_threads_still_in_thread_pool(dir_pool)
if not file_queue.empty() and zero_threads_still_in_thread_pool(file_pool):
main_pool.submit(file_thread)
if not dir_queue.empty() and zero_threads_still_in_thread_pool(dir_pool):
main_pool.submit(file_thread)
time.sleep(1)


0 comments on commit cd4e938

Please sign in to comment.