New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test:add-test-model #761
test:add-test-model #761
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
import sys | ||
import time | ||
import swagger_client | ||
|
||
class Server: | ||
def __init__(self, endpoint, verify_ssl): | ||
self.endpoint = endpoint | ||
self.verify_ssl = verify_ssl | ||
|
||
class Credential: | ||
def __init__(self, type, username, password): | ||
self.type = type | ||
self.username = username | ||
self.password = password | ||
|
||
def _create_client(server, credential, debug): | ||
cfg = swagger_client.Configuration() | ||
cfg.host = server.endpoint | ||
cfg.verify_ssl = server.verify_ssl | ||
# support basic auth only for now | ||
cfg.username = credential.username | ||
cfg.password = credential.password | ||
cfg.debug = debug | ||
return swagger_client.DefaultApi(swagger_client.ApiClient(cfg)) | ||
|
||
def _assert_status_code(expect_code, return_code): | ||
if str(return_code) != str(expect_code): | ||
raise Exception(r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}.".format(expect_code, return_code)) | ||
|
||
def _assert_status_body(expect_status_body, returned_status_body): | ||
if expect_status_body.strip() != returned_status_body.strip(): | ||
raise Exception(r"HTTPS status body s not as we expected. Expected {}, while actual HTTPS status body is {}.".format(expect_status_body, returned_status_body)) | ||
|
||
def _random_name(prefix): | ||
return "%s-%d" % (prefix, int(round(time.time() * 1000))) | ||
|
||
def _get_id_from_header(header): | ||
try: | ||
location = header["Location"] | ||
return location.split("/")[-1] | ||
except Exception: | ||
return None | ||
|
||
def _get_string_from_unicode(udata): | ||
result='' | ||
for u in udata: | ||
tmp = u.encode('utf8') | ||
result = result + tmp.strip('\n\r\t') | ||
return result | ||
|
||
class Base: | ||
def __init__(self, | ||
server = Server(endpoint="http://192.168.17.100:30022", verify_ssl=False), | ||
credential = Credential(type="basic_auth", username="admin", password="Harbor12345"), | ||
debug = True): | ||
if not isinstance(server.verify_ssl, bool): | ||
server.verify_ssl = server.verify_ssl == "True" | ||
self.server = server | ||
self.credential = credential | ||
self.debug = debug | ||
self.client = _create_client(server, credential, debug) | ||
|
||
def _get_client(self, **kwargs): | ||
if len(kwargs) == 0: | ||
return self.client | ||
server = self.server | ||
if "endpoint" in kwargs: | ||
server.endpoint = kwargs.get("endpoint") | ||
if "verify_ssl" in kwargs: | ||
if not isinstance(kwargs.get("verify_ssl"), bool): | ||
server.verify_ssl = kwargs.get("verify_ssl") == "True" | ||
else: | ||
server.verify_ssl = kwargs.get("verify_ssl") | ||
credential = self.credential | ||
if "type" in kwargs: | ||
credential.type = kwargs.get("type") | ||
if "username" in kwargs: | ||
credential.username = kwargs.get("username") | ||
if "password" in kwargs: | ||
credential.password = kwargs.get("password") | ||
return _create_client(server, credential, self.debug) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
from library import base | ||
import swagger_client | ||
from swagger_client.rest import ApiException | ||
|
||
def is_member_exist_in_project(members, member_user_name, expected_member_role_id = None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove unrelated codes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
result = False | ||
for member in members: | ||
if member.entity_name == member_user_name: | ||
if expected_member_role_id != None: | ||
if member.role_id == expected_member_role_id: | ||
return True | ||
else: | ||
return True | ||
return result | ||
|
||
class Project(base.Base): | ||
def create_project(self, name=None, metadata=None, expect_status_code = 201, expect_response_body = None, **kwargs): | ||
if name is None: | ||
name = base._random_name("project") | ||
if metadata is None: | ||
metadata = {} | ||
client = self._get_client(**kwargs) | ||
|
||
try: | ||
_, status_code, header = client.projects_post_with_http_info(swagger_client.ProjectReq(name, metadata)) | ||
except ApiException as e: | ||
base._assert_status_code(expect_status_code, e.status) | ||
if expect_response_body is not None: | ||
base._assert_status_body(expect_response_body, e.body) | ||
return | ||
base._assert_status_code(expect_status_code, status_code) | ||
base._assert_status_code(201, status_code) | ||
return base._get_id_from_header(header), name | ||
|
||
|
||
def get_projects(self, params, **kwargs): | ||
client = self._get_client(**kwargs) | ||
data = [] | ||
data, status_code, _ = client.apis_v1alpha1_projects_get_with_http_info(**params) | ||
base._assert_status_code(200, status_code) | ||
return data | ||
|
||
def projects_should_exist(self, params, expected_count = None, expected_project_id = None, **kwargs): | ||
project_data = self.get_projects(params, **kwargs) | ||
actual_count = len(project_data) | ||
if expected_count is not None and actual_count!= expected_count: | ||
raise Exception(r"Private project count should be {}.".format(expected_count)) | ||
if expected_project_id is not None and actual_count == 1 and str(project_data[0].project_id) != str(expected_project_id): | ||
raise Exception(r"Project-id check failed, expect {} but got {}, please check this test case.".format(str(expected_project_id), str(project_data[0].project_id))) | ||
|
||
def check_project_name_exist(self, name=None, **kwargs): | ||
client = self._get_client(**kwargs) | ||
_, status_code, _ = client.projects_head_with_http_info(name) | ||
return { | ||
200: True, | ||
404: False, | ||
}.get(status_code,'error') | ||
|
||
def get_project(self, project_id, expect_status_code = 200, expect_response_body = None, **kwargs): | ||
client = self._get_client(**kwargs) | ||
try: | ||
data, status_code, _ = client.projects_project_id_get_with_http_info(project_id) | ||
except ApiException as e: | ||
base._assert_status_code(expect_status_code, e.status) | ||
if expect_response_body is not None: | ||
base._assert_status_body(expect_response_body, e.body) | ||
return | ||
|
||
base._assert_status_code(expect_status_code, status_code) | ||
base._assert_status_code(200, status_code) | ||
return data | ||
|
||
def update_project(self, project_id, metadata, **kwargs): | ||
client = self._get_client(**kwargs) | ||
project = swagger_client.Project(project_id, None, None, None, None, None, None, None, None, None, None, metadata) | ||
_, status_code, _ = client.projects_project_id_put_with_http_info(project_id, project) | ||
base._assert_status_code(200, status_code) | ||
|
||
def delete_project(self, project_id, expect_status_code = 200, **kwargs): | ||
client = self._get_client(**kwargs) | ||
_, status_code, _ = client.projects_project_id_delete_with_http_info(project_id) | ||
base._assert_status_code(expect_status_code, status_code) | ||
|
||
def get_project_metadata_by_name(self, project_id, meta_name, expect_status_code = 200, **kwargs): | ||
client = self._get_client(**kwargs) | ||
ProjectMetadata = swagger_client.ProjectMetadata() | ||
ProjectMetadata, status_code, _ = client.projects_project_id_metadatas_meta_name_get_with_http_info(project_id, meta_name) | ||
base._assert_status_code(expect_status_code, status_code) | ||
return { | ||
'public': ProjectMetadata.public, | ||
'enable_content_trust': ProjectMetadata.enable_content_trust, | ||
'prevent_vul': ProjectMetadata.prevent_vul, | ||
'auto_scan': ProjectMetadata.auto_scan, | ||
'severity': ProjectMetadata.severity, | ||
}.get(meta_name,'error') | ||
|
||
def get_project_log(self, project_id, expect_status_code = 200, **kwargs): | ||
client = self._get_client(**kwargs) | ||
body, status_code, _ = client.projects_project_id_logs_get_with_http_info(project_id) | ||
base._assert_status_code(expect_status_code, status_code) | ||
return body | ||
|
||
def filter_project_logs(self, project_id, operator, repository, tag, operation_type, **kwargs): | ||
access_logs = self.get_project_log(project_id, **kwargs) | ||
count = 0 | ||
for each_access_log in list(access_logs): | ||
if each_access_log.username == operator and \ | ||
each_access_log.repo_name.strip(r'/') == repository and \ | ||
each_access_log.repo_tag == tag and \ | ||
each_access_log.operation == operation_type: | ||
count = count + 1 | ||
return count | ||
|
||
def get_project_members(self, project_id, **kwargs): | ||
client = self._get_client(**kwargs) | ||
return client.projects_project_id_members_get(project_id) | ||
|
||
def get_project_member(self, project_id, member_id, expect_status_code = 200, expect_response_body = None, **kwargs): | ||
client = self._get_client(**kwargs) | ||
data = [] | ||
try: | ||
data, status_code, _ = client.projects_project_id_members_mid_get_with_http_info(project_id, member_id,) | ||
except ApiException as e: | ||
base._assert_status_code(expect_status_code, e.status) | ||
if expect_response_body is not None: | ||
base._assert_status_body(expect_response_body, e.body) | ||
return | ||
|
||
base._assert_status_code(expect_status_code, status_code) | ||
base._assert_status_code(200, status_code) | ||
return data | ||
|
||
def check_project_member_not_exist(self, project_id, member_user_name, **kwargs): | ||
members = self.get_project_members(project_id, **kwargs) | ||
result = is_member_exist_in_project(list(members), member_user_name) | ||
if result == True: | ||
raise Exception(r"User {} should not be a member of project with ID {}.".format(member_user_name, project_id)) | ||
|
||
def check_project_members_exist(self, project_id, member_user_name, expected_member_role_id = None, **kwargs): | ||
members = self.get_project_members(project_id, **kwargs) | ||
result = is_member_exist_in_project(members, member_user_name, expected_member_role_id = expected_member_role_id) | ||
if result == False: | ||
raise Exception(r"User {} should be a member of project with ID {}.".format(member_user_name, project_id)) | ||
|
||
def update_project_member_role(self, project_id, member_id, member_role_id, expect_status_code = 200, **kwargs): | ||
client = self._get_client(**kwargs) | ||
role = swagger_client.Role(role_id = member_role_id) | ||
data = [] | ||
data, status_code, _ = client.projects_project_id_members_mid_put_with_http_info(project_id, member_id, role = role) | ||
base._assert_status_code(expect_status_code, status_code) | ||
return data | ||
|
||
def delete_project_member(self, project_id, member_id, expect_status_code = 200, **kwargs): | ||
client = self._get_client(**kwargs) | ||
_, status_code, _ = client.projects_project_id_members_mid_delete_with_http_info(project_id, member_id) | ||
base._assert_status_code(expect_status_code, status_code) | ||
|
||
def add_project_members(self, project_id, user_id, member_role_id = None, expect_status_code = 201, **kwargs): | ||
if member_role_id is None: | ||
member_role_id = 1 | ||
_member_user = {"user_id": int(user_id)} | ||
projectMember = swagger_client.ProjectMember(member_role_id, member_user = _member_user) | ||
client = self._get_client(**kwargs) | ||
data = [] | ||
data, status_code, header = client.projects_project_id_members_post_with_http_info(project_id, project_member = projectMember) | ||
base._assert_status_code(201, status_code) | ||
return base._get_id_from_header(header) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from __future__ import absolute_import | ||
|
||
import unittest | ||
|
||
|
||
from testutils import TEARDOWN | ||
from testutils import ADMIN_CLIENT | ||
from library.project import Project | ||
|
||
|
||
class TestProjects(unittest.TestCase): | ||
@classmethod | ||
def setUp(self): | ||
project = Project() | ||
self.project= project | ||
|
||
|
||
@classmethod | ||
def tearDown(self): | ||
print ("Case completed") | ||
|
||
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") | ||
def test_ClearData(self): | ||
#1. Delete repository(RA) by admin; | ||
kk = {"x_tenant":"devops"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use meaningful var name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
data = self.project.get_projects(kk) | ||
print(data) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import time | ||
import os | ||
import sys | ||
|
||
|
||
from swagger_client.rest import ApiException | ||
import swagger_client.models | ||
from pprint import pprint | ||
|
||
#CLIENT=dict(endpoint="https://"+harbor_server+"/api") | ||
ADMIN_CLIENT=dict(endpoint = "https://"+"192.168.17.100:30022"+"/apis") | ||
USER_ROLE=dict(admin=0,normal=1) | ||
TEARDOWN = True | ||
|
||
def GetProductApi(username, password, harbor_server= "192.168.17.100:30022"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is 'product' ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I forget to change the name. |
||
|
||
cfg = swagger_client.Configuration() | ||
cfg.host = "https://"+harbor_server+"/api" | ||
cfg.username = username | ||
cfg.password = password | ||
cfg.verify_ssl = False | ||
cfg.debug = True | ||
api_client = swagger_client.ApiClient(cfg) | ||
api_instance = swagger_client.DefaultApi(api_client) | ||
return api_instance | ||
class TestResult(object): | ||
def __init__(self): | ||
self.num_errors = 0 | ||
self.error_message = [] | ||
def add_test_result(self, error_message): | ||
self.num_errors = self.num_errors + 1 | ||
self.error_message.append(error_message) | ||
def get_final_result(self): | ||
if self.num_errors > 0: | ||
for each_err_msg in self.error_message: | ||
print ("Error message:", each_err_msg) | ||
raise Exception(r"Test case failed with {} errors.".format(self.num_errors)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you generate the client with docker container, so that we don't have to depend on
java
andpython3
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This client only has code, and does not need to be run. so it is just a package of python.
I do not think the docker container is the appropriate solution.