<a href="https://colab.research.google.com/github/JanithRankelum/streamlit_app/blob/master/app_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import streamlit as st
import boto3
import pandas as pd
import numpy as np
from io import StringIO, BytesIO
from datetime import datetime, timedelta
import time
import json
from sklearn.ensemble import IsolationForest
import plotly.express as px

# AWS Configuration
BUCKET_NAME = 'riceprice-s3-bucket'
FILE_PATH = '/content/drive/MyDrive/cleaned_data.csv'
KINESIS_STREAM_NAME = 'rice-price-stream'  # Only used if stream exists

# Initialize AWS clients
s3 = boto3.client('s3')
kinesis = boto3.client('kinesis')
lambda_client = boto3.client('lambda')

# Check if Kinesis stream exists
def kinesis_stream_exists(stream_name):
    try:
        kinesis.describe_stream(StreamName=stream_name)
        return True
    except kinesis.exceptions.ResourceNotFoundException:
        return False
    except Exception as e:
        st.error(f"Error checking Kinesis stream: {e}")
        return False

# Function to upload data to S3 (and optionally Kinesis)
def upload_data(file_path, bucket_name):
    try:
        timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
        s3_file_name = f'rice_prices_{timestamp}.csv'
        s3.upload_file(file_path, bucket_name, s3_file_name)

        if kinesis_stream_exists(KINESIS_STREAM_NAME):
            df = pd.read_csv(file_path)
            for _, row in df.iterrows():
                record = {
                    'timestamp': str(datetime.now()),
                    'province': row['province'],
                    'price': float(row['price'])
                }
                kinesis.put_record(
                    StreamName=KINESIS_STREAM_NAME,
                    Data=json.dumps(record),
                    PartitionKey=row['province']
                )
            st.success(f"Uploaded {file_path} to S3 and Kinesis")
        else:
            st.success(f"Uploaded {file_path} to S3 (Kinesis stream not found)")
    except Exception as e:
        st.error(f"Error uploading file: {e}")

@st.cache_data(ttl=60)
def read_processed_data(bucket_name):
    try:
        processed_key = 'processed_data/latest.parquet'
        obj = s3.get_object(Bucket=bucket_name, Key=processed_key)
        return pd.read_parquet(BytesIO(obj['Body'].read()))
    except:
        try:
            response = s3.list_objects_v2(Bucket=bucket_name)
            if 'Contents' not in response:
                return pd.DataFrame()

            latest_file = max(response['Contents'], key=lambda x: x['LastModified'])
            file_key = latest_file['Key']
            response = s3.get_object(Bucket=bucket_name, Key=file_key)
            csv_data = response['Body'].read().decode('utf-8')
            df = pd.read_csv(StringIO(csv_data))

            if 'date' in df.columns:
                df['date'] = pd.to_datetime(df['date'])
            return df
        except Exception as e:
            st.error(f"Error reading data: {e}")
            return pd.DataFrame()

def detect_anomalies(df):
    if len(df) == 0:
        return pd.DataFrame()

    df = df.copy()
    df['z_score'] = (df['price'] - df['price'].mean()) / df['price'].std()
    df['abs_z_score'] = np.abs(df['z_score'])

    model = IsolationForest(contamination=0.05)
    df['ml_anomaly'] = model.fit_predict(df[['price']].values) == -1
    df['is_anomaly'] = (df['abs_z_score'] > 2.5) | df['ml_anomaly']

    return df

def calculate_trends(df):
    if len(df) == 0:
        return pd.DataFrame()

    df = df.sort_values('date')
    df['3_day_ma'] = df['price'].rolling(window=3).mean()
    df['7_day_ma'] = df['price'].rolling(window=7).mean()
    df['trend'] = np.where(df['3_day_ma'] > df['7_day_ma'], 'Upward', 'Downward')

    return df

st.set_page_config(layout="wide")
st.title("Rice Price Monitoring Dashboard")

with st.sidebar:
    st.header("Controls")
    refresh_rate = st.slider("Refresh rate (seconds)", 10, 300, 60)
    anomaly_threshold = st.slider("Anomaly sensitivity", 1.0, 5.0, 2.5, 0.1)

    if st.button("Upload New Data"):
        upload_data(FILE_PATH, BUCKET_NAME)

    st.markdown("---")
    st.markdown("**System Status:**")
    st.markdown(f"- S3 Bucket: `{BUCKET_NAME}`")
    st.markdown(f"- Kinesis Stream: {'Found' if kinesis_stream_exists(KINESIS_STREAM_NAME) else 'Not found'}")

    # st.markdown("**Next Steps:**")
    # st.markdown("- Spark integration for large datasets")
    # st.markdown("- Weather data correlation")
    # st.markdown("- Predictive analytics")

tab1, tab2, tab3 = st.tabs(["Overview", "Trend Analysis", "Anomaly Detection"])

data = read_processed_data(BUCKET_NAME)
if not data.empty:
    data = calculate_trends(data)
    data = detect_anomalies(data)

    with tab1:
        col1, col2 = st.columns(2)

        with col1:
            st.subheader("Current Market Status")
            latest_date = data['date'].max()
            latest_data = data[data['date'] == latest_date]

            if not latest_data.empty:
                avg_price = latest_data['price'].mean()
                prev_avg = data[data['date'] < latest_date]['price'].mean() if len(data) > 10 else 0
                delta = avg_price - prev_avg
                st.metric(
                    f"National Average Price ({latest_date.strftime('%Y-%m-%d')})",
                    f"LKR {avg_price:.2f}",
                    f"{delta:+.2f} vs prior period" if len(data) > 10 else "Insufficient data"
                )
            else:
                st.warning("No recent price data available.")

            st.plotly_chart(
                px.line(
                    data,
                    x='date',
                    y='price',
                    title="Price Movement"
                ),
                use_container_width=True
            )

        with col2:
            st.subheader("Provincial Overview")
            st.plotly_chart(
                px.bar(
                    data.groupby('province')['price'].mean().reset_index(),
                    x='province',
                    y='price',
                    title="Average Prices by Province"
                ),
                use_container_width=True
            )

    with tab2:
        st.subheader("Detailed Trend Analysis")
        trend_col1, trend_col2 = st.columns(2)

        with trend_col1:
            st.plotly_chart(
                px.line(
                    data.set_index('date')[['price', '3_day_ma', '7_day_ma']].reset_index(),
                    x='date',
                    y=['price', '3_day_ma', '7_day_ma'],
                    title="Price with Moving Averages"
                ),
                use_container_width=True
            )

        with trend_col2:
            st.plotly_chart(
                px.histogram(
                    data,
                    x='trend',
                    title="Trend Direction Distribution"
                ),
                use_container_width=True
            )

    with tab3:
        st.subheader("Anomaly Detection Center")
        anomalies = data[data['is_anomaly']]

        if not anomalies.empty:
            alert_col1, alert_col2 = st.columns(2)

            with alert_col1:
                fig = px.scatter(
                    anomalies,
                    x='date',
                    y='price',
                    color='province',
                    size='abs_z_score',
                    title="Detected Anomalies",
                    hover_data=['z_score']
                )
                st.plotly_chart(fig, use_container_width=True)

            with alert_col2:
                st.dataframe(
                    anomalies[['date', 'province', 'price', 'z_score']].sort_values('z_score', ascending=False),
                    height=400
                )

            st.warning(f"🚨 {len(anomalies)} anomalies detected in current data window")
        else:
            st.success("No anomalies detected in current data window")

else:
    st.warning("No data available in S3 bucket")

last_refresh = st.empty()
last_refresh.text(f"Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
