### Notebook Documentation

- Created a dataset for streaming using the `faker` library to generate synthetic data.
- Saved the generated data as JSON files in a Databricks volume.
- Read and previewed the generated files using Spark.


In [0]:
%pip install faker
from faker import Faker
import json, time, random, os, threading

fake = Faker()

# Base directory for storing generated JSON files
base_dir = "/Volumes/kusha_solutions/jeevan_streaming/my_volume/Raw_JSON_Files/"

# Create subdirectories for each data type if they don't exist
dirs = ["users", "posts", "comments", "likes"]
for d in dirs:
    os.makedirs(os.path.join(base_dir, d), exist_ok=True)

# =============================
# Generator Functions
# =============================

def generate_users():
    """
    Continuously generates batches of fake user data and writes them to JSON files.
    Each batch contains 20 users with unique IDs and random attributes.
    """
    user_id = 1
    output_dir = os.path.join(base_dir, "users")
    while True:
        data = []
        for _ in range(20):
            user = {
                "user_id": user_id,
                "name": fake.name(),
                "username": fake.user_name(),
                "email": fake.email(),
                "location": fake.city(),
                "created_at": fake.iso8601()
            }
            data.append(user)
            user_id += 1

        filename = f"{output_dir}/users_{int(time.time())}.json"
        with open(filename, "w") as f:
            json.dump(data, f, indent=4)

        print(f"âœ… Generated: {filename}")
        time.sleep(10)

def generate_posts():
    """
    Continuously generates batches of fake post data and writes them to JSON files.
    Each batch contains 20 posts with random user IDs, content, and like counts.
    """
    post_id = 1
    output_dir = os.path.join(base_dir, "posts")
    while True:
        data = []
        for _ in range(20):
            post = {
                "post_id": post_id,
                "user_id": random.randint(1, 1000),
                "content": fake.sentence(nb_words=12),
                "likes": random.randint(0, 1000000),
                "created_at": fake.iso8601()
            }
            data.append(post)
            post_id += 1

        filename = f"{output_dir}/posts_{int(time.time())}.json"
        with open(filename, "w") as f:
            json.dump(data, f, indent=4)

        print(f"âœ… Generated: {filename}")
        time.sleep(5)

def generate_comments():
    """
    Continuously generates batches of fake comment data and writes them to JSON files.
    Each batch contains 50 comments linked to random posts and users.
    """
    comment_id = 1
    output_dir = os.path.join(base_dir, "comments")
    while True:
        data = []
        for _ in range(50):
            comment = {
                "comment_id": comment_id,
                "post_id": random.randint(1, 1000),
                "user_id": random.randint(1, 1000),
                "comment": fake.sentence(nb_words=10),
                "created_at": fake.iso8601()
            }
            data.append(comment)
            comment_id += 1

        filename = f"{output_dir}/comments_{int(time.time())}.json"
        with open(filename, "w") as f:
            json.dump(data, f, indent=4)

        print(f"âœ… Generated: {filename}")
        time.sleep(5)

def generate_likes():
    """
    Continuously generates batches of fake like/reaction data and writes them to JSON files.
    Each batch contains 50 likes with random reactions for posts by users.
    """
    like_id = 1
    output_dir = os.path.join(base_dir, "likes")
    while True:
        data = []
        for _ in range(50):
            like = {
                "like_id": like_id,
                "user_id": random.randint(1, 1000),
                "post_id": random.randint(1, 1000),
                "reaction": random.choice(["like", "love", "haha", "angry"]),
                "created_at": fake.iso8601()
            }
            data.append(like)
            like_id += 1

        filename = f"{output_dir}/likes_{int(time.time())}.json"
        with open(filename, "w") as f:
            json.dump(data, f, indent=4)

        print(f"âœ… Generated: {filename}")
        time.sleep(5)

# =============================
# Run all generators in parallel
# =============================

# Start each generator function in its own thread for parallel data generation
threads = []
for func in [generate_users, generate_posts, generate_comments, generate_likes]:
    t = threading.Thread(target=func, daemon=True)
    t.start()
    threads.append(t)

print("ðŸš€ Data generation started for users, posts, comments, and likes â€” running continuously!")

# Keep the main thread alive so that generator threads keep running
while True:
    time.sleep(60)

In [0]:
df = spark.read.option("multiLine", True).format("json").load(
    "/Volumes/kusha_solutions/jeevan_streaming/my_volume/Raw_JSON_Files/users/"
).cache()
df.show()

In [0]:
df = spark.read.option("multiLine", True).format("json").load(
    "/Volumes/kusha_solutions/jeevan_streaming/my_volume/Raw_JSON_Files/posts/"
)
display(df)

In [0]:
df = spark.read.option("multiLine", True).format("json").load(
    "/Volumes/kusha_solutions/jeevan_streaming/my_volume/Raw_JSON_Files/comments/"
)
display(df)

In [0]:
df = spark.read.option("multiLine", True).format("json").load(
    "/Volumes/kusha_solutions/jeevan_streaming/my_volume/Raw_JSON_Files/likes/"
)
display(df)

In [0]:
df = spark.read.table("kusha_solutions.jeevan_streaming.gold_user_engagement_summary")
display(df)