Establishing connection to MongoDB server running on specified IP address and port

In [1]:
from pymongo import MongoClient  # Import MongoClient class
from datetime import datetime  # Import datetime 

# Defining get_db_connection function which receives ip, port and database name (db_name)
def get_db_connection(ip, port, db_name):
    try:
        # Create an instance of the class MongoClient. 
        # This initiates a connection to the MongoDB server on the specified IP and port
        client = MongoClient(ip, port)

        # Once the connection is established, we can access the specific database with the name 'db_name'
        db = client[db_name]

        # The function returns the 'db' object that allows interaction with the database
        return db

    # This block will be executed if there is an exception during the 'try' execution
    except Exception as e:
        # Prints an error message with the exception description
        print(f"Error connecting to database: {e}")

        # Returns None if the connection failed
        return None

# establishing connection to the MongoDB server
db = get_db_connection('localhost', 27017, 'myDatabase')

# If the function returned None, then the connection failed
if db is None:
    print("Database connection failed!")

else:
    print("Database connection successful!")


Database connection successful!


This set of functions filters data in a MongoDB collection according to different criteria.

In [2]:
from pymongo import MongoClient

# Function to filter documents based on fields "sequence.channel" and "sequence.service"
def filter_by_channel_and_service(db, collection_name, channel, service):
    query = {"sequence.channel": channel, "sequence.service": service}
    print(f"Query: {query}")
    # Executes the query on the specified collection
    result = db[collection_name].find(query)
    return result

# Function to get the timestamp documents 
def get_time_range(db, collection):
    # Searches for the first document based on the "sequence.timestamp" field in ascending order
    first_document = db[collection].find().sort("sequence.timestamp", 1).limit(1)
    # Searches for the last document based on the "sequence.timestamp" field in descending order
    last_document = db[collection].find().sort("sequence.timestamp", -1).limit(1)

    # Extract timestamp from first and last document
    start_time = first_document[0]["sequence"]["timestamp"]
    end_time = last_document[0]["sequence"]["timestamp"]

    return start_time, end_time

# Function to filter documents based on a time range
def filter_by_date(db, collection, start_time, end_time):
    try:
        # Build the search query
        query = {
            "sequence.timestamp": {
                "$gte": start_time,
                "$lte": end_time
            }
        }
        count = db[collection].count_documents(query)
        print(f"Count: {count}")
        cursor = db[collection].find(query)
        return cursor
    
    except Exception as e:
        print(f"Erro: {e}")
        return None

# Function to filter documents "sequence.schema"
def filter_by_schema(db, collection_name, schema):
    query = {"sequence.schema": schema}
    result = db[collection_name].find(query)
    return result

# Function to filter documents "payload.timeout"
def filter_by_timeout(db, collection_name, timeout):
    query = {"payload.timeout": timeout}
    result = db[collection_name].find(query)
    return result

# Function to filter documents "payload.publishes"
def filter_by_publishes(db, collection_name, publishes):
    query = {"payload.publishes": publishes}
    result = db[collection_name].find(query)
    return result

# Function to filter documents "payload.subscribes"
def filter_by_subscribes(db, collection_name, subscribes):
    query = {"payload.subscribes": subscribes}
    result = db[collection_name].find(query)
    return result

# Function to filter documents "payload.next_alive_interval"
def filter_by_next_alive_interval(db, collection_name, next_alive_interval):
    # If next_alive_interval is None, returns all documents
    if next_alive_interval is None:
        return db[collection_name].find() 
    else:
        query = {"payload.next_alive_interval": next_alive_interval}
        result = db[collection_name].find(query)
        return result

This code uses the get_time_range function to get the start time (start_time) and end time (end_time) of the documents in the "myCollection" collection of the MongoDB database.

These times are based on the "sequence.timestamp" field of the documents in the collection. The get_time_range function searches for the oldest  and the newest document and returns their respective times.

In [3]:
start_time, end_time = get_time_range(db, "myCollection")
print(f"Start time: {start_time}")
print(f"End time: {end_time}")

