# Import Package

In [75]:
import re
import socket
from datetime import datetime
import mysql.connector
import paramiko
import os
import pymysql
import json
import smtplib
import schedule
import time
import pymongo
import random
import pymysql
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from bson import SON
from pymongo import MongoClient
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent
from pymysqlreplication.event import RotateEvent,QueryEvent


# Mysql Connection and Bin Logs Analysis

In [64]:
def analyze_bin_logs(host, port, username, password):
    try:
        # Connection configuration
        conn_config = {'host': host,
                       'port': port,
                       'user': username,
                       'password': password}
        
        # Create list to store the events
        analyzed_results = []
        
        # Create BinLogStreamReader object
        stream = BinLogStreamReader(connection_settings=conn_config, only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent],server_id=2)

        # Iterate over events in the binary logs
        for event in stream:
            # Process the event
            if event.__class__.__name__ == 'WriteRowsEvent':
                event_data = {
                    "event_type" : "Insert event",
                    "Table" : event.table,
                    "Affected rows" : str(event.rows)
                }
                analyzed_results.append(event_data)
                
                print("Insert event")
                print("Table:", event.table)
                print("Affected rows:", event.rows)
                print()
            elif event.__class__.__name__ == 'UpdateRowsEvent':
                event_data = {
                    "event_type" : "Update event",
                    "Table" : event.table,
                    "Affected rows" : str(event.rows)
                }
                analyzed_results.append(event_data)                
                print("Update event")
                print("Table:", event.table)
                print("Affected rows:", event.rows)
                print()
            elif event.__class__.__name__ == 'DeleteRowsEvent':
                event_data = {
                    "event_type" : "Delete event",
                    "Table" : event.table,
                    "Affected rows" : str(event.rows)
                }
                analyzed_results.append(event_data)                
                print("Delete event")
                print("Table:", event.table)
                print("Affected rows:", event.rows)
                print()

        # Close the stream
        stream.close()
        
        #Save list as json in localhost
#         with open('bin_log_analysis.json','w') as json_file:
#             json.dump(analyzed_results, json_file,indent = 4)
#             print(analyzed_results)
    # if error    
    except Exception as e:
        print("Error:", e)
        
    return analyzed_results


        


# Migrate Data to MongoDB

In [65]:
def migrate_to_mongodb(data, mongodb_uri, database_name, collection_name):
    
    try:
        # Connect to MongoDB
        client = MongoClient(mongodb_uri)

        # Select database
        db = client[database_name]

        # Select collection
        collection = db[collection_name]

        # Insert data into collection
        collection.insert_many(data)

        print("Data successfully migrated to MongoDB collection.")

    except Exception as e:
        print("Error:", e)

# Aggregate analysis in MongoDB

In [61]:
def connect_to_mongodb(uri, collection_name):
    try:
        client = MongoClient(uri)
        db = client[collection_name]
        return db
    except Exception as e:
        print("Error connecting to MongoDB:", e)
        return None

def analyze_binlog_data(collection):
    try:
        result = collection.aggregate([
            {"$group": {"_id": "$event_type", "count": {"$sum": 1}}}
        ])
        for doc in result:
            print(doc)
    except Exception as e:
        print("Error analyzing binlog data:", e)


## Send Email and Seven day cycle

In [80]:
def send_email(analysis_result, recipient_email):
    try:
        # Email configuration
        sender_email = "joshwen1314@163.com"
        password = "BYASQYOVCUDGUJSP"

        # Create message
        msg = MIMEMultipart()
        msg['From'] = sender_email
        msg['To'] = recipient_email
        msg['Subject'] = "Week Bin Logs Report"

        # Add body to email
        body = analysis_result
        msg.attach(MIMEText(body, 'plain'))

        # Send email
        with smtplib.SMTP('smtp.163.com', 25) as server:
            server.starttls()
            server.login(sender_email, password)
            server.send_message(msg)
        print("Email sent successfully.")
    except Exception as e:
        print("Error sending email:", e)

def job(mongodb_uri, database_name, collection_name, recipient_email):
    db = connect_to_mongodb(mongodb_uri, database_name)
    if db is not None:
        collection = db[collection_name]
        analysis_result = analyze_binlog_data(collection)
        if analysis_result is not None:
            send_email(analysis_result, recipient_email)
    else:
        print("Failed to connect to MongoDB.")

#    Test

