In [8]:
# Install necessary dependencies
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import time
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import avg, count, min, max  # Import these functions
import zipfile

# Define the dataset URL and the file name
dataset_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00344/Activity%20recognition%20exp.zip"
dataset_zip = "activity_recognition_exp.zip"

# Download the dataset if it doesn't exist
if not os.path.exists(dataset_zip):
    print(f"Downloading {dataset_zip}...")
    os.system(f"wget -O {dataset_zip} {dataset_url}")
else:
    print(f"{dataset_zip} already exists. Skipping download.")

# Extract the dataset
if os.path.exists(dataset_zip):
    print(f"Extracting {dataset_zip}...")
    with zipfile.ZipFile(dataset_zip, 'r') as zip_ref:
        zip_ref.extractall("activity_data")
    print("Extraction complete.")
else:
    print(f"{dataset_zip} not found.")

# Function to explore directory structure
def explore_directory(path, level=0):
    print(f"{'  ' * level}{os.path.basename(path)}/")
    if os.path.isdir(path):
        for item in os.listdir(path):
            item_path = os.path.join(path, item)
            if os.path.isdir(item_path):
                explore_directory(item_path, level + 1)
            else:
                print(f"{'  ' * (level + 1)}{item}")

# Explore the extracted directory structure
print("Exploring extracted directory structure:")
explore_directory("activity_data")

# Function to find CSV files recursively
def find_csv_files(directory):
    csv_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.csv'):
                csv_files.append(os.path.join(root, file))
    return csv_files

# Find CSV files
csv_files = find_csv_files("activity_data")
if not csv_files:
    raise FileNotFoundError("No CSV files found in the extracted data.")

print(f"Found {len(csv_files)} CSV file(s):")
for file in csv_files:
    print(file)

# Use the first CSV file found
dataset_csv = csv_files[0]
print(f"\nUsing CSV file: {dataset_csv}")

# Load the dataset into a pandas DataFrame
print("Loading data with Pandas...")
start_time = time.time()
df_pandas = pd.read_csv(dataset_csv)
pandas_load_time = time.time() - start_time
print(f"Loaded data with {len(df_pandas)} records and {len(df_pandas.columns)} columns.")
print(f"Loaded data with Pandas in {pandas_load_time:.4f} seconds")

# Perform a more complex operation with Pandas and measure time
print("Processing data with Pandas...")
start_time = time.time()
result_pandas = df_pandas.groupby('gt').agg({
    'x': 'mean',
    'y': 'mean',
    'z': 'mean'
}).reset_index()
result_pandas['count'] = df_pandas.groupby('gt').size().values
pandas_time = time.time() - start_time
print(f"Pandas processed data with {len(result_pandas)} groups.")
print(f"Processed data with Pandas in {pandas_time:.4f} seconds")

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Activity Recognition Dataset Analysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "2") \
    .master("local[2]") \
    .getOrCreate()

# Load the dataset in Spark
print("Loading data with Spark...")
start_time = time.time()
df_spark = spark.read.csv(dataset_csv, header=True, inferSchema=True).cache()
df_spark.count()  # Force caching
spark_load_time = time.time() - start_time
print(f"Loaded data with Spark in {spark_load_time:.4f} seconds")

# Perform the same complex operation with Spark and measure time
print("Processing data with Spark...")
start_time = time.time()
result_spark = df_spark.groupBy('gt').agg(
    avg('x').alias('x'),
    avg('y').alias('y'),
    avg('z').alias('z'),
    count('*').alias('count')
)
result_spark_count = result_spark.count()  # Trigger action to force execution
spark_time = time.time() - start_time
print(f"Spark processed data with {result_spark_count} groups.")
print(f"Processed data with Spark in {spark_time:.4f} seconds")

# Compare the performance
print(f"\nPerformance Comparison:")
print(f"Pandas Load Time: {pandas_load_time:.4f} seconds")
print(f"Spark Load Time: {spark_load_time:.4f} seconds")
print(f"Pandas Processing Time: {pandas_time:.4f} seconds")
print(f"Spark Processing Time: {spark_time:.4f} seconds")
print(f"Pandas Total Time: {pandas_load_time + pandas_time:.4f} seconds")
print(f"Spark Total Time: {spark_load_time + spark_time:.4f} seconds")

# Stop the Spark session
spark.stop()






[33m0% [Working][0m            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com (185.125.190.83)] [Waiting for headers] [C[0m                                                                               Hit:2 http://security.ubuntu.com/ubuntu jammy-security InRelease
[33m0% [Connecting to archive.ubuntu.com (185.125.190.83)] [Waiting for headers] [W[0m                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Ign:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 https://r2u.stat.illinois.edu/ubuntu jammy Release
Hit:6 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:11 https://