# Objectives

- See how **storage formats** (CSV vs Parquet) affect performance and energy.
- Instrument data pipelines with **CodeCarbon** to measure runtime and CO₂.
- Compare two runs of the same pipeline that differ only by file format.
- Explain results in terms of *I/O*, compression, and greener ETL choices.


## Context

We benchmark the existing CSV-based books and reviews pipeline against a functionally equivalent Parquet pipeline.
The goal is to show whether switching to a columnar, compressed format reduces runtime, file size, and estimated emissions for the same analytical workload.
Results support a recommendation on greener storage choices for downstream analytics.


## Datasets Overview

- `books.csv` — bibliographic metadata with fields such as `Title`, `Authors`, `Publisher`, `PublishedDate`, and `Categories`.
- `reviews.csv` — user feedback that includes `Id`, `Title`, `Price`, `User_id`, `profileName`, `review/score`, `review/text`, and timestamps.

The helper cell below ensures sample files exist (for a self-contained demo) and previews the first rows of each dataset.


In [1]:
!pip install matplotlib

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 24.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import importlib
import json
import math
import random
import statistics
import textwrap
import time
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Dict, Iterable, List, Optional

DEPENDENCIES = {"pandas": "pandas", "matplotlib": "matplotlib", "pyarrow": "pyarrow", "numpy": "numpy", "codecarbon": "codecarbon"}
loaded_modules: Dict[str, object] = {}
missing_dependencies: List[str] = []

for module_name, package_name in DEPENDENCIES.items():
    try:
        loaded_modules[module_name] = importlib.import_module(module_name)
    except ImportError:
        missing_dependencies.append(package_name)

if missing_dependencies:
    print("⚠️ Optional dependencies missing:", ", ".join(sorted(set(missing_dependencies))))
else:
    print("✅ All optional dependencies imported successfully.")

pd = loaded_modules.get("pandas")
try:
    import matplotlib.pyplot as plt
    loaded_modules["matplotlib"] = plt
except ImportError:
    loaded_modules["matplotlib"] = None

# Replace the existing plt assignment with:
plt = loaded_modules.get("matplotlib")
# plt = loaded_modules.get("matplotlib").pyplot if loaded_modules.get("matplotlib") else None
np = loaded_modules.get("numpy")
codecarbon_module = loaded_modules.get("codecarbon")

BASE_PATH = Path(".")
DATA_DIR = BASE_PATH / "data"
OUTPUTS_DIR = BASE_PATH / "outputs"
ANALYSIS_DIR = BASE_PATH / "analysis"
for directory in (DATA_DIR, OUTPUTS_DIR, ANALYSIS_DIR):
    directory.mkdir(parents=True, exist_ok=True)

DEPENDENCIES_READY = pd is not None and plt is not None
if not DEPENDENCIES_READY:
    print("➡️ Install the missing packages and re-run the notebook for full functionality.")


✅ All optional dependencies imported successfully.


In [3]:
# Ensure demo datasets exist so the pipeline can run end-to-end in any environment.
import csv

books_path = DATA_DIR / "books_data.csv"
reviews_path = DATA_DIR / "Books_rating.csv"

def _generate_books() -> List[Dict[str, object]]:
    rng = random.Random(42)
    titles = [
        "The Pragmatic Programmer",
        "Clean Code",
        "Effective Python",
        "Designing Data-Intensive Applications",
        "Deep Learning with Python",
        "Hands-On Machine Learning",
        "Introduction to Algorithms",
        "Python Data Science Handbook",
        "Data Pipelines Pocket Reference",
        "Building Microservices",
    ]
    categories = ["programming", "software engineering", "data", "machine learning", "architecture"]
    publishers = ["Addison-Wesley", "O'Reilly Media", "No Starch Press", "Manning"]
    authors = [
        "Andrew Hunt", "Robert C. Martin", "Brett Slatkin", "Martin Kleppmann", "Francois Chollet",
        "Aurelien Geron", "Thomas H. Cormen", "Jake VanderPlas", "James Densmore", "Sam Newman",
    ]
    rows: List[Dict[str, object]] = []
    for idx, title in enumerate(titles):
        rows.append({
            "Title": title,
            "Description": f"Insightful discussion about {title}.",
            "Authors": authors[idx % len(authors)],
            "Publisher": publishers[idx % len(publishers)],
            "PublishedDate": datetime(2010 + idx % 10, 1 + (idx % 12), 1 + (idx % 28)).date().isoformat(),
            "Categories": categories[idx % len(categories)],
            "RatingsCount": rng.randint(50, 5000),
            "AverageRating": round(rng.uniform(3.0, 5.0), 2),
        })
    return rows

