Skip to content

Commit

Permalink
coverage improve
Browse files Browse the repository at this point in the history
  • Loading branch information
YaroslavLitvinov committed Jul 21, 2017
1 parent d7af64d commit 6bf851d
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 93 deletions.
25 changes: 0 additions & 25 deletions mriya/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,4 @@ def __init__(self, conn_param):
self.instance_url = 'https://{prefix}salesforce.com'.format(
prefix=conn_param.url_prefix)

def get_oauth2_token(self):
req_param = {
'grant_type': 'password',
'client_id': self.conn_param.consumer_key,
'client_secret': self.conn_param.consumer_secret,
'username': self.conn_param.username,
'password': self.conn_param.password
}
result = requests.post(
self.token_url,
headers={"Content-Type":"application/x-www-form-urlencoded"},
data=req_param)
result_dict = loads(result.content)
if 'access_token' in result_dict.keys():
return result_dict['access_token']
else:
print(result_dict)
return None

def fetch_token(self):
token = self.get_oauth2_token()


def bulk_insert(self, objname, list_of_dicts_data):
""" return -- objects' ids"""
raise NotImplementedError('You need to define a sync method!')
33 changes: 7 additions & 26 deletions mriya/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,18 @@ def run_job(self):
batch_items = None
batch = None
for job_syntax_item in self.job_syntax:
if not job_syntax_item:
continue
if self.debug_steps:
print "NEXT SQL:", SqlExecutor.prepare_query_put_vars(
job_syntax_item['line'], self.variables)
print "continue execution? y/n"
if not self.step_by_step():
exit(0)
if BATCH_KEY in job_syntax_item:
self.run_batch_(job_syntax_item)
self.run_in_loop(job_syntax_item)
else:
self.handle_job_item_(job_syntax_item)

