# =========================
# TP2 — Pandas Baseline
# =========================

In [1]:
# !curl -L -o /content/sample.zip https://www.kaggle.com/api/v1/datasets/download/mohamedbakhet/amazon-books-reviews

In [2]:
import pandas as pd
import numpy as np
import re
import psutil
import time
from codecarbon import EmissionsTracker
from collections import Counter


# ---------------------------------------------
# Helper function to measure each pipeline step
# ---------------------------------------------
def measure_step(step_name, framework, func, log):
    co2_kg = 0.0
    start = 0.0
    try:
        tracker = EmissionsTracker(measure_power_secs=1, save_to_file=False)
        tracker.start()
        start = time.time()
        func()
        co2_kg = tracker.stop()  # may be None or float
    except Exception as e:
        print(f"⚠️ CodeCarbon failed for {step_name}: {e}")
        co2_kg = 0.0
    finally:
        duration = time.time() - start
        memory_mb = psutil.virtual_memory().used / (1024 * 1024)

        # Handle None safely
        if co2_kg is None:
            co2_kg = 0.0

        # Estimate energy from CO₂ (1 kWh ≈ 0.233 kg CO₂)
        energy_kwh = co2_kg / 0.233 if co2_kg and co2_kg > 0 else 0.0

        log.append({
            "Step": step_name,
            "Framework": framework,
            "Duration (s)": round(duration, 3),
            "Energy (kWh)": round(energy_kwh, 6),
            "CO2 (kg)": round(co2_kg, 6),
            "Memory (MB)": round(memory_mb, 2)
        })


# =========================================================
# ----------- TASK 1 : PANDAS PIPELINE ---------------------
# =========================================================

In [3]:

log_pandas = []

def pandas_pipeline():
    print("Running Pandas pipeline...")
    global books, reviews, merged, author_ratings, reviews_per_publisher, category_reviews, most_common_words

    # ---- Load ----
    def step_load_data():
        global books, reviews
        books = pd.read_csv("books_data.csv")
        reviews = pd.read_csv("Books_rating.csv")
    # measure_step("Load data", "Pandas", step_load_data, log_pandas)

    # ---- Clean ----
    def step_clean_data():
        books.fillna({"description": "", "publisher": "Unknown", "categories": "[]", "authors": "[]"}, inplace=True)
        reviews.fillna({"Price": 0, "review/text": "", "review/summary": ""}, inplace=True)

        def clean_list_column(x):
            if pd.isna(x): return []
            x = re.sub(r"[\[\]']", "", str(x))
            return [i.strip() for i in x.split(",") if i.strip()]

        books["authors"] = books["authors"].apply(clean_list_column)
        books["categories"] = books["categories"].apply(clean_list_column)
    # measure_step("Data cleaning", "Pandas", step_clean_data, log_pandas)

    # ---- Join ----
    def step_join_data():
        global merged
        merged = pd.merge(reviews, books, on="Title", how="inner")
    # measure_step("Join datasets", "Pandas", step_join_data, log_pandas)

    # ---- Compute metrics ----
    def step_avg_rating_per_author():
        global author_ratings
        author_ratings = (
            merged.explode("authors")
            .groupby("authors")["review/score"]
            .mean()
            .reset_index()
            .rename(columns={"review/score": "avg_rating"})
        )
    measure_step("Average rating per author", "Pandas", step_avg_rating_per_author, log_pandas)

    def step_reviews_per_publisher():
        global reviews_per_publisher
        reviews_per_publisher = (
            merged.groupby("publisher")["Id"]
            .count()
            .reset_index()
            .rename(columns={"Id": "num_reviews"})
        )
    measure_step("Number of reviews per publisher", "Pandas", step_reviews_per_publisher, log_pandas)

    def step_top10_categories():
        global category_reviews
        category_reviews = (
            merged.explode("categories")
            .groupby("categories")["Id"]
            .count()
            .reset_index()
            .rename(columns={"Id": "num_reviews"})
            .sort_values(by="num_reviews", ascending=False)
            .head(10)
        )
    measure_step("Top 10 most-reviewed categories", "Pandas", step_top10_categories, log_pandas)

    # ---- Text processing ----
    def step_avg_review_length():
        merged["review_length"] = merged["review/text"].apply(lambda x: len(str(x).split()))
        merged["review_length"].mean()
    measure_step("Average review length", "Pandas", step_avg_review_length, log_pandas)

    def step_most_common_words():
        global most_common_words
        all_words = " ".join(merged["review/text"]).lower().split()
        word_counts = Counter(all_words)
        most_common_words = pd.DataFrame(word_counts.most_common(10), columns=["word", "count"])
    measure_step("Most frequent keywords", "Pandas", step_most_common_words, log_pandas)

    # ---- Save ----
    def step_save_results():
        author_ratings.to_csv("avg_rating_per_author.csv", index=False)
        reviews_per_publisher.to_csv("reviews_per_publisher.csv", index=False)
        category_reviews.to_csv("top10_categories.csv", index=False)
        most_common_words.to_csv("top10_keywords.csv", index=False)
    # measure_step("Save results", "Pandas", step_save_results, log_pandas)

    pd.DataFrame(log_pandas).to_csv("emissions_pandas.csv", index=False)
    print("✅ Pandas pipeline done → emissions_pandas.csv")

