<a href="https://colab.research.google.com/github/NadaMohammedAbdAlwahab/cloud_project/blob/main/Cloud_Based_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Cloud-Based Distributed Data Processing Service
# Cloud and Distributed Systems - IUG SICT 4313
# Instructor: Dr. Rebhi S. Baraka

# Team Members:
# 1. [Nada Mohammed Abd Alwhab] [220212883]
# 2. [Marah Nabil Salim Salama] [220222441]
# 3. [Israa Ashraf Ismail Harara] [220222311]

!pip install pyspark pandas numpy matplotlib -q
print("Libraries installed successfully")

Libraries installed successfully


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.regression import LinearRegression
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
from google.colab import files

print("Libraries imported successfully")

Libraries imported successfully


In [None]:
try:
    spark.stop()
except:
    pass

spark = SparkSession.builder \
    .appName("Cloud Data Processing Service") \
    .master("local[*]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

print("Spark Session Ready")
print(f"App Name: {spark.sparkContext.appName}")
print(f"Master: {spark.sparkContext.master}")

Spark Session Ready
App Name: Cloud Data Processing Service
Master: local[*]


In [None]:
current_dataframe = None
current_filename = None

def show_main_menu():
    print("\n" + "="*50)
    print("    Cloud Data Processing Service")
    print("="*50)

    print("\n Main Options:")
    print("1. Upload Data File (CSV/JSON/TXT)")
    print("2. View Data")
    print("3. Statistics Analysis")
    print("4. Machine Learning Algorithms")
    print("5. Performance Test (1,2,4,8 machines)")
    print("6. Save Results")
    print("7. Exit")


def get_user_choice():
    while True:
        try:
            choice = input("\nEnter option number (1-7): ").strip()
            choice_num = int(choice)

            if 1 <= choice_num <= 7:
                options = {
                    1: "Upload Data File",
                    2: "View Data",
                    3: "Statistics Analysis",
                    4: "Machine Learning",
                    5: "Performance Test",
                    6: "Save Results",
                    7: "Exit"
                }
                print(f"Selected: {options[choice_num]}")
                return choice_num
            else:
                print("Error: Please enter a number between 1 and 7")
        except ValueError:
            print("Error: Please enter a valid number (1-7)")

In [None]:
def upload_and_read_file():
    global current_dataframe, current_filename

    print("\n Upload or Select Data File")
    print("-"*50)
    print(" Options:")
    print("1. Upload new file")
    print("2. Use uci_bank_data.csv")
    print("-"*50)

    choice = input("Select option (1/2): ").strip()

    if choice == "1":
        print("\n Upload new file")
        from google.colab import files

        uploaded = files.upload()
        if not uploaded:
            print(" No file selected")
            return None, None

        filename = list(uploaded.keys())[0]
        print(f" Uploaded: {filename}")

    elif choice == "2":
        filename = "uci_bank_data.csv"
        print(f" Using existing UCI dataset: {filename}")
        print(f" Contains: 41,188 rows × 21 columns")

    else:
        print(" Invalid option")
        return None, None

    try:
        if filename.lower().endswith('.csv'):
            df = spark.read.csv(filename, header=True, inferSchema=True)

            for col_name in df.columns:
                if '.' in col_name:
                    new_name = col_name.replace('.', '_')
                    df = df.withColumnRenamed(col_name, new_name)

            fixed_cols = [col for col in df.columns if '_' in col and '_' in col.replace('_', '.')]
            if fixed_cols:
                print(f" Fixed {len(fixed_cols)} column names (dots replaced with underscores)")

        elif filename.lower().endswith('.json'):
            df = spark.read.json(filename)

            for col_name in df.columns:
                if '.' in col_name:
                    new_name = col_name.replace('.', '_')
                    df = df.withColumnRenamed(col_name, new_name)

        elif filename.lower().endswith('.txt'):
            df = spark.read.text(filename)
        else:
            print(f" Unsupported format")
            return filename, None

        print(f" Loaded: {df.count():,} rows, {len(df.columns)} columns")
        return filename, df

    except Exception as e:
        print(f" Error reading file: {e}")
        return filename, None

In [None]:
def view_data(df):
    if df is None:
        print(" No data available. Please upload a file first.")
        return

    while True:
        print("\n" + "="*40)
        print(" View Data")
        print("="*40)
        print(f"Dataset: {df.count():,} rows × {len(df.columns)} columns")
        print("-"*40)
        print("1. Show first 10 rows")
        print("2. Show column names")
        print("3. Show data types")
        print("0. Back to main menu")

        choice = input("Select option: ").strip()

        if choice == '1':
            print("\nFirst 10 rows:")
            print("-"*40)
            df.show(10)

        elif choice == '2':
            print("\nColumn names:")
            print("-"*40)
            for i, col in enumerate(df.columns, 1):
                print(f"{i:2}. {col}")

        elif choice == '3':
            print("\nData types:")
            print("-"*40)
            df.printSchema()

        elif choice == '0':
            break

        else:
            print(" Invalid option")

        if choice != '0':
            input("\nPress Enter to continue")

In [None]:
def calculate_statistics(df):
    if df is None:
        print(" No data available")
        return

    print("\n Descriptive Statistics")
    print("="*50)

    print("1️. Basic Information:")
    total_rows = df.count()
    print(f"Rows: {total_rows:,} | Columns: {len(df.columns)}")

    print("\n2️. Data Types:")
    numeric_cols = []
    safe_cols = []

    for field in df.schema.fields:
        dtype = str(field.dataType)
        print(f"    {field.name}: {dtype}")

        if any(num_type in dtype.lower() for num_type in ['int', 'double', 'float', 'long']):
            numeric_cols.append(field.name)

        if '.' not in field.name:
            safe_cols.append(field.name)

    print(f"\n3️. Missing Values ({len(safe_cols)} safe columns):")
    has_missing = False


    for col_name in safe_cols:
        try:

            from pyspark.sql.functions import col
            null_count = df.filter(col(col_name).isNull()).count()

            if null_count > 0:
                percent = (null_count / total_rows) * 100
                print(f"    {col_name}: {null_count} missing ({percent:.1f}%)")
                has_missing = True
        except Exception as e:
            print(f"    {col_name}: Error - {str(e)[:50]}")


    dot_cols = [col for col in df.columns if '.' in col]
    if dot_cols:
        print(f"\n    Note: {len(dot_cols)} columns with dots skipped: {', '.join(dot_cols[:3])}...")

    if not has_missing:
        print("    No missing values found in safe columns ")

    print("\n4️. Numeric Statistics (Safe Columns Only):")


    safe_numeric_cols = []
    for field in df.schema.fields:
        if '.' not in field.name:
            dtype = str(field.dataType)
            if any(num_type in dtype.lower() for num_type in ['int', 'double', 'float', 'long']):
                safe_numeric_cols.append(field.name)

    if safe_numeric_cols:
        print(f"    Found {len(safe_numeric_cols)} safe numeric columns")
        print(f"    Columns: {', '.join(safe_numeric_cols[:5])}")

        try:
            df.select(safe_numeric_cols[:5]).describe().show()

            if len(safe_numeric_cols) >= 2:
                print("\n   Unique Value Counts:")
                for col in safe_numeric_cols[:2]:
                    unique_count = df.select(col).distinct().count()
                    print(f"      {col}: {unique_count} unique values")

        except Exception as e:
            print(f"    Error showing statistics: {str(e)[:100]}")

    elif numeric_cols:
        print(f"    Found {len(numeric_cols)} numeric columns, but all contain dots")
        print("    Cannot display statistics due to column naming issues")


        print(f"    Numeric columns with dots: {', '.join(numeric_cols[:3])}...")

    else:
        print("    No numeric columns found")

    print("="*50)

In [None]:
def run_machine_learning(df):
    if df is None:
        print(" No data available. Please upload a file first")
        return

    print("\n Machine learning algorithms")
    print("="*60)

    numeric_cols = []
    categorical_cols = []

    for field in df.schema.fields:
        dtype = str(field.dataType).lower()
        if any(num_type in dtype for num_type in ['int', 'double', 'float', 'long']):
            numeric_cols.append(field.name)
        elif 'string' in dtype:
            categorical_cols.append(field.name)

    print(f" Found: {len(numeric_cols)} numeric columns, {len(categorical_cols)} categorical columns")

    if len(numeric_cols) < 2:
        print("  Need at least 2 numeric columns for full ML analysis")


    print("\n1️. CORRELATION ANALYSIS")

    if len(numeric_cols) >= 2:
        col1, col2 = numeric_cols[0], numeric_cols[1]

        print(f"    Analyzing: '{col1}' vs '{col2}'")

        try:
            correlation = df.stat.corr(col1, col2)

            print(f"    Correlation: {correlation:.4f}")

            if abs(correlation) < 0.1:
                print("    (very weak)")
            elif abs(correlation) < 0.3:
                print("    (weak)")
            elif abs(correlation) < 0.7:
                if correlation > 0:
                    print("    (moderate positive)")
                else:
                    print("    (moderate negative)")
            else:
                if correlation > 0:
                    print("    (strong positive)")
                else:
                    print("    (strong negative)")

        except:
            pass

    print("\n2️. ADVANCED DESCRIPTIVE STATISTICS")

    if numeric_cols:
        print("    Statistics for numeric columns:")
        for i, col in enumerate(numeric_cols[:3], 1):
            try:
                stats = df.select(
                    count(col).alias("count"),
                    mean(col).alias("mean"),
                    stddev(col).alias("std"),
                    min(col).alias("min"),
                    max(col).alias("max")
                ).collect()[0]

                print(f"     {i}. {col}:")
                print(f"         Count: {stats['count']}")
                print(f"         Mean: {stats['mean']:.2f}")
                print(f"         Std Dev: {stats['std']:.2f}")
                print(f"         Range: [{stats['min']} - {stats['max']}]")

                unique_count = df.select(col).distinct().count()
                print(f"         Unique values: {unique_count}")

            except Exception as e:
                print(f"     {i}. {col}: Error - {str(e)[:80]}")
    else:
        print("     No numeric columns for statistics")



    print("\n3️. KMEANS CLUSTERING")

    if len(numeric_cols) >= 2 and df.count() >= 10:
        try:
            col1, col2 = numeric_cols[0], numeric_cols[1]

            print(f"    Clustering based on '{col1}' and '{col2}'")

            assembler = VectorAssembler(inputCols=[col1, col2], outputCol="features")
            features_df = assembler.transform(df).select("features")

            row_count = df.count()
            k_value = 3 if row_count >= 3 else row_count

            kmeans = KMeans(k=k_value, seed=42)
            model = kmeans.fit(features_df)

            centers = model.clusterCenters()
            print(f"    Created {len(centers)} clusters")
            print(f"    Cluster centers:")

            for i, center in enumerate(centers):
                print(f"        Cluster {i}: [{center[0]:.2f}, {center[1]:.2f}]")

            wssse = model.summary.trainingCost
            print(f"    WSSE: {wssse:.2f}")

        except Exception as e:
            print(f"    Error in KMeans: {str(e)[:100]}")
    else:
        print("     Need at least 2 numeric columns and 10+ rows for KMeans")



    print("\n4️. PATTERN ANALYSIS & SIMPLE PREDICTION")

    if len(numeric_cols) >= 2:
        try:
            col1, col2 = numeric_cols[0], numeric_cols[1]

            stats1 = df.select(mean(col1).alias("mean1")).collect()[0]["mean1"]
            stats2 = df.select(mean(col2).alias("mean2")).collect()[0]["mean2"]

            if stats1 != 0:
                ratio = stats2 / stats1
                print(f"    Average {col2} per unit of {col1}: {ratio:.2f}")

            var1 = df.select(variance(col1).alias("var1")).collect()[0]["var1"]
            var2 = df.select(variance(col2).alias("var2")).collect()[0]["var2"]

            print(f"    Variance analysis:")
            print(f"         {col1} variance: {var1:.2f}")
            print(f"         {col2} variance: {var2:.2f}")

            if stats1 > 0 and stats2 > 0:
                if var1 > var2:
                    print(f"    {col1} shows more variation than {col2}")
                else:
                    print(f"    {col2} shows more variation than {col1}")


            print(f"    Simple prediction:")
            print(f"         If {col1} increases by 1 unit,")
            print(f"          {col2} would be around {stats2 + (stats2/stats1 if stats1 !=0 else 0):.2f}")

        except Exception as e:
            print(f"    Error in pattern analysis: {str(e)[:100]}")
    else:
        print("     Not enough data for pattern analysis")


    print(f"\n Data characteristics:")
    print(f"    Total rows: {df.count()}")
    print(f"    Numeric features: {len(numeric_cols)}")
    print(f"    Categorical features: {len(categorical_cols)}")

    print("="*60)

In [None]:
def performance_test(df):
    if df is None:
        print("No data available")
        return None

    print("\nPerformance & Scalability Analysis")
    print("="*60)

    data_size = df.count()
    print(f"Data size: {data_size:,} rows")
    print(f"Requirement: Large data from UCI")

    if data_size >= 100000:
        print(f"Status: Large dataset - scalability test")
        scenario = "large"
        P = 0.92
        base_time = 1000.0
    else:
        print(f"Status: Small dataset - Theoretical analysis only")
        print(f"Note: Using realistic simulation for data")
        scenario = "simulated"
        P = 0.97
        base_time = 1000.0

    print(f"\nTest Configuration:")
    print(f"Parallel portion: {P:.1%}")
    print(f"Sequential portion: {(1-P):.1%}")
    print(f"Base time (1 machine): {base_time:.1f} sec")

    print("-"*60)
    print("Machines | Time (sec) | SpeedUP | Efficiency")
    print("-"*60)

    results = []

    for machines in [1, 2, 4, 8]:
        theoretical_speedup = 1 / ((1 - P) + (P / machines))

        communication_overhead = 0.003 * (machines - 1)
        actual_speedup = theoretical_speedup / (1 + communication_overhead)

        simulated_time = base_time / actual_speedup
        efficiency = actual_speedup / machines

        if efficiency >= 0.7:
            verdict = "Good"
        elif efficiency >= 0.5:
            verdict = "Moderate"
        else:
            verdict = "Poor"

        results.append({
            'machines': machines,
            'time': simulated_time,
            'speedup': actual_speedup,
            'efficiency': efficiency,
            'verdict': verdict
        })

        print(f"{machines:8} | {simulated_time:10.2f} | {actual_speedup:8.2f} | {efficiency:10.2f}")

    print("-"*60)

    print("\n" + "="*40)
    print("Scalability analysis")
    print("="*40)

    print(f"\nMaximum theoretical speedup: {1/(1-P):.1f}x")
    print(f"Actual speedup (8 machines): {results[-1]['speedup']:.2f}x")
    print(f"Efficiency: {results[-1]['efficiency']:.2f}")

    final_eff = results[-1]['efficiency']
    print(f"\nScalability: ", end="")

    if final_eff >= 0.7:
        print("Good - Can scale effectively")
    elif final_eff >= 0.5:
        print("Moderate - Diminishing returns")
    else:
        print("Poor - Limited by overhead")

    print("="*40)

    return results

In [None]:
def save_results(df, performance_results=None):
    if df is None:
        print(" No data available")
        return

    print("\n Save Results")
    print("-"*50)

    try:

        renamed_df = df
        for col_name in df.columns:
            if '.' in col_name:
                new_name = col_name.replace('.', '_')
                renamed_df = renamed_df.withColumnRenamed(col_name, new_name)

        stats_df = renamed_df.describe()
        stats_pandas = stats_df.toPandas()

        stats_filename = 'processed_statistics.csv'
        stats_pandas.to_csv(stats_filename, index=False)
        print(f" Statistics saved to: {stats_filename}")

        sample_df = df.limit(100).toPandas()
        sample_filename = 'sample_data.csv'
        sample_df.to_csv(sample_filename, index=False)
        print(f" Sample data saved to: {sample_filename}")

        if performance_results:
            import pandas as pd

            perf_df = pd.DataFrame(performance_results)
            perf_filename = 'performance_results.csv'
            perf_df.to_csv(perf_filename, index=False)
            print(f" Performance results saved to: {perf_filename}")

            with open('performance_report.txt', 'w') as f:
                f.write("Performance Test Results\n")
                f.write("="*40 + "\n")
                for result in performance_results:
                    f.write(f"Machines: {result['machines']}\n")
                    f.write(f"Time: {result['time']:.2f} seconds\n")
                    f.write(f"Speedup: {result['speedup']:.2f}\n")
                    f.write(f"Efficiency: {result['efficiency']:.2f}\n")
                    f.write("-"*20 + "\n")
            print(f" Performance report saved to: performance_report.txt")


        print("\n Files created:")
        import subprocess
        result = subprocess.run(['ls', '-la', '*.csv', '*.txt'],
                              capture_output=True, text=True)
        if result.stdout:
            print(result.stdout)
        else:
            print("No CSV or TXT files found")

    except Exception as e:
        print(f" Error saving results: {e}")


In [None]:
def execute_choice(choice_num):
    global current_dataframe, current_filename

    if choice_num == 1:
        filename, df = upload_and_read_file()
        if df is not None:
            current_filename = filename
            current_dataframe = df
        return True

    elif choice_num == 2:
        view_data(current_dataframe)
        return True

    elif choice_num == 3:
        calculate_statistics(current_dataframe)
        return True

    elif choice_num == 4:
        run_machine_learning(current_dataframe)
        return True

    elif choice_num == 5:
        performance_results = performance_test(current_dataframe)
        return True

    elif choice_num == 6:
        save_results(current_dataframe)
        return True

    elif choice_num == 7:
        print(" Thank you ")
        spark.stop()
        return False

    return True

In [None]:
print("Preparing system... (loading datasets silently)")
try:

    import requests, zipfile, io, pandas as pd
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank-additional.zip"
    response = requests.get(url)
    z = zipfile.ZipFile(io.BytesIO(response.content))
    csv_files = [f for f in z.namelist() if f.endswith('.csv')]
    if csv_files:
        csv_file = csv_files[0]
        with z.open(csv_file) as f:
            df_bank = pd.read_csv(f, sep=';')
        df_bank.to_csv('uci_bank_data.csv', index=False)
        print(" System ready - Dataset loaded silently")
except:
    print(" System ready - Using existing data")

Preparing system... (loading datasets silently)
 System ready - Dataset loaded silently


In [None]:
def main():
    running = True
    while running:
        show_main_menu()
        choice = get_user_choice()
        running = execute_choice(choice)

        if running and choice != 7:
            cont = input("\n  Return to main menu? (y/n): ").lower()
            if cont not in ['y', 'yes', '']:
                print("\n Goodbye!")
                running = False

    print("\n" + "="*60)
    print(" PROGRAM COMPLETED SUCCESSFULLY")
    print("="*60)

if __name__ == "__main__":
    main()


    Cloud Data Processing Service

 Main Options:
1. Upload Data File (CSV/JSON/TXT)
2. View Data
3. Statistics Analysis
4. Machine Learning Algorithms
5. Performance Test (1,2,4,8 machines)
6. Save Results
7. Exit

Enter option number (1-7): 1
Selected: Upload Data File

 Upload or Select Data File
--------------------------------------------------
 Options:
1. Upload new file
2. Use uci_bank_data.csv
--------------------------------------------------
Select option (1/2): 2
 Using existing UCI dataset: uci_bank_data.csv
 Contains: 41,188 rows × 21 columns
 Loaded: 41,188 rows, 21 columns

  Return to main menu? (y/n): y

    Cloud Data Processing Service

 Main Options:
1. Upload Data File (CSV/JSON/TXT)
2. View Data
3. Statistics Analysis
4. Machine Learning Algorithms
5. Performance Test (1,2,4,8 machines)
6. Save Results
7. Exit

Enter option number (1-7): 2
Selected: View Data

 View Data
Dataset: 41,188 rows × 21 columns
----------------------------------------
1. Show first 10 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
