In [None]:
# run this in goole collab 


In [None]:
# Step 1: Install necessary dependencies
!pip install flask flask-cors pyngrok scikit-learn
!wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64.deb
!dpkg -i cloudflared-linux-amd64.deb

# Step 2: Import necessary libraries and initialize Flask, Spark, and AdaBoost
from flask import Flask, jsonify
from flask_cors import CORS
import requests
import base64
from io import BytesIO
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, dayofyear
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from sklearn.ensemble import AdaBoostRegressor
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Initialize Flask app
app = Flask(__name__)
CORS(app)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("ElectricityData") \
    .getOrCreate()

# API configuration
API_URL = "https://api.eia.gov/v2/electricity/rto/daily-region-data/data/"
API_KEY = "8pT7GUYm3TYXQqAbGvm65saYJqwzlYHeIEwcdC53"

# Function to get data from the API and load into Spark DataFrame
def get_api_data():
    params = {
        "frequency": "daily",
        "data[0]": "value",
        "sort[0][column]": "period",
        "sort[0][direction]": "desc",
        "offset": "0",
        "length": "5000",
        "api_key": API_KEY,
    }
    response = requests.get(API_URL, params=params)
    data = response.json()
    raw_df = spark.createDataFrame(data["response"]["data"])

    # Convert the 'value' column to float
    raw_df = raw_df.withColumn("value", col("value").cast("float"))
    return raw_df

# Function to perform K-means clustering using PySpark
def perform_kmeans(df, n_clusters=3):
    assembler = VectorAssembler(inputCols=["value"], outputCol="features")
    assembled_df = assembler.transform(df)

    kmeans = KMeans().setK(n_clusters).setSeed(42).setFeaturesCol("features").setPredictionCol("cluster")
    model = kmeans.fit(assembled_df)
    clustered_df = model.transform(assembled_df)
    return clustered_df

# Function to generate the cluster plot for K-means and AdaBoost
def generate_plot(clustered_df, adaboost_predictions):
    pandas_df = clustered_df.select("cluster", "value").toPandas()

    plt.figure(figsize=(10, 8))

    # Plot K-means clusters
    for cluster in pandas_df['cluster'].unique():
        cluster_data = pandas_df[pandas_df['cluster'] == cluster]
        plt.scatter(cluster_data.index, cluster_data['value'], label=f"K-means Cluster {cluster}", alpha=0.6)

    # Plot AdaBoost predictions
    plt.plot(adaboost_predictions.index, adaboost_predictions.values, label="AdaBoost Regression", color="purple", linewidth=2)

    plt.xlabel('Index')
    plt.ylabel('Value (MWh)')
    plt.title('Clustering and Regression of Electricity Data')
    plt.legend()

    buf = BytesIO()
    plt.savefig(buf, format='png')
    buf.seek(0)
    plot_data = base64.b64encode(buf.read()).decode('utf-8')
    buf.close()
    return plot_data

# Function to predict the weekly electricity bill using both Linear Regression and AdaBoost
def predict_weekly_bill(df):
    # Convert 'period' to numeric day of the year
    df = df.withColumn("day", dayofyear(to_date(col("period"))))

    # Prepare data for Linear Regression
    assembler = VectorAssembler(inputCols=["day"], outputCol="features")
    data = assembler.transform(df.select("day", col("value").cast("float").alias("label")))

    # Train Linear Regression model
    lr = LinearRegression(featuresCol="features", labelCol="label")
    lr_model = lr.fit(data)

    # Predict usage for the next 7 days using Linear Regression
    max_day = df.agg({"day": "max"}).collect()[0][0]
    future_days = spark.createDataFrame([(max_day + i,) for i in range(1, 8)], ["day"])
    future_data = assembler.transform(future_days)
    lr_predictions = lr_model.transform(future_data).select("prediction").rdd.flatMap(lambda x: x).collect()

    # Prepare data for AdaBoost
    pandas_df = df.select("day", "value").toPandas()
    adaboost_model = AdaBoostRegressor()
    adaboost_model.fit(pandas_df[['day']], pandas_df['value'])
    adaboost_predictions = adaboost_model.predict([[max_day + i] for i in range(1, 8)])

    # Assume a cost of $0.12 per unit (kWh) for weekly prediction
    rate_per_unit = 0.12
    weekly_bill_lr = round(sum(lr_predictions) * rate_per_unit, 2)
    weekly_bill_adaboost = round(sum(adaboost_predictions) * rate_per_unit, 2)

    return weekly_bill_lr, weekly_bill_adaboost, pd.Series(adaboost_predictions)

@app.route('/cluster', methods=['GET'])
def cluster_data():
    try:
        # Fetch the data and perform clustering
        df = get_api_data()
        clustered_df = perform_kmeans(df)

        # Predict the weekly bill and generate comparative plot
        weekly_bill_lr, weekly_bill_adaboost, adaboost_predictions = predict_weekly_bill(df)
        plot_data = generate_plot(clustered_df, adaboost_predictions)

        response = {
            "data": clustered_df.select("period", "value", "cluster").toPandas().to_dict(orient='records'),
            "plot": f"data:image/png;base64,{plot_data}",
            "weekly_bill_lr": weekly_bill_lr,
            "weekly_bill_adaboost": weekly_bill_adaboost
        }
        return jsonify(response)
    except Exception as e:
        return jsonify({"error": str(e)}), 500

# Step 3: Run the Flask app in the background
import threading

def run_app():
    app.run(host="0.0.0.0", port=8000)

thread = threading.Thread(target=run_app)
thread.start()

# Step 4: Start cloudflared to expose the Flask app publicly
!cloudflared tunnel --url http://localhost:8000
