-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathclone_access_modules.py
197 lines (158 loc) · 5.99 KB
/
clone_access_modules.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
""" Script to clone access modules from git urls specified in config.json """
import logging
import os
import shutil
import sys
import time
from git import Repo, GitCommandError
from . import helpers
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
handlers=[
logging.StreamHandler()
]
)
def ensure_access_modules_config(config):
""" Validate access_modules config """
if "access_modules" not in config:
raise Exception(
"Not configured properly."
"Config is expected to have access_modules key"
)
if "git_urls" not in config["access_modules"]:
raise Exception(
"Not configured properly."
" Config is expected to have git_urls key"
" as an array of git repos for access modules"
)
def remove_stale_cloned_modules():
"""
Remove already present modules so that we can clone new ones
This is to ensure that we are on the correct commit
"""
for each_access_module in os.listdir('Access/access_modules'):
helpers.remove_directory_with_contents(
f"Access/access_modules/{each_access_module}")
def initialize_init_file():
"""
Initialize __init__.py file in access modules root directory
from a template. This is to ensure that the modules are loaded
properly
"""
shutil.copyfile(
'Access/base_email_access/access_modules_init.py',
"Access/access_modules/__init__.py"
)
def get_repo_url_and_branch(formatted_git_arg):
""" Get url and branch from formatted git url string """
url = formatted_git_arg
target_branch = None
if "#" in url:
target_branch = url.split("#")[1]
url = url.split("#")[0]
return url, target_branch
def clone_repo(formatted_git_arg, retry_limit):
""" Clone a single repo """
url, target_branch = get_repo_url_and_branch(formatted_git_arg)
folder_name = url.split("/").pop()[:-4]
target_folder_path = "./Access/access_modules/" + folder_name
retry_exception = None
for clone_attempt in range(1, retry_limit + 1):
try:
logger.info("Cloning Repo")
if target_branch:
Repo.clone_from(url, target_folder_path, branch=target_branch)
else:
Repo.clone_from(url, target_folder_path)
except (GitCommandError, Exception) as exception:
sleep_time = 10 * clone_attempt
logger.error(
"Error while cloning repo. Error %s.",
exception,
)
logger.info(
"Retrying cloning: %s/%s. Backoff sleep %s",
clone_attempt,
retry_limit,
sleep_time,
)
retry_exception = exception
time.sleep(sleep_time)
else:
retry_exception = None
break
if retry_exception is not None:
logger.exception("Max retry count reached while cloning repo")
raise retry_exception
return target_folder_path
def move_modules_from_cloned_repo(cloned_path):
"""
Move access modules from the cloned folder to proper structure
"""
for each_path in os.listdir(cloned_path):
blacklist_paths = [
".git",
".github",
"secrets",
"docs",
]
if (
os.path.isdir(cloned_path + "/" + each_path)
and each_path not in blacklist_paths
):
try:
os.rename(
cloned_path + "/" + each_path,
"./Access/access_modules/" + each_path
)
except Exception as exception:
logger.exception(
"Failed to move access module to proper structure %s",
exception
)
raise Exception(
f"Failed to move access module to proper structure {exception}") from exception
def ensure_access_modules_requirements(
cloned_path, core_requirements_file_path):
""" Consolidate requirements from multiple access modules """
for each_path in os.listdir(cloned_path):
if each_path == "requirements.txt":
current_requirements_file = cloned_path + "/" + each_path
all_requirements = helpers.read_content_from_file(
current_requirements_file)
all_requirements.extend(
helpers.read_content_from_file(core_requirements_file_path))
# Ensure requirements are unique
merged_requirements = list(set(all_requirements))
# Update the requirements.txt
helpers.write_content_to_file(
core_requirements_file_path, sorted(merged_requirements))
def clone_access_modules():
""" Core function to clone access modules repo """
config = helpers.read_json_from_file("./config.json")
ensure_access_modules_config(config)
retry_limit = config["access_modules"].get("RETRY_LIMIT", 5)
git_urls = config["access_modules"]["git_urls"]
requirements_file_path = 'Access/access_modules/requirements.txt'
helpers.ensure_folder_exists('Access/access_modules')
remove_stale_cloned_modules()
initialize_init_file()
helpers.ensure_file_exists(requirements_file_path)
# Ensure cleanup of requirements file before starting cloning process
helpers.write_content_to_file(requirements_file_path, [])
for formatted_git_arg in git_urls:
cloned_path = clone_repo(formatted_git_arg, retry_limit)
ensure_access_modules_requirements(cloned_path, requirements_file_path)
move_modules_from_cloned_repo(cloned_path)
logger.info("Cloning successful!")
helpers.remove_directory_with_contents(cloned_path)
def __main__():
logger.info("Starting cloning setup")
try:
clone_access_modules()
except Exception as exception:
logger.exception("Access module cloning failed!")
logger.exception(exception)
sys.exit(1)
__main__()