# REQ-001

In [45]:
if __name__ == "__main__":
    host = input("Enter MySQL server host: ")
    port = int(input("Enter MySQL server port: "))
    username = input("Enter MySQL username: ")
    password = input("Enter MySQL password: ")
    analyzed_results = analyze_bin_logs(host, port, username, password)

Enter MySQL server host:  192.168.206.134
Enter MySQL server port:  3306
Enter MySQL username:  root
Enter MySQL password:  1234


                       Setting The Variable Value BINLOG_ROW_METADATA = FULL, BINLOG_ROW_IMAGE = FULL.
                       By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
                        


Insert event
Table: users
Affected rows: [{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'jane_smith', 'UNKNOWN_COL2': 'jane@example.com', 'UNKNOWN_COL3': 25}, 'none_sources': {}}]

Insert event
Table: users
Affected rows: [{'values': {'UNKNOWN_COL0': 2, 'UNKNOWN_COL1': 'john_doe', 'UNKNOWN_COL2': 'john@example.com', 'UNKNOWN_COL3': 30}, 'none_sources': {}}]

Update event
Table: users
Affected rows: [{'before_values': {'UNKNOWN_COL0': 2, 'UNKNOWN_COL1': 'john_doe', 'UNKNOWN_COL2': 'john@example.com', 'UNKNOWN_COL3': 30}, 'before_none_sources': {}, 'after_values': {'UNKNOWN_COL0': 2, 'UNKNOWN_COL1': 'john_doe', 'UNKNOWN_COL2': 'john@example.com', 'UNKNOWN_COL3': 35}, 'after_none_sources': {}}]

Delete event
Table: users
Affected rows: [{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'jane_smith', 'UNKNOWN_COL2': 'jane@example.com', 'UNKNOWN_COL3': 25}, 'none_sources': {}}]

Insert event
Table: products
Affected rows: [{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'Laptop', 'UNKNOWN_COL2

# REQ-002

In [48]:
if __name__ == "__main__":
    myclient = pymongo.MongoClient("mongodb://192.168.206.135:27017/")    
    mydb = myclient["final_year_project"] 
    mycol = mydb["Audit_logs"]   
    x = mycol.find_one()    
    print(x)

{'_id': ObjectId('662d72226cae2b2e202a1a01'), 'event_type': 'Insert event', 'Table': 'users', 'Affected rows': "[{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'jane_smith', 'UNKNOWN_COL2': 'jane@example.com', 'UNKNOWN_COL3': 25}, 'none_sources': {}}]"}


# REQ-003

In [55]:
if __name__ == "__main__":
    connect_to_mongodb("mongodb://192.168.206.139:27017/",'final_year_project')

# REQ-004

In [62]:
if __name__ == "__main__":
    # MongoDB connection URI
    mongodb_uri = input("Enter MongoDB connection URI: ")

    # MongoDB database name
    database_name = input("Enter MongoDB database name: ")

    # Connect to MongoDB
    db = connect_to_mongodb(mongodb_uri, database_name)
    if db is not None:
        # Select collection containing binlog data
        collection = db[input("Enter MongoDB collection name: ")]

        # Analyze binlog data
        analyze_binlog_data(collection)
    else:
        print("Failed to connect to MongoDB.")



Enter MongoDB connection URI:  mongodb://192.168.206.135:27017
Enter MongoDB database name:  final_year_project
Enter MongoDB collection name:  Audit_logs


{'_id': 'Update event', 'count': 8}
{'_id': 'Delete event', 'count': 8}
{'_id': 'Insert event', 'count': 40}


# REQ-005

In [63]:
if __name__ == "__main__":
    # MongoDB connection URI
    mongodb_uri = input("Enter MongoDB connection URI: ")

    # MongoDB database name
    database_name = input("Enter MongoDB database name: ")

    # Connect to MongoDB
    db = connect_to_mongodb(mongodb_uri, database_name)
    if db is not None:
        # Select collection containing binlog data
        collection = db[input("Enter MongoDB collection name: ")]

        # Analyze binlog data
        analyze_binlog_data(collection)
    else:
        print("Failed to connect to MongoDB.")

Enter MongoDB connection URI:  mongodb://192.168.206.135:27017
Enter MongoDB database name:  final_year_project
Enter MongoDB collection name:  Cases


# REQ-006

In [66]:
if __name__ == "__main__":
    # Prompt the user to enter MySQL server connection details
    host = input("Enter MySQL server host: ")
    port = int(input("Enter MySQL server port: "))
    username = input("Enter MySQL username: ")
    password = input("Enter MySQL password: ")
    
    # Analyze MySQL binary log events
    analyzed_results = analyze_bin_logs(host, port, username, password)

    # Sample Python list to migrate to MongoDB
    sample_data = analyzed_results

    # Prompt the user to enter MongoDB connection details
    mongodb_uri = input("Enter MongoDB connection URI: ")
    database_name = input("Enter MongoDB database name: ")
    collection_name = input("Enter MongoDB collection name: ")

    # Migrate data to MongoDB
    migrate_to_mongodb(sample_data, mongodb_uri, database_name, collection_name)

Enter MySQL server host:  192.168.206.134
Enter MySQL server port:  3306
Enter MySQL username:  root
Enter MySQL password:  1234


                       Setting The Variable Value BINLOG_ROW_METADATA = FULL, BINLOG_ROW_IMAGE = FULL.
                       By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
                        


Insert event
Table: users
Affected rows: [{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'jane_smith', 'UNKNOWN_COL2': 'jane@example.com', 'UNKNOWN_COL3': 25}, 'none_sources': {}}]

Insert event
Table: users
Affected rows: [{'values': {'UNKNOWN_COL0': 2, 'UNKNOWN_COL1': 'john_doe', 'UNKNOWN_COL2': 'john@example.com', 'UNKNOWN_COL3': 30}, 'none_sources': {}}]

Update event
Table: users
Affected rows: [{'before_values': {'UNKNOWN_COL0': 2, 'UNKNOWN_COL1': 'john_doe', 'UNKNOWN_COL2': 'john@example.com', 'UNKNOWN_COL3': 30}, 'before_none_sources': {}, 'after_values': {'UNKNOWN_COL0': 2, 'UNKNOWN_COL1': 'john_doe', 'UNKNOWN_COL2': 'john@example.com', 'UNKNOWN_COL3': 35}, 'after_none_sources': {}}]

Delete event
Table: users
Affected rows: [{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'jane_smith', 'UNKNOWN_COL2': 'jane@example.com', 'UNKNOWN_COL3': 25}, 'none_sources': {}}]

