-
Notifications
You must be signed in to change notification settings - Fork 0
/
dir_overlay.py
291 lines (249 loc) · 11.9 KB
/
dir_overlay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#edit these variables to point to the correct paths
BASE_DIR= "../base"
CUSTOM_DIR="custom"
DIRECTION= "tobase" #tobase or tocustom
STATE_DIR="."
BACKUP_TYPE="files" #files or tar
#-----end of user editable variables. Only real developers from this point on please :)
USAGE= ("Usage: \n"
"./dir_overlay.py clean|apply \n"
"\n"
"Let us define two directories (BASE_DIR and CUSTOM_DIR) and two operations (APPLY and CLEAN). \n"
"We also define DIRECTION as being either TOBASE or TOCUSTOM. \n"
"The purpose of this script is to 'overlay' two directories, by reversibly copying the contents of one directory to the other. It allows you to put files on CUSTOM_DIR that are a 'customized version' of those on BASE_DIR\n"
"If DIRECTION is TOCUSTOM: \n"
" APPLY copies the contents of BASE_DIR to CUSTOM_DIR. It doesn't copy any files that already exist on the destination. \n"
" CLEAN indiscriminately removes any files that were created by APPLY. IT DOES NOT CHECK FOR CHANGES IN THOSE FILES\n"
"\n"
"If DIRECTION is TOBASE: \n"
" APPLY copies the contents of CUSTOM_DIR to BASE_DIR. Files that already exist on the destination are backed up and replaced. \n"
" CLEAN removes any files that were created by APPLY, and restores the backups if they already existed. \n"
"\n"
"The script relies on relative paths, so you need a good CWD when you launch it (from a terminal, preferably...) \n"
"Two state files are kept on the CWD - please don't delete them. \n"
)
from os import remove, listdir, mkdir, getcwd, chdir
from os.path import isdir, isfile, join, exists, normpath, abspath, relpath
from shutil import copyfile, rmtree, move
from functools import partial
import logging
logging.basicConfig(level=logging.INFO, format= '%(message)s')
#----------------This section has pure classes and functions-----------------------------------------------
def file_inside_directory(f, dir):
'''returns True iff the file is inside a directory OR any of its subdirectories'''
return abspath(f).startswith(abspath(dir))
def relativify_path(path, base):
'''similar to os.path.relpath, but makes sure path is inside base'''
assert file_inside_directory(path, base)
return relpath(path, base)
class StateFile( object ):
'''tracks state (CLEAN or APPLIED), using a file'''
CLEAN, APPLIED= "clean", "applied"
STATES= (CLEAN, APPLIED)
def __init__(self, filename):
self.filename= filename
self.state= self._read_state()
def _read_state(self):
if not exists(self.filename):
self.set_state(self.CLEAN)
return self.CLEAN
state= open(self.filename,'rb').read()
for x in self.STATES:
if state==x:
return state
raise Exception("Cannot read state from state file: "+state)
def set_state(self, state):
assert state in self.STATES
f=open(self.filename, 'wb')
f.write(state)
f.close()
self.state= state
class MergeHistory( object ):
'''keeps a history of which files have been added/changed by a merge'''
def __init__(self):
self.changed= [] #files that were changed or did not exist OR directories that did not exist
def add_file( self, f ):
self.changed.append(f)
def change_file( self, f ):
self.changed.append(f)
def add_dir( self, d):
self.changed.append(d)
def serialize_to_file( self, filename ):
open(filename, 'w').write("\n".join(self.changed))
@staticmethod
def read_serialized_file( filename ):
c= MergeHistory()
c.changed= open(filename, 'r').read().splitlines()
return c
class DirectoryMerger( object ):
'''Exposes operations to merge two directories, and to revert the changes, through the use of a MergeHistory'''
BACKUP_EXT='.dir_overlay_bak'
TAR_BACKUP_FILENAME='.dir_overlay_bak.tar'
BACKUP_NONE, BACKUP_FILES, BACKUP_TAR= None, "files", "tar"
BACKUP_TYPES= (BACKUP_NONE, BACKUP_FILES, BACKUP_TAR)
def __init__(self, from_dir, to_dir, backup, replace):
assert backup in self.BACKUP_TYPES
assert replace in (True, False)
assert not (backup and not replace) #it doesn't make sense to backup and not replace...
self.from_dir, self.to_dir= map( abspath, (from_dir, to_dir) )
self.backup_dir= self.to_dir #the implementation needs some changes for this to be something else
self.backup= backup
self.replace= replace
def _backup_filename( self, f ):
assert not f[-1] in ("/","\\")
return f+self.BACKUP_EXT
def _backup_file( self, f ):
if self.backup:
relative_f= relativify_path(f, self.to_dir)
dest= join( self.backup_dir, self._backup_filename( relative_f ) )
move( f, dest )
self.backed_up.append(dest)
def _merge_file( self, from_file, to_file ):
ex= exists(to_file)
if ex and not isfile(to_file):
raise Exception("File on origin has the same name as a non-file on destination: "+to_file)
if ex:
self._backup_file( to_file )
if self.replace or not ex:
logging.debug("copying file:".ljust(27)+from_file)
copyfile(from_file, to_file)
if ex:
self.changes.change_file( to_file )
else:
self.changes.add_file( to_file )
else:
logging.debug("ignoring file:".ljust(27)+from_file)
return False
def _merge_dir( self, from_dir, to_dir ):
logging.debug("processing directory:".ljust(25)+from_dir+", "+to_dir)
copied_files, copied_dirs= [], []
relative_listing= listdir(from_dir) #all files and dirs, with relative paths
absolute_ft_listing= [(join(from_dir, x),join(to_dir, x)) for x in relative_listing] #(from,to) pairs, with absolute paths
absolute_ft_files= [x for x in absolute_ft_listing if isfile(x[0])] #(from,to) pairs of files, with absolute paths
absolute_ft_dirs= [x for x in absolute_ft_listing if isdir(x[0])] #(from,to) pairs of directories, with absolute paths
for x in absolute_ft_dirs:
fd,td= x #from dir, to dir
try:
dir_created= not exists(td)
if dir_created:
mkdir(td)
if not dir_created:
if not isdir(td):
raise Exception("Directory on origin has same name as a non-directory on destination: "+td)
self._merge_dir( fd, td )
if dir_created: #copied_dirs do not contain already existent directories on to_dir - so that we don't delete them on a clean
self.changes.add_dir( td )
except Exception as e:
logging.error( str(e) )
for x in absolute_ft_files:
ff,tf= x #from file, to file
try:
self._merge_file( ff, tf )
except Exception as e:
logging.error( str(e) )
def merge( self ):
self.changes= MergeHistory()
self.backed_up= []
self._merge_dir( self.from_dir, self.to_dir)
if self.backup==self.BACKUP_TAR:
logging.debug("creating tar backup")
import tarfile
oldcwd= getcwd()
chdir( self.backup_dir )
tar = tarfile.open(self.TAR_BACKUP_FILENAME, "w")
for f in self.backed_up:
tar.add( relativify_path(f, self.backup_dir) )
tar.close()
for f in self.backed_up:
remove( relativify_path(f, self.backup_dir) )
chdir( oldcwd )
return self.changes
def remove_changes(self, changes):
'''delete all files and dirs on the list, and restores backups if existent'''
assert isinstance(changes, MergeHistory)
if self.backup==self.BACKUP_TAR and exists(join(self.backup_dir, self.TAR_BACKUP_FILENAME)):
logging.debug("extracting tar backup")
import tarfile
oldcwd= getcwd()
chdir( self.backup_dir )
tar = tarfile.open(self.TAR_BACKUP_FILENAME)
tar.extractall()
tar.close()
logging.debug("deleting tar backup")
remove(self.TAR_BACKUP_FILENAME)
chdir( oldcwd )
for x in changes.changed:
if isdir(x):
logging.debug("removing directory: ".ljust(20)+x)
rmtree(x)
elif isfile(x):
logging.debug("removing file: ".ljust(20)+x)
remove(x)
if exists( self._backup_filename(x) ):
logging.debug("restoring file: ".ljust(20)+x)
move( self._backup_filename(x), x )
else:
logging.warning("Cannot clean "+x)
class DirectoryOverlay( object ):
'''Uses a StateFile and a DirectoryMerger'''
TOBASE, TOCUSTOM= "tobase", "tocustom"
DIRECTIONS= (TOBASE, TOCUSTOM)
STATE_FILE= ".dir_overlay.state" #clean or applied state
CHANGES_FILE= ".dir_overlay.list" #list of files that were copied
class AlreadyApplied( Exception ):
pass
def __init__(self, base_dir, custom_dir, state_dir, direction):
assert direction in self.DIRECTIONS
assert not file_inside_directory(state_dir, base_dir) #state_dir is not inside (or the same) as base_dir
assert not file_inside_directory(state_dir, custom_dir) #state_dir is not inside (or the same) as base_dir
from_dir= base_dir if direction==self.TOCUSTOM else custom_dir
to_dir= base_dir if direction==self.TOBASE else custom_dir
replace= direction==self.TOBASE
backup= BACKUP_TYPE if direction==self.TOBASE else DirectoryMerger.BACKUP_NONE
self.statefile= StateFile(join(state_dir, self.STATE_FILE))
self.merger= DirectoryMerger( from_dir, to_dir, backup, replace )
self.state_dir= state_dir
def _changes_file( self ):
return join( self.state_dir, self.CHANGES_FILE)
def clean( self ):
if self.statefile.state==StateFile.CLEAN:
logging.info("Already clean")
else:
changes= MergeHistory.read_serialized_file( self._changes_file() )
self.merger.remove_changes( changes )
self.statefile.set_state( StateFile.CLEAN )
logging.info("Cleaned successfully")
def _apply(self):
if self.statefile.state==StateFile.APPLIED:
logging.warning("Already applied. If you want to apply again, you need to clean first")
raise self.AlreadyApplied
changes= self.merger.merge()
changes.serialize_to_file( self._changes_file() )
self.statefile.set_state( StateFile.APPLIED )
logging.info("Applied successfully")
def apply(self, allow_repeated=False):
if self.statefile.state==StateFile.APPLIED and allow_repeated:
self.clean()
self._apply()
#--This section has global-using and exit()ing functions, and keeps external state------------------------------
overlay= DirectoryOverlay( BASE_DIR, CUSTOM_DIR, STATE_DIR, DIRECTION )
def clean():
overlay.clean()
def apply():
overlay.apply(allow_repeated=False)
def reapply():
overlay.apply(allow_repeated=True)
if __name__=="__main__":
import sys
operations= {"clean":clean, "apply":apply, "reapply":reapply}
op= sys.argv[-1]
if len(sys.argv)<2 or not op in operations:
print USAGE
exit(1)
if not isdir(BASE_DIR) or not isdir(CUSTOM_DIR):
logging.critical("There is a problem with your directories (did you define the BASE_DIR and CUSTOM_DIR variables?)")
exit(2)
try:
operations[op]()
except DirectoryOverlay.AlreadyApplied:
exit(50)