# Run Task 1
pandas_pipeline()

[codecarbon INFO @ 10:45:54] [setup] RAM Tracking...
[codecarbon INFO @ 10:45:54] [setup] CPU Tracking...


Running Pandas pipeline...


 Windows OS detected: Please install Intel Power Gadget to measure CPU

[codecarbon INFO @ 10:45:56] CPU Model on constant consumption mode: Intel(R) Core(TM) Ultra 9 185H
[codecarbon INFO @ 10:45:56] [setup] GPU Tracking...
[codecarbon INFO @ 10:45:56] No GPU found.
[codecarbon INFO @ 10:45:56] The below tracking methods have been set up:
                RAM Tracking Method: RAM power estimation model
                CPU Tracking Method: global constant
                GPU Tracking Method: Unspecified
            
[codecarbon INFO @ 10:45:56] >>> Tracker's metadata:
[codecarbon INFO @ 10:45:56]   Platform system: Windows-11-10.0.26200-SP0
[codecarbon INFO @ 10:45:56]   Python version: 3.12.6
[codecarbon INFO @ 10:45:56]   CodeCarbon version: 3.0.8
[codecarbon INFO @ 10:45:56]   Available RAM : 31.435 GB
[codecarbon INFO @ 10:45:56]   CPU count: 22 thread(s) in 22 physical CPU(s)
[codecarbon INFO @ 10:45:56]   CPU model: Intel(R) Core(TM) Ultra 9 185H
[codecarbon INFO @ 10:45:56]   GPU

⚠️ CodeCarbon failed for Average rating per author: name 'merged' is not defined


[codecarbon INFO @ 10:45:58] Energy consumed for RAM : 0.000006 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:45:58] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:45:58] Energy consumed for All CPU : 0.000012 kWh
[codecarbon INFO @ 10:45:58] 0.000018 kWh of electricity and 0.000000 L of water were used since the beginning.
 Windows OS detected: Please install Intel Power Gadget to measure CPU

[codecarbon INFO @ 10:45:59] CPU Model on constant consumption mode: Intel(R) Core(TM) Ultra 9 185H
[codecarbon INFO @ 10:45:59] [setup] GPU Tracking...
[codecarbon INFO @ 10:45:59] No GPU found.
[codecarbon INFO @ 10:45:59] The below tracking methods have been set up:
                RAM Tracking Method: RAM power estimation model
                CPU Tracking Method: global constant
                GPU Tracking Method: Unspecified
            
[codecarbon INFO @ 10:45:59] >>> Tracker's metadata:
[codecarbon INFO @ 10:45:59]   Platform system: Wi

⚠️ CodeCarbon failed for Number of reviews per publisher: name 'merged' is not defined


