# Document Versioning
**Summary:**  A testing ground for storing multiple versions of an ingested document   
**Pros:**  Data supports a forensic trail  
**Cons:**  Increased Read/Write Burden and data duplication  
**Setup:**  Set up a change stream on a data collection.  Enable a watch to write the previous document to a collection when the current revision is updated or a new document is inserted.     

In [None]:
import pymongo, random, datetime, time, json, yaml
from pymongo import MongoClient
import numpy as np
from bson.son import SON
from bson import ObjectId
import pandas as pd

In [None]:
client = MongoClient(port=27018)
db = client.patterns
current_collection = db.document_versioning_current
historic_collection = db.document_versioning_history 

# Change Streams Test

In [None]:
test_collection = db.test
with test_collection.watch([], 'updateLookup' ) as stream:
    for change in stream:
        print(yaml.dump(change, default_flow_style=False))

# Split on Insert

In [None]:
pipeline = [
    {"$match": {"operationType": "insert" }},
    {"$addFields": {"document_version_1": 
                    { "event":"$$ROOT.fullDocument.event", 'document_version':"schema_v1"}}},
    {"$addFields": {"document_version_2": 
                    { "event":"$$ROOT.fullDocument.event",'document_version':"schema_v2"}}}
]

In [None]:
with current_collection.watch(pipeline, 'updateLookup' ) as stream:
    for change in stream:
        if change['operationType'] == 'insert':
            print(f'INSERT Change detected @ {datetime.datetime.utcnow()} - {change["fullDocument"]["_id"]}')
            historic_collection.insert_many([change['document_version_1'],change['document_version_2']])
            print('Documents created for schema versions 1 and 2')
            #current_collection.delete_one({'_id':change['fullDocument']['_id']})

# Split on Insert or Change with Update

In [25]:
pipeline = [
    {"$match": {'$or':[
        {"operationType":"insert"},
        {"operationType":"update"}
    ]}},
    {
        "$addFields": {
            "document_version_1": { 
                "event":"$$ROOT.fullDocument.event", 
                'document_version':"schema_v1", 
                'source_id':'$$ROOT.fullDocument._id',
                'deployment_type':'Big Iron',
                'created': datetime.datetime.now()
            },
            "document_version_2": { 
                "event":"$$ROOT.fullDocument.event", 
                'document_version':"schema_v2", 
                'source_id':'$$ROOT.fullDocument._id',
                'deployment_type':'Cloud VM',
                'created': datetime.datetime.now()
            }
        }
    }
]

In [None]:
common_update_fields = ['event']
with current_collection.watch(pipeline, 'updateLookup' ) as stream:
    for change in stream:
        if change['operationType'] == 'insert':
            print(f'INSERT Change detected @ {datetime.datetime.utcnow()} - {change["fullDocument"]["_id"]}')
            historic_collection.insert_many([change['document_version_1'],change['document_version_2']])
            print('Documents created for schema versions 1 and 2')
            #current_collection.delete_one({'_id':change['fullDocument']['_id']})
        elif change['operationType'] == 'update':
            print(f'UPDATE Change detected @ {datetime.datetime.utcnow()} - {change["fullDocument"]["_id"]}')
            updated_fields = change['updateDescription']['updatedFields'].keys()
            # case all fields present
            if all([x in common_update_fields for x in updated_fields]):
                historic_update_status = historic_collection.update_many({'source_id':change['fullDocument']['_id']}, 
                                                {'$set':change['updateDescription']['updatedFields']})
                print(f'Matched Documents: {historic_update_status.matched_count}')
                print(f'Updated Documents: {historic_update_status.modified_count}')
            # case some fields present
            elif any([x in common_update_fields for x in updated_fields]):
                documents = historic_collection.find({'source_id':change['fullDocument']['_id']})
                for document in documents:
                    print(document.keys())
                    for key in updated_fields:
                        if key in document.keys():
                            document[key] = change['fullDocument'][key]
                    document_update = historic_collection.replace_one({'_id':document['_id']}, document)
                    print(f'PARTIAL UPDATE Change {datetime.datetime.utcnow()} - {document["_id"]}')

INSERT Change detected @ 2022-03-22 19:20:04.430798 - 623a2164822e42eff6d3943b
Documents created for schema versions 1 and 2
UPDATE Change detected @ 2022-03-22 19:20:26.365668 - 623a2164822e42eff6d3943b
Matched Documents: 2
Updated Documents: 2
UPDATE Change detected @ 2022-03-22 19:30:29.489844 - 623a2164822e42eff6d3943b
UPDATE Change detected @ 2022-03-22 19:40:24.766046 - 623a2164822e42eff6d3943b
dict_keys(['_id', 'event', 'document_version', 'source_id', 'deployment_type', 'created'])
PARTIAL UPDATE Change 2022-03-22 19:40:24.769776 - 623a21640339d6672fe18b43
dict_keys(['_id', 'event', 'document_version', 'source_id', 'deployment_type', 'created'])
PARTIAL UPDATE Change 2022-03-22 19:40:24.771855 - 623a21640339d6672fe18b44
