In [1]:
import sqlite3
import re
import calendar # for isleap
import datetime
import pandas as pd

In [130]:
class GncDate(object):
    class GMT1(datetime.tzinfo):
        def utcoffset(self, dt):
            return datetime.timedelta(hours=1) + self.dst(dt)
        
        def dst(self, dt):
            # DST starts last Sunday in March
            d = datetime.datetime(dt.year, 4, 1)   # ends last Sunday in October
            self.dston = d - datetime.timedelta(days=d.weekday() + 1)
            d = datetime.datetime(dt.year, 11, 1)
            self.dstoff = d - datetime.timedelta(days=d.weekday() + 1)
            if self.dston <=  dt.replace(tzinfo=None) < self.dstoff:
                return datetime.timedelta(hours=1)
            else:
                return datetime.timedelta(0)
            
        def tzname(self,dt):
            return "GMT +1"

    def __init__(self, isodate):
        self.date = None

        try:
            self.date = datetime.datetime.strptime(isodate, "%Y-%m-%d")
        except ValueError:
            try:
                self.date = datetime.datetime.strptime(isodate, "%y-%m-%d")
            except ValueError:
                try:
                    self.date = datetime.datetime.strptime(isodate, "%y-%m-%d%z")
                except ValueError:
                    self.date = datetime.datetime.strptime(isodate, "%Y-%m-%d%z")
    
        if self.date.tzinfo == None:
            gmt1 = self.GMT1()
            self.date = self.date.replace(tzinfo=gmt1)
    
    def to_gnc(self):
        d = self.date
        d = d - d.utcoffset()
        ret = "{:04}{:02}{:02}{:02}{:02}{:02}".format(d.year, d.month, d.day, d.hour, d.minute, d.second)
        return ret
    
    def datetime(self):
        return self.date
    
    def from_gnc(gncdate_utc):
        date_utc = datetime.datetime.strptime(gncdate_utc, "%Y%m%d%H%M%S")
        time = date_utc.time()
        if time < datetime.time(12,0,0):
            # we're in a negative timezone UTC-1to-12
            utc_offset = datetime.timedelta(hours=time.hour, minutes=time.minute)
            tzstr = "-{:02}{:02}".format(time.hour, time.minute)
            date_tz = date_utc - utc_offset
        else:
            # we're in a positive timezone
            utc_offset = datetime.timedelta(hours=23) - datetime.timedelta(hours=time.hour, minutes=time.minute) + datetime.timedelta(hours=1)
            tzstr = "+{:02}{:02}".format(utc_offset.seconds // 3600, (utc_offset.seconds % 3600)//60)
            date_tz = date_utc + utc_offset
        return GncDate("%s-%s-%s%s" % (date_tz.year, date_tz.month, date_tz.day, tzstr))

In [334]:
from functools import wraps
import inspect

def get_class_that_defined_method(meth):
    if inspect.ismethod(meth):
        for cls in inspect.getmro(meth.__self__.__class__):
            if cls.__dict__.get(meth.__name__) is meth:
                return cls
        meth = meth.__func__ # fallback to __qualname__ parsing
    if inspect.isfunction(meth):
        cls = getattr(inspect.getmodule(meth),
                      meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0])
        if isinstance(cls, type):
            return cls
        
def unimplemented(func):
    @wraps(func)
    def tmp(*args, **kwargs):
        #func_name = sys._getframe().f_code.co_name
        #func_name = inspect.currentframe().f_code.co_name
        #func_name = inspect.stack()[0][0].f_code.co_name
        #func_name = inspect.stack()[0][3]
        caller = inspect.stack()[1][3]
        cls = get_class_that_defined_method(func)
        print("Warning: Unimplemented function %s() called by %s:%s()" % (func.__name__, cls, caller))
        return func(*args, **kwargs)
    return tmp