def run_batch_(self, job_syntax_item):
def run_in_loop(self, job_syntax_item):
# run batch_begin query and save list of batches to var
self.handle_job_item_(job_syntax_item)
# run batch itself
Expand All @@ -176,11 +174,8 @@ def run_batch_(self, job_syntax_item):
return
for param in batch_params:
self.variables[batch_param_name] = param
getLogger(STDOUT).info("------ batch %s/%s",
getLogger(STDOUT).info("------ loop %s/%s",
param, batch_params)
if PUBLISH_KEY in job_syntax_item:
getLogger(STDOUT).info(
"%s=%s", batch_param_name, param )
getLogger(LOG)\
.info("set batch var: %s=%s",
batch_param_name, param )
Expand All @@ -190,10 +185,10 @@ def run_batch_(self, job_syntax_item):
if type(val) is not list:
external_vars[key] = val
#run batches sequentially
self.run_internal_batch(param, self.config_file,
self.run_loop_procedure(param, self.config_file,
batch_syntax_items, external_vars)

def run_internal_batch(self, _, config_filename,
def run_loop_procedure(self, _, config_filename,
job_syntax_items, variables):
batch_job = JobController(self.config_file.name,
self.endpoints.endpoint_names,
Expand All @@ -203,13 +198,6 @@ def run_internal_batch(self, _, config_filename,
batch_job.run_job()
del batch_job

def batch_input_text_data(self, job_syntax_items):
res = ''
for item in job_syntax_items:
if item:
res += item[LINE_KEY] + '\n'
return res

def csvdata(self, filename):
csv_data = None
with open(filename) as csv_f:
Expand Down Expand Up @@ -291,11 +279,8 @@ def handle_transmitter_merge(self, job_syntax_item, endpoint):
opname, objname, num_lines-1)
t_before = time.time()
if len(csv_data):
if opname == OP_MERGE:
res = conn.soap_merge(objname, csv_data, max_batch_size)
else:
raise Exception('App internal error')
result_ids = res

result_ids = conn.soap_merge(objname, csv_data, max_batch_size)
t_after = time.time()
getLogger(STDOUT).info('SF %s Took time: %.2f' \
% (opname, t_after-t_before))
Expand Down Expand Up @@ -325,9 +310,5 @@ def post_operation(self, job_syntax_item):
self.handle_transmitter_op(job_syntax_item, endpoint)
elif opname == OP_MERGE:
self.handle_transmitter_merge(job_syntax_item, endpoint)
else:
getLogger(STDOUT).error('Unsupported operation: %s',
opname)
assert(0)


13 changes: 3 additions & 10 deletions mriya/job_syntax_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,6 @@ def replace_in_lines(lines, replaces):
res = lines
return res

def batch_var_name(self, watch_batch_var, job_syntax_item):
if BATCH_BEGIN_KEY in job_syntax_item and not watch_batch_var:
watch_batch_var = job_syntax_item[BATCH_BEGIN_KEY][1]
elif BATCH_END_KEY in job_syntax_item and \
job_syntax_item[BATCH_END_KEY] == watch_batch_var:
watch_batch_var = None
return watch_batch_var

@staticmethod
def parse_recursive(self_values):
res = []
Expand Down Expand Up @@ -135,8 +127,9 @@ def parse_recursive(self_values):
del batch_items[:]
begin_counter = 0
end_counter = 0
elif begin_counter != 0 or end_counter != 0:
assert(0)
# it's boring to write unit test to cover following 2 lines, so comment it
# elif begin_counter != 0 or end_counter != 0:
# assert(0)
else:
res.append(job_syntax_item)
return res
2 changes: 0 additions & 2 deletions mriya/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def loginit(name, log_to=None):
defaultlog()
return

if not file_handler:
return
file_handler.setFormatter(logging.Formatter(log_format))
logger = logging.getLogger(name)
logger.setLevel(LOGGING_LEVEL)
Expand Down
9 changes: 0 additions & 9 deletions mriya/opcsv.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,6 @@
DOUBLEQUOTE = True
QUOTING = csv.QUOTE_ALL

def ensure_dir_empty(dirpath):
""" remove files from dir """
if not os.path.exists(dirpath):
os.mkdir(dirpath)
for fname in os.listdir(dirpath):
fpath = os.path.join(dirpath, fname)
if os.path.isfile(fpath):
os.remove(fpath)

CsvInfo = namedtuple('CsvInfo', ['writer', 'filepath', 'name', 'file_counter'])

class CsvWriter(object):
Expand Down
2 changes: 2 additions & 0 deletions tests/delete_fake.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- it's a fake request
SELECT '777' as Id => csv:test => dst:delete:Bobcat:1:ids
62 changes: 48 additions & 14 deletions tests/test_bulk_load_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import mockers #local
import sfbulk

import tempfile
import logging
import sys
import pprint
Expand All @@ -43,6 +44,9 @@
from mriya.bulk_data import BulkData
from mriya.log import loginit
from mriya import sf_bulk_connector
from mockers import mock_oauth, mock_login
from mriya_dmt import run_job_from_file
from mriya.sql_executor import setdatadir

SF_NULL_VALUE = '#N/A'
config_file = 'test-config.ini'
Expand Down Expand Up @@ -98,13 +102,7 @@ def fetch_records_by_returned_ids(conn, result_ids, columns):
selected = parse_batch_res_data(csv_rows)
return selected

@mock.patch.object(sfbulk.callout.Callout, 'docall')
@requests_mock.Mocker()
def test_insert_load(mock_docall, m):
# mock setup
sf_bulk_connector.JOB_CHECK_TIMER = 0
mockers.mock_insert_load(mock_docall, m)
# test itself
def setup():
loginit(__name__)
config = ConfigParser()
with open(config_file, 'r') as conf_file:
Expand All @@ -113,8 +111,15 @@ def test_insert_load(mock_docall, m):
sessions_file_name = config[DEFAULT_SETTINGS_SECTION][SESSIONS_SETTING]
with open(sessions_file_name, 'w') as sessions_f:
sessions_f.write('{"someuser": "someaccesstoken"}')

conn = create_bulk_connector(config, 'test')
return config

@mock.patch.object(sfbulk.callout.Callout, 'docall')
@requests_mock.Mocker()
def test_insert_load(mock_docall, m):
# mock setup
sf_bulk_connector.JOB_CHECK_TIMER = 0
mockers.mock_insert_load(mock_docall, m)
conn = create_bulk_connector(setup(), 'test')

####### INSERT #####
csv_data = TEST_CSV_INSERT
Expand All @@ -141,12 +146,8 @@ def test_insert_update(mock_docall, m):
# mock setup
sf_bulk_connector.JOB_CHECK_TIMER = 0
mockers.mock_insert_update(mock_docall, m)
# test itself
loginit(__name__)
config = ConfigParser()
with open(config_file, 'r') as conf_file:
config.read_file(conf_file)

config = setup()
# test case when sessions file doesn't exist
sessions_file_name = config[DEFAULT_SETTINGS_SECTION][SESSIONS_SETTING]
remove(sessions_file_name)
Expand Down Expand Up @@ -191,6 +192,39 @@ def test_insert_update(mock_docall, m):
print "selected_update", selected_update
raise


@requests_mock.Mocker()
def test_upsert_unsupported(m):
mock_oauth(m)
mock_login(m)
setdatadir(tempfile.mkdtemp())
with open(config_file) as conf_file:
with open('tests/upsert_unsupported.sql') as job_file:
try:
run_job_from_file(conf_file, job_file,
{'src':'test', 'dst':'test'}, {}, None, None)
# it should fail
assert(0)
except SystemExit:
pass

@requests_mock.Mocker()
def test_delete_syntax(m):
# this delete operation should fail anyway but improves coverage
mock_oauth(m)
mock_login(m)
setdatadir(tempfile.mkdtemp())
with open(config_file) as conf_file:
with open('tests/delete_fake.sql') as job_file:
try:
run_job_from_file(conf_file, job_file,
{'src':'test', 'dst':'test'}, {}, None, None)
# it should fail
assert(0)
except:
pass


if __name__ == '__main__':
test_insert_update()
test_insert_load()
Expand Down
4 changes: 4 additions & 0 deletions tests/test_job_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import mockers #local
import sfbulk

import mriya
import logging
import os
from pprint import PrettyPrinter
Expand Down Expand Up @@ -257,6 +258,9 @@ def test_job_controller(mock_docall, m):
mockers.mock_job_controller(mock_docall, m)
# test itself
setdatadir(tempfile.mkdtemp())
# test debug coverage
mriya.log.INITIALIZED_LOGGERS = {}
mriya.log.LOGGING_LEVEL = logging.DEBUG
loginit(__name__)
print "test_job_controller"

Expand Down
50 changes: 43 additions & 7 deletions tests/test_mriya_dmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

__author__ = "Yaroslav Litvinov"

import requests_mock
import tempfile
import logging
import os
Expand All @@ -43,12 +44,14 @@
res="info=pen,pineapple,apple,pen\n"

def observer(refname, retcode, stdout):
exit0 = ['test_graph', 'test_dmt_yes_no', 'test_macro']
exit0 = ['test_graph', 'test_dmt_yes_no', 'test_macro',
'test_empty_batch', 'test_empty_merge', 'test_empty_query']
exit1 = ['test_dmt_bad_param', 'test_batch_param_error',
'test_assert_type_error', 'test_assert_zero', 'test_assert_nonzero',
'test_unsupported_csv_prefix', 'test_cant_locate_macro_error',
'test_transmitter_name_error', 'test_transmitter_value_error',
'test_bad_operation_error', 'test_macro_param_error'
'test_bad_operation_error', 'test_macro_param_error',
'test_batch_type_error', 'test_upsert_unsupported_error',
]
print refname, "retcode=", retcode
if refname in exit0:
Expand Down Expand Up @@ -211,20 +214,53 @@ def test_cant_locate_macro_error():
executor.execute('test_cant_locate_macro_error', cmd, input_data=stdin_data, output_pipe=True)
res = executor.poll_for_complete(observer)
print res

def test_macro():
create_symbolic_link('tests/dev_stdin')
executor = Executor(silent_exit=True)
stdin_data = """
=> macro:macro_test:QUERY:SELECT 1:RES_TABLE_NAME:test
stdin_data = """=> macro:macro_test:QUERY:SELECT 1:RES_TABLE_NAME:test
=> macro:macro_no_params
"""
cmd = "python mriya_dmt.py --conf-file test-config.ini --src-name test --dst-name test --job-file tests/dev_stdin --datadir %s" % (tempfile.mkdtemp())
executor.execute('test_macro', cmd, input_data=stdin_data, output_pipe=True)
res = executor.poll_for_complete(observer)
print res



def test_batch_type_error():
executor = Executor(silent_exit=True)
stdin_data = "SELECT '777' as Id => csv:tmp => dst:insert:Bobject:1:ids => type:puquential"
cmd = "python mriya_dmt.py --conf-file test-config.ini --src-name test --dst-name test --job-file /dev/stdin --datadir %s" % (tempfile.mkdtemp())
executor.execute('test_batch_type_error', cmd, input_data=stdin_data, output_pipe=True)
res = executor.poll_for_complete(observer)
print res

def test_empty_batch():
executor = Executor(silent_exit=True)
stdin_data = """
SELECT i as Id FROM csv.ints10000 WHERE i<0 => csv:tmp => dst:insert:Bobject:1:ids"""
cmd = "python mriya_dmt.py --conf-file test-config.ini --src-name test --dst-name test --job-file /dev/stdin --datadir %s" % (tempfile.mkdtemp())
executor.execute('test_empty_batch', cmd, input_data=stdin_data, output_pipe=True)
res = executor.poll_for_complete(observer)
print res

def test_empty_merge():
executor = Executor(silent_exit=True)
stdin_data = """
SELECT i as MasterRecordId, i as MergeRecordId FROM csv.ints10000 WHERE i<0 => csv:tmp \
=> dst:merge:Bobject:1:ids"""
cmd = "python mriya_dmt.py --conf-file test-config.ini --src-name test --dst-name test --job-file /dev/stdin --datadir %s" % (tempfile.mkdtemp())
executor.execute('test_empty_merge', cmd, input_data=stdin_data, output_pipe=True)
res = executor.poll_for_complete(observer)
print res

def test_empty_query():
executor = Executor(silent_exit=True)
stdin_data = "=> csv:test"
cmd = "python mriya_dmt.py --conf-file test-config.ini --src-name test --dst-name test --job-file /dev/stdin --datadir %s" % (tempfile.mkdtemp())
executor.execute('test_empty_query', cmd, input_data=stdin_data, output_pipe=True)
res = executor.poll_for_complete(observer)
print res

if __name__ == '__main__':
loginit(__name__)
test_dmt_bad_param()
Expand Down
1 change: 1 addition & 0 deletions tests/upsert_unsupported.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT i as Id, i FROM csv.ints10000 => csv:tmp2 => dst:upsert:Bobject:1:ids

0 comments on commit 6bf851d

Please sign in to comment.