[codecarbon INFO @ 10:46:00] Energy consumed for RAM : 0.000017 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:00] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:00] Energy consumed for All CPU : 0.000036 kWh
[codecarbon INFO @ 10:46:00] 0.000053 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:00] Energy consumed for RAM : 0.000006 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:00] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:00] Energy consumed for All CPU : 0.000012 kWh
[codecarbon INFO @ 10:46:00] 0.000018 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:01] Energy consumed for RAM : 0.000022 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:01] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:01] Energy consumed for All CPU

⚠️ CodeCarbon failed for Top 10 most-reviewed categories: name 'merged' is not defined


[codecarbon INFO @ 10:46:02] Energy consumed for RAM : 0.000017 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:02] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:02] Energy consumed for All CPU : 0.000036 kWh
[codecarbon INFO @ 10:46:02] 0.000052 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:03] Energy consumed for RAM : 0.000006 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:03] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:03] Energy consumed for All CPU : 0.000012 kWh
[codecarbon INFO @ 10:46:03] 0.000018 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:03] Energy consumed for RAM : 0.000033 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:03] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:03] Energy consumed for All CPU

⚠️ CodeCarbon failed for Average review length: name 'merged' is not defined


[codecarbon INFO @ 10:46:05] Energy consumed for RAM : 0.000017 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:05] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:05] Energy consumed for All CPU : 0.000036 kWh
[codecarbon INFO @ 10:46:05] 0.000052 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:05] Energy consumed for RAM : 0.000044 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:05] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:05] Energy consumed for All CPU : 0.000095 kWh
[codecarbon INFO @ 10:46:05] 0.000139 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:05] Energy consumed for RAM : 0.000006 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:05] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:05] Energy consumed for All CPU

⚠️ CodeCarbon failed for Most frequent keywords: name 'merged' is not defined
✅ Pandas pipeline done → emissions_pandas.csv


# =========================================================
# ----------- TASK 2 : PYSPARK PIPELINE --------------------
# =========================================================

In [4]:
pip install pyspark

[codecarbon INFO @ 10:46:07] Energy consumed for RAM : 0.000028 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:07] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:07] Energy consumed for All CPU : 0.000060 kWh
[codecarbon INFO @ 10:46:07] 0.000087 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:07] Energy consumed for RAM : 0.000055 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:07] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:07] Energy consumed for All CPU : 0.000119 kWh
[codecarbon INFO @ 10:46:07] 0.000174 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:07] Energy consumed for RAM : 0.000017 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:07] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:07] Energy consumed for All CPU

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, size, lower, regexp_replace, avg, count

log_spark = []