class Account(object):
    ROOT = 1
    ASSET = 2
    CASH = 3
    BANK = 4
    LIABILITY = 5
    CREDIT = 6
    TRADING = 7
    
    def __init__(self, book, accDF, parent):
        acc = accDF.iloc[0]
        self.book = book
        self.guid = acc.guid
        self.name = acc['name']
        self.type = self._type(acc.account_type)
        self.parent_guid = acc.parent_guid
        self.parent = parent
        self.description = acc.description
        self.commodity = self.book.getCommodityByGuid(acc.commodity_guid)
        self.hidden = acc.hidden
        self.placeholder = (acc.placeholder == 1)
        self.children = [] # to be completed via addChild
        self.hier_name = self._calcHierName()
        if parent is not None:
            parent.addChild(self)
        
    def _type(self, accType):
        if accType == 'ROOT': return self.ROOT
        if accType == 'ASSET': return self.ASSET
        if accType == 'CASH': return self.ROOT
        if accType == 'BANK': return self.BANK
        if accType == 'LIABILITY': return self.LIABILITY
        if accType == 'CREDIT': return self.CREDIT
        if accType == 'TRADING': return self.TRADING
        
    def _calcHierName(self):
        name = ""
        cur = self
        name += cur.name
        while cur.parent:
            if cur.parent.parent == None:
                # Parent is root -> so exit
                break
            cur = cur.parent
            name = cur.name + ":" + name
        return name
        
    def addChild(self, child):
        if not isinstance(child, Account):
            ValueError("Non Account-type passed for child")
        self.children.append(child)
        
class Commodity(object):
    CURRENCY = 1
    def __init__(self, book, commDF):
        comm = commDF.iloc[0]
        self.book = book
        self.guid = comm.guid
        self.name = comm.mnemonic
        self.fullname = comm.fullname
        self.type = self._type(comm.namespace)
        self.iscurrency = comm.namespace == "CURRENCY"
        
    def _type(self, commType):
        if commType == "CURRENCY": return self.CURRENCY
    
    
