Permalink
Browse files

init

  • Loading branch information...
0 parents commit a623033eca754ec0d1e5c43033d6c133de1d2a8f Robert committed Mar 9, 2012
Sorry, we could not display the entire diff because it took too long to generate.
53 README
@@ -0,0 +1,53 @@
+Increase system default settings
+--------------------------------
+Params that configure max open connections.
+Edit /etc/sysctl.conf and add
+ net.core.somaxconn=131072
+ fs.file-max=131072
+
+then on the command line:
+ sudo sysctl -p
+
+Edit /usr/include/linux/limits.h
+ NR_OPEN = 65536
+
+Edit /etc/security/limits.conf
+ your_username soft nofile 65535
+ your_username hard nofile 65535
+
+limits.conf is read at login, so you should be able to logout and back in for the new
+limits to take effect.
+
+As an alternative, you can run set_kernel.bash to set the kernel parameters at
+runtime with even larger values:
+
+ echo "10152 65535" > /proc/sys/net/ipv4/ip_local_port_range
+ sysctl -w fs.file-max=128000
+ sysctl -w net.ipv4.tcp_keepalive_time=300
+ sysctl -w net.core.somaxconn=250000
+ sysctl -w net.ipv4.tcp_max_syn_backlog=2500
+ sysctl -w net.core.netdev_max_backlog=2500
+ ulimit -n 10240
+
+
+Stop scaling frequency
+------------------------
+CPU frequency scaling enables the operating system to scale the CPU speed up or
+down in order to save power. You need to stop this before running any time
+based tests.
+
+#! /bin/bash
+for i in /sys/devices/system/cpu/cpu[0-9]
+do
+ echo performance > $i/cpufreq/scaling_governor
+done
+
+Running Tests
+---------------
+Before runing any tests, chech settings.py and settings.bash and set the variables for
+your system.
+
+Bob Hancock
+hancock.robert@gmail.com
+
+March 2012
0 __init__.py
No changes.
0 bigfile/__init__.py
No changes.
99 bigfile/bigfile.py
@@ -0,0 +1,99 @@
+def chunk_end(fh, start):
+ """
+ From the end of the chunk, increment until you find a newline and
+ return that position.
+ """
+ fh.seek(start)
+ c = fh.read(1)
+ while c != '\n' and c!= "": # empty string on EOF
+ c = fh.read(1)
+ return fh.tell()
+
+
+def size_chunks(fh, fsize, num_chunks=4):
+ """
+ Create num_chunks equal chunks of the file.
+
+ return
+ A list of tuples (chunk_start, chunk_size)
+ """
+ chunks = []
+ chunk_size = fsize / num_chunks
+
+ start = 0
+ for i in range(0, num_chunks):
+ if (start + chunk_size < fsize):
+ end = chunk_end(fh, start + chunk_size)
+ size = (end - start)
+ else:
+ end = fsize
+ size = (fsize - start)
+
+ #print("i={i} chunk_end={e} chunk_start={s} chunk_size={z}".format(i=i,e=end,s=start,z=size))
+ chunks.append((start, size))
+ start = end + 1
+
+ return chunks
+
+
+def find(fh, chunk, pattern, q):
+ """
+ Seek to the start of a chunk and read each line.
+
+ fh File handle to the file
+ chunk A tuple (start, size of chunk)
+ q Synchronized queue
+ """
+ f = fh
+ start, size = chunk
+ recsmatch = 0
+
+ bytes_read = 0
+ f.seek(start)
+ while bytes_read < size:
+ line = f.readline()
+ bytes_read += len(line)
+ if pattern.search(line):
+ recsmatch += 1
+
+ q.put(recsmatch)
+
+
+def count_matches(q):
+ recsmatch = 0
+ while True:
+ matches = q.get()
+ if matches == None: # sentinel
+ q.task_done()
+ break
+
+ recsmatch += matches
+ q.task_done()
+
+ print(recsmatch)
+
+
+def find_noq(fh, chunk, pattern):
+ """
+ Seek to the start of a chunk and read each line.
+ This is used with programs that do not use a queue.
+
+ fh File handle to the file
+ chunk A tuple (start, size of chunk)
+ q Synchronized queue
+ """
+ f = fh
+ start, size = chunk
+ recsmatch = 0
+
+ bytes_read = 0
+ f.seek(start)
+ while bytes_read < size:
+ line = f.readline()
+ bytes_read += len(line)
+ #print(line)
+ if pattern.search(line):
+ recsmatch += 1
+
+ return recsmatch
+
15 bigfile_brute.py
@@ -0,0 +1,15 @@
+import os
+import time
+import settings
+
+#recsread = 0
+recsmatch = 0
+
+with open(settings.BIG_FILE, "r") as fh_in:
+ for line in fh_in:
+ #recsread += 1
+ if settings.TARGET_USERNAME in line: # What is the big O of this operation?
+ recsmatch += 1
+
+#print("{r} {m}".format(r=recsread, m=recsmatch))
+print("{m}".format(m=recsmatch))
18 bigfile_brute_regex.py
@@ -0,0 +1,18 @@
+import os
+import time
+import re
+import settings
+
+RECSMATCH=0
+#RECSREAD=0
+
+pattern = re.compile(settings.TARGET_USERNAME)
+
+with open(settings.BIG_FILE, "r") as fh_in:
+ for line in fh_in:
+ #RECSREAD += 1
+ if pattern.search(line):
+ RECSMATCH += 1
+
+#print(RECSREAD,RECSMATCH)
+print(RECSMATCH)
59 bigfile_chunks_futures_threadpool.py
@@ -0,0 +1,59 @@
+# Multiprocessing won't work because the executor cannot serialize the object returned by
+# open(fname, "r"). Specifically,
+# <_io.TextIOWrapper name='/home/rhancock/bigsmallfile.xferlog' mode='r' encoding='UTF-8'>
+from concurrent import futures
+import sys
+import os
+import re
+import time
+import settings
+from bigfile.bigfile import chunk_end, size_chunks, find_noq
+
+sys.path.insert(0, settings.PACKAGE_PATH)
+import dry.logger
+
+def main():
+ start = time.time()
+ logger = dry.logger.setup_log_size_rotating("log/bigfile_futures_threadpool.log",
+ logname='bigfilefuturesthreads')
+
+ logger.info("START")
+ elapsed_time = []
+
+ sfile = settings.BIG_FILE
+ fsize = os.path.getsize(sfile)
+ with open(sfile, "r") as fh:
+ #A list of tuples (chunk_start, chunk_size)
+ chunks = size_chunks(fh, fsize, num_chunks=settings.BIGFILE_FUTURES_CHUNKS)
+
+ pattern = re.compile(settings.TARGET_USERNAME)
+ file_handles = []
+ for j in range(len(chunks)):
+ file_handles.append(open(sfile, "r"))
+
+ with futures.ThreadPoolExecutor(max_workers=settings.BIGFILE_FUTURES_CHUNKS) as executor:
+ future_to_chunk = dict( (executor.submit(find_noq, file_handles[i], chunks[i], pattern), "") \
+ for i in range(len(chunks)) )
+
+ recsmatch = 0
+
+ try:
+ for future in futures.as_completed(future_to_chunk, timeout=60):
+ recsmatch += future.result()
+ except Exception as e:
+ #traceback.print_exc(file=sys.stdout)
+ logger.error("recsmatch={m} e={e}".format(m=recsmatch, e=e))
+ return
+
+ elapsed_time.append(time.time() - start)
+
+ elapsed_time_str = ""
+ for t in elapsed_time:
+ elapsed_time_str += str(t)+","
+ elapsed_time_str.strip,(",")
+
+ print("{r}".format(r=recsmatch))
+ logger.info("STOP|elapsedtime:{et}|recsmatch:{r}".format(et=elapsed_time, r=recsmatch))
+
+if __name__ == "__main__":
+ sys.exit(main())
63 bigfile_chunks_gevent.py
@@ -0,0 +1,63 @@
+# Split the file into four chunks and assign each to a thread.
+# We don't calculate the records read because this would require
+# synchronizing the value with locks or creating a separate queue.
+# This would add an overhead that would skew the results when compared
+# to the brute force approach.
+import os
+import sys
+import re
+import time
+import gevent
+import gevent.queue
+
+if sys.version[0] == "3":
+ from queue import Queue
+else:
+ from Queue import Queue
+
+import settings
+from bigfile.bigfile import chunk_end, size_chunks, find
+
+def count_matches(q):
+ recsmatch = 0
+ while True:
+ matches = q.get()
+ if matches == None: # sentinel
+ break
+
+ recsmatch += matches
+
+ print(recsmatch)
+
+# Start Execution
+if len(sys.argv) < 1:
+ print("usage: %prog")
+ sys.exit(1)
+
+sfile = settings.BIG_FILE
+
+fsize = os.path.getsize(sfile)
+with open(sfile, "r") as fh:
+ chunks = size_chunks(fh, fsize, num_chunks=settings.BIGFILE_GEVENT_CHUNKS)
+
+pattern = re.compile(settings.TARGET_USERNAME)
+
+# maxsize = 0 makes the queue act like a channel. The queue will block
+# until a get call retrieves the data. In effect, it works like a CSP.
+q = gevent.queue.Queue(maxsize=0)
+
+# consumer
+con = gevent.spawn(count_matches, q)
+
+# producer
+fhandles = [open(sfile, "r") for i in xrange(0, settings.BIGFILE_GEVENT_CHUNKS)]
+jobs = [gevent.spawn(find, fhandles[i], chunks[i], pattern, q) for i in xrange(0, settings.BIGFILE_GEVENT_CHUNKS)]
+gevent.joinall(jobs, timeout=10)
+
+#q.put(None)
+#con.join()
+
+for f in fhandles:
+ f.close()
+
+#print("chunks={c}".format(c=settings.BIGFILE_GEVENT_CHUNKS))
65 bigfile_chunks_gevent_pipeline2.py
@@ -0,0 +1,65 @@
+# Split the file into four chunks and assign each to a thread.
+# We don't calculate the records read because this would require
+# synchronizing the value with locks or creating a separate queue.
+# This would add an overhead that would skew the results when compared
+# to the brute force approach.
+import os
+import sys
+import re
+import time
+import gevent
+import gevent.queue
+from bigfile_pipeline_2 import opener, cat, grep, writer
+
+
+if sys.version[0] == "3":
+ from queue import Queue
+else:
+ from Queue import Queue
+
+import settings
+from bigfile.bigfile import chunk_end, size_chunks, find
+
+def count_matches(q):
+ recsmatch = 0
+ while True:
+ matches = q.get()
+ if matches == None: # sentinel
+ break
+
+ recsmatch += matches
+
+ print(recsmatch)
+
+# Start Execution
+if len(sys.argv) < 1:
+ print("usage: %prog")
+ sys.exit(1)
+
+sfile = settings.BIG_FILE
+
+fsize = os.path.getsize(sfile)
+with open(sfile, "r") as fh:
+ chunks = size_chunks(fh, fsize, num_chunks=settings.BIGFILE_GEVENT_CHUNKS)
+
+pattern = re.compile(settings.TARGET_USERNAME)
+
+# maxsize = 0 makes the queue act like a channel. The queue will block
+# until a get call retrieves the data. In effect, it works like a CSP.
+q = gevent.queue.Queue(maxsize=0)
+
+# consumer
+con = gevent.spawn(count_matches, q)
+
+# producer
+fhandles = [open(sfile, "r") for i in xrange(0, settings.BIGFILE_GEVENT_CHUNKS)]
+jobs = [gevent.spawn(opener, cat(grep(pattern, writer()))) for i in xrange(0, settings.BIGFILE_GEVENT_CHUNKS)]
+gevent.joinall(jobs, timeout=10)
+
+#q.put(None)
+#con.join()
+
+for f in fhandles:
+ f.close()
+
+#print("chunks={c}".format(c=settings.BIGFILE_GEVENT_CHUNKS))
53 bigfile_chunks_mp.py
@@ -0,0 +1,53 @@
+# Split the file into four chunks, process each chunk in a separate process,
+# count the number of matching records, report via a queue, calculate total.
+import os
+import sys
+import re
+import time
+import multiprocessing
+from multiprocessing import JoinableQueue as Queue
+import settings
+from bigfile.bigfile import chunk_end, size_chunks, find, count_matches
+
+
+sfile = settings.BIG_FILE
+fsize = os.path.getsize(sfile)
+with open(sfile, "r") as fh:
+ chunks = size_chunks(fh, fsize, num_chunks=settings.BIGFILE_MP_CHUNKS)
+
+# Debug
+#for c in chunks:
+ #print(c)
+
+q = Queue()
+pattern = re.compile(settings.TARGET_USERNAME)
+
+# consumer
+con = multiprocessing.Process(target=count_matches, args=(q,))
+con.daemon = True
+con.start()
+
+# producer
+producers = []
+file_handles = []
+
+for chunk in chunks:
+ fh = open(sfile, "r")
+ file_handles.append(fh)
+ t = multiprocessing.Process(target=find, args=(fh, chunk, pattern, q))
+ t.daemon = True
+ producers.append(t)
+
+for p in producers:
+ p.start()
+
+for p in producers:
+ p.join()
+
+q.put(None) # sentinel
+con.join()
+
+for f in file_handles:
+ f.close()
+
+print("chunks={c}".format(c=settings.BIGFILE_MP_CHUNKS))

0 comments on commit a623033

Please sign in to comment.