In [None]:
import os
import json
import random
import time
from datetime import datetime, timedelta, timezone

import pandas as pd
import requests
from azure.storage.blob import BlobServiceClient, BlobClient


# ============================================================
# üîê CONFIGURATION
# ============================================================

OPENWEATHER_KEY = "OPENWEATHER_KEY"

AZ_ACCOUNT_NAME = "herdtrackstorage1"          
AZ_ACCOUNT_KEY = "AZ_ACCOUNT_KEY"         
AZ_CONTAINER_NAME = "bronze"

# Kaggle datasets (local paths)
MILK_FILE = "./data/kaggle/global_cattle_milk_yield_prediction_dataset.csv"
DISEASE_FILE = "./data/kaggle/global_cattle_disease_detection_dataset.csv"

REGION_TO_CITY = {
    "India": "Pune",
    "USA": "Dallas",
    "Kenya": "Nairobi",
    "South Africa": "Johannesburg",
    "Brazil": "Sao Paulo",
    "UK": "London"
}
DEFAULT_CITY = "Dallas"

STREAM_INTERVAL_SEC = 120  # every 2 minutes


# ============================================================
# ‚òÅÔ∏è CONNECT TO AZURE
# ============================================================

def get_blob_container():
    conn_str = (
        f"DefaultEndpointsProtocol=https;AccountName={AZ_ACCOUNT_NAME};"
        f"AccountKey={AZ_ACCOUNT_KEY};EndpointSuffix=core.windows.net"
    )
    svc = BlobServiceClient.from_connection_string(conn_str)
    return svc.get_container_client(AZ_CONTAINER_NAME)


# ============================================================
# üîÑ CHUNKED UPLOAD (Fix for ConnectionResetError)
# ============================================================

import base64

def upload_big_file(container_client, blob_name, local_path):

    print(f"‚¨ÜÔ∏è Uploading (chunked) ‚Üí {blob_name}")

    blob_client = container_client.get_blob_client(blob_name)

    # Delete existing blob if exists
    try:
        blob_client.delete_blob()
    except:
        pass

    block_ids = []
    block_size = 256 * 1024  # 256 KB blocks

    with open(local_path, "rb") as file:

        i = 0
        while True:
            chunk = file.read(block_size)
            if not chunk:
                break

            # Base64 block ID, fixed length
            block_id = base64.b64encode(f"{i:06d}".encode())

            blob_client.stage_block(
                block_id=block_id,
                data=chunk
            )

            block_ids.append(block_id)
            i += 1

    # Commit block list
    blob_client.commit_block_list(block_ids)

    print(f"‚úîÔ∏è Chunked upload completed ‚Üí {blob_name}")


# ============================================================
# üì¶ GENERATE STATIC TABLES
# ============================================================