class Book(object):
    def __init__(self, gnc_db):
        self.db = sqlite3.connect(gnc_db)
        self._getCommodities()
        self._createCommodityObjects()
        self._getBookRootAccount()
        self._getBookAccounts()
        self._createAccountObjects()
        self._getCommodityPrices()
        self._getTransactions()
        self._getSplits()
    
    def _getCommodities(self):
        self.commDF = pd.read_sql_query("SELECT * FROM commodities;", self.db)
        
    def _createCommodityObjects(self):
        self.commodities = []
        for i in range(self.commDF.shape[0]):
            comm = self.commDF.iloc[[i]]
            c = Commodity(self, comm)
            self.commodities.append(c)
            
    def _getCommodityPrices(self):
        self.pricesDF = pd.read_sql_query("SELECT * FROM prices;", self.db)
                
    def _getBookRootAccount(self):
        """Obtain the root account guid of the gnucash book.
        
        Assumptions:
        - There is only one root account in the book
        """
        r = self.db.execute("SELECT root_account_guid FROM books;").fetchone()
        if r is None:
            raise ValueError("No valid root_account found in the gnucash book")
        else:
            self.rootAccountGuid = r[0]

    def _getBookAccounts(self):
        self.accDF = pd.read_sql_query("SELECT * FROM accounts;", self.db)
        
    def _createAccountObjects(self):
        self.root = Account(self, self.accDF.loc[self.accDF.guid == self.rootAccountGuid], parent=None)
        self.accounts = [self.root]
        for child, parentGuid in self._iterAccountTree(self.root.guid):
            acc = Account(self, child, self.getAccountByGuid(parentGuid))
            self.accounts.append(acc)

    def _iterAccountTree(self, parentGuid):
        children = self._findChildrenOf(parentGuid)
        for i in range(children.shape[0]):
            child = children.iloc[[i]]
            yield (child, parentGuid)
            yield from self._iterAccountTree(child.iloc[0].guid)
        
    def _findChildrenOf(self, parentGuid):
        ch = self.accDF.loc[self.accDF.parent_guid == parentGuid]
        return ch
    
    def _getTransactions(self):
        self.txDF = pd.read_sql_query("SELECT * FROM transactions;", self.db)
    
    def _getSplits(self):
        self.splitsDF = pd.read_sql_query("SELECT * FROM splits;", self.db)
    
    def getAccountByGuid(self, accGuid):
        for a in self.accounts:
            if a.guid == accGuid:
                return a
        return None
    
    def getAccountByName(self, accName, strict=True):
        for a in self.accounts:
            if a.name == accName:
                return a
        return None
    
    def getCommodityByGuid(self, commGuid):
        for c in self.commodities:
            if c.guid == commGuid:
                return c
        return None
    
    def getCommodityByName(self, commName):
        for c in self.commodities:
            if c.name == commName:
                return c
        return None
    
    def getCommodityPrice(self, comm, curr, refDate):
        if not self._isGuid(comm):
            comm = self.getCommodityByName(comm).guid
        if not self._isGuid(curr):
            curr = self.getCommodityByName(curr).guid
        
        # Find and sort prices of the required commodity
        prices = self.pricesDF.query("commodity_guid == '%s' and currency_guid == '%s'" % (comm, curr))
        prices = prices.sort_values("date", axis=0)
        
        # Find closest by date price entry
        pred_price_df = prices.query("date <= '%s'" % (refDate.to_gnc())).iloc[-1]
        succ_price_df = prices.query("date > '%s'" % (refDate.to_gnc())).iloc[0]
        best_price_df = None
        if pred_price_df.empty:
            best_price_df = succ_price_df
        if succ_price_df.empty:
            best_price_df = pred_price_df
        if not best_price_df:
            pred_price_date = int(pred_price_df.date)
            succ_price_date = int(succ_price_df.date)
            ref_price_date = int(refDate.to_gnc())
            if (ref_price_date - pred_price_date) > (succ_price_date - pred_price_date):
                best_price_df = succ_price_df
            else:
                best_price_df = pred_price_df

        return (best_price_df.value_num, best_price_df.value_denom)
            
    def _getTxByQuery(self, queryString):
        res = self.txDF.query(queryString)
        return res
    
    def _getSplitsByTx(self, txGuid):
        res = self.splitsDF[self.splitsDF.tx_guid == txGuid]
        return res
    
    def _isGuid(self, guid):
        if not isinstance(guid, str):
            return False
        if not re.match(r"[0-9a-fA-F]{32}", guid):
            return False
        return True
    
    def _printAccounts(self, parent, level=0):
        print(level*" " + parent.name)
        for c in parent.children:
            self._printAccounts(c, level+3)
            
    def reportCf(self, accounts, per_beg, per_end, currency="CHF"):
        """Get total cashflow for a set of accounts.

        Algorithm:
            tx_of_period = get_tx_for_period(fiscal_period)
            splits = []
            for tx in tx_period:
                # retrieve all splits of the given transaction
                tx_splits = get_tx_splits(tx)

                # delete all splits pertaining to the accounts in questions
                tx_splits = del_splits_for_acc(accounts) 

                if no splits have been deleted:
                    continue with the next tx        

                if tx_splits is not empty:
                    if the given transaction is not in the desired currency:
                        get the closest prior exchange rate for the tx currency
                        convert all tx values to the desired currency
                    splits.append(tx_splits)
        """
        if isinstance(per_beg, str):
            per_beg = GncDate(per_beg)
        if per_end == "" or per_end is None:
            per_end = "2100-01-01"
        if isinstance(per_end, str):
            per_end = GncDate(per_end)
        if per_end.datetime() < per_beg.datetime():
            raise ValueError("Period start is > period end")
            
        # Convert the account names to guid's
        accGuids = list(map(lambda x: self.getAccountByName(x).guid, accounts))
        
        # Get the guids of all trading accounts
        tradingGuids = list(map(lambda x: x.guid, filter(lambda x: x.type == Account.TRADING, book.accounts)))
        
        # Get report's currency guid
        currGuid = self.getCommodityByName(currency).guid
        
        # Get all tx in the given period
        tx_for_period = self._getTxByQuery('post_date >= "%s" and post_date <= "%s"' % 
                                           (per_beg.to_gnc(), per_end.to_gnc()))
        if tx_for_period.empty:
            return None
        
        # Empty dataframe that will accumulate the final account-trimmed splits
        splits = pd.DataFrame()
        
        # Dictionary for storing exchange rates
        fx_rates = {}
        fx_rates['base'] = currency
    
        #print(tx_for_period)
        for _, tx in tx_for_period.iterrows():
            #print(tx)
            tx_splits = self._getSplitsByTx(tx.guid)
        
            # Delete splits belonging to the accounts in question
            nb_tx_splits = tx_splits.size;
            tx_splits = tx_splits[~tx_splits.account_guid.isin(accGuids)]
            
            # If no splits have been deleted, the tx is to be dumped
            # as it does not concern the required accounts
            if tx_splits.size == nb_tx_splits:
                continue

            # Delete all splits belonging to TRADING accounts
            tx_splits = tx_splits[~tx_splits.account_guid.isin(tradingGuids)]

            if tx_splits.empty:
                continue
            
            # Apply exchange rate if needed
            if tx.currency_guid != currGuid:
                tx_curr = self.getCommodityByGuid(tx.currency_guid)
                if tx_curr.name in fx_rates:
                    fx_num, fx_denom = fx_rates[tx_curr.name]
                else:
                    fx_num, fx_denom = self.getCommodityPrice(tx.currency_guid, 
                                                              currGuid, 
                                                              per_end)
                    fx_rates[tx_curr.name] = (fx_num, fx_denom)
                tx_splits.value_num *= fx_num
                tx_splits.value_denom *= fx_denom
                tx.currency_guid = currGuid

            splits = splits.append(tx_splits)
            
        splits['value'] = splits['value_num'] / splits['value_denom']
        splits['quantity'] = splits['quantity_num'] / splits['quantity_denom']
        splits['account'] = splits.apply(lambda x: self.getAccountByGuid(x.account_guid).hier_name, axis=1)
        
        cleaned_splits = splits[['account', 'value', 'quantity', 'account_guid', 'tx_guid']]
        cf_sources = cleaned_splits.query("value < 0").copy()
        cf_sinks = cleaned_splits.query("value > 0").copy()
        cf_sources[['value', 'quantity']] *= -1
        
        res = {}
        res['gnc_period_begin'] = per_beg.to_gnc()
        res['gnc_period_end'] = per_end.to_gnc()
        res['raw_tx'] = tx_for_period
        res['raw_splits'] = splits
        res['fx_rates'] = fx_rates
        res['cf_sources'] = cf_sources
        res['cf_sinks'] = cf_sinks
        res['cf_total_inflow'] = cf_sources.value.sum()
        res['cf_total_outlow'] = cf_sinks.value.sum()
        res['period_saldo'] = res['cf_total_inflow'] - res['cf_total_outlow']
    
        return res


