Skip to content

Commit

Permalink
Working on addressing Codacy issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Meadows committed Sep 29, 2014
1 parent 62cfdda commit f3788cf
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 43 deletions.
14 changes: 13 additions & 1 deletion etltest/test/test_code_generator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
These are the tests for the Code Generator Module.
"""
__author__ = 'ameadows'

import unittest
Expand All @@ -8,7 +11,10 @@


class CodeGeneratorTest(unittest.TestCase):

"""
We start our tests by ensuring the environment is set up correctly using the first_run_rest. We then define
an output directory, and the location of our test files.
"""
def setUp(self):
SettingsManager().first_run_test()
self.out_dir = SettingsManager().find_setting('Locations', 'output')
Expand All @@ -18,6 +24,9 @@ def setUp(self):


def test_generate_single_test_file(self):
"""
Testing that we can generate a good PyUnit test file from a single yaml test file.
"""
CodeGenerator(in_file=self.test_file).generate_test()
# Testing the in_file option of code_generator
sample_file = os.path.join(self.main_path, 'etltest/samples/output/DataMart/UsersDim.py')
Expand All @@ -33,6 +42,9 @@ def test_generate_single_test_file(self):
self.assertEqual(given_result, expected_result)

def test_generate_multiple_test_file(self):
"""
Testing that we can generate good PyUnit test files from a directory of yaml test files.
"""
# Testing the in_dir option of code_generator.
CodeGenerator(in_dir=self.test_dir).generate_test()
sample_dir = os.path.join(self.main_path, 'etltest/samples/output/DataMart/')
Expand Down
34 changes: 23 additions & 11 deletions etltest/test/test_data_connector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
Testing the DataConnector module.
"""

__author__ = 'ameadows'

import unittest
Expand All @@ -21,30 +25,38 @@ def tearDown(self):
DataConnector(self.source).truncate_data(self.table)

def test_bad_connection(self):
#Testing to see if a non existent connection will fail gracefully.
"""
Testing to see if a non existent connection will fail gracefully.
"""
records = [1, 2]

with self.assertRaises(KeyError) as raises:
with self.assertRaises(KeyError):
DataConnector("BadConnection").generate_data(self.table, records)

def test_generate_data_all(self):
#Testing to see if the full data set is generated.
"""
Testing to see if the full data set is generated.
"""
given_result = DataConnector(self.source).generate_data(self.table)
expected_result = [{'first_name': 'Bob', 'last_name': 'Richards', 'user_id': 1, 'zipcode': 55555,
'birthday': datetime.date(2000, 1, 4), 'is_active': 0}, {'first_name': 'Sarah',
'last_name': 'Jenkins', 'user_id': 2, 'zipcode': 12345,
'birthday': datetime.date(2000, 2, 2), 'is_active': 1},{'first_name': 'Frank',
'last_name': 'Williams', 'user_id': 3, 'is_active': 0, 'zipcode': 56789,
'birthday': datetime.date(1972, 3, 3)},{'first_name': 'Thomas', 'last_name': 'Stedding',
'birthday': datetime.date(2000, 1, 4), 'is_active': 0},
{'first_name': 'Sarah', 'last_name': 'Jenkins', 'user_id': 2, 'zipcode': 12345,
'birthday': datetime.date(2000, 2, 2), 'is_active': 1},
{'first_name': 'Frank', 'last_name': 'Williams', 'user_id': 3, 'is_active': 0,
'zipcode': 56789, 'birthday': datetime.date(1972, 3, 3)},
{'first_name': 'Thomas', 'last_name': 'Stedding',
'user_id': 4, 'is_active': 1, 'zipcode': 44444, 'birthday': datetime.date(1923, 1, 4)}]

self.assertCountEqual(given_result, expected_result)

def test_generate_data_subset(self):
#Testing to see if a subset of data is generated and not the full data set.
"""
Testing to see if a subset of data is generated and not the full data set.
"""
given_result = DataConnector(self.source).generate_data(self.table, self.records)
expected_result = [{'first_name': 'Bob', 'last_name': 'Richards', 'user_id': 1, 'zipcode': 55555,
'birthday': datetime.date(2000, 1, 4), 'is_active': 0}, {'first_name': 'Sarah',
'last_name': 'Jenkins', 'user_id': 2, 'zipcode': 12345,
'birthday': datetime.date(2000, 1, 4), 'is_active': 0},
{'first_name': 'Sarah', 'last_name': 'Jenkins', 'user_id': 2, 'zipcode': 12345,
'birthday': datetime.date(2000, 2, 2), 'is_active': 1}]