Start time: 2021-03-29T14:13:34.249+02:00
End time: 2021-07-12T08:38:44.344+0200


Filtering by date

In [4]:
start_time = "2021-03-29T14:13:34.249+02:00"
end_time = "2021-03-29T14:14:01.285+02:00"
cursor = filter_by_date(db, "myCollection", start_time, end_time)
if cursor:
    for document in cursor:
        print(document)

Count: 10
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde6'), 'sequence': {'timestamp': '2021-03-29T14:13:34.249+02:00', 'number': 2, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde7'), 'sequence': {'timestamp': '2021-03-29T14:13:37.253+02:00', 'number': 3, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde8'), 'sequence': {'timestamp': '2021-03-29T14:13:40.257+02:00', 'number': 4, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde9

In [5]:
# Defining test parameters
collection_name = "myCollection"
channel = "status"
service = "mediator"

cursor = filter_by_channel_and_service(db, collection_name, channel, service)

# Checking if the function returned a cursor
if cursor is not None:
    for document in cursor:
        print(document)
else:
    print("No documents found.")

Query: {'sequence.channel': 'status', 'sequence.service': 'mediator'}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde6'), 'sequence': {'timestamp': '2021-03-29T14:13:34.249+02:00', 'number': 2, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde7'), 'sequence': {'timestamp': '2021-03-29T14:13:37.253+02:00', 'number': 3, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde8'), 'sequence': {'timestamp': '2021-03-29T14:13:40.257+02:00', 'number': 4, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, '

In [6]:
# Defining test parameters
collection_name = "myCollection"
schema = "schema_status"

cursor = filter_by_schema(db, collection_name, schema)

# Checking if the function returned a cursor
if cursor is not None:
    for document in cursor:
        print(document)
else:
    print("No documents found.")

{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde6'), 'sequence': {'timestamp': '2021-03-29T14:13:34.249+02:00', 'number': 2, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde7'), 'sequence': {'timestamp': '2021-03-29T14:13:37.253+02:00', 'number': 3, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde8'), 'sequence': {'timestamp': '2021-03-29T14:13:40.257+02:00', 'number': 4, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde9'), 'seque

In [7]:
# Defining test parameters
collection_name = "myCollection"
timeout = 20000

cursor = filter_by_timeout(db, collection_name, timeout)

# Checking if the function returned a cursor
if cursor is not None:
    for document in cursor:
        print(document)
else:
    print("No documents found.")

{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde6'), 'sequence': {'timestamp': '2021-03-29T14:13:34.249+02:00', 'number': 2, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde7'), 'sequence': {'timestamp': '2021-03-29T14:13:37.253+02:00', 'number': 3, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde8'), 'sequence': {'timestamp': '2021-03-29T14:13:40.257+02:00', 'number': 4, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde9'), 'seque

In [8]:
# Defining test parameters
collection_name = "myCollection"
publishes = []
subscribes = ["status"]
next_alive_interval = 3000

cursor = filter_by_publishes(db, collection_name, publishes)

# Checking if the function returned a cursor
if cursor is not None:
    for document in cursor:
        print(document)
else:
    print("No documents found.")

{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde6'), 'sequence': {'timestamp': '2021-03-29T14:13:34.249+02:00', 'number': 2, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde7'), 'sequence': {'timestamp': '2021-03-29T14:13:37.253+02:00', 'number': 3, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde8'), 'sequence': {'timestamp': '2021-03-29T14:13:40.257+02:00', 'number': 4, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde9'), 'seque

In [9]:
cursor = filter_by_subscribes(db, collection_name, subscribes)

# Checking if the function returned a cursor
if cursor is not None:
    for document in cursor:
        print(document)
else:
    print("No documents found.")

{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde6'), 'sequence': {'timestamp': '2021-03-29T14:13:34.249+02:00', 'number': 2, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde7'), 'sequence': {'timestamp': '2021-03-29T14:13:37.253+02:00', 'number': 3, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde8'), 'sequence': {'timestamp': '2021-03-29T14:13:40.257+02:00', 'number': 4, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde9'), 'seque

In [10]:
cursor = filter_by_next_alive_interval(db, collection_name, next_alive_interval)

# Checking if the function returned a cursor
if cursor is not None:
    for document in cursor:
        print(document)
else:
    print("No documents found.")

{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde6'), 'sequence': {'timestamp': '2021-03-29T14:13:34.249+02:00', 'number': 2, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde7'), 'sequence': {'timestamp': '2021-03-29T14:13:37.253+02:00', 'number': 3, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde8'), 'sequence': {'timestamp': '2021-03-29T14:13:40.257+02:00', 'number': 4, 'channel': 'status', 'service': 'mediator', 'schema': 'schema_status'}, 'payload': {'service': 'mediator', 'publishes': [], 'subscribes': ['status'], 'next_alive_interval': 3000, 'timeout': 20000}}
{'_id': ObjectId('64bfbe9a6aca1b1e26d7fde9'), 'seque

get_available_channels(db, collection_name):

This function returns all distinct channels present in the documents of the specified collection. It uses MongoDB's aggregation operation to group the documents by the "sequence.channel" key and return each distinct channel value.


get_available_services(db, collection_name):

This function is similar to the get_available_channels function, but groups documents by service instead of channel.


get_oldest_and_newest_date_by_channel(db, collection_name, channel):

This function returns the earliest and latest dates for a specific channel.


get_oldest_and_newest_date_by_schema(db, collection_name, schema):

This function is similar to get_oldest_and_newest_date_by_channel, but filters documents by schema instead of channel.


get_messages_by_service(db, collection_name, service):

This function returns information about all channels for a specific service. It groups documents by channel, counts the number of records, and finds the oldest and newest dates for each channel.


get_message_fields(db, collection_name, channel, service):

This function returns information about the fields in the document payload for a specific channel and service. It returns the name of each field, plus a placeholder for the field type, size, and data, which you'll need to fill in based on the schema and documents.

In [11]:
def get_available_channels(db, collection_name):
    # Groups documents by the key "sequence.channel" and returns the result
    pipeline = [
        {"$group": {"_id": "$sequence.channel"}}
    ]
    # Uses the aggregation operation to run the pipeline on the database
    result = db[collection_name].aggregate(pipeline)
    # Creates a list of all returned channels
    channels = [doc["_id"] for doc in result]
    return channels


def get_available_services(db, collection_name):
    # Groups documents by the key "sequence.service" and returns the result
    pipeline = [
        {"$group": {"_id": "$sequence.service"}}
    ]
    # Uses the aggregation operation to run the pipeline on the database
    result = db[collection_name].aggregate(pipeline)
    # Creates a list of all returned services
    services = [doc["_id"] for doc in result]
    return services

def get_oldest_and_newest_date_by_channel(db, collection_name, channel):
    # Filters documents by the specified channel and groups them to get the oldest and newest dates
    pipeline = [
        {"$match": {"sequence.channel": channel}},
        {"$group": {"_id": None, "oldest": {"$min": "$sequence.timestamp"}, "newest": {"$max": "$sequence.timestamp"}}}
    ]
    # Uses the aggregation operation to run the pipeline on the database
    result = db[collection_name].aggregate(pipeline)
    # Get the first document returned
    document = next(result, None)
    if document:
        oldest_date = document["oldest"]
        newest_date = document["newest"]
        return oldest_date, newest_date
    else:
        return None, None
    
def get_oldest_and_newest_date_by_schema(db, collection_name, schema):
    # Filters the documents by the specified schema and groups them to get the oldest and newest dates
    pipeline = [
        {"$match": {"sequence.schema": schema}},
        {"$group": {"_id": None, "oldest": {"$min": "$sequence.timestamp"}, "newest": {"$max": "$sequence.timestamp"}}}
    ]
    # Uses the aggregation operation to run the pipeline on the database
    result = db[collection_name].aggregate(pipeline)
    # Get the first document returned
    document = next(result, None)
    if document:
        oldest_date = document["oldest"]
        newest_date = document["newest"]
        return oldest_date, newest_date
    else:
        return None, None
    
def get_messages_by_service(db, collection_name, service):
    # Filters documents by specified service, groups them by channel, and returns information about each group
    pipeline = [
        {"$match": {"sequence.service": service}},
        {"$group": {"_id": "$sequence.channel", "schema": {"$first": "$sequence.schema"}, "records": {"$sum": 1}, "since": {"$min": "$sequence.timestamp"}, "until": {"$max": "$sequence.timestamp"}}},
        {"$project": {"_id": 0, "channel": "$_id", "schema": 1, "records": 1, "since": 1, "until": 1}}
    ]
    # Uses the aggregation operation to run the pipeline on the database
    result = db[collection_name].aggregate(pipeline)
    # Creates a list of all returned messages
    messages = []
    for doc in result:
        message = {
            "channel": doc["channel"],
            "schema": doc["schema"],
            "records": doc["records"],
            "since": doc["since"],
            "until": doc["until"]
        }
        messages.append(message)
    return messages

def get_message_fields(db, collection_name, channel, service):
    # Filters the documents by the specified channel and service, extracts the fields from the payload and returns information about each field
    pipeline = [
        {"$match": {"sequence.channel": channel, "sequence.service": service}},
        {"$limit": 1},
        {"$project": {"fields": {"$objectToArray": "$payload"}}},
        {"$unwind": "$fields"},
        {"$group": {"_id": None, "fields": {"$push": "$fields.k"}}},
        {"$project": {"_id": 0, "fields": 1}}
    ]
    # Uses the aggregation operation to run the pipeline on the database
    result = db[collection_name].aggregate(pipeline)
    # Get the first document returned
    document = next(result, None)
    if document:
        fields = document["fields"]
        num_fields = len(fields)
        data = []
        for i, field in enumerate(fields):
            field_data = {
                "field": field,
                "type_of_field": "",  # needs to be filled in based on the schema
                "size": 0,  # needs to be filled in based on the schema
                "data": []  # needs to be filled in based on the documents
            }
            data.append({str(i): field_data})
        return {
            "status": "OK",
            "service": service,
            "channel": channel,
            "schema": "",  # Schema needs to be filled
            "since": "",  # The oldest date needs to be filled in based on the documents
            "to": "",  # The newest date needs to be filled in based on the documents
            "fields": num_fields,
            "size": num_fields,
            "data": data
        }
    else:
        return None


In [12]:
# Testing the function get_available_channels
channels = get_available_channels(db, "myCollection")
print("Available Channels:")
print(channels)

# Testing the function get_available_services
services = get_available_services(db, "myCollection")
print("Available Services:")
print(services)


Available Channels:
['ET_Tomograms', 'log', 'DateChannel', 'NI_vortex_control', 'NI_vortex_measurement', 'status', 'ET-OfflineMode', 'TimeChannel', 'CzwarteczekChannel', 'PiateczekChannel']
Available Services:
['mediator', 'TomoKISStudio#GameBoard', 'TomoKISStudio#15201', 'TomoKISStudio#2853', 'TomoKISStudio#17797', 'TomoKISStudio#10629', 'TomoKISStudio#PawelGameBoard', 'TomoKISStudio#11622', 'TomoKISStudio#9346', 'TomoKISStudio#21315', 'TomoKISStudio#16216', 'TomoKISStudio#17500', 'TomoKISStudio#17383', 'SIMULATOR', 'TomoKISStudio#PLATOMK4', 'TomoKISStudio#14819', 'Zegarynka', 'TomoKISStudio#18192', 'TomoKISStudio#8406', 'TomoKISStudio#775', 'NI_CYKLON', 'TomoKISStudio#20143', 'PiateczekDetector', 'TomoKISStudio#RadekLaptop', 'Kangur', 'TomoKISStudio#14163', 'TomoKISStudio#PLATOM4', 'TomoKISStudio#9930', 'TomoKISStudio#15995', 'TomoKISStudio#18460', 'CzwarteczekDetector', 'TomoKISStudio#13503', 'RandomizerKanalowService']


In [13]:
# Testing the function get_oldest_and_newest_date_by_channel
channel = "NI_vortex_measurement"
oldest_date, newest_date = get_oldest_and_newest_date_by_channel(db, "myCollection", channel)
print(f"Channel: {channel}")
print(f"Oldest Date: {oldest_date}")
print(f"Newest Date: {newest_date}")

# Testing the function get_oldest_and_newest_date_by_schema
schema = "NI_vortex_measurement"
oldest_date, newest_date = get_oldest_and_newest_date_by_schema(db, "myCollection", schema)
print(f"Schema: {schema}")
print(f"Oldest Date: {oldest_date}")
print(f"Newest Date: {newest_date}")


Channel: NI_vortex_measurement
Oldest Date: 2021-06-21T11:25:13.803+0200
Newest Date: 2021-07-11T22:09:54.007+0200
Schema: NI_vortex_measurement
Oldest Date: 2021-06-21T11:25:13.803+0200
Newest Date: 2021-07-11T22:09:54.007+0200


In [14]:
# Testing the function get_messages_by_service
service = "NI_CYKLON"
messages = get_messages_by_service(db, "myCollection", service)
print(f"Messages sent by service '{service}':")
print(messages)

Messages sent by service 'NI_CYKLON':
[{'channel': 'status', 'schema': 'schema_status', 'records': 314, 'since': '2021-06-21T11:21:53.470+0200', 'until': '2021-07-11T22:09:53.929+0200'}, {'channel': 'NI_vortex_measurement', 'schema': 'NI_vortex_measurement', 'records': 270, 'since': '2021-06-21T11:25:13.803+0200', 'until': '2021-07-11T22:09:54.007+0200'}]


In [15]:
# Testing the function get_message_fields
channel = "NI_vortex_control"
service = "TomoKISStudio#15995"
fields_info = get_message_fields(db, "myCollection", channel, service)
print("Message Fields:")
print(fields_info)

Message Fields:
None


This class is used to represent a user's configuration, including an IP address, a port, some filters, and the location of a logger. Configuration information can be saved to a JSON file and loaded from a JSON file.

In [16]:
import json

# Define the class UserConfig
class UserConfig:
    def __init__(self, ip_address, port, filters, logger_location):
        self.ip_address = ip_address
        self.port = port
        self.filters = filters
        self.logger_location = logger_location

    # Method to save user configuration to a JSON file
    def save(self, filename):
        with open(filename, 'w') as f:  # Open file in recording mode
            # Use json module's dump method to write configuration to file
            # self.__dict__ is a dictionary that contains the properties of the class instance
            json.dump(self.__dict__, f)  

    # Class method for loading user configuration from a JSON file
    @classmethod
    def load(cls, filename):
        with open(filename, 'r') as f:  # Open file in read mode
            config = json.load(f)  # Load the content of the JSON file into a dictionary
        return cls(**config)


In [17]:
# Create a new configuration
config = UserConfig('127.0.0.1', 8000, {"channel": "ET_Tomograms", "service": "TomoKISStudio#2853"}, 'app.log')

# Save the configuration to a file
config.save('user_config.json')

# Load configuration from a file
loaded_config = UserConfig.load('user_config.json')
print(loaded_config.ip_address)  # Outputs: 127.0.0.1


127.0.0.1


Python script that creates a GUI (Grafical User Interface) for filtering and visualizing data from a MongoDB collection. Here are the details of the script's main components:

Auxiliary functions: Functions that assist in reading and writing the JSON configuration file, connecting to the MongoDB database, initializing the log and manipulating data.

GUI-specific functions: Functions that perform specific actions when buttons are pressed in the GUI. This includes filtering data based on input values, obtaining channels, services, oldest and newest dates, messages by service, message fields, and plotting graphs.

create_gui() function: The main function that creates the GUI using the tkinter package. It creates labels, input fields, buttons, and a text area to display the results.

Connection to MongoDB database: The script connects to MongoDB using the IP address and port provided in the JSON configuration file.

Exception Handling: The script also has exception handling to deal with  possible errors during execution.

Settings: The script reads a configuration file at the beginning to determine the database connection parameters, filters to apply, and the log file. If the configuration file does not exist, it will create one with default values.

Graph Plotting: Using the matplotlib library, this script is capable of plotting graphs with the filtered data from MongoDB.

In [18]:
import tkinter as tk
from pymongo import MongoClient
import logging
import json
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from tkinter import messagebox, Scrollbar, Canvas

# Function to retrieve MongoDB payload data
def get_payload_data(cursor):
    data = []
    for document in cursor:
        if "payload" in document:
            payload = document["payload"]
            if "image" in payload:
                image = payload["image"]
                if "data" in image:
                    item = {
                        "timestamp": document["sequence"]["timestamp"],
                        "data": image["data"][0]
                    }
                    data.append(item)
    return data

# Function to start logging
def init_logging(log_file):
    logging.basicConfig(filename=log_file, level=logging.ERROR,
                        format='%(asctime)s - %(levelname)s - %(message)s')
    
# Function to load the configuration file
def load_config(config_file):
    with open(config_file, 'r') as f:
        config = json.load(f)
    return config

# Function to save the configuration file
def save_config(config, config_file):
    with open(config_file, 'w') as f:
        json.dump(config, f, indent=4)

# Main function to create the GUI
def create_gui(db, collection_name):

    # Auxiliary functions
    def execute_filters():
        try:
            channel = channel_entry.get()
            service = service_entry.get()
            schema = schema_entry.get()
            publishes = publishes_entry.get().split(",") if publishes_entry.get() else []
            subscribes = subscribes_entry.get().split(",") if subscribes_entry.get() else []
            next_alive_interval = int(next_alive_interval_entry.get()) if next_alive_interval_entry.get() else None

            # create filter
            filters = {}
            if channel:
                filters["sequence.channel"] = channel
            if service:
                filters["sequence.service"] = service
            if schema:
                filters["sequence.schema"] = schema
            if publishes:
                filters["payload.publishes"] = {"$in": publishes}
            if subscribes:
                filters["payload.subscribes"] = {"$in": subscribes}
            if next_alive_interval is not None:
                filters["payload.next_alive_interval"] = next_alive_interval
            
            cursor = db[collection_name].find(filters)

            # Output the results
            results_text.delete('1.0', tk.END)
            results_text.insert(tk.END, "Filtered Data:\n")
            for document in cursor:
                results_text.insert(tk.END, str(document) + "\n")
            results_text.insert(tk.END, "End of results.")
        
            # Save user's filter values
            config['filters'] = {
                'channel': channel,
                'service': service,
                'schema': schema,
                'publishes': publishes,
                'subscribes': subscribes,
                'next_alive_interval': next_alive_interval
            }
            save_config(config, config_file)

        except Exception as e:
            error_message = str(e)
            messagebox.showerror("Error", error_message)
            logging.error(f"Error executing filters: {error_message}")
                
    def execute_get_channels():
        try:
            results_text.delete('1.0', tk.END)  # New line to clear the text field
            channels = get_available_channels(db, collection_name)
            results_text.insert(tk.END, f"Channels: {channels}\n")
        except Exception as e:
            error_message = str(e)
            messagebox.showerror("Error", str(e))
            logging.error(f"Error getting channels: {error_message}")

    def execute_get_services():
        try:
            results_text.delete('1.0', tk.END)  # New line to clear the text field
            services = get_available_services(db, collection_name)
            results_text.insert(tk.END, f"Services: {services}\n")
        except Exception as e:
            error_message = str(e)
            messagebox.showerror("Error", str(e))
            logging.error(f"Error getting services: {error_message}")

    def execute_get_oldest_and_newest_date_by_channel():
        try:
            results_text.delete('1.0', tk.END)  # New line to clear the text field
            channel = channel_entry.get()
            oldest_date, newest_date = get_oldest_and_newest_date_by_channel(db, collection_name, channel)
            results_text.insert(tk.END, f"For channel '{channel}', oldest date: {oldest_date}, newest date: {newest_date}\n")
        except Exception as e:
            error_message = str(e)
            messagebox.showerror("Error", str(e))
            logging.error(f"Error getting oldest and newest date by channel: {error_message}")

    def execute_get_oldest_and_newest_date_by_schema():
        try:
            results_text.delete('1.0', tk.END)  # New line to clear the text field
            schema = schema_entry.get()
            oldest_date, newest_date = get_oldest_and_newest_date_by_schema(db, collection_name, schema)
            results_text.insert(tk.END, f"For schema '{schema}', oldest date: {oldest_date}, newest date: {newest_date}\n")
        except Exception as e:
            error_message = str(e)
            messagebox.showerror("Error", str(e))
            logging.error(f"Error getting oldest and newest date by schema: {error_message}")

    def execute_get_messages_by_service():
        try:
            results_text.delete('1.0', tk.END)  # New line to clear the text field
            service = service_entry.get()
            messages = get_messages_by_service(db, collection_name, service)
            results_text.insert(tk.END, f"Messages by service '{service}': {messages}\n")
        except Exception as e:
            error_message = str(e)
            messagebox.showerror("Error", str(e))
            logging.error(f"Error getting messages by service: {error_message}")

    def execute_get_message_fields():
        try:
            results_text.delete('1.0', tk.END)  # New line to clear the text field
            channel = channel_entry.get()
            service = service_entry.get()
            message_fields = get_message_fields(db, collection_name, channel, service)
            results_text.insert(tk.END, f"Message fields for channel '{channel}' and service '{service}': {message_fields}\n")
        except Exception as e:
            error_message = str(e)
            messagebox.showerror("Error", str(e))
            logging.error(f"Error getting messages by fields: {error_message}")
    
    def plot_graph():
        try:
            # Gets values from input widgets
            channel = channel_entry.get()
            service = service_entry.get()
            time0 = start_time
            time1 = end_time

            cursor = filter_by_channel_and_service(db, 'myCollection', channel, service)

            data = get_payload_data(cursor)

            df = pd.DataFrame(data)
            df['data_0'] = df['data'].apply(lambda x: x[0] if len(x) > 0 else None)
            df['data_1'] = df['data'].apply(lambda x: x[1] if len(x) > 1 else None)

            df = df.dropna(subset=['data_0', 'data_1'])

            fig = Figure(figsize=(10, 5))
            a = fig.add_subplot(111)

            a.plot(df['timestamp'], df['data_0'], label='first position of data row by time')
            a.plot(df['timestamp'], df['data_1'], label='second position of data row by time')

            a.set_title('Data over time')
            a.set_ylabel('Value')
            a.set_xlabel('Timestamp')
            a.legend(loc='best')
            a.text(0,0.07,"start: " + time0)
            a.text(0,0.06,"end: " + time1)

            # Create a new Frame for matplotlib figure
            plot_frame = tk.Frame(frame)
            plot_frame.grid(row=14, column=0, columnspan=2)

            # Create tkinter widget for matplotlib plot
            canvas = FigureCanvasTkAgg(fig, master=plot_frame) 
            canvas.draw()
            canvas.get_tk_widget().pack()

        except Exception as e:
            messagebox.showerror("Error", str(e))

     # GUI creation
    root = tk.Tk()
    root.title("Data Filtering Tool")
    
    # Creates the main Frame that will host the Canvas and the Scrollbar
    main_frame = tk.Frame(root)
    main_frame.pack(fill=tk.BOTH, expand=1)
    
    # Creates the Canvas
    canvas = tk.Canvas(main_frame)
    canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=1)
    
    # Creates the Scrollbar
    scrollbar = tk.Scrollbar(main_frame, orient=tk.VERTICAL, command=canvas.yview)
    scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
    
    # Add the frame inside the canvas
    frame = tk.Frame(canvas)
    
    # Add scrollbar to canvas
    canvas.configure(yscrollcommand=scrollbar.set)
    
    # Add the frame to the canvas window
    canvas.create_window((0,0), window=frame, anchor='nw')
    
    # Update canvas scrolling region when frame size changes
    frame.bind('<Configure>', lambda e: canvas.configure(scrollregion=canvas.bbox('all')))

    tk.Label(frame, text="Channel:").grid(row=0, column=0)
    channel_entry = tk.Entry(frame)
    channel_entry.grid(row=0, column=1)

    tk.Label(frame, text="Service:").grid(row=1, column=0)
    service_entry = tk.Entry(frame)
    service_entry.grid(row=1, column=1)

    tk.Label(frame, text="Schema:").grid(row=2, column=0)
    schema_entry = tk.Entry(frame)
    schema_entry.grid(row=2, column=1)

    tk.Label(frame, text="Publishes (comma-separated):").grid(row=3, column=0)
    publishes_entry = tk.Entry(frame)
    publishes_entry.grid(row=3, column=1)

    tk.Label(frame, text="Subscribes (comma-separated):").grid(row=4, column=0)
    subscribes_entry = tk.Entry(frame)
    subscribes_entry.grid(row=4, column=1)

    tk.Label(frame, text="Next alive interval:").grid(row=5, column=0)
    next_alive_interval_entry = tk.Entry(frame)
    next_alive_interval_entry.grid(row=5, column=1)

    filter_button = tk.Button(frame, text="Filter", command=execute_filters)
    filter_button.grid(row=6, column=0, columnspan=2)

    plot_button = tk.Button(frame, text="Plot Graph", command=plot_graph)
    plot_button.grid(row=15, column=0, columnspan=2)

    results_text = tk.Text(frame)
    results_text.grid(row=7, column=0, columnspan=2)

    # Additional buttons for functions
    get_channels_button = tk.Button(frame, text="Get Channels", command=execute_get_channels)
    get_channels_button.grid(row=8, column=0, columnspan=2)
        
    get_services_button = tk.Button(frame, text="Get Services", command=execute_get_services)
    get_services_button.grid(row=9, column=0, columnspan=2)

    get_oldest_and_newest_date_by_channel_button = tk.Button(frame, text="Get Oldest and Newest Date by Channel", command=execute_get_oldest_and_newest_date_by_channel)
    get_oldest_and_newest_date_by_channel_button.grid(row=10, column=0, columnspan=2)
        
    get_oldest_and_newest_date_by_schema_button = tk.Button(frame, text="Get Oldest and Newest Date by Schema", command=execute_get_oldest_and_newest_date_by_schema)
    get_oldest_and_newest_date_by_schema_button.grid(row=11, column=0, columnspan=2)

    get_messages_by_service_button = tk.Button(frame, text="Get Messages by Service", command=execute_get_messages_by_service)
    get_messages_by_service_button.grid(row=12, column=0, columnspan=2)

    get_message_fields_button = tk.Button(frame, text="Get Message Fields", command=execute_get_message_fields)
    get_message_fields_button.grid(row=13, column=0, columnspan=2)

    frame.update_idletasks()

# Configure the canvas scroll area to match the frame size
    canvas.configure(scrollregion=canvas.bbox('all'))

    root.mainloop()

config_file = 'config.json'

# Load user config
try:
    config = load_config(config_file)
except FileNotFoundError:
    config = {
        'db_ip': 'localhost',
        'db_port': 27017,
        'filters': {},
        'log_file': 'app.log'
    }

init_logging(config['log_file'])
init_logging('app.log')

# Connecting to MongoDB Database
db = get_db_connection('localhost', 27017, 'myDatabase')

# Calling the function to create the UI
create_gui(db, "myCollection")


Query: {'sequence.channel': 'ET_Tomograms', 'sequence.service': 'TomoKISStudio#2853'}
