-
-
Notifications
You must be signed in to change notification settings - Fork 213
/
test_stock_report.py
143 lines (116 loc) · 5.15 KB
/
test_stock_report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from datetime import datetime
import random
import string
from django.test import TestCase
from casexml.apps.case.tests.util import delete_all_xforms
from casexml.apps.stock.utils import get_current_ledger_transactions, get_current_ledger_state
from corehq.apps.commtrack.models import NewStockReport, SQLProduct, StockTransaction as STrans
from casexml.apps.stock.const import REPORT_TYPE_BALANCE
from casexml.apps.stock.models import StockReport, StockTransaction
from corehq.apps.domain.shortcuts import create_domain
from couchforms.models import XFormInstance
DOMAIN_MAX_LENGTH = 25
class StockReportDomainTest(TestCase):
def _get_name_for_domain(self):
return ''.join(
random.choice(string.ascii_lowercase)
for _ in range(DOMAIN_MAX_LENGTH)
)
def create_report(self, transactions=None, tag=None, date=None):
form = XFormInstance(domain=self.domain)
form.save()
report = NewStockReport(
form,
date or datetime.utcnow(),
tag or REPORT_TYPE_BALANCE,
transactions or [],
)
return report, form
def setUp(self):
self.case_ids = {'c1': 10, 'c2': 30, 'c3': 50}
self.section_ids = {'s1': 2, 's2': 9}
self.product_ids = {'p1': 1, 'p2': 3, 'p3': 5}
self.domain = self._get_name_for_domain()
create_domain(self.domain)
SQLProduct.objects.bulk_create([
SQLProduct(product_id=id) for id in self.product_ids
])
transactions_flat = []
self.transactions = {}
for case, c_bal in self.case_ids.items():
for section, s_bal in self.section_ids.items():
for product, p_bal in self.product_ids.items():
bal = c_bal + s_bal + p_bal
transactions_flat.append(
STrans(
case_id=case,
section_id=section,
product_id=product,
action='soh',
quantity=bal
)
)
self.transactions.setdefault(case, {}).setdefault(section, {})[product] = bal
self.new_stock_report, self.form = self.create_report(transactions_flat)
self.new_stock_report.create_models(self.domain)
def tearDown(self):
delete_all_xforms()
StockReport.objects.all().delete()
StockTransaction.objects.all().delete()
def test_stock_report(self):
self.create_report()
filtered_stock_report = StockReport.objects.filter(domain=self.domain)
self.assertEquals(filtered_stock_report.count(), 1)
stock_report = filtered_stock_report.get()
self.assertEquals(stock_report.form_id, self.form._id)
self.assertEquals(stock_report.domain, self.domain)
def test_get_current_ledger_transactions(self):
for case in self.case_ids:
transactions = get_current_ledger_transactions(case)
for section, products in transactions.items():
for product, trans in products.items():
self.assertEqual(trans.stock_on_hand, self.transactions[case][section][product])
def _validate_case_data(self, data, expected):
for section, products in data.items():
for product, trans in products.items():
self.assertEqual(trans.stock_on_hand, expected[section][product])
def _test_get_current_ledger_transactions(self, tester_fn):
tester_fn(self.transactions)
date = datetime.utcnow()
report, _ = self.create_report([
STrans(
case_id='c1',
section_id='s1',
product_id='p1',
action='soh',
quantity=864)
], date=date)
report.create_models(self.domain)
# create second report with the same date
# results should have this transaction and not the previous one
report, _ = self.create_report([
STrans(
case_id='c1',
section_id='s1',
product_id='p1',
action='soh',
quantity=1)
], date=date)
report.create_models(self.domain)
new_trans = self.transactions.copy()
new_trans['c1']['s1']['p1'] = 1
tester_fn(new_trans)
def test_get_current_ledger_transactions(self):
def test_transactions(expected):
for case in self.case_ids:
transactions = get_current_ledger_transactions(case)
self._validate_case_data(transactions, expected[case])
self._test_get_current_ledger_transactions(test_transactions)
self.assertEqual({}, get_current_ledger_transactions('non-existent'))
def test_get_current_ledger_state(self):
def test_transactions(expected):
state = get_current_ledger_state(self.case_ids.keys())
for case, sections in state.items():
self._validate_case_data(sections, expected[case])
self._test_get_current_ledger_transactions(test_transactions)
self.assertEqual({}, get_current_ledger_state([]))