In [335]:
d = GncDate("2017-06-01")
d.to_gnc()

'20170531220000'

In [336]:
d =GncDate.from_gnc("20170531220000")
d.to_gnc()
#d.datetime()

'20170531220000'

In [327]:
book = Book("money.gnucash.sql.gnucash")

In [328]:
accs_cf = ['Private', 'Private-Bills', "Epay Microaccount"]
res = book.reportCf(accs_cf, GncDate("2017-02-28"), GncDate("2017-03-27"))

In [329]:
book.getCommodityPrice('BGN', 'CHF', GncDate("2015-05-01"))

(5000, 9261)

In [330]:
res['cf_sinks']

Unnamed: 0,account,value,quantity,account_guid,tx_guid
13411,Expenses:Groceries,25.300000,25.30,a965e2c6bde903c170ac1f1f47ea6440,e09ec9e5c6ab2b72d106aecd9a39017c
13412,Expenses:Alcohol,13.900000,13.90,c0cd223dbf0a570d9cfd90b9749cde8a,e09ec9e5c6ab2b72d106aecd9a39017c
13414,Expenses:Car:Parking,3.000000,3.00,3977c26d2e4ae9492b36ff26f9af55fe,0544fed87dadad4e672fc365ca057b4a
13420,Expenses:State:Taxes,1730.000000,1730.00,065cf464ceffdd68bcbb36b983fee364,642cac02c87fd8fd1d894e5caa393f41
13422,Expenses:Insurance:Health Insurance,462.350000,462.35,f84ad2440286f60aaa6a0fad48a131dd,5991d75ad8509ae2959cb66c1a65d770
13424,Expenses:Insurance:Health Insurance,439.500000,439.50,f84ad2440286f60aaa6a0fad48a131dd,de5f353f4ec76626bde5edd7936a16f6
13426,Expenses:Utilities:Billag,112.750000,112.75,12885edd8630cc4819c5c7e38fa7cff4,915747608c4ca5bea3cb3150a13ce002
13428,Expenses:Medical,25.000000,25.00,1706edb22c7673f37843951230899f81,aecc98a64e3a685aeb0f1c30dc6b027f
13430,Expenses:Medical,151.250000,151.25,1706edb22c7673f37843951230899f81,f3737ccc6fa6180f80c1fb8930c13f74
13432,Expenses:Baby:Babysitting,505.900000,505.90,c4d10a111f9b29cefcb88ce81d0788b4,c18317fb0e843353cb2ceddccae2686d


