In [1]:
# bring in the dependencies
from itertools import product, combinations
from dataclasses import dataclass
from typing import Optional, List, Dict, Iterable
from pprint import pprint
import os, sys

# project dependencies
proj_path = 'd:/code/Projects/property_mapping_sql_generator/'
sys.path.insert(0, proj_path)
from dsl.engine import *

In [2]:
# create the logic branch type i.e. a branch from a case statement
@dataclass(frozen=True)
class LogicBranch():
    result: str
    search_expr: Optional[str]
    ref: Optional[str]

In [3]:
# spike for case statement to support return of logic branches
class AltCase(Case):
    
    def get_branches(self) -> List[LogicBranch]:
        for src, res in self.search_result_pairs:
            yield LogicBranch(res.val, src.val, self.ref_field)
        yield LogicBranch(self.default.val, None, self.ref_field)
        
    def __repr__(self):
        pairs_str = ", ".join([f"({s.val}, {r.val})" for s, r in self.search_result_pairs])
        return f"AltCase({self.ref_field}, [{pairs_str}], default={self.default.val})"
    
    def __str__(self):
        return self.__repr__()

In [4]:
# example alt cases
p_case = AltCase('ref', [('x', 'p1'), ('y', 'p2')], default='pN')
list(p_case.get_branches())

[LogicBranch(result='p1', search_expr='x', ref='a.ref'),
 LogicBranch(result='p2', search_expr='y', ref='a.ref'),
 LogicBranch(result='pN', search_expr=None, ref='a.ref')]

In [5]:
a_case = AltCase('ref', [('x', 'a1'), ('y', 'a2'), ('z', 'a3')], default='aN')
list(a_case.get_branches())

[LogicBranch(result='a1', search_expr='x', ref='a.ref'),
 LogicBranch(result='a2', search_expr='y', ref='a.ref'),
 LogicBranch(result='a3', search_expr='z', ref='a.ref'),
 LogicBranch(result='aN', search_expr=None, ref='a.ref')]

In [6]:
u_case = AltCase('ref', [('x', 'u1'), ('*', 'u*')], default='n/a')
list(u_case.get_branches())

[LogicBranch(result='u1', search_expr='x', ref='a.ref'),
 LogicBranch(result='u*', search_expr='*', ref='a.ref'),
 LogicBranch(result='n/a', search_expr=None, ref='a.ref')]

In [7]:
# assume inline mapping for others e.g.
p = Property(p_case)
a = Analysis(a_case)
i = NoIndicator()
u = Uom(u_case)
r = NoRatio()

In [8]:
from collections import defaultdict

me_rc : Dict[str, List[Mapping]] = defaultdict(list)  # mapping_el_by_ref_col
for e in [p, a, i, u, r]:
    key = None if e.is_inline() else e.mapping_func.ref_field
    me_rc[key].append(e)
    
pprint(me_rc)

defaultdict(<class 'list'>,
            {None: [NoIndicator(mapping_func='na', has_value=False),
                    NoRatio(mapping_func='NoRatio', has_value=False)],
             'a.ref': [Property(mapping_func=AltCase(a.ref, [(x, p1), (y, p2)], default=pN), has_value=True),
                       Analysis(mapping_func=AltCase(a.ref, [(x, a1), (y, a2), (z, a3)], default=aN), has_value=True),
                       Uom(mapping_func=AltCase(a.ref, [(x, u1), (*, u*)], default=n/a), has_value=True)]})


In [9]:
# in real version this should be exposed via the `Mapping` type
def get_branches(e: Mapping) -> List[LogicBranch]:
    if isinstance(e.mapping_func, str):
        yield LogicBranch(e.mapping_func, None, None)
    elif e.is_inline():
        yield LogicBranch(e.mapping_func.val, None, None)
    elif isinstance(e.mapping_func, AltCase):
        for b in e.mapping_func.get_branches():
            yield b
    else:
        raise NotImplementedError(f'Only str, inline or AltCase supported, got type: {type(e)}')

In [10]:
# types needed in next section
from enum import Enum

class HeaderElementType(Enum):
    prop = 1
    anal = 2
    ind = 3
    uom = 4
    ratio = 5
    
