diff --git a/backend/api/v1/v1_profile/serializers.py b/backend/api/v1/v1_profile/serializers.py index 48bbe060c..08d91ccbd 100644 --- a/backend/api/v1/v1_profile/serializers.py +++ b/backend/api/v1/v1_profile/serializers.py @@ -1,3 +1,4 @@ +import os import random import string from typing import Any, Dict, cast @@ -8,6 +9,10 @@ ) from utils.custom_serializer_fields import CustomPrimaryKeyRelatedField from utils.custom_generator import update_sqlite +from utils.custom_generator import ( + administration_csv_add, + administration_csv_update, +) class RelatedAdministrationField(serializers.PrimaryKeyRelatedField): @@ -180,6 +185,8 @@ def create(self, validated_data): 'path': instance.path } ) + TESTING = os.environ.get("TESTING") + administration_csv_add(data=instance, test=TESTING) for attribute in attributes: instance.attributes.create(**attribute) return instance @@ -207,6 +214,8 @@ def update(self, instance, validated_data): }, id=instance.id ) + TESTING = os.environ.get("TESTING") + administration_csv_update(data=instance, test=TESTING) return instance def _set_code(self, validated_data): diff --git a/backend/api/v1/v1_profile/tests/test_administration_csv_action.py b/backend/api/v1/v1_profile/tests/test_administration_csv_action.py new file mode 100644 index 000000000..c34365c3e --- /dev/null +++ b/backend/api/v1/v1_profile/tests/test_administration_csv_action.py @@ -0,0 +1,70 @@ +import pandas as pd + +from django.test import TestCase +from django.test.utils import override_settings +from django.core.management import call_command +from rtmis.settings import STORAGE_PATH +from api.v1.v1_profile.models import Administration, Levels +from utils.custom_generator import ( + administration_csv_add, + administration_csv_update, + administration_csv_delete +) + + +@override_settings(USE_TZ=False) +class AdministrationCSVTestCase(TestCase): + def setUp(self) -> None: + super().setUp() + call_command("administration_seeder", "--test") + call_command("download_all_administrations", "--test") + + def test_add_data_in_csv(self): + last = Levels.objects.order_by('-id').first() + parent = Administration.objects.filter(level__id=last.id - 1).first() + new_adm = Administration( + id=111, + name='New village', + level=last, + parent=parent + ) + new_adm.save() + filepath = administration_csv_add(data=new_adm, test=True) + self.assertEqual( + filepath, + f"{STORAGE_PATH}/master_data/kenya-administration_test.csv" + ) + df = pd.read_csv(filepath) + last_record = df.iloc[-1] + self.assertEqual(last_record["village"], "New village") + self.assertEqual(last_record["village_id"], 111) + + def test_update_data_in_csv(self): + adm_id = 5 + adm = Administration.objects.get(pk=adm_id) + adm.name = 'Village name changed' + adm.save() + + filepath = administration_csv_update(data=adm, test=True) + self.assertEqual( + filepath, + f"{STORAGE_PATH}/master_data/kenya-administration_test.csv" + ) + df = pd.read_csv(filepath) + contains_value = (df == "Village name changed").any().any() + self.assertTrue(contains_value) + + def test_delete_data_in_csv(self): + adm_id = 5 + adm = Administration.objects.get(pk=adm_id) + adm_name = adm.name + adm.delete() + + filepath = administration_csv_delete(id=adm_id, test=True) + self.assertEqual( + filepath, + f"{STORAGE_PATH}/master_data/kenya-administration_test.csv" + ) + df = pd.read_csv(filepath) + contains_value = (df == adm_name).any().any() + self.assertFalse(contains_value) diff --git a/backend/api/v1/v1_profile/tests/test_administrations.py b/backend/api/v1/v1_profile/tests/test_administrations.py index 622d6f6c1..c361076eb 100644 --- a/backend/api/v1/v1_profile/tests/test_administrations.py +++ b/backend/api/v1/v1_profile/tests/test_administrations.py @@ -1,3 +1,4 @@ +import os import typing from django.core.management import call_command from django.http import HttpResponse @@ -9,6 +10,8 @@ Administration, AdministrationAttribute, Levels) from api.v1.v1_profile.tests.mixins import ProfileTestHelperMixin +os.environ['TESTING'] = 'True' + @override_settings(USE_TZ=False) class AdministrationTestCase(TestCase, ProfileTestHelperMixin): diff --git a/backend/api/v1/v1_profile/views.py b/backend/api/v1/v1_profile/views.py index 2fd4c43e4..e8c6bab51 100644 --- a/backend/api/v1/v1_profile/views.py +++ b/backend/api/v1/v1_profile/views.py @@ -1,4 +1,5 @@ # Create your views here. +import os from typing import cast from wsgiref.util import FileWrapper from django.contrib.admin.sites import site @@ -33,6 +34,7 @@ from rest_framework.permissions import IsAuthenticated from utils.email_helper import send_email, EmailTypes from utils.custom_serializer_fields import validate_serializers_message +from utils.custom_generator import administration_csv_delete @extend_schema( @@ -117,6 +119,8 @@ def list(self, request, *args, **kwargs): def destroy(self, request, *args, **kwargs): instance = self.get_object() try: + TESTING = os.environ.get("TESTING") + administration_csv_delete(id=instance.pk, test=TESTING) instance.delete() except ProtectedError: _, _, _, protected = get_deleted_objects( diff --git a/backend/utils/custom_generator.py b/backend/utils/custom_generator.py index b1eeaf512..202d1639a 100644 --- a/backend/utils/custom_generator.py +++ b/backend/utils/custom_generator.py @@ -2,7 +2,8 @@ import sqlite3 import pandas as pd import logging -from rtmis.settings import MASTER_DATA +from rtmis.settings import MASTER_DATA, STORAGE_PATH +from api.v1.v1_profile.models import Administration logger = logging.getLogger(__name__) @@ -69,3 +70,91 @@ def update_sqlite(model, data, id=None): conn.rollback() finally: conn.close() + + +def administration_csv_add(data: dict, test: bool = False): + filename = "kenya-administration{0}.csv".format("_test" if test else "") + filepath = f"{STORAGE_PATH}/master_data/{filename}" + if os.path.exists(filepath): + df = pd.read_csv(filepath) + new_data = {} + if data.path: + parent_ids = list(filter( + lambda path: path, data.path.split(".") + )) + parents = Administration.objects.filter( + pk__in=parent_ids, + level__id__gt=1 + ).all() + for p in parents: + new_data[p.level.name.lower()] = p.name + new_data[f"{p.level.name.lower()}_id"] = p.id + new_data[data.level.name.lower()] = data.name + new_data[f"{data.level.name.lower()}_id"] = data.id + new_df = pd.DataFrame([new_data]) + df = pd.concat([df, new_df], ignore_index=True) + df.to_csv(filepath, index=False) + return filepath + else: + logger.error({ + 'context': 'insert_administration_row_csv', + 'message': "kenya-administration_test.csv doesn't exists" + }) + return None + + +def find_index_by_id(df, id): + for idx, row in df.iterrows(): + last_non_null_col = row.last_valid_index() + last_non_null_value = row[last_non_null_col] + if last_non_null_value == id: + return idx + return None + + +def administration_csv_update(data: dict, test: bool = False): + filename = "kenya-administration{0}.csv".format("_test" if test else "") + filepath = f"{STORAGE_PATH}/master_data/{filename}" + if os.path.exists(filepath): + df = pd.read_csv(filepath) + index = find_index_by_id(df=df, id=data.pk) + if index is not None: + if data.path: + parent_ids = list(filter( + lambda path: path, data.path.split(".") + )) + parents = Administration.objects.filter( + pk__in=parent_ids, + level__id__gt=1 + ).all() + for p in parents: + df.loc[index, p.level.name.lower()] = p.name + df.loc[index, f"{p.level.name.lower()}_id"] = p.id + df.loc[index, data.level.name.lower()] = data.name + df.loc[index, f"{data.level.name.lower()}_id"] = data.id + df.to_csv(filepath, index=False) + return filepath + else: + logger.error({ + 'context': 'update_administration_row_csv', + 'message': "kenya-administration_test.csv doesn't exists" + }) + return None + + +def administration_csv_delete(id: int, test: bool = False): + filename = "kenya-administration{0}.csv".format("_test" if test else "") + filepath = f"{STORAGE_PATH}/master_data/{filename}" + if os.path.exists(filepath): + df = pd.read_csv(filepath) + ix = find_index_by_id(df=df, id=id) + if ix is not None: + df.drop(index=ix, inplace=True) + df.to_csv(filepath, index=False) + return filepath + else: + logger.error({ + 'context': 'delete_administration_row_csv', + 'message': "kenya-administration_test.csv doesn't exists" + }) + return None