### Exploring Data Insights & Aggregated Metrics

**Presenter:** Zachary Amadi
**Date:** February 25th, 2025

This notebook presents an interactive analysis of the DriveWealth Open Library Data Pipeline. The pipeline extracts, transforms, and aggregates book data from the Open Library API (default subject: ‘science_fiction’). Through this interactive exploration, you will be able to view key performance metrics, inspect data quality, and drill down into trends by filtering by author and publication year.

##  Summary

- Pipeline Objective: Extract, transform, and aggregate book data for the subject ‘science_fiction’ (modifiable via command-line).

- Future Enhancements: Migration to PostgreSQL, distributed processing via Dask, enhanced real-time monitoring.


In [2]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from ipywidgets import interact, Dropdown, SelectMultiple, IntRangeSlider
%matplotlib inline

### Data Loading Overview
The data for this analysis comes from two CSV files generated by the pipeline: authors.csv (containing author details) and books.csv (containing book details). These files represent the clean, structured outputs of our transformation process.

In [3]:
# Load the CSV files into Pandas DataFrames
df_authors = pd.read_csv("authors.csv")
df_books = pd.read_csv("books.csv")

# Display the first few rows to verify correct loading
print("Authors DataFrame Preview:")
display(df_authors.head())
print("Books DataFrame Preview:")
display(df_books.head())

# Display DataFrame information for a quick structural overview
print("Authors DataFrame Info:")
df_authors.info()
print("Books DataFrame Info:")
df_books.info()

Authors DataFrame Preview:


Unnamed: 0,author_id,author_name
0,/authors/OL22098A,Lewis Carroll
1,/authors/OL25342A,Mary Shelley
2,/authors/OL23431A,L. Frank Baum
3,/authors/OL13066A,H. G. Wells
4,/authors/OL161167A,Arthur Conan Doyle


Books DataFrame Preview:


Unnamed: 0,book_id,title,publication_year,author_id
0,/works/OL138052W,Alice's Adventures in Wonderland,1865,/authors/OL22098A
1,/works/OL450063W,Frankenstein or The Modern Prometheus,1818,/authors/OL25342A
2,/works/OL18417W,The Wonderful Wizard of Oz,1899,/authors/OL23431A
3,/works/OL52267W,The Time Machine,1895,/authors/OL13066A
4,/works/OL262460W,The Lost World,1900,/authors/OL161167A


Authors DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11 entries, 0 to 10
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   author_id    11 non-null     object
 1   author_name  11 non-null     object