def spark_pipeline():
    print("\nRunning PySpark pipeline...")

    spark = SparkSession.builder \
        .appName("Books Reviews Spark CodeCarbon") \
        .master("local[*]") \
        .getOrCreate()

    # ---- Load ----
    def step_load_data():
        global books_df, reviews_df
        books_df = spark.read.option("header", True).csv("books.csv")
        reviews_df = spark.read.option("header", True).csv("reviews.csv")

    # ---- Clean ----
    def step_clean_data():
        global books_df_clean, reviews_df_clean
        books_df_clean = books_df.fillna({
            "description": "",
            "publisher": "Unknown",
            "categories": "[]",
            "authors": "[]"
        })
        reviews_df_clean = reviews_df.fillna({
            "Price": "0",
            "review/text": "",
            "review/summary": ""
        })
        books_df_clean = books_df_clean \
            .withColumn("authors", regexp_replace(col("authors"), r"[\[\]']", "")) \
            .withColumn("categories", regexp_replace(col("categories"), r"[\[\]']", "")) \
            .withColumn("authors", split(col("authors"), ",")) \
            .withColumn("categories", split(col("categories"), ","))

    # ---- Join ----
    def step_join_data():
        global merged_df
        merged_df = reviews_df_clean.join(books_df_clean, on="Title", how="inner")

    # Call the data loading and preparation steps first
    step_load_data()
    step_clean_data() 
    step_join_data()

    # Then proceed with the metrics computation
    def step_avg_rating_per_author():
        global author_ratings_df
        author_ratings_df = merged_df.withColumn("author", explode(col("authors"))) \
            .groupby("author").agg(avg(col("review/score").cast("float")).alias("avg_rating"))
    measure_step("Average rating per author", "PySpark", step_avg_rating_per_author, log_spark)

    def step_reviews_per_publisher():
        global reviews_per_publisher_df
        reviews_per_publisher_df = merged_df.groupby("publisher").agg(count("Id").alias("num_reviews"))
    measure_step("Number of reviews per publisher", "PySpark", step_reviews_per_publisher, log_spark)

    def step_top10_categories():
        global category_reviews_df
        category_reviews_df = merged_df.withColumn("category", explode(col("categories"))) \
            .groupby("category").agg(count("Id").alias("num_reviews")) \
            .orderBy(col("num_reviews").desc()).limit(10)
    measure_step("Top 10 most-reviewed categories", "PySpark", step_top10_categories, log_spark)

    # ---- Text processing ----
    def step_avg_review_length():
        tmp = merged_df.withColumn("review_length", size(split(col("review/text"), " ")))
        tmp.selectExpr("avg(review_length)").collect()
    measure_step("Average review length", "PySpark", step_avg_review_length, log_spark)

    def step_most_common_words():
        global top_words_df
        words_df = merged_df.select(explode(split(lower(col("review/text")), " ")).alias("word"))
        top_words_df = words_df.groupBy("word").agg(count("*").alias("count")).orderBy(col("count").desc()).limit(10)
    measure_step("Most frequent keywords", "PySpark", step_most_common_words, log_spark)

    # ---- Save ----
    def step_save_results():
        author_ratings_df.toPandas().to_csv("avg_rating_per_author_spark.csv", index=False)
        reviews_per_publisher_df.toPandas().to_csv("reviews_per_publisher_spark.csv", index=False)
        category_reviews_df.toPandas().to_csv("top10_categories_spark.csv", index=False)
        top_words_df.toPandas().to_csv("top10_keywords_spark.csv", index=False)

    spark.stop()

    pd.DataFrame(log_spark).to_csv("emissions_spark.csv", index=False)
    print("✅ PySpark pipeline done → emissions_spark.csv")

# Run Task 2
spark_pipeline()

[codecarbon INFO @ 10:46:10] Energy consumed for RAM : 0.000033 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:10] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:10] Energy consumed for All CPU : 0.000071 kWh
[codecarbon INFO @ 10:46:10] 0.000104 kWh of electricity and 0.000000 L of water were used since the beginning.



Running PySpark pipeline...

[codecarbon INFO @ 10:46:10] Energy consumed for RAM : 0.000061 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:10] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:10] Energy consumed for All CPU : 0.000130 kWh
[codecarbon INFO @ 10:46:10] 0.000191 kWh of electricity and 0.000000 L of water were used since the beginning.





[codecarbon INFO @ 10:46:10] Energy consumed for RAM : 0.000022 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:10] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:10] Energy consumed for All CPU : 0.000048 kWh
[codecarbon INFO @ 10:46:10] 0.000070 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:11] Energy consumed for RAM : 0.000050 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:11] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:11] Energy consumed for All CPU : 0.000107 kWh
[codecarbon INFO @ 10:46:11] 0.000157 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:11] Energy consumed for RAM : 0.000078 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:11] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:11] Energy consumed for All CPU

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/c:/Users/Gaurav Chugh/source/GreenComputing_TP2/books.csv.

[codecarbon INFO @ 10:46:22] Energy consumed for RAM : 0.000099 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:22] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:22] Energy consumed for All CPU : 0.000213 kWh
[codecarbon INFO @ 10:46:22] 0.000313 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:22] Energy consumed for RAM : 0.000127 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:22] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:22] Energy consumed for All CPU : 0.000272 kWh
[codecarbon INFO @ 10:46:22] 0.000400 kWh of electricity and 0.000000 L of water were used since the beginning.
[codecarbon INFO @ 10:46:23] Energy consumed for RAM : 0.000088 kWh. RAM Power : 20.0 W
[codecarbon INFO @ 10:46:23] Delta energy consumed for CPU with constant : 0.000012 kWh, power : 42.5 W
[codecarbon INFO @ 10:46:23] Energy consumed for All CPU