@dataclass
class HeadingElementsBuilder():
    """
    Construct a header, allowing partial representations and merging. 
    
    e.g. if we have prop and anal from one ref col, and the other elements  
    from another, we can construct an instance for each and then merge them
    to allow us to build a complete header.
    """
    prop: Optional[str] = field(init=True, default=None)
    anal: Optional[str] = field(init=True, default=None)
    ind: Optional[str] = field(init=True, default=None)
    uom: Optional[str] = field(init=True, default=None)
    ratio: Optional[str] = field(init=True, default=None)
    
    def add_element(self, element_type: HeaderElementType, result: str) -> None:
        if element_type == HeaderElementType.prop:
            self.prop = result
        elif element_type == HeaderElementType.anal:
            self.anal = result
        elif element_type == HeaderElementType.ind:
            self.ind = result
        elif element_type == HeaderElementType.uom:
            self.uom = result
        elif element_type == HeaderElementType.ratio:
            self.ratio = result
        else:
            raise NotImplementedError(f"HeaderElementType {element_type} not supported.")
        return self
    
    def build_header_elements(self) -> HeaderElements:
        self.validate()
        return HeaderElements(prop=self.prop, anal=self.anal, ind=self.ind, uom=self.uom, ratio=self.ratio)
        
    def validate(self) -> None:
        elements = [self.prop, self.anal, self.ind, self.uom, self.ratio]
        if not all(elements): # each element is truthy if it has a value
            raise Exception("Each element type must have a value to proceed.")
            

def merge_hebs(*hebs: HeadingElementsBuilder) -> HeadingElementsBuilder:

    def _merge_element_or_raise(v1: Optional[str], v2: Optional[str],
                                t: HeaderElementType) -> Optional[str]:
        if (v1 is not None and v2 is not None) and v1 != v2:  # both have a val and not eq
            raise Exception(f"Can't merge on {t} as there is a conflict: {v1} | {v2}")
        return v1 or v2  # return which ever has a value or None if neither

    def _merge_two(heb1: HeadingElementsBuilder, heb2: HeadingElementsBuilder) -> HeadingElementsBuilder:
        return HeadingElementsBuilder(
            prop = _merge_element_or_raise(heb1.prop, heb2.prop, HeaderElementType.prop),
            anal = _merge_element_or_raise(heb1.anal, heb2.anal, HeaderElementType.anal),
            ind = _merge_element_or_raise(heb1.ind, heb2.ind, HeaderElementType.ind),
            uom = _merge_element_or_raise(heb1.uom, heb2.uom, HeaderElementType.uom),
            ratio = _merge_element_or_raise(heb1.ratio, heb2.ratio, HeaderElementType.ratio)
        )

    # using functools reduce ro merge 1st with 2nd, result of this with 3rd and so on
    return functools.reduce(lambda p1, p2: _merge_two(p1, p2), hebs)


# in real version, may be able to just add a property to Mapping, which set set to the 
# HeaderElementType in the subclass e.g. Property would have mapping_type = HeaderElementType.prop
def get_element_type(e: Mapping) -> HeaderElementType:
    # can't do simple dict lookup as need to recognise sub types e.g. that NoUom is a Uom
    if isinstance(e, Property): return HeaderElementType.prop
    if isinstance(e, Analysis): return HeaderElementType.anal
    if isinstance(e, Indicator): return HeaderElementType.ind
    if isinstance(e, Uom): return HeaderElementType.uom
    if isinstance(e, Ratio): return HeaderElementType.ratio

In [11]:
# heading elements builder by ref_col - keep in mind that these will usually be part 
# formed when building within the context of a single ref_col - combined later
hebs_rc: Dict[str, List[HeadingElementsBuilder]] = defaultdict(list)

for ref, elements in me_rc.items():
    
    print(f"\n{'='*80}")
    print(f"\nprocessing reference col: {ref}")
        
    # get all unique search terms (for current ref field)
    search_terms = set([lb.search_expr for e in elements for lb in get_branches(e)])
    print(f"\nunique search terms: {search_terms}")
    
    # get result by search term by element type
    r_s_rc: Dict[HeaderElementType, Dict[str, str]] = {}
          
    # get possible logic branch combinations) 
    for e in elements:
        element_type = get_element_type(e)
        r_s_rc[element_type] = {}
        print(f"\nel func: {e.mapping_func}:")
        for b in get_branches(e):
            pprint(b)
            r_s_rc[element_type][b.search_expr] = b.result
    
    print('\nlookup of srcg -> result grouped by element type')
    pprint(r_s_rc)
    
    for term in search_terms:
        heb = HeadingElementsBuilder()
        for element_type, result_by_search_term in r_s_rc.items():
            result: str
            if term in result_by_search_term:
                result = result_by_search_term[term]
            else:
                result = result_by_search_term[None]  # use the default if this search term is not mapped in this case statement
            heb.add_element(element_type, result)
        if heb not in hebs_rc[ref]:
            hebs_rc[ref].append(heb)
    