dtypes: object(2)
memory usage: 308.0+ bytes
Books DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13 entries, 0 to 12
Data columns (total 4 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   book_id           13 non-null     object
 1   title             13 non-null     object
 2   publication_year  13 non-null     int64 
 3   author_id         13 non-null     object
dtypes: int64(1), object(3)
memory usage: 548.0+ bytes


In [4]:
### Data Exploration and Summary Statistics

# Calculate and print the number of unique authors
unique_authors = df_authors["author_id"].nunique()
print(f"Total Unique Authors: {unique_authors}")

Total Unique Authors: 11


In [5]:
# Compute the number of books per author per publication year
aggregation = df_books.groupby(["author_id", "publication_year"]).size().reset_index(name="num_books")

# Compute the average number of books per author per year
avg_aggregation = aggregation.groupby("author_id")["num_books"].mean().reset_index(name="avg_books_per_year")

# Display the aggregated metrics
print("Aggregated Metrics - Books per Author per Publication Year:")
display(aggregation)
print("Aggregated Metrics - Average Books per Author:")
display(avg_aggregation)

Aggregated Metrics - Books per Author per Publication Year:


Unnamed: 0,author_id,publication_year,num_books
0,/authors/OL118077A,1949,1
1,/authors/OL13066A,1895,1
2,/authors/OL13066A,1897,1
3,/authors/OL13066A,1898,1
4,/authors/OL161167A,1900,1
5,/authors/OL19767A,1800,1
6,/authors/OL20585A,1884,1
7,/authors/OL22098A,1865,1
8,/authors/OL23431A,1899,1
9,/authors/OL25342A,1818,1


Aggregated Metrics - Average Books per Author:


Unnamed: 0,author_id,avg_books_per_year
0,/authors/OL118077A,1.0
1,/authors/OL13066A,1.0
2,/authors/OL161167A,1.0
3,/authors/OL19767A,1.0
4,/authors/OL20585A,1.0
5,/authors/OL22098A,1.0
6,/authors/OL23431A,1.0
7,/authors/OL25342A,1.0
8,/authors/OL3127898A,1.0
9,/authors/OL31727A,1.0


## SQL equivalent
### Number of Books Written Each Year by an Author

```
SELECT
    author_name,
    year,
    COUNT(*) AS num_books
FROM books
GROUP BY author_name, year
ORDER BY author_name, year;
```

### Average Number of Books Written by an Author per Year
```
WITH yearly_counts AS (
    SELECT
        author_name,
        year,
        COUNT(*) AS num_books
    FROM books
    GROUP BY author_name, year
)
SELECT
    author_name,
    AVG(num_books) AS avg_books_per_year
FROM yearly_counts
GROUP BY author_name
ORDER BY author_name;
```

### Optimising for a larger dataset

	1.	Indexing / Partitioning
	•	In SQL, create an index on (author_name, year) or partition the table by author_name or year. This speeds up group-by operations and reduces scan times.
	•	In Pandas, consider using categorical data for author_name if it has many repeats. This can reduce memory usage.
	2.	Data Warehousing
	•	For extremely large data, use a data warehouse or a distributed SQL engine like Amazon Redshift, Google BigQuery, or Apache Hive.
	•	These systems are designed to handle massive amounts of data with parallel processing.
	3.	Distributed Computing (Spark or Dask)
	•	If you have to process huge CSV files in Python, libraries like Dask or PySpark can handle out-of-memory datasets by distributing the workload across multiple cores or machines.
	4.	Incremental Aggregation
	•	If your data grows over time, you can incrementally update your aggregates instead of recomputing from scratch. For instance, maintain a table that stores (author_name, year, num_books) and only update the new or changed records.

## Conclusion 

The solution was built as a modular data pipeline with distinct components for extraction, transformation, loading, and aggregation. This modular approach allowed us to separate concerns and write targeted unit tests for each component. We used a YAML configuration file to centralize settings (e.g., API endpoints, AWS parameters) so that the system is flexible and easily adjustable. An interactive Jupyter Notebook was created to present the findings, complete with interactive filters, summary dashboards, and visualizations.

While the current solution is robust for a take-home assessment, a production environment would demand further enhancements:
1.	Database Integration:
•	Current Approach: We use CSV files as a mock database.
•	Production Enhancement: I would replace CSV storage with a production-grade relational database (e.g., PostgreSQL on AWS RDS) or even a distributed data warehouse (like Amazon Redshift) to handle large-scale data, support complex queries, and ensure ACID properties. This involves designing a proper schema, indexing key fields (e.g., author_id, publication_year), and implementing ETL pipelines that update the database incrementally.
	
	
2.	Scalability and Distributed Processing:
•	Current Approach: Data processing is performed using Pandas, which works well on moderate datasets.
•	Production Enhancement: For larger datasets (millions of records), I’d adopt a distributed processing framework like Apache Spark or Dask. This would allow parallel processing, reduce memory constraints, and improve performance across a cluster of machines.
	
3.	Advanced Error Handling and Monitoring:
•	Current Approach: We log errors and persist error records to a CSV file.
•	Production Enhancement: Implement comprehensive monitoring and alerting using AWS CloudWatch or Prometheus. This includes real-time metrics for extraction, transformation, and loading phases, as well as alerts for data quality issues. Additionally, integrating automated error review processes would help quickly identify and resolve issues.

4.	Containerization and Orchestration:
	•	Current Approach: The solution is implemented as standalone Python scripts and a Jupyter Notebook.
	•	Production Enhancement: I would containerize the pipeline using Docker and orchestrate it with Kubernetes. This ensures consistent deployment, scalability, and easier management across different environments (development, staging, production).

5.	CI/CD and Automated Testing:
	•	Current Approach: The solution includes a GitHub Actions pipeline to run tests and code coverage.
	•	Production Enhancement: Extend the CI/CD pipeline to include integration tests in a staging environment, automated deployment to production, and rigorous performance testing. Incorporate static code analysis and security scanning as part of the build process.

6.	Data Quality and Governance:
	•	Current Approach: Error records are logged, and basic quality checks are performed.
	•	Production Enhancement: Establish a robust data quality framework with automated validations, data profiling, and data lineage tracking. Use tools like Apache Atlas or a data catalog to maintain metadata and ensure data governance.

7.	User Experience in Reporting:
	•	Current Approach: The interactive notebook provides rich visualizations and interactive filtering.
	•	Production Enhancement: Develop a full-fledged dashboard using a BI tool like Tableau or PowerBI, or create a web-based dashboard using frameworks such as Dash or Streamlit. This dashboard would be integrated with real-time data from the production database and provide drill-down capabilities for various business units.

In [None]:
### Optimising for a larger dataset

