<a href="https://colab.research.google.com/github/Cloud-Course-Group-Phoenix/Project-Pheonix/blob/Dev/Logic/BigDataPySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Step 1 - Installation**

In [1]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download the latest Apache Spark version
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz

# Install findspark to connect Python with Spark
!pip install -q findspark


# **Step 2 - Environment Setup**

In [2]:
# Import the os module to interact with the operating system
import os
# Import findspark to locate the Spark installation
import findspark

# Set the environment variable for Java home directory (required for Spark to run)
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# Set the environment variable for Spark home directory to the downloaded Spark path
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

# Initialize findspark to make pyspark importable within Python
findspark.init()


# **Step 3 - Create SparkSession**

In [3]:
# Import SparkSession class from PySpark SQL module
from pyspark.sql import SparkSession

# Create a SparkSession object, which is the entry point to use Spark functionality
  # Set the name of the Spark application to be "Big Data Example"
  # Create a new SparkSession or return an existing one
spark = SparkSession.builder.appName("Big Data Example").getOrCreate()

# **Step 4 – Continue with DataFrame operations**

In [4]:
# Define a list of tuples, each containing a name and a price
data = [("Tal", 120), ("Uri", 90), ("Dina", 150)]
# Define the column names for the DataFrame
columns = ["name", "price"]
# Create a DataFrame from the data and column names using the SparkSession
df = spark.createDataFrame(data, columns)
# Filter the DataFrame to include only rows where the price is greater than 100
df.filter(df["price"] > 100).show()


+----+-----+
|name|price|
+----+-----+
| Tal|  120|
|Dina|  150|
+----+-----+



In [6]:
import os, sys

try:
    #Clone the GitHub repository if not already present
    if not os.path.exists("/content/Project-Pheonix"):
        !git clone https://github.com/Cloud-Course-Group-Phoenix/Project-Pheonix.git /content/Project-Pheonix

    # Change directory to project root
    %cd /content/Project-Pheonix

    # Checkout the 'dev' branch
    !git fetch origin -q
    !git checkout Dev -q

    # Add project directory to Python path
    sys.path.append("/content/Project-Pheonix/Logic")
    %pip install -q importnb
    from importnb import Notebook
    with Notebook():
        import CloudDB as dbService
except Exception as e:
    print("❌ Setup failed:", str(e))