Insert event
Table: products
Affected rows: [{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'Laptop', 'UNKNOWN_COL2

Enter MongoDB connection URI:  mongodb://192.168.206.135:27017
Enter MongoDB database name:  final_year_project
Enter MongoDB collection name:  Audit_logs


Data successfully migrated to MongoDB collection.


![jupyter](./REQ-006.png)

# REQ-007

In [71]:
def main():
    # MongoDB connection URI
    mongodb_uri = input("Enter MongoDB connection URI: ")

    # MongoDB database name
    database_name = input("Enter MongoDB database name: ")

    # Connect to MongoDB
    db = connect_to_mongodb(mongodb_uri, database_name)
    if db is not None:
        # Select collection containing binlog data
        collection = db[input("Enter MongoDB collection name: ")]

        # Analyze binlog data
        analyze_binlog_data(collection)
    else:
        print("Failed to connect to MongoDB.")

if __name__ == "__main__":
    main()

Enter MongoDB connection URI:  mongodb://192.168.206.135:27017
Enter MongoDB database name:  final_year_project
Enter MongoDB collection name:  Cases


# REQ-008

In [73]:
# MySQL connection parameters
host = '192.168.206.133'  
port = 3306  
username = 'root'  
password = '1234'  
database = 'mysql'  

# Define possible table and field names
tables = ['table1', 'table2', 'table3']
fields = ['field1', 'field2', 'field3']

# Create MySQL connection
connection = pymysql.connect(host=host, port=port, user=username, password=password, db=database)

# Create cursor object
cursor = connection.cursor()

# Create Tables
for i in range(1, 4):
    table_name = f'table{i}'
    cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (id INT AUTO_INCREMENT PRIMARY KEY, field1 INT, field2 INT, field3 INT)")

# Commit the transaction created by the table
connection.commit()

# Generate random addition, deletion and modification statements and execute them
for _ in range(100000):
    # Generate random addition, deletion and modification statements and execute them
    operation = random.choice(['INSERT', 'DELETE', 'UPDATE'])
    table = random.choice([f'table{i}' for i in range(1, 4)])
    field = random.choice(['field1', 'field2', 'field3'])
    
    # Generate SQL statements for corresponding operations
    if operation == 'INSERT':
        value = random.randint(1, 100)
        statement = f"INSERT INTO {table} ({field}) VALUES ({value})"
    elif operation == 'DELETE':
        statement = f"DELETE FROM {table} WHERE {field} = {random.randint(1, 100)}"
    else:
        value = random.randint(1, 100)
        statement = f"UPDATE {table} SET {field} = {value} WHERE {field} = {random.randint(1, 100)}"
    
    # Execution
    cursor.execute(statement)

# commit
connection.commit()

# close cursor
cursor.close()
connection.close()

print("Tables created and SQL queries executed and inserted into MySQL database.")

Tables created and SQL queries executed and inserted into MySQL database.


In [76]:
if __name__ == "__main__":
    # Prompt the user to enter MySQL server connection details
    host = input("Enter MySQL server host: ")
    port = int(input("Enter MySQL server port: "))
    username = input("Enter MySQL username: ")
    password = input("Enter MySQL password: ")

    # Start time
    time_start=time.time()

    # Analyze MySQL binary log events
    analyzed_results = analyze_bin_logs(host, port, username, password)

    # Sample Python list to migrate to MongoDB
    sample_data = analyzed_results

    # Prompt the user to enter MongoDB connection details
    mongodb_uri = input("Enter MongoDB connection URI: ")
    database_name = input("Enter MongoDB database name: ")
    collection_name = input("Enter MongoDB collection name: ")

    # Migrate data to MongoDB
    migrate_to_mongodb(sample_data, mongodb_uri, database_name, collection_name)

    # End time
    time_end=time.time()
    print('time cost',time_end-time_start,'s')

Enter MySQL server host:  192.168.206.134
Enter MySQL server port:  3306
Enter MySQL username:  root
Enter MySQL password:  1234


                       Setting The Variable Value BINLOG_ROW_METADATA = FULL, BINLOG_ROW_IMAGE = FULL.
                       By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
                        


Insert event
Table: users
Affected rows: [{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'jane_smith', 'UNKNOWN_COL2': 'jane@example.com', 'UNKNOWN_COL3': 25}, 'none_sources': {}}]

Insert event
Table: users
Affected rows: [{'values': {'UNKNOWN_COL0': 2, 'UNKNOWN_COL1': 'john_doe', 'UNKNOWN_COL2': 'john@example.com', 'UNKNOWN_COL3': 30}, 'none_sources': {}}]

Update event
Table: users
Affected rows: [{'before_values': {'UNKNOWN_COL0': 2, 'UNKNOWN_COL1': 'john_doe', 'UNKNOWN_COL2': 'john@example.com', 'UNKNOWN_COL3': 30}, 'before_none_sources': {}, 'after_values': {'UNKNOWN_COL0': 2, 'UNKNOWN_COL1': 'john_doe', 'UNKNOWN_COL2': 'john@example.com', 'UNKNOWN_COL3': 35}, 'after_none_sources': {}}]

Delete event
Table: users
Affected rows: [{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'jane_smith', 'UNKNOWN_COL2': 'jane@example.com', 'UNKNOWN_COL3': 25}, 'none_sources': {}}]

