In [None]:
from kafka import KafkaConsumer
from json import loads
import json
import boto3
import logging
import os

# ----------------------------------------
# Step 1: Create logs folder if not exists
# ----------------------------------------
os.makedirs('logs', exist_ok=True)

# ----------------------------------------
# Step 2: Set up logging
# ----------------------------------------
logging.basicConfig(
    filename='logs/consumer.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='a'  # Append to file; use 'w' if you want to overwrite each time
)

# ----------------------------------------
# Step 3: Setup S3 client
# ----------------------------------------
s3 = boto3.client('s3')

# ----------------------------------------
# Step 4: Kafka consumer setup
# ----------------------------------------
consumer = KafkaConsumer(
    'demo_testing',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)

# ----------------------------------------
# Step 5: Consume messages and upload to S3
# ----------------------------------------
for count, message in enumerate(consumer):
    file_name = f'stock_market_{count}.json'
    json_data = json.dumps(message.value)

    try:
        s3.put_object(
            Bucket='dnyan-kafka-stock-project',
            Key=file_name,
            Body=json_data,
            ContentType='application/json'
        )
        print(f"✅ Uploaded {file_name}")
        logging.info(f"Uploaded {file_name} to S3 bucket.")
    except Exception as e:
        print(f"❌ Failed to upload {file_name}")
        print(e)
        logging.error(f"Failed to upload {file_name}: {str(e)}")


In [None]:
#import os
print(os.getcwd())


In [None]:
#import os
print("Current directory:", os.getcwd())


In [None]:
import os
os.getcwd()
