## from_dicts() bs
How to handle inconsistent schemas in regards to input from nosql databases such as elastic?

Probably not going to use this code.
The problem is that what if you write a detection and the fields change per query?
But you're writing the detection for a specific set of logs that probably won't change schema.
Think I'm going to move to a multi-dataframe format

In [104]:
import json
import polars as pl
import Types as t
import importlib
importlib.reload(t)

# Simple examples
a = [
    {
        "log": {
            "file": {
                "path": "/nsm/import/acb5b3eacd804c4c17d25fb312581610/suricata/eve-2022-11-10-07:20.json"
            },
            "offset": 9491215,
            "id": {
                "uid": "1017858934534263",
                "resp_fuids": None
            }
        }
    }
]

b = [
    {  
        "log": {
            "file": {
                "path": "/nsm/import/acb5b3eacd804c4c17d25fb312581610/zeek/logs/files.log"
            },
            "offset": 7559607,
            "id": {
                "uid": [
                    "CAnltOiClThlO6ZFk"
                ],
                "fuid": "FdZyiY2Kf5yf3L4239"
            }
        }
    }
]

with open('./so-network-index.json') as f:
    index = json.loads(f.read())
    
with open('./so-network-data.json') as f:
    json_data = json.loads(f.read())

In [105]:
priorities = {
    'multivalue': {
        'name': 'multivalue',
        'priority': 5,
        'super': set(),
    },
    'string': {
        'name': 'string',
        'priority': 4,
        'super': {'list'}
    },
    'float': {
        'name': 'float',
        'priority': 3,
        'super': {'str', 'list'}
    },
    'int': {
        'name': 'int',
        'priority': 2,
        'super': {'float', 'str', 'list'}
    },
    'bool': {
        'name': 'bool',
        'priority': 1,
        'super': {'str', 'list'}
    },
    'null': {
        'name': 'null',
        'priority': 0,
        'super': {'bool', 'int', 'float', 'string', 'multivalue'}
    }
}

types = {
    'multivalue': t.multivalue,
    'string': t.string,
    'float': t.float,
    'int': t.int,
    'bool': t.bool,
    'null': t.null
}

In [106]:
class Schema():
    def __init__(self, data:list[dict]):
        # [['foo', 'bar'], ['cat']]
        self.mv_fields = []
        self.schema = self.gen_schema(data)

    # Gets the appropriate Hql type for a particular piece of data
    def to_hql_type(self, proto):
        prototype = type(proto)

        if prototype == dict:
            return prototype
        elif prototype in (list, tuple):
            inner = self.resolve_conflict([self.to_hql_type(x) for x in proto])
            return t.multivalue(inner)
        elif prototype == str:
            return t.string()
        elif prototype == int:
            return t.int()
        elif prototype == float:
            return t.float()
        elif prototype == bool:
            return t.bool()
        elif prototype == type(None):
            return t.null()
        else:
            print(f'Unhandled conversion type {prototype}')

    def to_pl_schema(self, src:dict=None):
        if not src:
            src = self.schema

        schema = {}
        for i in src:
            j = src[i]

            if isinstance(j, dict):
                schema[i] = pl.Struct(self.to_pl_schema(j))
                continue

            if isinstance(j, t.multivalue):
                schema[i] = j.pl_schema()
                continue

            schema[i] = j().pl_schema()

        return schema

    def resolve_conflict(self, ts:list):
        # Check to see if there's a multivalue we need to handle
        mv = False
        for i in ts:
            if isinstance(i, t.multivalue):
                mv = True
                break
        
        # Handle multivalue
        if mv:
            inner_set = set()
            for i in ts:
                if isinstance(i, t.multivalue):
                    inner_set.add(i.inner)
                else:
                    inner_set.add(i)
            inner = self.resolve_conflict(list(inner_set))
            return t.multivalue(inner)

        l = priorities['null']
        for i in [str(x) for x in ts]:
            r = priorities[i]
            if l['priority'] > r['priority']:
                continue

            if r['name'] in l['super']:
                l = r
                continue

        return types[l['name']]

    def gen_schema(self, data:list[dict]):
        # get a set of keys to handle
        keyset = set()
        for i in data:
            if i:
                keyset |= set(i.keys())
        keyset = list(keyset)

        new = dict()
        for key in keyset:
            typeset = set()
            for datum in data:
                if key not in datum:
                    typeset.add(t.null())
                    continue

                if isinstance(datum[key], dict):
                    typeset.add(dict)
                    continue

                typeset.add(self.to_hql_type(datum[key]))

            if dict in typeset:
                if len(typeset) != 1 and t.null() not in typeset:
                    raise Exception(f"Cannot merge types {list(typeset)}")
                
                sub_data = []
                for i in data:
                    if key in i:
                        sub_data.append(i[key])
                new_schema = Schema(sub_data)

                new[key] = new_schema.schema
                self.mv_fields += [[key] + x for x in new_schema.mv_fields]

                continue

            new[key] = self.resolve_conflict(list(typeset))
            if isinstance(new[key], t.multivalue):
                self.mv_fields.append([key])

        return new
    
    def adjust_mv(self, data:list[dict], mv_fields:list[list[str]]=None):        
        if not mv_fields:
            mv_fields = self.mv_fields

        for field in mv_fields:
            key = field[0]
            if len(field) == 1:
                for row in data:
                    if key not in row:
                        continue

                    if isinstance(row[key], list):
                        continue

                    row[key] = [row[key]]
            else:
                rows = []
                for i in data:
                    if key in i:
                        rows.append(i[key])
                self.adjust_mv(rows, [field[1:]])

        return data
            