def _generate_reviews() -> List[Dict[str, object]]:
    rng = random.Random(123)
    rows: List[Dict[str, object]] = []
    titles = [row["Title"] for row in _generate_books()]
    for review_id in range(1, 1001):
        title = rng.choice(titles)
        rows.append({
            "Id": review_id,
            "Title": title,
            "Price": round(rng.uniform(10, 80), 2),
            "User_id": rng.randint(1, 500),
            "profileName": f"User {rng.randint(1, 500)}",
            "review/score": rng.randint(1, 5),
            "review/text": " ".join(
                rng.choices(
                    ["great", "insightful", "comprehensive", "dense", "practical", "clear", "challenging"],
                    k=rng.randint(8, 30),
                )
            ),
            "review/time": int(datetime(2020, rng.randint(1, 12), rng.randint(1, 28)).timestamp()),
        })
    return rows

if not books_path.exists() or not reviews_path.exists():
    print("Creating synthetic CSV assets to keep the notebook self-contained.")
    books_rows = _generate_books()
    reviews_rows = _generate_reviews()
    if pd is not None:
        pd.DataFrame(books_rows).to_csv(books_path, index=False)
        pd.DataFrame(reviews_rows).to_csv(reviews_path, index=False)
    else:
        with books_path.open("w", newline="", encoding="utf-8") as handle:
            writer = csv.DictWriter(handle, fieldnames=list(books_rows[0].keys()))
            writer.writeheader()
            writer.writerows(books_rows)
        with reviews_path.open("w", newline="", encoding="utf-8") as handle:
            writer = csv.DictWriter(handle, fieldnames=list(reviews_rows[0].keys()))
            writer.writeheader()
            writer.writerows(reviews_rows)
else:
    print("Reusing existing CSV files from the data/ directory.")


Reusing existing CSV files from the data/ directory.


In [4]:
# Preview the first rows from each dataset so readers know the schema before processing.
if pd is None:
    print("Pandas is required to preview DataFrames. Install pandas and re-run this cell.")
else:
    try:
        books_df = pd.read_csv(books_path)
        reviews_df = pd.read_csv(reviews_path)
    except Exception as load_error:
        books_df = None
        reviews_df = None
        print(f"Failed to load CSV files: {load_error}")
    else:
        display(books_df.head())
        display(reviews_df.head())
    finally:
        if "books_df" in locals() and isinstance(books_df, type(pd.DataFrame())):
            print(f"Loaded {len(books_df)} book rows.")
        if "reviews_df" in locals() and isinstance(reviews_df, type(pd.DataFrame())):
            print(f"Loaded {len(reviews_df)} review rows.")


Unnamed: 0,Title,description,authors,image,previewLink,publisher,publishedDate,infoLink,categories,ratingsCount
0,Its Only Art If Its Well Hung!,,['Julie Strain'],http://books.google.com/books/content?id=DykPA...,http://books.google.nl/books?id=DykPAAAACAAJ&d...,,1996,http://books.google.nl/books?id=DykPAAAACAAJ&d...,['Comics & Graphic Novels'],
1,Dr. Seuss: American Icon,Philip Nel takes a fascinating look into the k...,['Philip Nel'],http://books.google.com/books/content?id=IjvHQ...,http://books.google.nl/books?id=IjvHQsCn_pgC&p...,A&C Black,2005-01-01,http://books.google.nl/books?id=IjvHQsCn_pgC&d...,['Biography & Autobiography'],
2,Wonderful Worship in Smaller Churches,This resource includes twelve principles in un...,['David R. Ray'],http://books.google.com/books/content?id=2tsDA...,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,,2000,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,['Religion'],
3,Whispers of the Wicked Saints,Julia Thomas finds her life spinning out of co...,['Veronica Haddon'],http://books.google.com/books/content?id=aRSIg...,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,iUniverse,2005-02,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,['Fiction'],
4,"Nation Dance: Religion, Identity and Cultural ...",,['Edward Long'],,http://books.google.nl/books?id=399SPgAACAAJ&d...,,2003-03-01,http://books.google.nl/books?id=399SPgAACAAJ&d...,,


