In [2]:
import unittest
import pandas as pd
import numpy as np
from libs.variable_function import (  
    fetch_last_data,
    fetch_exist_data,
    process_occurrence,
    process_last_weighted,
    process_average,
    process_weighted_average,
    process_difference,
    process_std,
    process_regression,
    calculate_weighted_sum
)

In [3]:
class TestFunctions(unittest.TestCase):

    def test_fetch_last_data(self):
        data = {
            'id': [1, 1, 2, 2],
            'col_name': [10, 20, 30, 40]
        }
        df = pd.DataFrame(data)
        result = fetch_last_data(df, 'col_name')
        expected = pd.DataFrame({
            'id': [1, 2],
            'col_name': [20, 40]
        })
        pd.testing.assert_frame_equal(result, expected)

    def test_fetch_exist_data(self):
        data = {
            'id': [1, 1, 2, 2],
            'col_name': [10, np.nan, 30, 40]
        }
        df = pd.DataFrame(data)
        result = fetch_exist_data(df, 'col_name')
        expected = pd.DataFrame({
            'id': [1, 2],
            'col_name': [1, 1]  # 1 means there is at least one non-null value
        })
        pd.testing.assert_frame_equal(result, expected)

    def test_process_occurrence(self):
        data = {
            'id': [1, 1, 2, 2],
            'col_name': [10, 20, 30, 40]
        }
        df = pd.DataFrame(data)
        result = process_occurrence(df, 'col_name')
        expected = pd.DataFrame({
            'id': [1, 2],
            'col_name_occurrence': [2, 2]
        })
        pd.testing.assert_frame_equal(result, expected)

    def test_process_last_weighted(self):
        data = {
            'id': [1, 1, 2, 2],
            'col_name': [10, 20, 30, 40],
            'weight': [0.5, 0.5, 1.0, 1.0]
        }
        df = pd.DataFrame(data)
        result = process_last_weighted(df, 'col_name')
        expected = pd.DataFrame({
            'id': [1, 2],
            'col_name_last_weighted': [10.0, 40.0]  # Weighted value calculation
        })
        pd.testing.assert_frame_equal(result, expected)

    def test_process_average(self):
        data = {
            'id': [1, 1, 2, 2],
            'col_name': [10, 20, 30, 40]
        }
        df = pd.DataFrame(data)
        result = process_average(df, 'col_name')
        expected = pd.DataFrame({
            'id': [1, 2],
            'col_name_mean': [15.0, 35.0]
        })
        pd.testing.assert_frame_equal(result, expected)

    def test_process_weighted_average(self):
        data = {
            'id': [1, 1, 2, 2],
            'col_name': [10, 20, 30, 40],
            'weight': [0.5, 0.5, 1.0, 1.0]
        }
        df = pd.DataFrame(data)
        result = process_weighted_average(df, 'col_name')
        expected = pd.DataFrame({
            'id': [1, 2],
            'col_name_weighted_avg': [15.0, 35.0]
        })
        pd.testing.assert_frame_equal(result, expected)

    def test_process_difference(self):
        data = {
            'id': [1, 1, 2, 2],
            'col_name': [10, 20, 20, 40],
            'date_col': ['2020-01-01', '2021-01-01', '2020-01-01', '2021-01-01']
        }
        df = pd.DataFrame(data)
        df['date_col'] = pd.to_datetime(df['date_col'])
        result = process_difference(df, 'col_name', 'date_col')
        expected = pd.DataFrame({
            'id': [1, 1, 2, 2],
            'col_name_difference': [10.0, np.nan, 20.0, np.nan]
        })
        pd.testing.assert_frame_equal(result, expected)

    def test_process_std(self):
        data = {
            'id': [1, 1, 2, 2],
            'col_name': [10, 20, 30, 40]
        }
        df = pd.DataFrame(data)
        result = process_std(df, 'col_name')
        expected = pd.DataFrame({
            'id': [1, 2],
            'col_name_std': [7.071068, 7.071068]  # Standard deviation calculation
        })
        pd.testing.assert_frame_equal(result, expected)

    def test_process_regression(self):
        data = {
            'id': [1, 1, 1, 2, 2, 2],
            'col_name': [10, 15, 20, 30, 35, 40],
            'diff': [1, 2, 3, 1, 2, 3]
        }
        df = pd.DataFrame(data)
        result = process_regression(df, 'col_name')
        expected = pd.DataFrame({
            'id': [1, 2],
            'col_name_regression': [5.0, 5.0]  # Example slope calculation
        })
        pd.testing.assert_frame_equal(result, expected)

    def test_calculate_weighted_sum(self):
        data = {
            'id': [1, 1, 2, 2],
            'col_name': [10, 20, 30, 40],
            'weight': [0.5, 0.5, 1.0, 1.0]
        }
        df = pd.DataFrame(data)
        result = calculate_weighted_sum(df, 'col_name')
        expected = pd.DataFrame({
            'id': [1, 2],
            'col_name_weighted': [15.0, 70.0]
        })
        pd.testing.assert_frame_equal(result, expected)


test_suite = unittest.TestLoader().loadTestsFromTestCase(TestFunctions)
test_runner = unittest.TextTestRunner()
test_runner.run(test_suite)


..........
----------------------------------------------------------------------
Ran 10 tests in 0.046s

OK


<unittest.runner.TextTestResult run=10 errors=0 failures=0>