Cloning into '/content/Project-Pheonix'...
remote: Enumerating objects: 716, done.[K
remote: Counting objects: 100% (154/154), done.[K
remote: Compressing objects: 100% (126/126), done.[K
remote: Total 716 (delta 95), reused 22 (delta 22), pack-reused 562 (from 2)[K
Receiving objects: 100% (716/716), 7.32 MiB | 15.84 MiB/s, done.
Resolving deltas: 100% (399/399), done.
/content/Project-Pheonix


In [7]:
# Fetch sensor data from Firebase database
print("Fetching sensor data from database...")

# Get indoor and outdoor sensor data
indoor_data = dbService.get_from_db('/FakeData/indoor') or {}
outdoor_data = dbService.get_from_db('/FakeData/outdoor') or {}

print(f"Indoor data entries: {len(indoor_data)}")
print(f"Outdoor data entries: {len(outdoor_data)}")

# Display sample data structure
if indoor_data:
    sample_key = list(indoor_data.keys())[0]
    print(f"\nSample indoor data structure:")
    print(f"Timestamp: {sample_key}")
    print(f"Data: {indoor_data[sample_key]}")

if outdoor_data:
    sample_key = list(outdoor_data.keys())[0]
    print(f"\nSample outdoor data structure:")
    print(f"Timestamp: {sample_key}")
    print(f"Data: {outdoor_data[sample_key]}")

Fetching sensor data from database...
Indoor data entries: 20160
Outdoor data entries: 20160

Sample indoor data structure:
Timestamp: 1747688122
Data: {'Distance': 1542.0, 'Humidity': 40.1, 'Pressure': 97458.1, 'Temperature': 20.77}

Sample outdoor data structure:
Timestamp: 1747688106
Data: {'DLIGHT': 8957.03, 'Humidity': 54.77, 'Pressure': 97656.07, 'Temperature': 27.55}


In [8]:
# Convert sensor data to Spark DataFrame format
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import from_unixtime, col, avg, max as spark_max, min as spark_min, count

def prepare_sensor_data_for_spark(data_dict, environment_type):
    """Convert sensor data dictionary to list of records for Spark DataFrame"""
    records = []
    for timestamp_str, sensor_values in data_dict.items():
        timestamp = int(timestamp_str)
        record = {
            'timestamp': timestamp,
            'environment': environment_type,
            'datetime': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
        }
        # Add sensor values
        for sensor_name, value in sensor_values.items():
            if isinstance(value, (int, float)):
                record[sensor_name.lower()] = float(value)
        records.append(record)
    return records

# Prepare data for Spark
indoor_records = prepare_sensor_data_for_spark(indoor_data, 'indoor')
outdoor_records = prepare_sensor_data_for_spark(outdoor_data, 'outdoor')

# Combine all records
all_records = indoor_records + outdoor_records

print(f"Total records prepared for Spark: {len(all_records)}")
if all_records:
    print(f"Sample record: {all_records[0]}")

# Create Spark DataFrame
if all_records:
    df_sensors = spark.createDataFrame(all_records)
    print("\n✅ Spark DataFrame created successfully!")
    df_sensors.printSchema()
    print(f"Total rows in DataFrame: {df_sensors.count()}")
else:
    print("❌ No data available to create DataFrame")

Total records prepared for Spark: 40320
Sample record: {'timestamp': 1747688122, 'environment': 'indoor', 'datetime': '2025-05-19 20:55:22', 'distance': 1542.0, 'humidity': 40.1, 'pressure': 97458.1, 'temperature': 20.77}

✅ Spark DataFrame created successfully!
root
 |-- datetime: string (nullable = true)
 |-- distance: double (nullable = true)
 |-- environment: string (nullable = true)
 |-- humidity: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- dlight: double (nullable = true)

Total rows in DataFrame: 40320


In [9]:
# MapReduce Analysis 1: Temperature Statistics by Environment
print("🔍 MapReduce Analysis 1: Temperature Statistics by Environment")
print("=" * 60)

if 'df_sensors' in locals() and df_sensors.count() > 0:
    # Filter data that has temperature readings
    temp_data = df_sensors.filter(col("temperature").isNotNull())

    # MapReduce operation: Group by environment and calculate temperature statistics
    temp_stats = temp_data.groupBy("environment") \
        .agg(
            avg("temperature").alias("avg_temperature"),
            spark_min("temperature").alias("min_temperature"),
            spark_max("temperature").alias("max_temperature"),
            count("temperature").alias("reading_count")
        ).collect()

    print("Temperature Statistics by Environment:")
    for row in temp_stats:
        print(f"Environment: {row['environment']}")
        print(f"  Average Temperature: {row['avg_temperature']:.2f}°C")
        print(f"  Min Temperature: {row['min_temperature']:.2f}°C")
        print(f"  Max Temperature: {row['max_temperature']:.2f}°C")
        print(f"  Total Readings: {row['reading_count']}")
        print()

    # Store results for visualization
    temp_analysis_results = temp_stats
else:
    print("❌ No data available for temperature analysis")

🔍 MapReduce Analysis 1: Temperature Statistics by Environment
Temperature Statistics by Environment:
Environment: indoor
  Average Temperature: 22.17°C
  Min Temperature: 18.00°C
  Max Temperature: 28.00°C
  Total Readings: 20160

Environment: outdoor
  Average Temperature: 26.96°C
  Min Temperature: 16.24°C
  Max Temperature: 35.00°C
  Total Readings: 20160



In [10]:
# Graph 1: Temperature Comparison Between Indoor and Outdoor Environments
print("📊 Creating Temperature Comparison Visualization")

if 'temp_analysis_results' in locals():
    # Create subplot with multiple visualizations
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Average Temperature by Environment',
                       'Temperature Range Analysis',
                       'Reading Count Distribution',
                       'Temperature Statistics Summary'),
        specs=[[{"type": "bar"}, {"type": "bar"}],
               [{"type": "pie"}, {"type": "table"}]]
    )

    # Extract data for visualization
    environments = [row['environment'] for row in temp_analysis_results]
    avg_temps = [row['avg_temperature'] for row in temp_analysis_results]
    min_temps = [row['min_temperature'] for row in temp_analysis_results]
    max_temps = [row['max_temperature'] for row in temp_analysis_results]
    reading_counts = [row['reading_count'] for row in temp_analysis_results]

    # Bar chart 1: Average Temperature
    fig.add_trace(
        go.Bar(x=environments, y=avg_temps, name='Average Temperature',
               marker_color=['lightblue', 'lightcoral']),
        row=1, col=1
    )

    # Bar chart 2: Temperature Range (Min-Max)
    fig.add_trace(
        go.Bar(x=environments, y=min_temps, name='Min Temperature',
               marker_color='lightblue', opacity=0.7),
        row=1, col=2
    )
    fig.add_trace(
        go.Bar(x=environments, y=max_temps, name='Max Temperature',
               marker_color='lightcoral', opacity=0.7),
        row=1, col=2
    )

    # Pie chart: Reading distribution
    fig.add_trace(
        go.Pie(labels=environments, values=reading_counts, name="Reading Distribution"),
        row=2, col=1
    )

    # Table: Summary statistics
    fig.add_trace(
        go.Table(
            header=dict(values=['Environment', 'Avg Temp (°C)', 'Min Temp (°C)', 'Max Temp (°C)', 'Readings']),
            cells=dict(values=[environments,
                              [f"{temp:.2f}" for temp in avg_temps],
                              [f"{temp:.2f}" for temp in min_temps],
                              [f"{temp:.2f}" for temp in max_temps],
                              reading_counts])
        ),
        row=2, col=2
    )

    # Update layout
    fig.update_layout(
        title_text="🌡️ Comprehensive Temperature Analysis - MapReduce Results",
        height=800,
        showlegend=True
    )

    # Update axes labels
    fig.update_xaxes(title_text="Environment", row=1, col=1)
    fig.update_yaxes(title_text="Temperature (°C)", row=1, col=1)
    fig.update_xaxes(title_text="Environment", row=1, col=2)
    fig.update_yaxes(title_text="Temperature (°C)", row=1, col=2)

    fig.show()

    # Summary insights
    temp_diff = abs(avg_temps[0] - avg_temps[1]) if len(avg_temps) == 2 else 0
    print(f"\n🔍 Key Insights:")
    print(f"• Temperature difference between environments: {temp_diff:.2f}°C")
    print(f"• Total temperature readings analyzed: {sum(reading_counts)}")

else:
    print("❌ No temperature analysis results available for visualization")

📊 Creating Temperature Comparison Visualization



🔍 Key Insights:
• Temperature difference between environments: 4.79°C
• Total temperature readings analyzed: 40320