Unnamed: 0,Id,Title,Price,User_id,profileName,review/helpfulness,review/score,review/time,review/summary,review/text
0,1882931173,Its Only Art If Its Well Hung!,,AVCGYZL8FQQTD,"Jim of Oz ""jim-of-oz""",7/7,4.0,940636800,Nice collection of Julie Strain images,This is only for Julie Strain fans. It's a col...
1,826414346,Dr. Seuss: American Icon,,A30TK6U7DNS82R,Kevin Killian,10/10,5.0,1095724800,Really Enjoyed It,I don't care much for Dr. Seuss but after read...
2,826414346,Dr. Seuss: American Icon,,A3UH4UZ4RSVO82,John Granger,10/11,5.0,1078790400,Essential for every personal and Public Library,"If people become the books they read and if ""t..."
3,826414346,Dr. Seuss: American Icon,,A2MVUWT453QH61,"Roy E. Perry ""amateur philosopher""",7/7,4.0,1090713600,Phlip Nel gives silly Seuss a serious treatment,"Theodore Seuss Geisel (1904-1991), aka &quot;D..."
4,826414346,Dr. Seuss: American Icon,,A22X4XUPKF66MR,"D. H. Richards ""ninthwavestore""",3/3,4.0,1107993600,Good academic overview,Philip Nel - Dr. Seuss: American IconThis is b...


Loaded 212404 book rows.
Loaded 3000000 review rows.


## Experimental Design

1. Load raw CSV assets for books and reviews.
2. Clean textual fields and normalise categories/authors.
3. Join the datasets on `Title`.
4. Compute metrics (ratings per author, reviews per publisher, top categories, review length stats, frequent keywords).
5. Persist the merged dataset in the chosen format.




### Pipeline A – CSV & Pipeline B – Parquet

In [5]:
# Preview the first rows from each dataset so readers know the schema before processing.
if pd is None:
    print("Pandas is required to preview DataFrames. Install pandas and re-run this cell.")
