In [None]:
import requests

url = 'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=IBM&apikey=demo'
r = requests.get(url)
data = r.json()

print(data)

{'Meta Data': {'1. Information': 'Daily Prices (open, high, low, close) and Volumes', '2. Symbol': 'IBM', '3. Last Refreshed': '2025-10-29', '4. Output Size': 'Compact', '5. Time Zone': 'US/Eastern'}, 'Time Series (Daily)': {'2025-10-29': {'1. open': '312.7900', '2. high': '314.3300', '3. low': '307.5200', '4. close': '308.2100', '5. volume': '4135948'}, '2025-10-28': {'1. open': '312.6000', '2. high': '319.3500', '3. low': '311.4100', '4. close': '312.5700', '5. volume': '6044770'}, '2025-10-27': {'1. open': '307.8000', '2. high': '313.5000', '3. low': '302.8800', '4. close': '313.0900', '5. volume': '9868151'}, '2025-10-24': {'1. open': '283.7700', '2. high': '310.7500', '3. low': '282.2100', '4. close': '307.4600', '5. volume': '16914243'}, '2025-10-23': {'1. open': '264.9500', '2. high': '285.5791', '3. low': '263.5623', '4. close': '285.0000', '5. volume': '16676394'}, '2025-10-22': {'1. open': '281.9900', '2. high': '289.1700', '3. low': '281.3500', '4. close': '287.5100', '5. vo

In [None]:
import json
import boto3
import pandas as pd
from io import StringIO
from kafka import KafkaConsumer
from datetime import datetime

# ========== CONFIG ==========
KAFKA_BROKER = '*'       # IP:port Kafka broker
TOPIC_NAME = 'stockdata_demo'              #  topic  
AWS_ACCESS_KEY = '*'
AWS_SECRET_KEY = '*'
S3_BUCKET = 'reddit-scrapped-data0922-5123-0539'     # Tên bucket S3 
S3_FOLDER = 'data/'             # Thư mục con trong S3 
REGION = 'ap-southeast-2'             # Region AWS


def upload_csv_to_s3(data, filename):
    """
    Upload 1 bản ghi JSON lên S3 dưới dạng CSV (append nếu file đã có)
    """
    # --- Convert JSON sang DataFrame ---
    df = pd.DataFrame([data])

    # --- Convert sang CSV string ---
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)

    # --- Kết nối S3 ---
    s3 = boto3.client(
        's3',
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=REGION
    )

    key = f"{S3_FOLDER}{filename}"

    # --- Kiểm tra nếu file đã tồn tại để append ---
    try:
        existing_obj = s3.get_object(Bucket=S3_BUCKET, Key=key)
        existing_csv = existing_obj['Body'].read().decode('utf-8')

        # Nối dữ liệu mới vào CSV cũ
        existing_df = pd.read_csv(StringIO(existing_csv))
        combined_df = pd.concat([existing_df, df], ignore_index=True)

        csv_buffer = StringIO()
        combined_df.to_csv(csv_buffer, index=False)
        print(f"[APPEND] Added new row to {key}")
    except s3.exceptions.NoSuchKey:
        print(f"[CREATE] Creating new file {key}")

    # --- Upload lại lên S3 ---
    s3.put_object(Bucket=S3_BUCKET, Key=key, Body=csv_buffer.getvalue())
    print(f"[UPLOAD] Uploaded CSV to {key}")


def consume_and_upload():
    """
    Consumer Kafka: đọc dữ liệu JSON và upload CSV lên S3
    """
    consumer = KafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=[KAFKA_BROKER],
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    )

    print(f"[INFO] Connected to Kafka broker: {KAFKA_BROKER}")
    print(f"[INFO] Listening to topic: {TOPIC_NAME}")

    for message in consumer:
        data = message.value

        # Nếu JSON có key 'ticker' hoặc 'symbol'
        symbol = data.get('ticker') or data.get('symbol') or 'UNKNOWN'
        filename = f"all_stocks.csv"

        print(f"[RECEIVED] {symbol} - Price: {data.get('price')}")

        upload_csv_to_s3(data, filename)


if __name__ == "__main__":
    consume_and_upload()



[INFO] Connected to Kafka broker: 3.27.142.86:9092
[INFO] Listening to topic: stockdata_demo
[RECEIVED] NVDA - Price: 202.39
[CREATE] Creating new file data/all_stocks.csv
[UPLOAD] Uploaded CSV to data/all_stocks.csv
[RECEIVED] AMD - Price: 256.06
[APPEND] Added new row to data/all_stocks.csv
[UPLOAD] Uploaded CSV to data/all_stocks.csv
[RECEIVED] INTC - Price: 39.98
[APPEND] Added new row to data/all_stocks.csv
[UPLOAD] Uploaded CSV to data/all_stocks.csv


KeyboardInterrupt: 