<a href="https://colab.research.google.com/github/aShYousef/Freecode2/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from google.colab import drive

MOUNT_PATH = '/content/drive'

try:
    print(f"Mounting Google Drive to {MOUNT_PATH}...")
    drive.mount(MOUNT_PATH, force_remount=True)
    print("Drive mounted successfully.")

except Exception as e:
    print(f"Failed to mount Drive: {e}")

Mounting Google Drive to /content/drive...
Failed to mount Drive: mount failed


In [3]:
"""
=============================================================================
Project Name: Cloud-Based Distributed Data Processing Service
Authors: Alaa Yousef & Misk Ashour
Supervised By: Dr. Rebhi S. Baraka
Description: A Streamlit app using Apache Spark to analyze data and simulate
             distributed computing scalability.
=============================================================================
"""

# 1. Install necessary libraries
!pip install pyspark streamlit pyngrok

# 2. Import libraries for the notebook environment
import os
import time
import shutil
import gc
import pandas as pd
import numpy as np
from datetime import datetime
from pyngrok import ngrok
from google.colab import drive, userdata
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.clustering import KMeans

# 3. Authenticate Ngrok (to make the app public)
try:
    ngrok_auth_token = userdata.get('NGROK_TOKEN')
    if ngrok_auth_token:
        ngrok.set_auth_token(ngrok_auth_token)
        print("Ngrok Authenticated.")
except:
    pass