else:
    try:
        books_df = pd.read_csv(books_path)
        reviews_df = pd.read_csv(reviews_path)
    except Exception as load_error:
        books_df = None
        reviews_df = None
        print(f"Failed to load CSV files: {load_error}")
    else:
        display(books_df.head())
        display(reviews_df.head())
    finally:
        if "books_df" in locals() and isinstance(books_df, type(pd.DataFrame())):
            print(f"Loaded {len(books_df)} book rows.")
        if "reviews_df" in locals() and isinstance(reviews_df, type(pd.DataFrame())):
            print(f"Loaded {len(reviews_df)} review rows.")            
            PIPELINE_RESULTS: List[Dict[str, object]] = []

            if not DEPENDENCIES_READY:
                print("Install pandas and matplotlib to enable the reusable pipeline helpers.")
            else:
                def create_tracker(project_name: str):
                    emissions_dir = ANALYSIS_DIR / "emissions"
                    emissions_dir.mkdir(parents=True, exist_ok=True)
                    output_file = f"{project_name}_emissions.jsonl"
                    if codecarbon_module is not None:
                        try:
                            tracker = codecarbon_module.EmissionsTracker(
                                project_name=project_name,
                                output_dir=str(emissions_dir),
                                output_file=output_file,
                            )
                            return tracker
                        except Exception as tracker_error:
                            print(f"Falling back to lightweight tracker because CodeCarbon initialisation failed: {tracker_error}")

                    class FallbackTracker:
                        def __init__(self, project_name: str, target_dir: Path, file_name: str) -> None:
                            self.project_name = project_name
                            self.target_dir = target_dir
                            self.file_name = file_name
                            self._start: Optional[float] = None

                        def start(self) -> float:
                            self._start = time.perf_counter()
                            return self._start

                        def stop(self) -> float:
                            end = time.perf_counter()
                            duration = end - (self._start or end)
                            emissions = duration * 0.00012
                            self._persist(
                                {
                                    "project_name": self.project_name,
                                    "duration_s": duration,
                                    "emissions_kg": emissions,
                                    "timestamp": datetime.utcnow().isoformat(),
                                }
                            )
                            return emissions

                        def _persist(self, payload: Dict[str, object]) -> None:
                            try:
                                self.target_dir.mkdir(parents=True, exist_ok=True)
                                with (self.target_dir / self.file_name).open("a", encoding="utf-8") as handle:
                                    handle.write(json.dumps(payload) + "")
                            except Exception as persist_error:
                                print(f"Could not persist fallback emissions data: {persist_error}")

                    return FallbackTracker(project_name, emissions_dir, output_file)

                def clean_books(df):
                    cleaned = df.copy()
                    cleaned["Authors"] = cleaned["Authors"].fillna("Unknown").str.title()
                    cleaned["Categories"] = cleaned["Categories"].fillna("misc").str.lower()
                    cleaned["PublishedDate"] = pd.to_datetime(cleaned["PublishedDate"], errors="coerce")
                    cleaned["RatingsCount"] = cleaned["RatingsCount"].fillna(0).astype(int)
                    return cleaned

                def clean_reviews(df):
                    cleaned = df.copy()
                    cleaned.rename(columns={"profileName": "ProfileName"}, inplace=True)
                    cleaned["review/text"] = cleaned["review/text"].fillna("")
                    cleaned["review/score"] = cleaned["review/score"].fillna(cleaned["review/score"].mean())
                    cleaned["review/time"] = pd.to_datetime(cleaned["review/time"], unit="s", errors="coerce")
                    return cleaned

                def enrich_features(df):
                    enriched = df.copy()
                    enriched["review_length"] = enriched["review/text"].str.split().map(len)
                    enriched["CategoriesList"] = (
                        enriched["Categories"].str.split("|").apply(lambda values: [v.strip() for v in values if v])
                    )
                    return enriched

                def compute_metrics(df):
                    metrics: Dict[str, pd.DataFrame] = {}
                    metrics["avg_rating_per_author"] = (
                        df.groupby("Authors")["review/score"].mean().reset_index().sort_values("review/score", ascending=False)
                    )
                    metrics["reviews_per_publisher"] = (
                        df.groupby("Publisher")["Id"].count().reset_index().rename(columns={"Id": "review_count"})
                    )
                    exploded = df.explode("CategoriesList")
                    metrics["top_categories"] = (
                        exploded.groupby("CategoriesList")["Id"].count().reset_index().rename(
                            columns={"Id": "review_count", "CategoriesList": "Category"}
                        ).sort_values("review_count", ascending=False).head(10)
                    )
                    metrics["review_length_stats"] = pd.DataFrame(
                        [
                            {"metric": "mean", "value": df["review_length"].mean()},
                            {"metric": "median", "value": df["review_length"].median()},
                            {"metric": "std", "value": df["review_length"].std()},
                        ]
                    )
                    tokens = Counter(" ".join(df["review/text"]).split())
                    metrics["top_keywords"] = pd.DataFrame(tokens.most_common(15), columns=["keyword", "occurrences"])
                    return metrics

                def persist_outputs(df, metrics: Dict[str, pd.DataFrame], path: Path, writer) -> None:
                    try:
                        writer(df, path)
                    except Exception as write_error:
                        print(f"Failed to persist merged dataset: {write_error}")
                    else:
                        for name, frame in metrics.items():
                            target = ANALYSIS_DIR / f"{path.stem}_{name}.csv"
                            try:
                                frame.to_csv(target, index=False)
                            except Exception as export_error:
                                print(f"Could not export metric {name}: {export_error}")

                def run_pipeline(format_name: str, writer_callable, project_name: str, output_name: str) -> Dict[str, object]:
                    tracker = create_tracker(project_name)
                    start = time.perf_counter()
                    emissions = math.nan
                    error: Optional[str] = None
                    merged_df = None
                    metrics: Dict[str, pd.DataFrame] = {}
                    try:
                        tracker.start()
                        books_df = clean_books(pd.read_csv(books_path))
                        reviews_df = clean_reviews(pd.read_csv(reviews_path))
                        merged_df = enrich_features(reviews_df.merge(books_df, on="Title", how="inner"))
                        metrics = compute_metrics(merged_df)
                        persist_outputs(merged_df, metrics, OUTPUTS_DIR / output_name, writer_callable)
                    except Exception as pipeline_error:
                        error = str(pipeline_error)
                        print(f"[{format_name}] Pipeline encountered an issue: {pipeline_error}")
                    finally:
                        duration = time.perf_counter() - start
                        try:
                            emissions = tracker.stop()
                        except Exception as tracker_error:
                            print(f"[{format_name}] Unable to obtain emissions from tracker: {tracker_error}")
                        result = {
                            "format": format_name,
                            "runtime_s": duration,
                            "emissions_kg": emissions,
                            "error": error,
                            "row_count": int(0 if merged_df is None else len(merged_df)),
                        }
                        result["metrics"] = metrics
                        PIPELINE_RESULTS.append(result)
                        return result

                def write_csv(df: pd.DataFrame, path: Path) -> None:
                    df.to_csv(path, index=False)

                def write_parquet(df: pd.DataFrame, path: Path) -> None:
                    try:
                        df.to_parquet(path, index=False, compression="snappy")
                    except Exception as parquet_error:
                        print(f"Snappy compression failed ({parquet_error}); retrying without compression.")
                        df.to_parquet(path, index=False)


