# Book Review Data Pipeline
### Data Engineering Capstone Project

#### Project Summary
Imagine that you're running a Book Review site and wants to use your stored data for analytics. Thus, you want to store them into a Data Warehouse under a Star Schema (image in Step 2).

Please check out the ERD in the README.md file


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up


## Imports and setups

In [4]:
import pandas as pd
import numpy as np
import os

## Step 1: Scope the Project and Gather Data

#### Scope 
For this notebook, we will go through step 1 to 4, as step 5 will be in the README.

#### Data
The files are downloaded from [this kaggle](https://www.kaggle.com/datasets/ruchi798/bookcrossing-dataset).

>Contains 278,858 users (anonymized but with demographic information) providing 1,149,780 ratings (explicit / implicit) about 271,379 books.

There are 4 files in total:
- BX-Book-Ratings.csv
- BX-Users.csv
- BX_Books.csv
- Preprocessed_data.csv

### Preprocessed Data

In [36]:

file_path = './data/Books Data with Category Language and Summary/'

df = pd.read_csv(file_path + 'Preprocessed_data.csv')
df.head()

Unnamed: 0.1,Unnamed: 0,user_id,location,age,isbn,rating,book_title,book_author,year_of_publication,publisher,img_s,img_m,img_l,Summary,Language,Category,city,state,country
0,0,2,"stockton, california, usa",18.0,195153448,0,Classical Mythology,Mark P. O. Morford,2002.0,Oxford University Press,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...,Provides an introduction to classical myths pl...,en,['Social Science'],stockton,california,usa
1,1,8,"timmins, ontario, canada",34.7439,2005018,5,Clara Callan,Richard Bruce Wright,2001.0,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,"In a small town in Canada, Clara Callan reluct...",en,['Actresses'],timmins,ontario,canada
2,2,11400,"ottawa, ontario, canada",49.0,2005018,0,Clara Callan,Richard Bruce Wright,2001.0,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,"In a small town in Canada, Clara Callan reluct...",en,['Actresses'],ottawa,ontario,canada
3,3,11676,"n/a, n/a, n/a",34.7439,2005018,8,Clara Callan,Richard Bruce Wright,2001.0,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,"In a small town in Canada, Clara Callan reluct...",en,['Actresses'],,,
4,4,41385,"sudbury, ontario, canada",34.7439,2005018,0,Clara Callan,Richard Bruce Wright,2001.0,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,"In a small town in Canada, Clara Callan reluct...",en,['Actresses'],sudbury,ontario,canada


In [41]:
len(df)

1031175

In [5]:
print(os.listdir('data/Book reviews/Book reviews'))
file_path = './data/Book reviews/Book reviews/'

df_ratings = pd.read_csv(file_path + 'BX-Book-Ratings.csv', sep=';', encoding="latin-1", error_bad_lines=False)
df_books = pd.read_csv(file_path + 'BX_Books.csv', sep=';', encoding="latin-1", error_bad_lines=False)
df_users = pd.read_csv(file_path + 'BX-Users.csv', sep=';', encoding="latin-1", error_bad_lines=False)

['BX-Book-Ratings.csv', 'BX_Books.csv', 'BX-Users.csv']


In [44]:
print(len(df_ratings))
print(df.info())
df_ratings.head()

1149780
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1031175 entries, 0 to 1031174
Data columns (total 19 columns):
 #   Column               Non-Null Count    Dtype  
---  ------               --------------    -----  
 0   Unnamed: 0           1031175 non-null  int64  
 1   user_id              1031175 non-null  int64  
 2   location             1031175 non-null  object 
 3   age                  1031175 non-null  float64
 4   isbn                 1031175 non-null  object 
 5   rating               1031175 non-null  int64  
 6   book_title           1031175 non-null  object 
 7   book_author          1031175 non-null  object 
 8   year_of_publication  1031175 non-null  float64
 9   publisher            1031175 non-null  object 
 10  img_s                1031175 non-null  object 
 11  img_m                1031175 non-null  object 
 12  img_l                1031175 non-null  object 
 13  Summary              1031175 non-null  object 
 14  Language             1031175 non-null  obj

Unnamed: 0,User-ID,ISBN,Book-Rating
0,276725,034545104X,0
1,276726,0155061224,5
2,276727,0446520802,0
3,276729,052165615X,3
4,276729,0521795028,6


In [39]:
print(len(df_books))
df_books.head()

271379


Unnamed: 0,ISBN,Book-Title,Book-Author,Year-Of-Publication,Publisher,Image-URL-S,Image-URL-M,Image-URL-L
0,195153448,Classical Mythology,Mark P. O. Morford,2002,Oxford University Press,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...
1,2005018,Clara Callan,Richard Bruce Wright,2001,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...
2,60973129,Decision in Normandy,Carlo D'Este,1991,HarperPerennial,http://images.amazon.com/images/P/0060973129.0...,http://images.amazon.com/images/P/0060973129.0...,http://images.amazon.com/images/P/0060973129.0...
3,374157065,Flu: The Story of the Great Influenza Pandemic...,Gina Bari Kolata,1999,Farrar Straus Giroux,http://images.amazon.com/images/P/0374157065.0...,http://images.amazon.com/images/P/0374157065.0...,http://images.amazon.com/images/P/0374157065.0...
4,393045218,The Mummies of Urumchi,E. J. W. Barber,1999,W. W. Norton & Company,http://images.amazon.com/images/P/0393045218.0...,http://images.amazon.com/images/P/0393045218.0...,http://images.amazon.com/images/P/0393045218.0...


In [38]:
print(len(df_users))
df_users.head()

278858


Unnamed: 0,User-ID,Location,Age
0,1,"nyc, new york, usa",
1,2,"stockton, california, usa",18.0
2,3,"moscow, yukon territory, russia",
3,4,"porto, v.n.gaia, portugal",17.0
4,5,"farnborough, hants, united kingdom",


### Step 2: Explore and Assess the Data
#### Cleaning Steps
- Missing Values
- Incorrect Column types/names
- Invalid Values
- Remove unnecessary columns

In [45]:
# define a tune function
def assess_df(df):
    print("<=== Columns ===>")
    print(df.info(), "\n")
    
    print("<=== Head ==>")
    print(df.head())
assess_df(df)

<=== Columns ===>
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1031175 entries, 0 to 1031174
Data columns (total 19 columns):
 #   Column               Non-Null Count    Dtype  
---  ------               --------------    -----  
 0   Unnamed: 0           1031175 non-null  int64  
 1   user_id              1031175 non-null  int64  
 2   location             1031175 non-null  object 
 3   age                  1031175 non-null  float64
 4   isbn                 1031175 non-null  object 
 5   rating               1031175 non-null  int64  
 6   book_title           1031175 non-null  object 
 7   book_author          1031175 non-null  object 
 8   year_of_publication  1031175 non-null  float64
 9   publisher            1031175 non-null  object 
 10  img_s                1031175 non-null  object 
 11  img_m                1031175 non-null  object 
 12  img_l                1031175 non-null  object 
 13  Summary              1031175 non-null  object 
 14  Language             1031175 non

In [46]:
assess_df(df_books)

<=== Columns ===>
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 271379 entries, 0 to 271378
Data columns (total 8 columns):
 #   Column               Non-Null Count   Dtype 
---  ------               --------------   ----- 
 0   ISBN                 271379 non-null  object
 1   Book-Title           271379 non-null  object
 2   Book-Author          271378 non-null  object
 3   Year-Of-Publication  271379 non-null  int64 
 4   Publisher            271377 non-null  object
 5   Image-URL-S          271379 non-null  object
 6   Image-URL-M          271379 non-null  object
 7   Image-URL-L          271379 non-null  object
dtypes: int64(1), object(7)
memory usage: 16.6+ MB
None 

<=== Head ==>
         ISBN                                         Book-Title  \
0  0195153448                                Classical Mythology   
1  0002005018                                       Clara Callan   
2  0060973129                               Decision in Normandy   
3  0374157065  Flu: The Stor

In [48]:
assess_df(df_ratings)

<=== Columns ===>
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1149780 entries, 0 to 1149779
Data columns (total 3 columns):
 #   Column       Non-Null Count    Dtype 
---  ------       --------------    ----- 
 0   User-ID      1149780 non-null  int64 
 1   ISBN         1149780 non-null  object
 2   Book-Rating  1149780 non-null  int64 
dtypes: int64(2), object(1)
memory usage: 26.3+ MB
None 

<=== Head ==>
   User-ID        ISBN  Book-Rating
0   276725  034545104X            0
1   276726  0155061224            5
2   276727  0446520802            0
3   276729  052165615X            3
4   276729  0521795028            6


In [47]:
assess_df(df_users)

<=== Columns ===>
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 278858 entries, 0 to 278857
Data columns (total 3 columns):
 #   Column    Non-Null Count   Dtype  
---  ------    --------------   -----  
 0   User-ID   278858 non-null  int64  
 1   Location  278858 non-null  object 
 2   Age       168096 non-null  float64
dtypes: float64(1), int64(1), object(1)
memory usage: 6.4+ MB
None 

<=== Head ==>
   User-ID                            Location   Age
0        1                  nyc, new york, usa   NaN
1        2           stockton, california, usa  18.0
2        3     moscow, yukon territory, russia   NaN
3        4           porto, v.n.gaia, portugal  17.0
4        5  farnborough, hants, united kingdom   NaN


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The idea is be able to create dashboards to look at the ratings and find out "facts" about authors, users, categories, and even language in relation to ratings.

An assumption I made here is that "COUNT of rating" is equivalent to popularity, thus I included this in there as well.

<img src="./images/data-model.png"/>

#### 3.2 Mapping Out Data Pipelines
This will happen, for now, in this notebook.

The data flow will follow the standard Data Warehouse Architecture (taken from public website):
<img src="http://bidw.ca/wp-content/uploads/2014/07/datawarehouse.png" />

In more detail, the functions will be split into these "modules"
- **EXTRACT**
    - Get data from sources into (for this notebook) Dataframe. Since the datasets are local, this part is considered done.
- **TRANSFORM**
    - CLEAN: generally clean data types, column names, fill null values with n/a (for string), and 100 for integers like age
    - TRANSFORM: reshape the data into 
- **LOAD**
    - (Re)Create data model on data store (Not necessary for local csv storage)
    - Load data into data store (save to csv)

In [237]:
def create_dim_users():
    """
    Create Dimension table - Users
    Columns: user_id, city, state, country, age
    """
    cols = ["user_id", "city", "state", "country", "age"]
    dim_users = df_users.copy()
    dim_users = dim_users.rename(columns={'User-ID': 'user_id','Age': 'age'})
    
    dim_users[["city", "state", "country"]] = dim_users['Location'].str.split(',', expand=True).iloc[:, :3]
    dim_users["age"] = dim_users.age.fillna(100) # temp value instead of NaN
    return dim_users[cols]

def create_dim_books():
    """
    Create Dimension table - Books
    Columns: isbn, title, year_of_publication, publisher, summary
    """
    dim_books = df_books.copy()
    cols = ['isbn', 'title', 'year_of_publication', 'publisher', 'summary']
    
    # get summary column
    nt = df.groupby(['isbn','Summary']).size().reset_index()[['isbn','Summary']]
    dim_books = dim_books.merge(nt, left_on='ISBN', right_on='isbn')
    
    dim_books = dim_books.rename(columns={
                    'Book-Title': 'title',
                    'Year-Of-Publication': 'year_of_publication',
                    'Publisher': 'publisher',
                    'Summary' : 'summary'
                })
    
    return dim_books[cols]

def create_dim_authors():
    """
    Create Dimension table - Authors
    Columns: author_id, author_name
    """
    dim_authors = pd.DataFrame(df_books['Book-Author'].unique()).sort_values(0).reset_index()
    dim_authors = dim_authors.rename(columns={'index': 'author_id', 0: 'author_name'})
    dim_authors['author_name'] = dim_authors['author_name'].fillna('n/a')
    
    return dim_authors

def create_dim_languages():
    """
    Create Dimension table - Languages
    Columns: lid, language
    """
    dim_lang = pd.DataFrame(df['Language'].unique()).sort_values(0).reset_index()
    dim_lang = dim_lang.rename(columns={'index': 'lid', 0: 'language'})
    return dim_lang

def create_dim_categories():
    """
    Create Dimension table - Categories
    Columns: cid, category
    """
    dim_cat = pd.DataFrame(df['Category'].unique()).sort_values(0).reset_index()
    dim_cat = dim_cat.rename(columns={'index': 'cid', 0: 'category'})
    return dim_cat

In [226]:
def create_fact(dim_users, dim_books, dim_authors, dim_languages, dim_categories):
    """
    Create Fact Table
    Dimensional Table Foreign keys: user_id, isbn, author_id, language_id, category_id
    Facts: QTY_rating, AVG_rating, MAX_rating, MIN_rating
    """
    df_compiled = df[['user_id', 'isbn', 'book_author', 'Language', 'Category','rating']].copy()
    
    # replace some fields with ids from dim tables
    # can exclude user_id and isbn because we didn't make surrogate keys for them
    df_compiled = df_compiled.merge(dim_authors, left_on='book_author', right_on='author_name')
    df_compiled = df_compiled.merge(dim_languages, left_on='Language', right_on='language')
    df_compiled = df_compiled.merge(dim_categories, left_on='Category', right_on='category')
    
    # create fact columns
    cols = ['user_id', 'isbn', 'author_id', 'lid', 'cid','rating']
    df_fact = df_compiled[cols]
    df_fact = df_fact.rename(columns={'lid':'language_id', 'cid': 'category_id'})
    
    df_fact['QTY_rating'] = np.ones(shape=len(df_fact)) # if group by all columns, then each row is unique
    df_fact['AVG_rating'] = df_fact.rating
    df_fact['MAX_rating'] = df_fact.rating
    df_fact['MIN_rating'] = df_fact.rating
    
    df_fact = df_fact.drop(columns=['rating'])
    return df_fact

### Step 4: Data Model
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [260]:
# create tables
dim_users = create_dim_users()
dim_books = create_dim_books()
dim_authors = create_dim_authors()
dim_languages = create_dim_languages()
dim_categories = create_dim_categories()

fact = create_fact(dim_users, dim_books, dim_authors, dim_languages, dim_categories)
fact.head()

Unnamed: 0,user_id,isbn,author_id,language_id,category_id,QTY_rating,AVG_rating,MAX_rating,MIN_rating
0,2,195153448,0,0,0,1.0,0,0,0
1,269782,801319536,0,0,0,1.0,7,7,7
2,95991,757301401,14,0,0,1.0,0,0,0
3,143239,757301401,14,0,0,1.0,0,0,0
4,7346,345354648,28,0,0,1.0,0,0,0


In [228]:
# LOAD: write to storage
dim_users.to_csv('data/model/dim_users.csv', index=False)
dim_books.to_csv('data/model/dim_books.csv', index=False)
dim_authors.to_csv('data/model/dim_authors.csv', index=False)
dim_languages.to_csv('data/model/dim_languages.csv', index=False)
dim_categories.to_csv('data/model/dim_categories.csv', index=False)

fact.to_csv('data/model/fact_table.csv', index=False)

#### 4.2 Data Quality Checks
Data quality checks includes

- No empty table after running ETL data pipeline
- Data schema of every dimensional table matches data model

We could load csv's to do these quality check, but I will just reuse the tables created because Pandas is reliable with `to_csv()` function.

Define Test functions:

In [199]:
# Check Empty table
def check_empty(df, table_name):
    for k,v in table_name_map.items():
        assert not (len(df) == 0), table_name + " is empty!"


In [206]:
# Compare Schema between data model and output
def check_schema(df, table_name, schema={'col_name1': 'type1'}, unique_keys=None):
    """
    input:
    - df: the DataFrame of the table
    - table_name: name of table for print statements
    - schema: a dictionary describing the complete schema, where each key 
    is the column name, and their corresponding value is the type
    E.g. {'user_id': 'int', 'age':'int'}
    - unique_keys: list of column names to check for uniqueness
    """
    if not table_name: raise Exception("table_name cannot be None or empty")
    
    # Check Column name and type
    for colname, dtype in df.dtypes.items():
        assert schema[colname] == dtype, "FAIL check_schema TABLE " + table_name + " SCHEMA COLUMN " + colname + " has different data type"
    
    # Check Unique keys
    if unique_keys:
        for k in unique_keys:
            assert df[k].nunique() == len(df), "FAIL check_schema TABLE " + table_name + " unique_key: COLUMN '" + k + "' is NOT UNIQUE"


In [240]:
table_name_map = {
    "dim_users": dim_users,
    "dim_books": dim_books,
    "dim_authors": dim_authors,
    "dim_languages": dim_languages,
    "dim_categories": dim_categories
}

def data_quality_check():
    """
    Data Quality Tests
    1. Check if Tables are empty after ETL
    2. Validate data schema matches data model
    3. Check Foreign Key constraints are met
    """
    for k,v in table_name_map.items():
        check_empty(v, k)
        
    # for the sake of this project, I will leave the data types as is
    # only when using Spark would we guarantee data type and coherence.
    schema = {
        "fact": {
            "user_id" :        "int64",
            "isbn"    :        "object",
            "author_id":        "int64",
            "language_id":      "int64",
            "category_id":      "int64",
            "QTY_rating":     "float64",
            "AVG_rating":       "int64",
            "MAX_rating":       "int64",
            "MIN_rating":       "int64"
        },
        "dim_users": {
            "user_id"    :   "int64",
            "city"       :  "object",
            "state"      :  "object",
            "country"    :  "object",
            "age"        :  "float64"
        },
        "dim_books":{
            "isbn"       :            "object",
            "title"      :            "object",
            "year_of_publication":     "int64",
            "publisher"  :            "object",
            "summary"    :            "object"
        },
        "dim_authors":{
            "author_id"  :     "int64",
            "author_name":    "object"
        },
        "dim_languages":{
            "lid"        :  "int64",
            "language"   : "object"
        },
        "dim_categories":{
            "cid"        :  "int64",
            "category"   : "object"
        }
        
    }
    unique_keys = {
        "dim_users": ['user_id'],
        "dim_books": ['isbn'],
        "dim_authors": ['author_id', 'author_name'],
        "dim_languages": ['lid', 'language'],
        "dim_categories": ['cid'],
        "fact": []
    }
    for k,v in table_name_map.items():
        check_schema(v, k, schema[k], unique_keys[k])
    
    print("All Tests passed!")

In [241]:
data_quality_check()

All Tests passed!


AWESOME! Luckily, all our tables passed the tests, as planned.
Let's perform some simple queries to answer some business questions.

- Q1. Who are the authors of the top 5 Books with the highest AVG rating?

- Q2. What is the least popular book LANGUAGE in Spain?

- Q3. Following Q2, for that language, what are the MAX_rating and MIN_rating in canada?

In [268]:
# Q1. Who are the authors of the top 5 Books with the highest AVG rating?
df_q1 = fact.merge(dim_books, on='isbn', how='left')\
            .merge(dim_authors, on='author_id', how='left')
df_q1 = df_q1[['isbn', 'author_name', 'title', 'AVG_rating']]\
            .groupby(['isbn', 'author_name', 'title'])\
            .mean().reset_index()
df_q1 = df_q1.sort_values('AVG_rating', ascending=False)\
            .head(5)['author_name'].tolist()
df_q1

['Janet Wellington',
 'Simon Adams',
 'Scott Foresman',
 'Various',
 'Carol Endler Sterbenz']

In [300]:
# Q2. What is the least popular book LANGUAGE(S) in Spain?
df_q2 = fact[['language_id']]\
        .merge(dim_languages, left_on='language_id', right_on='lid', how='left')
df_q2 = df_q2.groupby(['language_id', 'language'])\
        .count().reset_index()
df_q2 = df_q2.loc[df_q2.lid == df_q2.lid.min()]
df_q2['language'].tolist()

['pl', 'tl', 'cy', 'hi', 'ko', 'sv']

In [336]:
# Q3. Following Q2, for that language/those languages, what are the MAX_rating and MIN_rating in USA?
langs = df_q2[['language_id','language']].copy().reset_index()
df_q3 = fact[fact.language_id.isin(langs.language_id.tolist())]\
        .merge(dim_users, on='user_id', how='left')
df_q3 = df_q3[['user_id', 'country', 'MAX_rating', 'MIN_rating', 'language_id']]
df_q3 = df_q3.groupby('country').agg({'MAX_rating': 'max', 'MIN_rating': 'min'})
df_q3.loc[' usa']

MAX_rating    7
MIN_rating    0
Name:  usa, dtype: int64

#### 4.3 Data dictionary 
The Data Dictionary is in a separate Excel file. Please check that out!

## Step 5: Complete Project Write Up
#### Justify the choice of tools and technologies for the project.

I used Pandas and Numpy since the datasets were not too large ( ~1.5 million rows with ).These tools are familiar, perform well on in memory data. I did not use Spark because although it is very convenient with schema representation and efficiency in data transformation, it was not necessary to blend Pandas and Spark with such little data points.

#### Write a description of how you would approach the problem differently under the following scenarios:
##### The data was increased by 100x.
I would implement Spark instead of Pandas, then horizontally scale up Spark clusters.

##### The pipelines would be run on a daily basis by 7 am every day.
If that's the case, I would use task schedulers, such as AWS Step Function and Airflow, to create DAG workflows.

##### The database needed to be accessed by 100+ people.
In this case, I would make OLAP cubes and store those in separate NoSQL DB. Another concern that might be raised is security and authorization. 100+ people would be accessing this database, but probably only a few have write-permission and most would have read-permission. AWS authentications or Azure AD can also be considered.