df1 = pl.from_dicts(a)
df2 = pl.from_dicts(b)
ab_schema = Schema(a+b)
print(json.dumps(ab_schema.schema, indent=2, default=repr))

data = ab_schema.adjust_mv(a+b)
df3 = pl.from_dicts(data, schema=Schema(a+b).to_pl_schema())
print(json.dumps(df3.to_dicts(), indent=2))


{
  "log": {
    "offset": "int",
    "id": {
      "resp_fuids": "null",
      "fuid": "string",
      "uid": "multivalue(string)"
    },
    "file": {
      "path": "string"
    }
  }
}
[
  {
    "log": {
      "offset": 9491215,
      "id": {
        "resp_fuids": null,
        "fuid": null,
        "uid": [
          "1017858934534263"
        ]
      },
      "file": {
        "path": "/nsm/import/acb5b3eacd804c4c17d25fb312581610/suricata/eve-2022-11-10-07:20.json"
      }
    }
  },
  {
    "log": {
      "offset": 7559607,
      "id": {
        "resp_fuids": null,
        "fuid": "FdZyiY2Kf5yf3L4239",
        "uid": [
          "CAnltOiClThlO6ZFk"
        ]
      },
      "file": {
        "path": "/nsm/import/acb5b3eacd804c4c17d25fb312581610/zeek/logs/files.log"
      }
    }
  }
]


In [114]:
unnested = [x['_source'] for x in json_data]
jschema = Schema(unnested)
unnested = jschema.adjust_mv(unnested)
df4 = pl.from_dicts(unnested, schema=jschema.to_pl_schema())

for i in df4.to_dicts():
    if 'client' in i and i['client'] and i['client']['ip_bytes']:
        print(i['client']['ip_bytes'])

800
1331
71
1331
361
478
1331
384
387
812
83
229
86
86
91
361
353
355
712
1371
610
1331
1331
1331
1331
1331
1371
2010
1331
5480
812
14634
812
91
1331
3445
100
5480
825
86
800
812
800
2748
385
800
71
80
71
9588
1371
1371
1371
87
380
800
1331
387
800
800
86
86
1331
84
1331
355
1331
345
384
1331
100
375
1331
1331
2604
1331
353
71
73
392
83
1624
1932
388
2010
84
1331
71
800
1331
1331
1371
1331
812
458
392
152
1331
800
384
812
1331
1371
4400
71
71
1331
1556
345
312
1331
152
3445
2366
83
800
385
800
812
381
83
1331
73
1331
370
812
1371
812
1208
183
4400
71
380
1331
1331
83
11739
1624
1331
86
388
856
4925
385
2700
384
380
825
812
1331
1371
1331
380
286
91
812
370
1371
812
304
800
1371
181
1371
181
183
2045
80
1208
1331
381
812
73
83
1331
1371
812
56
1331
800
800
800
1331
91
1331
1331
9588
375
388
1331
86
1371
1331
91
1331
1331
73
1331
1331
152
86
1331
385
812


In [108]:
def gen_elastic_schema(props:dict):
    schema = {}
    for i in props:
        if 'properties' in props[i]:
            schema[i] = gen_schema(props[i]['properties'])
            continue
        
        ptype = props[i]['type']
        if ptype in  ('scaled_float'):
            schema[i] = t.decimal()
        elif ptype in ('half_float', 'float'):
            schema[i] = t.float()
        elif ptype in ('double'):
            schema[i] = t.double()
        elif ptype in ('byte'):
            schema[i] = t.byte()
        elif ptype in ('short'):
            schema[i] = t.short()
        elif ptype in ('integer'):
            schema[i] = t.int()
        elif ptype in ('long'):
            schema[i] = t.long()
        elif ptype in ('unsigned_long'):
            schema[i] = t.ulong()
        elif ptype in ('ip'):
            schema[i] = t.ip()
        elif ptype in ('date', 'date_nanos'):
            schema[i] = t.datetime()
        elif ptype in ('date_range', 'integer_range', 'float_range', 'long_range', 'double_range', 'ip_range'):
            rtype = gen_schema({'rtype': {'type': ptype.replace('_range', '')}})['rtype']
            schema[i] = t.range(rtype, rtype)
        elif ptype in ('keyword', 'constant_keyword', 'wildcard', 'binary', 'text', 'match_only_text'):
            schema[i] = t.string()
        elif ptype in ('boolean'):
            schema[i] = t.bool()
        elif ptype in ('object', 'flattened', 'nested'):
            schema[i] = t.string()
        elif ptype in ('point', 'geo_point'):
            ptype = t.float()
            schema[i] = t.multivalue(ptype)
        # elif ptype in ('object', 'flattened', 'nested'):
        #     schema[i] = pl.
        # elif ptype in ('geo_point'):
        #     schema[i] = pl.String
        # elif ptype in ('')
        else:
            print(f'{i} {ptype}')

    return schema