/
recipe-577404.py
138 lines (110 loc) · 3.57 KB
/
recipe-577404.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/python
# Unix/Linux only (as the 'fcntl' module is). Python >=2.6/3.x compatibile.
# Copyright (c) 2010 Jan Kaliszewski (zuo). Licensed under the MIT License.
"""
flocktests.py: fcntl.flock(LOCK_EX|LOCK_NB) behaviour sampling -- with one
file object or separate file objects (pointing to the same filesystem path),
with/without threading or forking.
"""
from __future__ import print_function
import os
import sys
import threading
from fcntl import flock, LOCK_EX, LOCK_UN, LOCK_NB
from os.path import basename
class lockpath(object):
"Open a file, flock it, generate appropriate info, unflock if necessary"
def __init__(self, path, _keep=False):
"By default, unflock/close the file immediately after setting the lock"
self.file = file = open(path, 'a')
locked = _lockonly(file)
if _keep:
self.locked = locked
else:
file.close()
self.locked = False
@classmethod
def keeping(cls, path):
"Constructor for with-blocks: tries to keep the file open and flocked"
return cls(path, _keep=True)
def __enter__(self):
"Enter a with-block binding the file to the as-clause target"
if self.locked:
return self.file
else:
raise RuntimeError('lockpath().file is not locked '
'-- cannot enter the with-block')
def __exit__(self, *args, **kwargs):
"Leave the with-block closing the file (=> unflocking it)"
self.file.close()
def _help(*args):
print('\n '.join(args), file=sys.stderr)
def _msg(*args):
print('pid:{0}'.format(os.getpid()),
threading.current_thread().name,
*args)
def _lockonly(file):
_msg('got file #', file.fileno())
try:
flock(file, LOCK_EX | LOCK_NB)
except IOError:
_msg('failed to lock')
return False
else:
_msg('locked successfully')
return True
def lockfile(file):
"flock a given file, then unflock it immediately"
if _lockonly(file):
flock(file, LOCK_UN)
# Options
def n(path):
"one file object + no concurrency"
with lockpath.keeping(path) as file:
lockfile(file)
def N(path):
"separate file objects + no concurrency"
with lockpath.keeping(path):
lockpath(path)
def t(path):
"one file object + threading"
with lockpath.keeping(path) as file:
t = threading.Thread(target=lockfile, args=(file,))
t.start()
t.join()
def T(path):
"separate file objects + threading"
with lockpath.keeping(path):
t = threading.Thread(target=lockpath, args=(path,))
t.start()
t.join()
def f(path):
"one file object + forking"
with lockpath.keeping(path) as file:
if os.fork():
os.wait()
else:
lockfile(file)
def F(path):
"separate file objects + forking"
with lockpath.keeping(path):
if os.fork():
os.wait()
else:
lockpath(path)
OPTIONS = 'nNtTfF'
def main(program, option='', path='test.flock'):
"Do one of the tests or print a short help"
flocktests = globals()
option = option.lstrip('-')
if option and (option in OPTIONS):
function = flocktests[option]
function(path)
else:
_help(__doc__.lstrip())
_help('Usage: {0} OPTION [PATH]'.format(basename(program)),
'Default PATH: test.flock', 'OPTIONS:',
*('-{0} {1}'.format(option, flocktests[option].__doc__)
for option in OPTIONS))
if __name__ == '__main__':
main(*sys.argv)