In [0]:
%sql

create catalog if not exists slack_etl;
create schema if not exists slack_etl.landing;
create schema if not exists slack_etl.bronze;
create schema if not exists slack_etl.silver;
create schema if not exists slack_etl.gold;
create volume if not exists slack_etl.landing.raw

In [0]:
%rm -r "/Volumes/slack_etl/landing/raw"

## Generate Data

In [0]:
%pip install tqdm

In [0]:
%restart_python

In [0]:
import json
import uuid
import random
import os
from datetime import datetime, timedelta
from tqdm import tqdm

BASE_PATH = "/Volumes/slack_etl/landing/raw"
START_DATE = datetime(2025, 1, 1)
DAYS = 7                       # keep small for Databricks Free
RECORDS_PER_FILE = 10_000

CHANNELS = [f"C{str(i).zfill(8)}" for i in range(1, 300)]
USERS = [f"U{str(i).zfill(8)}" for i in range(1, 3000)]
DEPARTMENTS = ["Engineering", "Data", "HR", "Finance"]
REACTIONS = ["thumbsup", "heart", "eyes", "rocket"]

def ensure_dir(path):
    os.makedirs(path, exist_ok=True)

def write_json_with_tqdm(path, generator_fn, entity):
    with open(path, "w") as f:
        for _ in tqdm(
            range(RECORDS_PER_FILE),
            desc=f"    Writing {entity}",
            unit="rec",
            leave=False
        ):
            f.write(json.dumps(generator_fn()) + "\n")

# ---------------- USERS ----------------
def generate_user():
    uid = random.choice(USERS)
    return {
        "id": uid,
        "team_id": "T123456",
        "profile": {
            "real_name": f"User {uid[-4:]}",
            "display_name": f"user_{uid[-4:]}",
            "email": f"user{uid[-4:]}@company.com",
            "fields": {
                "department": {"value": random.choice(DEPARTMENTS)},
                "location": {"value": random.choice(["India", "US", "UK"])}
            }
        },
        "created": int(datetime.now().timestamp()),
        "is_bot": False
    }

# ---------------- CONVERSATIONS ----------------
def generate_conversation():
    cid = random.choice(CHANNELS)
    return {
        "id": cid,
        "name": f"channel_{cid[-4:]}",
        "purpose": {
            "value": "Slack data engineering project",
            "creator": random.choice(USERS)
        },
        "topic": {"value": "Complex JSON practice"},
        "members": random.sample(USERS, random.randint(5, 50)),
        "created": int(datetime.now().timestamp()),
        "is_private": False
    }

# ---------------- MESSAGES ----------------
def generate_message():
    user_id = random.choice(USERS)
    channel_id = random.choice(CHANNELS)

    msg = {
        "client_msg_id": str(uuid.uuid4()),
        "type": "message",
        "user": {
            "id": user_id,
            "profile": {
                "real_name": f"User {user_id[-4:]}",
                "email": f"user{user_id[-4:]}@company.com"
            }
        },
        "channel": {
            "id": channel_id,
            "name": f"channel_{channel_id[-4:]}"
        },
        "text": f"Message {uuid.uuid4()}",
        "blocks": [
            {
                "type": "section",
                "elements": [{"type": "text", "text": "Hello from Slack"}]
            }
        ],
        "attachments": [
            {
                "id": random.randint(1, 10),
                "mimetype": "text/plain",
                "size": random.randint(100, 5000)
            }
        ],
        "reactions": [
            {
                "name": random.choice(REACTIONS),
                "count": random.randint(1, 5)
            }
        ],
        "ts": str(datetime.now().timestamp())
    }

    if random.random() > 0.6:
        msg["thread"] = {
            "thread_ts": msg["ts"],
            "reply_count": random.randint(1, 10),
            "participants": random.sample(USERS, random.randint(1, 5))
        }

    if random.random() > 0.8:
        msg["edited"] = {
            "user": random.choice(USERS),
            "ts": str(datetime.now().timestamp())
        }

    return msg

# ---------------- MESSAGE HISTORY ----------------
def generate_message_history():
    cid = random.choice(CHANNELS)
    return {
        "channel_id": cid,
        "stats": {
            "message_count": random.randint(100, 5000),
            "active_users": random.sample(USERS, random.randint(5, 50))
        },
        "window": {
            "start_ts": str(datetime.now().timestamp()),
            "end_ts": str(datetime.now().timestamp())
        }
    }

# ---------------- MAIN LOOP ----------------
current = START_DATE

for day in tqdm(range(1, DAYS + 1), desc="Days", unit="day"):
    for hour in tqdm(range(24), desc=f"  Hours (Day {day})", unit="hr", leave=False):

        partition = f"year={current.year}/month={current.month:02d}/day={current.day:02d}/hour={hour:02d}"

        for entity, generator in [
            ("users", generate_user),
            ("conversations", generate_conversation),
            ("messages", generate_message),
            ("message_history", generate_message_history)
        ]:
            path = f"{BASE_PATH}/{entity}/{partition}"
            ensure_dir(path)

            write_json_with_tqdm(
                f"{path}/{entity}.json",
                generator,
                entity
            )

    current += timedelta(days=1)

print("\nâœ… Complex Slack-style JSON data generation completed.")