In [331]:
res['cf_sources']

Unnamed: 0,account,value,quantity,account_guid,tx_guid
13417,Income:Salary,8134.1,8134.1,b58b3d0c66017aebf3648c091265f7d7,8df9aacbe22e6d9a356aaa1fd1239e34
13419,Income:Salary,123.75,123.75,b58b3d0c66017aebf3648c091265f7d7,be3fc7483c99253089ab5787396ebbb6
13536,Expenses:Medical,113.45,113.45,1706edb22c7673f37843951230899f81,9cf082d9463cc5544a8ee4e2ed4a1816


In [332]:
res['fx_rates']

{'BGN': (546700000, 1000000000), 'base': 'CHF'}

In [333]:
res

{'cf_sinks':                                         account        value  quantity  \
 13411                        Expenses:Groceries    25.300000     25.30   
 13412                          Expenses:Alcohol    13.900000     13.90   
 13414                      Expenses:Car:Parking     3.000000      3.00   
 13420                      Expenses:State:Taxes  1730.000000   1730.00   
 13422       Expenses:Insurance:Health Insurance   462.350000    462.35   
 13424       Expenses:Insurance:Health Insurance   439.500000    439.50   
 13426                 Expenses:Utilities:Billag   112.750000    112.75   
 13428                          Expenses:Medical    25.000000     25.00   
 13430                          Expenses:Medical   151.250000    151.25   
 13432                 Expenses:Baby:Babysitting   505.900000    505.90   
 13434                          Expenses:Medical   146.500000    146.50   
 13437                         Expenses:Services    46.600000     46.60   
 13439       

In [415]:
from unittest import *

class TestGncDate(TestCase):
    def test_GncDateFromIsoString_noTz(self):
        tests = [{'teststr': "2017-03-28", 'y':2017, 'm':3,  'd':28},
                 {'teststr': "2015-02-01", 'y':2015, 'm':2,  'd':1},
                 {'teststr': "2016-02-29", 'y':2016, 'm':2,  'd':29}, # leap
                 {'teststr': "1999-01-15", 'y':1999, 'm':1,  'd':15},
                 {'teststr': "2010-11-13", 'y':2010, 'm':11, 'd':13},
                 {'teststr': "2100-12-31", 'y':2100, 'm':12, 'd':31},
                 {'teststr': "15-12-28",   'y':2015, 'm':12, 'd':28},
                 {'teststr': "99-02-12",   'y':1999, 'm':2,  'd':12},
                 {'teststr': "01-04-10",   'y':2001, 'm':4,  'd':10},
                 {'teststr': "14-03-30",   'y':2014, 'm':3,  'd':30},
                 {'teststr': "00-06-01",   'y':2000, 'm':6,  'd':1}]
        
        for t in tests:
            with self.subTest(ts = t['teststr']):
                d = GncDate(t['teststr'])
                self.assertIsInstance(d.date, datetime.datetime)
                self.assertEqual(d.date.year, t['y'])
                self.assertEqual(d.date.month, t['m'])
                self.assertEqual(d.date.day, t['d'])
                self.assertIsNotNone(d.date.tzinfo)
                self.assertEqual(d.date.tzinfo.tzname(d), "GMT +1")

        invalid_input = ["03-28-2017",
                         "13-2017-01"
                         "2017-03-32",
                         "2017-13-01",
                         "13-01-2017",
                         "2017-03-32",
                         "2017.01.03",
                         "17.01.03",
                         "17-00-28",
                         "17-13-28",
                         "17-03-32",
                         "17-02-29"] # 2017 is non-leap
        
        for inv in invalid_input:
            with self.subTest(inv = inv):
                with self.assertRaises(ValueError):
                    d = GncDate(inv)

        with self.assertRaises(TypeError):
            d = GncDate(2017)
            

    def test_GncDateFromIsoString_withTz(self):
        tests = [{'teststr': "2017-03-28+0100", 'y':2017, 'm':3,  'd':28, 'utc_h':1,   'utc_m':0},
                 {'teststr': "2015-02-01-2315", 'y':2015, 'm':2,  'd':1,  'utc_h':-23, 'utc_m':-15},
                 {'teststr': "2012-02-29+2315", 'y':2012, 'm':2,  'd':29, 'utc_h':23,  'utc_m':15}, # leap year
                 {'teststr': "1999-01-15-1200", 'y':1999, 'm':1,  'd':15, 'utc_h':-12, 'utc_m':0},
                 {'teststr': "2010-11-13+0000", 'y':2010, 'm':11, 'd':13, 'utc_h':0,   'utc_m':0},
                 {'teststr': "2100-12-31-0000", 'y':2100, 'm':12, 'd':31, 'utc_h':0,   'utc_m':0},
                 {'teststr': "15-12-28+0100",   'y':2015, 'm':12, 'd':28, 'utc_h':1,   'utc_m':0},
                 {'teststr': "99-02-12-0335",   'y':1999, 'm':2,  'd':12, 'utc_h':-3,  'utc_m':-35},
                 {'teststr': "01-04-10+1200",   'y':2001, 'm':4,  'd':10, 'utc_h':12,  'utc_m':0},
                 {'teststr': "14-03-30+0001",   'y':2014, 'm':3,  'd':30, 'utc_h':0,   'utc_m':1},
                 {'teststr': "00-06-01-0100",   'y':2000, 'm':6,  'd':1,  'utc_h':-1,  'utc_m':0}]
        
        for t in tests:
            with self.subTest(ts = t['teststr']):
                d = GncDate(t['teststr'])
                self.assertIsInstance(d.date, datetime.datetime)
                self.assertEqual(d.date.year, t['y'])
                self.assertEqual(d.date.month, t['m'])
                self.assertEqual(d.date.day, t['d'])
                self.assertIsNotNone(d.date.tzinfo)
                self.assertEqual(d.date.tzinfo.utcoffset(d.date), 
                                 datetime.timedelta(hours=t['utc_h'], minutes=t['utc_m']))
                
        invalid_input = ["03-28-2017+0100",
                         "2017-03-32+0100",
                         "2017-02-29+0100", # 2017 is non-leap
                         "2017-03-32+0100",
                         "2017-03-28Z0100",
                         "2017-03-28z0100",
                         "2017-03-28Z+100",
                         "2017-03-28+01:00",
                         "2017-03-28+01:00",
                         "2017-03-28+01",
                         "17-03-28Z+0100",
                         "17-13-28+0100",
                         "17-03-32+0100"]
        
        for inv in invalid_input:
            with self.subTest(inv = inv):
                with self.assertRaises(ValueError):
                    d = GncDate(inv)
        
               
a = TestGncDate()
suite = TestLoader().loadTestsFromModule(a)
TextTestRunner().run(suite)

..
----------------------------------------------------------------------
Ran 2 tests in 0.016s

OK


<unittest.runner.TextTestResult run=2 errors=0 failures=0>