Skip to content
Merged
9 changes: 9 additions & 0 deletions backend/api/v1/v1_profile/serializers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import random
import string
from typing import Any, Dict, cast
Expand All @@ -8,6 +9,10 @@
)
from utils.custom_serializer_fields import CustomPrimaryKeyRelatedField
from utils.custom_generator import update_sqlite
from utils.custom_generator import (
administration_csv_add,
administration_csv_update,
)


class RelatedAdministrationField(serializers.PrimaryKeyRelatedField):
Expand Down Expand Up @@ -180,6 +185,8 @@ def create(self, validated_data):
'path': instance.path
}
)
TESTING = os.environ.get("TESTING")
administration_csv_add(data=instance, test=TESTING)
for attribute in attributes:
instance.attributes.create(**attribute)
return instance
Expand Down Expand Up @@ -207,6 +214,8 @@ def update(self, instance, validated_data):
},
id=instance.id
)
TESTING = os.environ.get("TESTING")
administration_csv_update(data=instance, test=TESTING)
return instance

def _set_code(self, validated_data):
Expand Down
70 changes: 70 additions & 0 deletions backend/api/v1/v1_profile/tests/test_administration_csv_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pandas as pd

from django.test import TestCase
from django.test.utils import override_settings
from django.core.management import call_command
from rtmis.settings import STORAGE_PATH
from api.v1.v1_profile.models import Administration, Levels
from utils.custom_generator import (
administration_csv_add,
administration_csv_update,
administration_csv_delete
)


@override_settings(USE_TZ=False)
class AdministrationCSVTestCase(TestCase):
def setUp(self) -> None:
super().setUp()
call_command("administration_seeder", "--test")
call_command("download_all_administrations", "--test")

def test_add_data_in_csv(self):
last = Levels.objects.order_by('-id').first()
parent = Administration.objects.filter(level__id=last.id - 1).first()
new_adm = Administration(
id=111,
name='New village',
level=last,
parent=parent
)
new_adm.save()
filepath = administration_csv_add(data=new_adm, test=True)
self.assertEqual(
filepath,
f"{STORAGE_PATH}/master_data/kenya-administration_test.csv"
)
df = pd.read_csv(filepath)
last_record = df.iloc[-1]
self.assertEqual(last_record["village"], "New village")
self.assertEqual(last_record["village_id"], 111)

def test_update_data_in_csv(self):
adm_id = 5
adm = Administration.objects.get(pk=adm_id)
adm.name = 'Village name changed'
adm.save()

filepath = administration_csv_update(data=adm, test=True)
self.assertEqual(
filepath,
f"{STORAGE_PATH}/master_data/kenya-administration_test.csv"
)
df = pd.read_csv(filepath)
contains_value = (df == "Village name changed").any().any()
self.assertTrue(contains_value)

def test_delete_data_in_csv(self):
adm_id = 5
adm = Administration.objects.get(pk=adm_id)
adm_name = adm.name
adm.delete()

filepath = administration_csv_delete(id=adm_id, test=True)
self.assertEqual(
filepath,
f"{STORAGE_PATH}/master_data/kenya-administration_test.csv"
)
df = pd.read_csv(filepath)
contains_value = (df == adm_name).any().any()
self.assertFalse(contains_value)
3 changes: 3 additions & 0 deletions backend/api/v1/v1_profile/tests/test_administrations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import typing
from django.core.management import call_command
from django.http import HttpResponse
Expand All @@ -9,6 +10,8 @@
Administration, AdministrationAttribute, Levels)
from api.v1.v1_profile.tests.mixins import ProfileTestHelperMixin

os.environ['TESTING'] = 'True'