# =============================================================================
# STREAMLIT APPLICATION CODE
# This string contains the entire web app logic that will be saved to app.py
# =============================================================================
streamlit_application_code = r'''
import streamlit as st
import time
import pandas as pd
import os
import gc
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.clustering import KMeans

# --- Page Setup ---
st.set_page_config(page_title="Distributed Data Service", layout="wide")

# --- Constants ---
RESULTS_STORAGE_PATH = "/content/drive/MyDrive/University_Project_Results"
DEMO_DATASET_PATH = "/content/large_dataset_150MB.csv"

# --- Helper Function: Start Spark ---
def initialize_spark_session(app_name="MasterNode"):
    # Create a Spark session with 4GB memory
    return SparkSession.builder \
        .master("local[*]") \
        .appName(app_name) \
        .config("spark.driver.memory", "4g") \
        .getOrCreate()

# Initialize the main Spark session
spark_session = initialize_spark_session()

# --- Sidebar: System Control & Info ---
with st.sidebar:
    st.header("System Control")
    st.info("System Status: Active")

    st.markdown("### Developed By:\n**Alaa Yousef & Misk Ashour**")
    st.write("---")

    # Checkbox to switch to "Report Mode" (clean view)
    is_report_mode = st.checkbox("Generate Report Mode", value=False)

    # Button: Save all results to Google Drive
    if st.button("Save Results to Drive"):
        # Create folder if it doesn't exist
        if not os.path.exists(RESULTS_STORAGE_PATH):
            try:
                os.makedirs(RESULTS_STORAGE_PATH, exist_ok=True)
            except:
                st.error("Could not create Drive folder. Make sure Drive is mounted.")

        if os.path.exists(RESULTS_STORAGE_PATH):
            # Create a timestamped folder for this run
            timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            output_directory = os.path.join(RESULTS_STORAGE_PATH, f"Report_{timestamp_str}")
            os.makedirs(output_directory, exist_ok=True)

            saved_files_list = []

            # Save the Original Dataset
            if "dataset_dataframe" in st.session_state and st.session_state.dataset_dataframe is not None:
                st.session_state.dataset_dataframe.to_csv(os.path.join(output_directory, "Original_Dataset.csv"), index=False)
                saved_files_list.append("Original Dataset")

            # Save Statistics
            if "dataset_dataframe" in st.session_state and st.session_state.dataset_dataframe is not None:
                statistics_dataframe = st.session_state.dataset_dataframe.describe()
                statistics_dataframe.to_csv(os.path.join(output_directory, "Statistics.csv"))
                saved_files_list.append("Statistics")

            # Save ML Results
            if "model_performance_results" in st.session_state and st.session_state.model_performance_results is not None:
                st.session_state.model_performance_results.to_csv(os.path.join(output_directory, "ML_Results.csv"), index=False)
                saved_files_list.append("ML Results")

            # Save Scalability Results
            if "scalability_results" in st.session_state and st.session_state.scalability_results is not None:
                st.session_state.scalability_results.to_csv(os.path.join(output_directory, "Scalability_Results.csv"), index=False)
                saved_files_list.append("Scalability Data")

            # Feedback to user
            if saved_files_list:
                st.success(f"Saved successfully to Drive folder: Report_{timestamp_str}")
                st.write(f"Saved Files: {', '.join(saved_files_list)}")
            else:
                st.warning("No data loaded to save yet!")
        else:
            st.error("Drive path not accessible.")

# --- Main Page Title ---
if is_report_mode:
    st.title("Final Project Report")
else:
    st.title("Cloud-Based Distributed Data Processing Service")

# Initialize session state for data
if "dataset_dataframe" not in st.session_state:
    st.session_state.dataset_dataframe = None

# --- Section 1: Data Ingestion (Upload or Demo) ---
if not is_report_mode:
    st.subheader("1. Data Ingestion")
    ingestion_source = st.radio("Input Source:", ["Upload File", "Use Demo Data (Fast)"], horizontal=True)

    loaded_dataframe = None
    if ingestion_source == "Upload File":
        # Handle file upload
        uploaded_file = st.file_uploader("Upload CSV/JSON", type=['csv','json'])
        if uploaded_file:
            if uploaded_file.name.endswith('.csv'):
                loaded_dataframe = pd.read_csv(uploaded_file)
            else:
                loaded_dataframe = pd.read_json(uploaded_file)
    else:
        # Load Demo Data from server
        if st.button("Load Demo File"):
            if os.path.exists(DEMO_DATASET_PATH):
                loaded_dataframe = pd.read_csv(DEMO_DATASET_PATH)
                st.success("Demo data loaded from local server.")
            else:
                st.error("Demo file missing. Please run generation script.")

    if loaded_dataframe is not None:
        st.session_state.dataset_dataframe = loaded_dataframe
        st.success(f"Ingested {len(loaded_dataframe)} records.")

# --- Main Logic: If data is loaded, show tabs ---
if st.session_state.dataset_dataframe is not None:
    active_dataframe = st.session_state.dataset_dataframe

    # Function to show stats
    def display_statistics():
        st.header("Statistical Analysis")
        # Use Pandas describe for quick stats
        st.write(active_dataframe.describe())

    # Function for Machine Learning
    def display_machine_learning_module():
        st.header("Machine Learning Models")
        # Get numeric columns only
        numeric_columns = [col for col in active_dataframe.columns if pd.api.types.is_numeric_dtype(active_dataframe[col])]

        if len(numeric_columns) > 1:
            col1, col2 = st.columns([1, 2])
            target_column = col1.selectbox("Target (Y)", numeric_columns)
            feature_columns = col2.multiselect("Features (X)", [col for col in numeric_columns if col != target_column])

            st.session_state.feature_columns = feature_columns
            st.session_state.target_column = target_column

            if st.button("Train Models"):
                if feature_columns:
                    # Convert Pandas DF to Spark DF for processing
                    temp_spark_session = initialize_spark_session()
                    spark_dataframe = temp_spark_session.createDataFrame(active_dataframe)

                    # Create feature vector
                    vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
                    processed_data = vector_assembler.transform(spark_dataframe).select("features", target_column).withColumnRenamed(target_column, "label")

                    # Define the models to train
                    ml_models = {
                        "Linear Regression": LinearRegression(featuresCol="features", labelCol="label"),
                        "Decision Tree": DecisionTreeRegressor(featuresCol="features", labelCol="label"),
                        "Random Forest": RandomForestRegressor(featuresCol="features", labelCol="label"),
                        "K-Means": KMeans(featuresCol="features", k=3)
                    }

                    performance_metrics = []
                    progress_bar = st.progress(0)

                    # Loop through models and train them
                    for index, (model_name, model_instance) in enumerate(ml_models.items()):
                        time.sleep(0.5) # FIX: Add sleep to prevent flickering
                        start_time = time.time()
                        try:
                            model_instance.fit(processed_data)
                            duration_seconds = time.time() - start_time
                            execution_status = "Success"
                        except Exception as e:
                            print(f"Error training {model_name}: {e}") # FIX: Log error instead of silent fail
                            duration_seconds = 0
                            execution_status = "Failed"

                        performance_metrics.append({
                            "Model": model_name,
                            "Time (s)": round(duration_seconds, 4),
                            "Status": execution_status
                        })
                        progress_bar.progress((index + 1) / 4)

                    st.session_state.model_performance_results = pd.DataFrame(performance_metrics)

            if "model_performance_results" in st.session_state:
                st.table(st.session_state.model_performance_results)

    # Function for Scalability Test
    def display_scalability_simulation():
        st.header("Scalability Simulation")
        feature_columns = st.session_state.get('feature_columns', [])
        target_column = st.session_state.get('target_column', None)

        if st.button("Run Cluster Test"):
            if not feature_columns:
                st.error("Configure ML tab first.")
            else:
                node_counts = [1, 2, 4, 8]
                scalability_metrics = []

                # Stop current session to free resources
                global spark_session
                spark_session.stop()

                progress_bar = st.progress(0)
                # Loop through different node counts (Simulating Cluster Size)
                for index, node_count in enumerate(node_counts):
                    gc.collect()
                    time.sleep(0.5) # FIX: Add sleep to prevent flickering
                    try:
                        # Create a new isolated Spark session with N threads (nodes)
                        isolated_spark_session = SparkSession.builder.master(f"local[{node_count}]").appName(f"Node_{node_count}").getOrCreate()
                        node_dataframe = isolated_spark_session.createDataFrame(active_dataframe)

                        # Prepare data
                        vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
                        training_data = vector_assembler.transform(node_dataframe).select("features", target_column).withColumnRenamed(target_column, "label")

                        # Measure training time
                        start_time = time.time()
                        LinearRegression(featuresCol="features", labelCol="label").fit(training_data)
                        end_time = time.time()

                        latency = end_time - start_time

                        # Calculate Speedup and Efficiency
                        baseline_latency = scalability_metrics[0]["Time"] if index > 0 else latency
                        speedup_factor = baseline_latency / latency if latency > 0 else 0

                        scalability_metrics.append({
                            "Nodes": node_count,
                            "Time": latency,
                            "Speedup": speedup_factor,
                            "Efficiency": speedup_factor / node_count
                        })
                        isolated_spark_session.stop()
                    except Exception as e:
                        print(f"Error on node {node_count}: {e}") # FIX: Log error
                        pass
                    progress_bar.progress((index + 1) / 4)

                # Restart main session
                spark_session = initialize_spark_session()
                st.session_state.scalability_results = pd.DataFrame(scalability_metrics)

        if "scalability_results" in st.session_state:
            results_df = st.session_state.scalability_results
            st.dataframe(results_df)
            # Plot charts
            chart_col1, chart_col2 = st.columns(2)
            chart_col1.line_chart(results_df.set_index("Nodes")["Speedup"])
            chart_col2.line_chart(results_df.set_index("Nodes")["Efficiency"])
            st.bar_chart(results_df.set_index("Nodes")["Time"])

    # --- Render Tabs ---
    if is_report_mode:
        display_statistics()
        st.markdown("---")
        display_machine_learning_module()
        st.markdown("---")
        display_scalability_simulation()
    else:
        tab_stats, tab_ml, tab_scale = st.tabs(["Statistics", "ML Models", "Scalability"])
        with tab_stats: display_statistics()
        with tab_ml: display_machine_learning_module()
        with tab_scale: display_scalability_simulation()
'''