self.assertEqual(given_result, expected_result)
Expand Down
4 changes: 2 additions & 2 deletions etltest/test/test_etlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
# SettingsManager().first_run_test()
#
# self.in_file = SettingsManager().system_variable_replace("${"
# "ETL_TEST_ROOT}/etltest/samples/test/dataMart/users_dim.yml")
# "ETL_TEST_ROOT}/etltest/samples/test/dataMart/users_dim.yml")
# self.in_dir = SettingsManager().system_variable_replace("${ETL_TEST_ROOT}/etltest/samples/test/dataMart/")
#
# self.expected_output = SettingsManager().system_variable_replace("${"
# "ETL_TEST_ROOT}/etltest/samples/output/DataMart/UsersDim.py")
# "ETL_TEST_ROOT}/etltest/samples/output/DataMart/UsersDim.py")
# self.process = os.path.join(SettingsManager().get_file_location(), "etltest/etlTest.py")
#
# def test_in_file_generation_output(self):
Expand Down
42 changes: 30 additions & 12 deletions etltest/test/test_process_executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
These are the tests for the Process Executor.
"""
__author__ = 'ameadows'

import unittest
Expand All @@ -9,20 +12,19 @@


class ProcessExecutorTests(unittest.TestCase):

"""
We test the Process Executor by setting up our environment with the first run test and set up specific locations to
ensure that they exist. We then can run our tests.
"""
def setUp(self):
SettingsManager().first_run_test()
self.executor = ProcessExecutor('PDI')
self.process_job = SettingsManager().system_variable_replace(
'${ETL_TEST_ROOT}/etltest/samples/etl/data_mart/user_dim_jb.kjb')
self.process_trans = SettingsManager().system_variable_replace(
'${ETL_TEST_ROOT}/etltest/samples/etl/data_mart/user_dim_load_tr.ktr')

#self.process_job = "/opt/pentaho/pdi/src/com/redhat/bi/sandbox/test_failure_job_job.kjb"
#self.process_trans = "/opt/pentaho/pdi/src/com/redhat/bi/sandbox/test_memory.ktr"
self.process_job = SettingsManager().system_variable_replace('${ETL_TEST_ROOT}/etltest/'
'samples/etl/data_mart/user_dim_jb.kjb')
self.process_trans = SettingsManager().system_variable_replace('${ETL_TEST_ROOT}/etltest/'
'samples/etl/data_mart/user_dim_load_tr.ktr')

shared_file = SettingsManager().system_variable_replace(
'${ETL_TEST_ROOT}/etltest/samples/etl/shared.xml')
shared_file = SettingsManager().system_variable_replace('${ETL_TEST_ROOT}/etltest/samples/etl/shared.xml')

kettle_settings = SettingsManager().system_variable_replace('${ETL_TEST_ROOT}/.kettle')

Expand All @@ -33,34 +35,50 @@ def setUp(self):
if os.path.isfile(shared_file_target) is False:
copyfile(shared_file, shared_file_target)

self.mysql_driver = SettingsManager().system_variable_replace('${'
'TOOL_PATH}/lib/mysql-connector-java-5.1.31-bin.jar')
self.mysql_driver = SettingsManager().system_variable_replace('${TOOL_PATH}/lib/'
'mysql-connector-java-5.1.31-bin.jar')

def test_sample_job_exists(self):
"""
We need to verify that the sample job exists that is to be run. Mainly for troubleshooting.
"""
given_result = os.path.isfile(self.process_job)
expected_result = True

self.assertEqual(given_result, expected_result)

def test_sample_trans_exists(self):
"""
Testing that the sample transformation also exists. Mainly for troubleshooting.
"""
given_result = os.path.isfile(self.process_trans)
expected_result = True

self.assertEqual(given_result, expected_result)

def test_mysql_driver_exists(self):
"""
Testing that the MySQL JDBC driver exists. Mainly for troubleshooting.
"""
given_result = os.path.isfile(self.mysql_driver)
expected_result = True

self.assertEqual(given_result, expected_result)

def test_process_executor_job(self):
"""
Testing that the process executor is able to pick up the job settings and run the sample job.
"""
given_result = self.executor.execute_process('job', self.process_job)
expected_result = 0

self.assertEqual(given_result, expected_result)

def test_process_executor_trans(self):
"""
Testing that the process executor is able to pick up the transformation settings and run the sample
transformation.
"""
given_result = self.executor.execute_process('trans', self.process_trans)
expected_result = 0

Expand Down
Loading

0 comments on commit f3788cf

Please sign in to comment.