Unnamed: 0,Title,description,authors,image,previewLink,publisher,publishedDate,infoLink,categories,ratingsCount
0,Its Only Art If Its Well Hung!,,['Julie Strain'],http://books.google.com/books/content?id=DykPA...,http://books.google.nl/books?id=DykPAAAACAAJ&d...,,1996,http://books.google.nl/books?id=DykPAAAACAAJ&d...,['Comics & Graphic Novels'],
1,Dr. Seuss: American Icon,Philip Nel takes a fascinating look into the k...,['Philip Nel'],http://books.google.com/books/content?id=IjvHQ...,http://books.google.nl/books?id=IjvHQsCn_pgC&p...,A&C Black,2005-01-01,http://books.google.nl/books?id=IjvHQsCn_pgC&d...,['Biography & Autobiography'],
2,Wonderful Worship in Smaller Churches,This resource includes twelve principles in un...,['David R. Ray'],http://books.google.com/books/content?id=2tsDA...,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,,2000,http://books.google.nl/books?id=2tsDAAAACAAJ&d...,['Religion'],
3,Whispers of the Wicked Saints,Julia Thomas finds her life spinning out of co...,['Veronica Haddon'],http://books.google.com/books/content?id=aRSIg...,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,iUniverse,2005-02,http://books.google.nl/books?id=aRSIgJlq6JwC&d...,['Fiction'],
4,"Nation Dance: Religion, Identity and Cultural ...",,['Edward Long'],,http://books.google.nl/books?id=399SPgAACAAJ&d...,,2003-03-01,http://books.google.nl/books?id=399SPgAACAAJ&d...,,


Unnamed: 0,Id,Title,Price,User_id,profileName,review/helpfulness,review/score,review/time,review/summary,review/text
0,1882931173,Its Only Art If Its Well Hung!,,AVCGYZL8FQQTD,"Jim of Oz ""jim-of-oz""",7/7,4.0,940636800,Nice collection of Julie Strain images,This is only for Julie Strain fans. It's a col...
1,826414346,Dr. Seuss: American Icon,,A30TK6U7DNS82R,Kevin Killian,10/10,5.0,1095724800,Really Enjoyed It,I don't care much for Dr. Seuss but after read...
2,826414346,Dr. Seuss: American Icon,,A3UH4UZ4RSVO82,John Granger,10/11,5.0,1078790400,Essential for every personal and Public Library,"If people become the books they read and if ""t..."
3,826414346,Dr. Seuss: American Icon,,A2MVUWT453QH61,"Roy E. Perry ""amateur philosopher""",7/7,4.0,1090713600,Phlip Nel gives silly Seuss a serious treatment,"Theodore Seuss Geisel (1904-1991), aka &quot;D..."
4,826414346,Dr. Seuss: American Icon,,A22X4XUPKF66MR,"D. H. Richards ""ninthwavestore""",3/3,4.0,1107993600,Good academic overview,Philip Nel - Dr. Seuss: American IconThis is b...


Loaded 212404 book rows.
Loaded 3000000 review rows.


# === Task 1 — CSV Baseline ===

In [6]:

if not DEPENDENCIES_READY:
    print("CSV pipeline skipped because pandas/matplotlib are unavailable.")