Insert event
Table: products
Affected rows: [{'values': {'UNKNOWN_COL0': 1, 'UNKNOWN_COL1': 'Laptop', 'UNKNOWN_COL2

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



Insert event
Table: table1
Affected rows: [{'values': {'UNKNOWN_COL0': 4331, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': 70, 'UNKNOWN_COL3': None}, 'none_sources': {'UNKNOWN_COL1': 'null', 'UNKNOWN_COL3': 'null'}}]

Insert event
Table: table2
Affected rows: [{'values': {'UNKNOWN_COL0': 4242, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': None, 'UNKNOWN_COL3': 15}, 'none_sources': {'UNKNOWN_COL1': 'null', 'UNKNOWN_COL2': 'null'}}]

Delete event
Table: table2
Affected rows: [{'values': {'UNKNOWN_COL0': 3771, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': None, 'UNKNOWN_COL3': 7}, 'none_sources': {'UNKNOWN_COL1': 'null', 'UNKNOWN_COL2': 'null'}}]

Update event
Table: table2
Affected rows: [{'before_values': {'UNKNOWN_COL0': 4142, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': 72, 'UNKNOWN_COL3': None}, 'before_none_sources': {'UNKNOWN_COL1': 'null', 'UNKNOWN_COL3': 'null'}, 'after_values': {'UNKNOWN_COL0': 4142, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': 58, 'UNKNOWN_COL3': None}, 'after_none_sources': {'UNKNOWN_COL1': 'null'

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



Delete event
Table: table3
Affected rows: [{'values': {'UNKNOWN_COL0': 8183, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': 6, 'UNKNOWN_COL3': None}, 'none_sources': {'UNKNOWN_COL1': 'null', 'UNKNOWN_COL3': 'null'}}]

Insert event
Table: table2
Affected rows: [{'values': {'UNKNOWN_COL0': 8083, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': 20, 'UNKNOWN_COL3': None}, 'none_sources': {'UNKNOWN_COL1': 'null', 'UNKNOWN_COL3': 'null'}}]

Insert event
Table: table3
Affected rows: [{'values': {'UNKNOWN_COL0': 8380, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': 42, 'UNKNOWN_COL3': None}, 'none_sources': {'UNKNOWN_COL1': 'null', 'UNKNOWN_COL3': 'null'}}]

Update event
Table: table3
Affected rows: [{'before_values': {'UNKNOWN_COL0': 8370, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': 91, 'UNKNOWN_COL3': None}, 'before_none_sources': {'UNKNOWN_COL1': 'null', 'UNKNOWN_COL3': 'null'}, 'after_values': {'UNKNOWN_COL0': 8370, 'UNKNOWN_COL1': None, 'UNKNOWN_COL2': 75, 'UNKNOWN_COL3': None}, 'after_none_sources': {'UNKNOWN_COL1': 'null'

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



Enter MongoDB connection URI:  mongodb://192.168.206.135:27017
Enter MongoDB database name:  final_year_project
Enter MongoDB collection name:  Audit_logs


Data successfully migrated to MongoDB collection.
time cost 55.84963774681091 s


# REQ-009

In [77]:
if __name__ == "__main__":
    send_email('test','joshmen9718@gmail.com')

Email sent successfully.


![jupyter](./REQ-010.png)

# REQ-010

In [82]:
import schedule
import time

def main():
    # MongoDB connection URI
    mongodb_uri = input("Enter MongoDB connection URI: ")

    # MongoDB database name
    database_name = input("Enter MongoDB database name: ")

    # MongoDB collection name
    collection_name = input("Enter MongoDB collection name: ")

    # Recipient email address
    recipient_email = input("Enter recipient email address: ")

    # Schedule job to run every 10 seconds
    schedule.every(360).seconds.do(job, mongodb_uri, database_name, collection_name, recipient_email)

    while True:
        schedule.run_pending()
        time.sleep(1)

if __name__ == "__main__":
    main()

Enter MongoDB connection URI:  mongodb://192.168.206.135:27017
Enter MongoDB database name:  final_year_project
Enter MongoDB collection name:  Audit_logs
Enter recipient email address:  joshmen9718@gmail.com


{'_id': 'Update event', 'count': 13329}
{'_id': 'Delete event', 'count': 13736}
{'_id': 'Insert event', 'count': 33352}
{'_id': 'Update event', 'count': 13329}
{'_id': 'Delete event', 'count': 13736}
{'_id': 'Insert event', 'count': 33352}
{'_id': 'Update event', 'count': 13329}
{'_id': 'Delete event', 'count': 13736}
{'_id': 'Insert event', 'count': 33352}
{'_id': 'Update event', 'count': 13329}
{'_id': 'Insert event', 'count': 33352}
{'_id': 'Delete event', 'count': 13736}
{'_id': 'Update event', 'count': 13329}
{'_id': 'Delete event', 'count': 13736}
{'_id': 'Insert event', 'count': 33352}
{'_id': 'Update event', 'count': 13329}
{'_id': 'Delete event', 'count': 13736}
{'_id': 'Insert event', 'count': 33352}
{'_id': 'Update event', 'count': 13329}
{'_id': 'Delete event', 'count': 13736}
{'_id': 'Insert event', 'count': 33352}
{'_id': 'Update event', 'count': 13329}
{'_id': 'Delete event', 'count': 13736}
{'_id': 'Insert event', 'count': 33352}
{'_id': 'Update event', 'count': 13329}


KeyboardInterrupt: 