In [1]:
import os
import json
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
from pymongo.operations import InsertOne

In [2]:
os.chdir('..') 

In [3]:
from src.config import cfg

Output File: slack_data.csv
Path: data\Anonymized_B6SlackExport_25Nov23\anonymized
Channel: data\Anonymized_B6SlackExport_25Nov23\anonymized\channels.json
Userfile: data\Anonymized_B6SlackExport_25Nov23\anonymized\users.json


In [4]:
class SlackDataDB:
    def __init__(self) -> None:
        self.client = MongoClient("mongodb://localhost:27017/")
        self.db = self.client["slack_data"]        
        channels_validator = {
            "$jsonSchema": {
                "bsonType": "array",
                "items": {
                    "bsonType": "object",
                    "required": ["id", "name", "created", "creator", "is_archived", "is_general", "members", "topic", "purpose"],
                    "properties": {
                        "id": {"bsonType": "string"},
                        "name": {"bsonType": "string"},
                        "created": {"bsonType": "number"},
                        "creator": {"bsonType": "string"},
                        "is_archived": {"bsonType": "bool"},
                        "is_general": {"bsonType": "bool"},
                        "members": {"bsonType": "array"},
                        "topic": {"bsonType": "object"},
                        "purpose": {"bsonType": "object"},
                    }
                }
            }
            }

        integration_logs_validator = {
            "$jsonSchema": {
                "bsonType": "array",
                "items": {
                    "bsonType": "object",
                    "required": ["user_id", "user_name", "date", "change_type", "app_type", "app_id", "scope"],
                    "properties": {
                        "user_id": {"bsonType": "string"},
                        "user_name": {"bsonType": "string"},
                        "date": {"bsonType": "number"},
                        "change_type": {"bsonType": "string"},
                        "app_type": {"bsonType": "string"},
                        "app_id": {"bsonType": "string"},
                        "scope": {"bsonType": "string"},
                    }
                }
            }
        }
        try:
            self.db.create_collection("messages")
            self.db.create_collection("canvases")
            self.db.create_collection("channels")
            self.db.create_collection("integration_logs")
            self.db.create_collection("users_old_new")
            self.db.create_collection("users")
        except Exception as e:
            print(e)

    def create_collection_with_validation(self, collection_name, validator):
        try:
            self.db.create_collection(collection_name, validator=validator)
        except Exception as e:
            print(f"Error creating {collection_name} collection: {e}")

    def insert_messages(self, base_path):
        messages_by_channels = self.organize_messages_by_channels(base_path)
        
        if messages_by_channels:
            for channel_id, messages in messages_by_channels.items():
                # Insert messages into the specific channel
                self.insert_to_collection(f"{channel_id}", messages)

    def organize_messages_by_channels(self, base_path):
        messages_by_channels = {}

        for folder_name in os.listdir(base_path):
            folder_path = os.path.join(base_path, folder_name)
            if os.path.isdir(folder_path):
                channel_id = folder_name
                messages = self.read_messages_from_folder(folder_path)
                messages_by_channels[channel_id] = messages

        return messages_by_channels

    def read_messages_from_folder(self, folder_path):
        messages = []
        for filename in os.listdir(folder_path):
            if filename.endswith(".json"):
                file_path = os.path.join(folder_path, filename)
                with open(file_path, "r") as file:
                    messages.extend(json.load(file))
        return messages


    def insert_canvases(self, canvases_path):
        canvases_data = self.read_json_without_dict(canvases_path)
        
        if canvases_data:
            # Ensure each canvas in canvases_data is a dictionary
            for canvas in canvases_data:
                if not isinstance(canvas, dict):
                    return
                
            self.insert_to_collection("canvases", canvases_data)

    def insert_channels(self, channels_path):
        channels_data = self.read_json_file(channels_path)
        if channels_data:
            self.insert_to_collection("channels", channels_data)

    def insert_integration_logs(self, integration_logs_path):
        integration_logs_data = self.read_json_file(integration_logs_path)
        if integration_logs_data:
            self.insert_to_collection("integration_logs", integration_logs_data)

    def insert_users_old_new(self, users_old_new_path):
        users_old_new_data = self.read_json_file(users_old_new_path)

        if users_old_new_data:
            # Ensure all values in the dictionary are dictionaries
            valid_data = {key: value for key, value in users_old_new_data.items() if isinstance(value, dict)}

            # Insert each user dictionary into the collection separately
            for user_id, user_data in valid_data.items():
                self.insert_to_collection("users_old_new", [user_data])

    def insert_users(self, users_path):
        users_data = self.read_json_file(users_path)
        if users_data:
            self.insert_to_collection("users", users_data)
            
    def read_json_file(self, file_path):
        try:
            with open(file_path, "r") as file:
                data = json.load(file)
            return data
        except Exception as e:
            print(f"Error reading JSON file at {file_path}: {e}")
            return None
    
    def read_json_without_dict(self, file_path):

        try:
            with open(file_path, 'r') as file:
                data = json.load(file)
                if isinstance(data, list) and all(isinstance(item, dict) for item in data):
                    return data
                else:
                    return None
        except Exception as e:
            return None

    def insert_to_collection(self, collection_name, data):
        collection = self.db[collection_name]
    
        # Use bulk write for inserts
        requests = [InsertOne(doc) for doc in data]
        try:
            result = collection.bulk_write(requests, ordered=False)
            print(f"Inserted {result.inserted_count} documents into {collection_name}.")
        except BulkWriteError as bwe:
            print(f"Bulk write error for {collection_name}: {bwe.details}")
        

In [5]:
slack_db = SlackDataDB()

collection messages already exists


In [6]:
slack_db.insert_messages(cfg.path)

Inserted 573 documents into ab_test-group.
Inserted 611 documents into adludios-challange.
Inserted 559 documents into all-broadcast.
Inserted 828 documents into all-career-exercises.
Inserted 8302 documents into all-community-building.
Inserted 25 documents into all-de-week12.
Inserted 285 documents into all-ideas.
Inserted 19 documents into all-ml-week12.
Inserted 1506 documents into all-resources.
Inserted 161 documents into all-technical-support.
Inserted 9 documents into all-web3-week12.
Inserted 596 documents into all-week1.
Inserted 224 documents into all-week10.
Inserted 291 documents into all-week11.
Inserted 125 documents into all-week12.
Inserted 350 documents into all-week2.
Inserted 482 documents into all-week3.
Inserted 371 documents into all-week4.
Inserted 535 documents into all-week5.
Inserted 385 documents into all-week6.
Inserted 555 documents into all-week7.
Inserted 137 documents into all-week8.
Inserted 178 documents into all-week9.
Inserted 6 documents into batch

In [7]:
slack_db.insert_channels(os.path.join(cfg.path, "channels.json"))

Inserted 39 documents into channels.


In [8]:
slack_db.insert_integration_logs(os.path.join(cfg.path, "integration_logs.json"))

Inserted 18 documents into integration_logs.


In [9]:
slack_db.insert_canvases(os.path.join(cfg.path, "canvases.json"))

Inserted 15 documents into canvases.


In [10]:
slack_db.insert_users_old_new(os.path.join(cfg.path, "users_old_new.json"))

Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 documents into users_old_new.
Inserted 1 docum

In [11]:
slack_db.insert_users(os.path.join(cfg.path, "users.json"))

Inserted 71 documents into users.