# =============================================================================
# SAVE AND RUN
# =============================================================================

# 4. Save the string above to a file named 'app.py'
with open("app.py", "w") as file_handle:
    file_handle.write(streamlit_application_code)

print("Restarting Server...")

# 5. Kill any existing Streamlit process to avoid conflicts
get_ipython().system_raw('pkill -9 streamlit')
time.sleep(2)

# 6. Run Streamlit in the background
get_ipython().system_raw('nohup streamlit run app.py --server.port 8501 --server.maxUploadSize 2000 &')

# 7. Create Ngrok Tunnel to expose the app
try:
    ngrok.kill()
    public_url = ngrok.connect(8501).public_url
    print(f"App Updated! Open here: {public_url}")
except Exception as error_message:
    print(f"Error: {error_message}")

Ngrok Authenticated.
Restarting Server...
App Updated! Open here: https://miss-centered-rustlingly.ngrok-free.dev


In [None]:
import pandas as pd
import numpy as np
import os

OUTPUT_FILENAME = "large_dataset_150MB.csv"

ROW_COUNT = 600_000
COLUMN_COUNT = 6

print(f"Generating dataset with {ROW_COUNT} rows...")

synthetic_data = np.random.randn(ROW_COUNT, COLUMN_COUNT)

column_names = [f'Feature_{i}' for i in range(1, COLUMN_COUNT)] + ['Target']

dataset_dataframe = pd.DataFrame(
    synthetic_data,
    columns=column_names
)

dataset_dataframe.to_csv(OUTPUT_FILENAME, index=False)

file_size_bytes = os.path.getsize(OUTPUT_FILENAME)
file_size_mb = file_size_bytes / (1024 * 1024)

print(f"Success! New file generated. Size: {file_size_mb:.2f} MB")
print("Action Required: Return to the application, click 'Load Demo File', and run the simulation.")

Generating dataset with 600000 rows...
Success! New file generated. Size: 67.40 MB
Action Required: Return to the application, click 'Load Demo File', and run the simulation.