print(f"\n{'*' * 80}")
print('\nlookup of header elements builder by ref')
pprint(hebs_rc)  



processing reference col: a.ref

unique search terms: {'*', 'y', None, 'z', 'x'}

el func: AltCase(a.ref, [(x, p1), (y, p2)], default=pN):
LogicBranch(result='p1', search_expr='x', ref='a.ref')
LogicBranch(result='p2', search_expr='y', ref='a.ref')
LogicBranch(result='pN', search_expr=None, ref='a.ref')

el func: AltCase(a.ref, [(x, a1), (y, a2), (z, a3)], default=aN):
LogicBranch(result='a1', search_expr='x', ref='a.ref')
LogicBranch(result='a2', search_expr='y', ref='a.ref')
LogicBranch(result='a3', search_expr='z', ref='a.ref')
LogicBranch(result='aN', search_expr=None, ref='a.ref')

el func: AltCase(a.ref, [(x, u1), (*, u*)], default=n/a):
LogicBranch(result='u1', search_expr='x', ref='a.ref')
LogicBranch(result='u*', search_expr='*', ref='a.ref')
LogicBranch(result='n/a', search_expr=None, ref='a.ref')

lookup of srcg -> result grouped by element type
{<HeaderElementType.prop: 1>: {None: 'pN', 'x': 'p1', 'y': 'p2'},
 <HeaderElementType.anal: 2>: {None: 'aN', 'x': 'a1', 'y': 'a2'

In [12]:
# get combinations from groups (* unpacks the collection)
unmerged_hebs = list(product(*hebs_rc.values()))
print(f"unmerged_hebs is of type: {type(unmerged_hebs)} of {type(unmerged_hebs[0])}")
pprint(unmerged_hebs)

import functools
# merge HeaderElementsBuilder from each collection - there may be more than 2, so using functools
merged = []
for heb_parts in unmerged_hebs:
    # could refactor to put a function to merge a collection of HeaderElementBuilders with the class def
    #print(f"\nAttempting to merge group:")
    #pprint(heb_parts)
    merged_heb = merge_hebs(*heb_parts)
    merged.append(merged_heb)
    
print("\nThink we have the final grouping now...")
pprint(merged)

unmerged_hebs is of type: <class 'list'> of <class 'tuple'>
[(HeadingElementsBuilder(prop='pN', anal='aN', ind=None, uom='u*', ratio=None),
  HeadingElementsBuilder(prop=None, anal=None, ind='na', uom=None, ratio='NoRatio')),
 (HeadingElementsBuilder(prop='p2', anal='a2', ind=None, uom='n/a', ratio=None),
  HeadingElementsBuilder(prop=None, anal=None, ind='na', uom=None, ratio='NoRatio')),
 (HeadingElementsBuilder(prop='pN', anal='aN', ind=None, uom='n/a', ratio=None),
  HeadingElementsBuilder(prop=None, anal=None, ind='na', uom=None, ratio='NoRatio')),
 (HeadingElementsBuilder(prop='pN', anal='a3', ind=None, uom='n/a', ratio=None),
  HeadingElementsBuilder(prop=None, anal=None, ind='na', uom=None, ratio='NoRatio')),
 (HeadingElementsBuilder(prop='p1', anal='a1', ind=None, uom='u1', ratio=None),
  HeadingElementsBuilder(prop=None, anal=None, ind='na', uom=None, ratio='NoRatio'))]

Think we have the final grouping now...
[HeadingElementsBuilder(prop='pN', anal='aN', ind='na', uom='u*', 

In [13]:
valid_combinations = [heb.build_header_elements() for heb in merged]
print("\nHere is the output of valid combinations...")
pprint(sorted(valid_combinations))


Here is the output of valid combinations...
[HeaderElements(prop='p1', anal='a1', uom='u1', ind='na', ratio='NoRatio'),
 HeaderElements(prop='p2', anal='a2', uom='n/a', ind='na', ratio='NoRatio'),
 HeaderElements(prop='pN', anal='a3', uom='n/a', ind='na', ratio='NoRatio'),
 HeaderElements(prop='pN', anal='aN', uom='n/a', ind='na', ratio='NoRatio'),
 HeaderElements(prop='pN', anal='aN', uom='u*', ind='na', ratio='NoRatio')]