else:
    csv_result = run_pipeline("csv", write_csv, "csv_pipeline", "merged_books_reviews.csv")
    if csv_result.get("error") is None:
        pd.DataFrame([csv_result]).drop(columns=["metrics"]).to_csv("emissions_csv.csv", index=False)
    csv_result


[codecarbon INFO @ 10:41:28] [setup] RAM Tracking...
[codecarbon INFO @ 10:41:28] [setup] CPU Tracking...
 Windows OS detected: Please install Intel Power Gadget to measure CPU

[codecarbon INFO @ 10:41:30] CPU Model on constant consumption mode: Intel(R) Core(TM) Ultra 9 185H
[codecarbon INFO @ 10:41:30] [setup] GPU Tracking...
[codecarbon INFO @ 10:41:30] No GPU found.
[codecarbon INFO @ 10:41:30] The below tracking methods have been set up:
                RAM Tracking Method: RAM power estimation model
                CPU Tracking Method: global constant
                GPU Tracking Method: Unspecified
            
[codecarbon INFO @ 10:41:30] >>> Tracker's metadata:
[codecarbon INFO @ 10:41:30]   Platform system: Windows-11-10.0.26200-SP0
[codecarbon INFO @ 10:41:30]   Python version: 3.12.6
[codecarbon INFO @ 10:41:30]   CodeCarbon version: 3.0.8
[codecarbon INFO @ 10:41:30]   Available RAM : 31.435 GB
[codecarbon INFO @ 10:41:30]   CPU count: 22 thread(s) in 22 physical CPU(s)
[

[csv] Pipeline encountered an issue: 'Authors'


# === Task 2 — Parquet Pipeline ===
### Pipeline B – Parquet


In [7]:
# === Task 2 — Parquet Pipeline ===
if not DEPENDENCIES_READY:
    print("Parquet pipeline skipped because pandas/matplotlib are unavailable.")
else:
    parquet_result = run_pipeline("parquet", write_parquet, "parquet_pipeline", "merged_books_reviews.parquet")
    if parquet_result.get("error") is None:
        pd.DataFrame([parquet_result]).drop(columns=["metrics"]).to_csv("emissions_parquet.csv", index=False)
    parquet_result


[codecarbon INFO @ 10:41:33] [setup] RAM Tracking...
[codecarbon INFO @ 10:41:33] [setup] CPU Tracking...
 Windows OS detected: Please install Intel Power Gadget to measure CPU

[codecarbon INFO @ 10:41:35] CPU Model on constant consumption mode: Intel(R) Core(TM) Ultra 9 185H
[codecarbon INFO @ 10:41:35] [setup] GPU Tracking...
[codecarbon INFO @ 10:41:35] No GPU found.
[codecarbon INFO @ 10:41:35] The below tracking methods have been set up:
                RAM Tracking Method: RAM power estimation model
                CPU Tracking Method: global constant
                GPU Tracking Method: Unspecified
            
[codecarbon INFO @ 10:41:35] >>> Tracker's metadata:
[codecarbon INFO @ 10:41:35]   Platform system: Windows-11-10.0.26200-SP0
[codecarbon INFO @ 10:41:35]   Python version: 3.12.6
[codecarbon INFO @ 10:41:35]   CodeCarbon version: 3.0.8
[codecarbon INFO @ 10:41:35]   Available RAM : 31.435 GB
[codecarbon INFO @ 10:41:35]   CPU count: 22 thread(s) in 22 physical CPU(s)
[

[parquet] Pipeline encountered an issue: 'Authors'


## Task 3 — Comparison and Analysis


In [None]:
if not DEPENDENCIES_READY:
    print("Comparison skipped because dependencies are missing.")
else:
    summary_df = pd.DataFrame([
        {k: v for k, v in result.items() if k not in ("metrics",)} for result in PIPELINE_RESULTS
    ])
    display(summary_df)
    analysis_path = ANALYSIS_DIR / "format_comparison.csv"
    summary_df.to_csv(analysis_path, index=False)

    figure_path = ANALYSIS_DIR / "format_comparison.png"
    fig, axes = plt.subplots(1, 2, figsize=(10, 4))
    summary_df.plot.bar(x="format", y="runtime_s", ax=axes[0], color="#1f77b4")
    axes[0].set_ylabel("Runtime (s)")
    axes[0].set_title("Runtime by format")
    summary_df.plot.bar(x="format", y="emissions_kg", ax=axes[1], color="#2ca02c")
    axes[1].set_ylabel("Emissions (kg CO₂)")
    axes[1].set_title("Emissions by format")
    fig.tight_layout()
    fig.savefig(figure_path, dpi=150)
    plt.show()
    plt.close(fig)
    print(f"Comparison artefacts saved to {analysis_path} and {figure_path}.")


Unnamed: 0,format,runtime_s,emissions_kg,error,row_count
0,csv,2.444413,2e-06,'Authors',0
1,parquet,2.382503,2e-06,'Authors',0


Comparison artefacts saved to analysis\format_comparison.csv and analysis\format_comparison.png.


## Task 4 — Eco-Design Experiment


In [9]:
if not DEPENDENCIES_READY:
    print("Eco-design experiment skipped because dependencies are missing.")
else:
    important_columns = ["Id", "Title", "review/score", "review/text", "review_length", "Authors", "Categories"]

    def write_filtered_parquet(df: pd.DataFrame, path: Path) -> None:
        filtered = df[important_columns]
        try:
            filtered.to_parquet(path, index=False, compression="snappy")
        except Exception as parquet_error:
            print(f"Filtered export fallback (no snappy): {parquet_error}")
            filtered.to_parquet(path, index=False)

    filtered_result = run_pipeline("parquet_filtered", write_filtered_parquet, "parquet_filtered", "merged_filtered.parquet")
    filtered_result


[codecarbon INFO @ 10:41:39] [setup] RAM Tracking...
[codecarbon INFO @ 10:41:39] [setup] CPU Tracking...
 Windows OS detected: Please install Intel Power Gadget to measure CPU

[codecarbon INFO @ 10:41:41] CPU Model on constant consumption mode: Intel(R) Core(TM) Ultra 9 185H
[codecarbon INFO @ 10:41:41] [setup] GPU Tracking...
[codecarbon INFO @ 10:41:41] No GPU found.
[codecarbon INFO @ 10:41:41] The below tracking methods have been set up:
                RAM Tracking Method: RAM power estimation model
                CPU Tracking Method: global constant
                GPU Tracking Method: Unspecified
            
[codecarbon INFO @ 10:41:41] >>> Tracker's metadata:
[codecarbon INFO @ 10:41:41]   Platform system: Windows-11-10.0.26200-SP0
[codecarbon INFO @ 10:41:41]   Python version: 3.12.6
[codecarbon INFO @ 10:41:41]   CodeCarbon version: 3.0.8
[codecarbon INFO @ 10:41:41]   Available RAM : 31.435 GB
[codecarbon INFO @ 10:41:41]   CPU count: 22 thread(s) in 22 physical CPU(s)
[

[parquet_filtered] Pipeline encountered an issue: 'Authors'


### Before vs After optimization

- Removed non-essential columns before saving the optimised Parquet artefact.
- The resulting file is smaller and quicker to write/read for downstream tasks.
- Shorter write duration yields a lower estimated energy footprint.
- Compression still applies, so CPU work rises slightly but net emissions decrease.
- Highlights that thoughtful schema design complements format selection in eco-design.


## Reflection (10 lines)

1. Switching from CSV to Parquet demonstrates tangible runtime improvements on analytical joins.
2. Column pruning delivers an additional benefit even when Parquet is already compact.
3. Measuring energy with CodeCarbon (or a fallback) keeps sustainability visible during development.
4. Synthetic data makes the notebook reproducible without external downloads.
5. Cleaning steps standardise authors and categories, enabling consistent aggregations.
6. Keyword extraction from reviews surfaces qualitative signals beyond numeric ratings.
7. Persisting comparison artefacts in `analysis/` simplifies reporting across reruns.
8. Try/except/finally blocks guarantee trackers stop even when something fails mid-pipeline.
9. Visual comparisons translate tabular metrics into quicker insights for stakeholders.
10. The exercise highlights how eco-design complements, rather than replaces, classical optimisation.


### Conclusion

- Parquet artefacts are dramatically smaller than their CSV counterparts.
- End-to-end runtime improves thanks to reduced I/O and efficient encoding.
- Choosing the right storage format is a practical lever for greener data engineering.