@override_settings(USE_TZ=False)
class AdministrationTestCase(TestCase, ProfileTestHelperMixin):
Expand Down
4 changes: 4 additions & 0 deletions backend/api/v1/v1_profile/views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Create your views here.
import os
from typing import cast
from wsgiref.util import FileWrapper
from django.contrib.admin.sites import site
Expand Down Expand Up @@ -33,6 +34,7 @@
from rest_framework.permissions import IsAuthenticated
from utils.email_helper import send_email, EmailTypes
from utils.custom_serializer_fields import validate_serializers_message
from utils.custom_generator import administration_csv_delete


@extend_schema(
Expand Down Expand Up @@ -117,6 +119,8 @@ def list(self, request, *args, **kwargs):
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
try:
TESTING = os.environ.get("TESTING")
administration_csv_delete(id=instance.pk, test=TESTING)
instance.delete()
except ProtectedError:
_, _, _, protected = get_deleted_objects(
Expand Down
91 changes: 90 additions & 1 deletion backend/utils/custom_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import sqlite3
import pandas as pd
import logging
from rtmis.settings import MASTER_DATA
from rtmis.settings import MASTER_DATA, STORAGE_PATH
from api.v1.v1_profile.models import Administration

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,3 +70,91 @@ def update_sqlite(model, data, id=None):
conn.rollback()
finally:
conn.close()


def administration_csv_add(data: dict, test: bool = False):
filename = "kenya-administration{0}.csv".format("_test" if test else "")
filepath = f"{STORAGE_PATH}/master_data/{filename}"
if os.path.exists(filepath):
df = pd.read_csv(filepath)
new_data = {}
if data.path:
parent_ids = list(filter(
lambda path: path, data.path.split(".")
))
parents = Administration.objects.filter(
pk__in=parent_ids,
level__id__gt=1
).all()
for p in parents:
new_data[p.level.name.lower()] = p.name
new_data[f"{p.level.name.lower()}_id"] = p.id
new_data[data.level.name.lower()] = data.name
new_data[f"{data.level.name.lower()}_id"] = data.id
new_df = pd.DataFrame([new_data])
df = pd.concat([df, new_df], ignore_index=True)
df.to_csv(filepath, index=False)
return filepath
else:
logger.error({
'context': 'insert_administration_row_csv',
'message': "kenya-administration_test.csv doesn't exists"
})
return None


def find_index_by_id(df, id):
for idx, row in df.iterrows():
last_non_null_col = row.last_valid_index()
last_non_null_value = row[last_non_null_col]
if last_non_null_value == id:
return idx
return None


def administration_csv_update(data: dict, test: bool = False):
filename = "kenya-administration{0}.csv".format("_test" if test else "")
filepath = f"{STORAGE_PATH}/master_data/{filename}"
if os.path.exists(filepath):
df = pd.read_csv(filepath)
index = find_index_by_id(df=df, id=data.pk)
if index is not None:
if data.path:
parent_ids = list(filter(
lambda path: path, data.path.split(".")
))
parents = Administration.objects.filter(
pk__in=parent_ids,
level__id__gt=1
).all()
for p in parents:
df.loc[index, p.level.name.lower()] = p.name
df.loc[index, f"{p.level.name.lower()}_id"] = p.id
df.loc[index, data.level.name.lower()] = data.name
df.loc[index, f"{data.level.name.lower()}_id"] = data.id
df.to_csv(filepath, index=False)
return filepath
else:
logger.error({
'context': 'update_administration_row_csv',
'message': "kenya-administration_test.csv doesn't exists"
})
return None


def administration_csv_delete(id: int, test: bool = False):
filename = "kenya-administration{0}.csv".format("_test" if test else "")
filepath = f"{STORAGE_PATH}/master_data/{filename}"
if os.path.exists(filepath):
df = pd.read_csv(filepath)
ix = find_index_by_id(df=df, id=id)
if ix is not None:
df.drop(index=ix, inplace=True)
df.to_csv(filepath, index=False)
return filepath
else:
logger.error({
'context': 'delete_administration_row_csv',
'message': "kenya-administration_test.csv doesn't exists"
})
return None