def generate_realistic_static_tables(base_df):

    cattle_ids = base_df["Cattle_ID"].tolist()
    regions = base_df["Region"].tolist()

    # Pregnancy Table
    preg_rows = []
    for cid in cattle_ids:
        pregnant = random.random() < 0.30
        if pregnant:
            days = random.randint(1, 280)
            due_date = datetime.now() + timedelta(days=(280 - days))
            preg_rows.append([
                cid, "Pregnant", days,
                due_date.strftime("%Y-%m-%d"),
                (datetime.now() - timedelta(days=random.randint(10, 60))).strftime("%Y-%m-%d"),
                random.choice(["Dr. Smith", "Dr. Patel", "Dr. John", "Dr. Anita"])
            ])
        else:
            preg_rows.append([
                cid, "Not Pregnant", 0, "",
                (datetime.now() - timedelta(days=random.randint(20, 120))).strftime("%Y-%m-%d"),
                random.choice(["Dr. Smith", "Dr. Patel", "Dr. John", "Dr. Anita"])
            ])

    preg_df = pd.DataFrame(
        preg_rows,
        columns=["Cattle_ID", "PregStatus", "DaysPregnant", "DueDate", "LastCheckupDate", "VetName"]
    )

    # Sensor Metadata
    sensor_rows = []
    for cid in cattle_ids:
        sensor_rows.append([
            f"SENSOR_{cid}", cid,
            random.choice(["BioSensor", "EnvSensor", "FeedSensor"]),
            (datetime.now() - timedelta(days=random.randint(30, 300))).strftime("%Y-%m-%d"),
            random.randint(30, 100),
            random.choice(["v1.2.3", "v2.0.1", "v1.5.9"])
        ])

    sensor_df = pd.DataFrame(
        sensor_rows,
        columns=["SensorID", "Cattle_ID", "SensorType", "InstallDate", "BatteryLevel", "FirmwareVersion"]
    )

    # Feed Table
    feed_rows = []
    for cid, region in zip(cattle_ids, regions):
        feed_rows.append([
            cid,
            random.choice(["Grass", "Silage", "Corn", "Hay", "Concentrate"]),
            round(random.uniform(5, 12), 2),
            random.choice(["06:00", "18:00"]),
            round(random.uniform(2, 10), 2)
        ])

    feed_df = pd.DataFrame(
        feed_rows,
        columns=["Cattle_ID", "FeedType", "FeedQuantityKg", "FeedingTime", "FeedCostUSD"]
    )

    return preg_df, sensor_df, feed_df


# ============================================================
# üì§ UPLOAD STATIC + KAGGLE FILES
# ============================================================

def upload_static_datasets(container_client, base_df):

    print("üìÅ Preparing static datasets...")

    preg_df, sensor_df, feed_df = generate_realistic_static_tables(base_df)

    preg_df.to_csv("pregnancy_record.csv", index=False)
    sensor_df.to_csv("sensor_metadata.csv", index=False)
    feed_df.to_csv("feed_type.csv", index=False)

    static_files = {
        "kaggle/milk_yield.csv": MILK_FILE,
        "kaggle/disease.csv": DISEASE_FILE,
        "static/pregnancy_record.csv": "pregnancy_record.csv",
        "static/sensor_metadata.csv": "sensor_metadata.csv",
        "static/feed_type.csv": "feed_type.csv"
    }

    for blob_name, local_path in static_files.items():
        upload_big_file(container_client, blob_name, local_path)

    print("‚úîÔ∏è ALL static + Kaggle datasets uploaded to Bronze!")


# ============================================================
# üå¶Ô∏è WEATHER API
# ============================================================

def fetch_weather(city: str):
    try:
        r = requests.get(
            f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={OPENWEATHER_KEY}&units=metric",
            timeout=10
        )
        r.raise_for_status()
        data = r.json()
        return {
            "city": city,
            "temperature": data["main"]["temp"],
            "humidity": data["main"]["humidity"],
            "wind_speed": data["wind"]["speed"],
            "description": data["weather"][0]["description"]
        }
    except:
        return {
            "city": city,
            "temperature": round(random.uniform(20, 30), 2),
            "humidity": random.randint(45, 85),
            "wind_speed": round(random.uniform(1, 7), 2),
            "description": "partly cloudy (simulated)"
        }


# ============================================================
# üß† HEALTH SCORE LOGIC
# ============================================================

def compute_health_score(temp, milk, feed):
    score = 100
    if temp > 40: score -= 40
    elif temp > 39.5: score -= 30
    elif temp > 39: score -= 20

    if milk < 8: score -= 25
    elif milk < 10: score -= 15

    if feed < 5: score -= 25
    elif feed < 6: score -= 15

    return max(0, min(100, int(round(score))))


def classify_severity(score):
    if score < 70: return "Critical"
    if score < 90: return "Medium"
    return "Low"


# ============================================================
# üìÑ LOAD BASE DATA
# ============================================================

