In [1]:
import os, re,json, imp, unittest
import pandas as pd

print('First Directory at :', os.getcwd()[2:])

#Load Setting File
with open('setting.txt','r') as f:
    setting = json.load(f)
print('Load Setting OK')    

os.chdir(re.sub(os.getcwd().split('\\')[-1]+'$','',os.getcwd()))
print('Change Directory into :', os.getcwd()[2:])

First Directory at : \01_DEV_Function\Shared_Function\unittest_example
Load Setting OK
Change Directory into : \01_DEV_Function\Shared_Function


In [2]:
local_library = True

In [3]:
if local_library : 
    da_tran_SQL = imp.load_source('da_tran_SQL', 'Shared_Function/data_connection/database.py').da_tran_SQL
else :
    from Shared_Function.data_connection.database import da_tran_SQL

def help_test(df_in1, df_in2) :
        result = list()
        expect = list()
        for i in df_in1.columns :
            result.append(list(df_in1[i]))
            expect.append(list(df_in2[i]))
        return result, expect

test_mssql = setting['MSSQL']
    
class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mssql['type'],
                                host_name = test_mssql['host'],
                                database_name = test_mssql['database'],
                                user = test_mssql['user'],
                                password = test_mssql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ CREATE OR ALTER PROCEDURE unit_test_git_SP (@PARAM1 AS VARCHAR(100))
                    AS
                    BEGIN
                    SELECT * FROM {} WHERE col1 = @PARAM1 
                    END""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'@PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
unittest.main(argv = ['first-arg-is-ignored'], exit = False)

  module = __import__("pymssql")


Connection OK
Start Filter Existing data from df at  2020-12-03 17:35:56.960085
Dump data to  unit_test_git  End  2020-12-03 17:35:57.153615


.

Connection OK
Start Filter Existing data from df at  2020-12-03 17:35:58.140087
Dump data to  unit_test_git  End  2020-12-03 17:35:58.329327


.

Connection OK
Start Filter Existing data from df at  2020-12-03 17:35:59.258694
Dump data to  unit_test_git

.

  End  2020-12-03 17:35:59.452361
Connection OK


.

Connection OK
Start delete old data at 2020-12-03 17:36:01.518434
Delete Last ['col1'] at 2020-12-03 17:36:01.608370


.

Dump data to  unit_test_git  End  2020-12-03 17:36:01.754600
Connection OK
Start delete old data at 2020-12-03 17:36:02.824003
Delete Last col1 at 2020-12-03 17:36:02.888649
Dump data to  unit_test_git  End  

.

2020-12-03 17:36:03.028564
Connection OK
Start delete old data at 2020-12-03 17:36:03.967220
Delete Last ['col1', 'col2'] at 2020-12-03 17:36:04.040897


.

Dump data to  unit_test_git  End  2020-12-03 17:36:04.174505
Connection OK
Start delete old data at 2020-12-03 17:36:05.104036
Delete Last ['col1', 'col2', 'col3'] at 2020-12-03 17:36:05.179664


.

Dump data to  unit_test_git  End  2020-12-03 17:36:05.332182
Connection OK
Start delete old data at 2020-12-03 17:36:06.258847
Delete Last ['col1'] at 2020-12-03 17:36:06.328676
Dump data to  unit_test_git  End  2020-12-03 17:36:06.468367


.

Connection OK
Start Filter Existing data from df at  2020-12-03 17:36:07.418642
Dump data to  unit_test_git  End  2020-12-03 17:36:07.909316
Start delete old data at 2020-12-03 17:36:07.910353
Delete Last ['date'] at 2020-12-03 17:36:07.978564


.

Dump data to  unit_test_git  End  2020-12-03 17:36:08.121438
Connection OK


.

Connection OK
Start Filter Existing data from df at  2020-12-03 17:36:10.138879


.

Dump data to  unit_test_git  End  2020-12-03 17:36:10.638629



----------------------------------------------------------------------
Ran 12 tests in 14.688s

OK


<unittest.main.TestProgram at 0x28757f7d908>