def load_base_data():
    milk_df = pd.read_csv(MILK_FILE)
    disease_df = pd.read_csv(DISEASE_FILE)

    base = pd.merge(
        milk_df,
        disease_df[["Cattle_ID", "Disease_Status"]],
        on="Cattle_ID",
        how="inner"
    )

    base = base[["Cattle_ID", "Region", "Milk_Yield_L", "Feed_Quantity_kg", "Disease_Status"]]
    base = base.drop_duplicates(subset=["Cattle_ID"])

    print(f"‚úÖ Loaded {len(base)} cattle baseline records.")
    return base


# ============================================================
# üßÆ GENERATE ONE STREAM BATCH
# ============================================================

def generate_batch(base_df):
    now = datetime.now(timezone.utc)

    env_records = []
    health_stream = []
    alerts = []

    # WEATHER per region
    for region in base_df["Region"].unique():
        city = REGION_TO_CITY.get(region, DEFAULT_CITY)
        w = fetch_weather(city)
        env_records.append({
            "Region": region,
            "City": city,
            "RecordDateTime": now.isoformat(),
            "Temperature": w["temperature"],
            "Humidity": w["humidity"],
            "WindSpeed": w["wind_speed"],
            "Description": w["description"]
        })

    # CATTLE health stream
    for _, row in base_df.iterrows():

        live_temp = round(random.uniform(37.5, 40.5), 2)
        live_milk = round(row["Milk_Yield_L"] + random.uniform(-2, 2), 2)
        live_feed = round(row["Feed_Quantity_kg"] + random.uniform(-1, 1), 2)

        score = compute_health_score(live_temp, live_milk, live_feed)
        severity = classify_severity(score)

        health_stream.append({
            "Cattle_ID": row["Cattle_ID"],
            "Region": row["Region"],
            "LiveTemperature": live_temp,
            "LiveMilkYield": live_milk,
            "LiveFeedIntake": live_feed,
            "HealthScore": score,
            "Severity": severity
        })

        alerts.append({
            "Cattle_ID": row["Cattle_ID"],
            "Region": row["Region"],
            "Severity": severity,
            "Message": f"Cattle {row['Cattle_ID']} ‚Äì Temp:{live_temp}¬∞C, Milk:{live_milk}L, Feed:{live_feed}kg, Score:{score}",
            "TriggerTime": now.isoformat()
        })

    return {
        "timestamp": now.isoformat(),
        "env_records": env_records,
        "health_stream": health_stream,
        "alerts": alerts
    }, now


# ============================================================
# üßµ STREAM UPLOAD
# ============================================================

def upload_json_stream(container_client, batch, timestamp):
    blob_name = f"api_streams/herdtrack_stream_{timestamp}.json"
    blob_client = container_client.get_blob_client(blob_name)

    blob_client.upload_blob(
        json.dumps(batch).encode("utf-8"),
        overwrite=True
    )

    print(f"‚úîÔ∏è Stream uploaded ‚Üí {blob_name}")


# ============================================================
# üöÄ MAIN LOOP
# ============================================================

def main():

    base_df = load_base_data()

    # Keep 1000 cattle for streaming
    base_df = base_df.sample(n=1000, random_state=42)

    container_client = get_blob_container()

    # upload static + kaggle files
    upload_static_datasets(container_client, base_df)

    # streaming forever
    while True:
        batch, now = generate_batch(base_df)
        ts = now.strftime("%Y%m%d_%H%M%S")

        upload_json_stream(container_client, batch, ts)

        print(f"‚úÖ Batch {ts} ‚Üí {len(batch['alerts'])} alerts")
        time.sleep(STREAM_INTERVAL_SEC)


if __name__ == "__main__":
    main()


‚úÖ Loaded 250000 cattle baseline records.
üìÅ Preparing static datasets...
‚¨ÜÔ∏è Uploading (chunked) ‚Üí kaggle/milk_yield.csv
‚úîÔ∏è Chunked upload completed ‚Üí kaggle/milk_yield.csv
‚¨ÜÔ∏è Uploading (chunked) ‚Üí kaggle/disease.csv


KeyboardInterrupt: 