# UK Non-Fiction Book Sales Analysis, 2014-2023

## Imports

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

## Initialising dataset

* Dataset consists of the top 5000 non-fiction titles (HB and PB) sold through UK TCM between 2014-2023.
* Additional data points for each title were added from Goodreads (GR) and Google Books (GB).

In [None]:
df = pd.read_csv("./data/final_books_dataset.csv")
df.head()

In [None]:
df.shape # 50,000 rows, 23 columns

Drop columns containing non-numeric or non-pertinent data

In [None]:
df = df[["Sales_Year", "Publisher Group", "RRP", "ASP", "Binding", "Publ Date", "Product Class", "GR_Pages", "GB_Pages"]]
df.head() # 50,000 rows, 8 columns

## Cleaning, completing and converting data

### Initial Overview

In [None]:
df.describe()

In [None]:
# Display nulls per column
df.isnull().sum()

### Binding

In [None]:
df["Binding"].value_counts() # 33,370 PBs, 16,630 HBs

1. Integer encode hardbacks as 0, paperbacks as 1

In [None]:
df.loc[(df["Binding"] == "Hardback"), "Binding"] = 0
df.loc[(df["Binding"] == "Paperback"), "Binding"] = 1
df["Binding"] = df["Binding"].astype(int)

### RRPs

In [None]:
df["RRP"].describe()

1. Fill 879 missing RRPs with values calculated from average sale price (ASP)

In [None]:
# Add column showing average discount on RRP for each title
df["ASP_Discount"] = 1 - ((df["RRP"].dropna() - df["ASP"]) / df["RRP"].dropna())

# Calculate average level of discount for hardbacks and paperbacks
result = df.groupby("Binding", as_index=False)["ASP_Discount"].mean()
mean_hb_disc = result["ASP_Discount"].iloc[0]
mean_pb_disc = result["ASP_Discount"].iloc[1]

print(mean_hb_disc) # ~0.7 of RRP
print(mean_pb_disc) # ~0.75 of RRP

# Fill missing RRPs based on title's ASP and format, using mean discount calculation
df.loc[(df["RRP"].isnull()) & (df["Binding"] == "Hardback"), "RRP"] = (
    1 / mean_hb_disc
) * df["ASP"]
df.loc[(df["RRP"].isnull()) & (df["Binding"] == "Paperback"), "RRP"] = (
    1 / mean_pb_disc
) * df["ASP"]

# Fill missing ASP Discounts with average values per format
df.loc[(df["ASP_Discount"].isnull()) & (df["Binding"] == "Hardback"), "ASP_Discount"] = mean_hb_disc
df.loc[(df["ASP_Discount"].isnull()) & (df["Binding"] == "Paperback"), "ASP_Discount"] = mean_pb_disc

# Drop ASP and ASP_Discount columns
df = df.drop(columns=["ASP", "ASP_Discount"])

2. Calculate top 25 most frequently occurring RRPs for both hardbacks and paperbacks

In [None]:
hbs = df[df["Binding"] == 0]
pbs = df[df["Binding"] == 1]
print(hbs["RRP"].nunique()) # 143 unique hardback RRPs
print(pbs["RRP"].nunique()) # 204 unique hardback RRPs

In [None]:
hb_top_25 = hbs["RRP"].value_counts().nlargest(25)
hb_rrps = hb_top_25.index.tolist()
hb_rrps.sort()
print(hb_top_25.sum()) # Top 25 hardback RRPs account for 15,304 out of 16,630 titles
print(hb_rrps)

In [None]:
pb_top_25 = pbs["RRP"].value_counts().nlargest(25)
pb_rrps = pb_top_25.index.tolist()
pb_rrps.sort()
print(pb_top_25.sum()) # Top 25 hardback RRPs account for 30,860 out of 33,370 titles
print(pb_rrps)

3. Using top 25 lists, round non-standard RRPs to the nearest standard value if within £0.49 difference - else drop as outlier

In [None]:
# Create RRP_Rounded column, rounding prices to the nearest value in top 25 lists. Extract any significant outliers
df["RRP_Rounded"] = 0.0
outlier_rrps = []

def find_closest(value, reference_list):
    closest = min(reference_list, key=lambda x: abs(x - value))
    return closest

def round_rrps(format, top_20_rrps):
    for index, value in format["RRP"].items():
        if value in top_20_rrps:
            df.at[index, "RRP_Rounded"] = value
        else:
            closest = find_closest(value, top_20_rrps)
            if abs(closest - value) <= 0.49:
                df.at[index, "RRP_Rounded"] = closest
            else:
                outlier_rrps.append(value)
                df.at[index, "RRP_Rounded"] = 0.0

round_rrps(hbs, hb_rrps)
round_rrps(pbs, pb_rrps)

print(len(outlier_rrps)) # 2,442 RRPs fall outside the top 25 HB and PB RRPs - drop accordingly
print(pd.Series(outlier_rrps).nunique()) # 130 unique, non-standard RRPs to be dropped
print(df["RRP_Rounded"].nunique()) #35 unique, standard RRPs remain

In [None]:
# Show RRP boxplots inc outliers before rounding
sns.set_theme(rc={"figure.figsize": (6, 5)})
ax = sns.boxplot(x="Binding", y="RRP", data=df)
ax.set_title("Distribution of RRPs inc outliers (no rounding)")
plt.show()

In [None]:
# Drop rows with non-standard RRPs
df = df[df["RRP_Rounded"] > 0]

# Show boxplots w/o outliers
ax = sns.boxplot(x="Binding", y="RRP_Rounded", data=df, showfliers=False)
ax.set_title("Distribution of RRPs after rounding applied")
plt.show()

In [None]:
# Drop original RRP column
df = df.drop(columns=["RRP"])

### Page Counts

In [None]:
# Describe page count data from Goodreads
df["GR_Pages"].describe()

1. Find and drop outliers

In [None]:
ax = sns.boxplot(x=df["GR_Pages"]) 
ax.set_title("Distribution of page counts from Goodreads")
plt.show()

In [None]:
# Describe page count data from Google Books
df["GB_Pages"].describe() 

In [None]:
# 80,000pp outlier clearly an error - drop corresponding row
df = df[df["GB_Pages"] < 10000] 

In [None]:
ax = sns.boxplot(x=df["GB_Pages"])
ax.set_title("Distribution of page counts from Google Books")
plt.show()

2. Fill 3,155 nulls in GR page counts with GB data, or mean where data not available

In [None]:
# Fill GR_Pages nulls with value from GB_Pages
df["Pages"] = df.apply(
    lambda row: (
        row["GB_Pages"]
        if pd.isnull(row["GR_Pages"]) and pd.notnull(row["GB_Pages"])
        else row["GR_Pages"]
    ),
    axis=1,
)

# Fill remainder with mean page count per format, rounded to nearest multiple of 16
mean_hb_pages = 16 * round(df.loc[df["Binding"] == 0]["Pages"].mean() / 16)
mean_pb_pages = 16 * round(df.loc[df["Binding"] == 1]["Pages"].mean() / 16)
print(mean_hb_pages) # 272pp 
print(mean_pb_pages) # 288pp
df.loc[df["Binding"] == 0]["Pages"].fillna(mean_hb_pages)
df.loc[df["Binding"] == 1]["Pages"].fillna(mean_pb_pages)

# Convert Pages column from float to int
df["Pages"] = df["Pages"].astype(int)

# Drop now redundant GR_Pages and GB_Pages columns
df = df.drop(columns=["GR_Pages", "GB_Pages"])

### Publishers

1. Group titles by publishing house

In [None]:
# Define publishing houses and their respective divisions
publishers = {
    "PRH": [
        "Dorling Kindersley Grp",
        "Penguin Grp",
        "Random House Grp",
        "Transworld Grp",
    ],
    "Hachette": [
        "Hachette Books Ireland Grp",
        "Hachette Children's Grp",
        "Headline Grp",
        "Hodder & Stoughton Grp",
        "John Murray Press Group",
        "Little, Brown Book Grp",
        "Octopus Publishing Grp",
        "Orion Grp",
        "Perseus Books Group",
        "Quercus Grp",
    ],
    "Pan_Mac": [
        "Pan Macmillan Grp",
    ],
    "HC": [
        "HarperCollins Grp",
    ],
    "SS": [
        "Simon & Schuster Grp",
    ],
    "Bonnier": [
        "Bonnier Books UK Publishing Gr",
    ],
    "Bloomsbury": [
        "Bloomsbury Grp",
    ],
    "IA": [
        "Faber Grp",
        "Atlantic Books Grp",
        "Canongate Grp",
        "Duckworth Books Group",
        "Europa Editions Grp",
        "Fitzcarraldo Editions Grp",
        "Granta Grp",
        "Lonely Planet Grp",
        "Murdoch Books Grp",
        "Oneworld Publications Grp",
        "Profile Books Group",
        "Pushkin Grp",
        "Scribe Publications Group",
        "Swift Press",
    ],
}

def get_publisher(publisher_group):
    for publisher, divisions in publishers.items():
        if publisher_group in divisions:
            return publisher
    return "Other"

# Assign divisions to publishers (or "Other" if not a member of a major group)
df["Publisher"] = df["Publisher Group"].apply(get_publisher)

In [None]:
df["Publisher"].value_counts()

2. One-hot encode Publishers and drop original Publisher Group column

In [None]:
# One-hot encode publishers
df = pd.get_dummies(df, columns=["Publisher"], prefix="", prefix_sep="", dtype=int)
# Drop original Publisher Group columns
df = df.drop(columns=["Publisher Group"])

### Publication Dates

1. Extract year and month from Publ Date column

In [None]:
# Convert to datetime format before extracting dates
df["Publ Date"] = pd.to_datetime(df["Publ Date"], format="%d/%m/%Y")
df["Pub_Year"] = df["Publ Date"].dt.year
df["Pub_Month"] = df["Publ Date"].dt.month

# Drop original column
df = df.drop(columns=["Publ Date"])

### Genres and Subgenres

12. Export and integer encode genres / subgenres

In [None]:
# Define top-level genres
genre_dict = {
    1: "The Arts",
    2: "Encyclopedias and Reference Works",
    3: "Literature and Literary Studies",
    4: "Biography and Autobiography",
    5: "History and Archaeology",
    6: "Religion and Belief Systems",
    7: "Politics and Government",
    8: "Popular Science, Popular Culture and Natural History",
    9: "Health and Relationships",
    10: "Mind, Body, Spirit",
    11: "Lifestyle, Hobbies and Leisure",
    12: "Transport",
    13: "Humour, Trivia and Puzzles",
    14: "Travel and Holiday",
    15: "Sports and Active Pursuits",
    16: "Cookery, Food and Drink",
    17: "Personal Development and Practical Advice",
    18: "True Crime and True Stories",
}
qualifiers_dict = {"": "0", "A": "1", "T": "2"}

# Create new genre DF by exporting unique values from "Product Class" column
genre_df = pd.DataFrame(df["Product Class"].unique(), columns=["Full_String"])

# Split the "T0.0"-formatted codes into Genre/Subgenre codes, qualifiers and descriptions
pattern = (
    r"T(?P<Genre_Code>\d{1,2})\.(?P<Sub_Code>\d{1})(?P<Qualifier>\w?)\s(?P<Sub_Desc>.+)"
)
expanded_df = genre_df["Full_String"].str.extract(pattern)
genre_df = pd.concat([genre_df, expanded_df], axis=1)
genre_df[["Genre_Code", "Sub_Code"]] = genre_df[["Genre_Code", "Sub_Code"]].astype(int)

# Convert qualifiers to numeric values from qualifiers_dict
genre_df["Qualifier"] = genre_df["Qualifier"].replace(qualifiers_dict)
genre_df["Qualifier"] = genre_df["Qualifier"].astype(int)

# Add Genre_Desc column using values from genre_dict, tidy up order and export to CSV
genre_df["Genre_Desc"] = genre_df["Genre_Code"].astype(int).map(genre_dict)
genre_df = genre_df.iloc[:, [0, 1, 2, 3, 5, 4]]
genre_df = genre_df.sort_values(by=["Genre_Code", "Sub_Code"]).reset_index(drop=True)
genre_df.to_csv("./data/genres.csv", index=False)

# # Add newly defined genre codes and drop "Product Class" and "Full_String" columns
df = df.merge(
    genre_df[["Full_String", "Genre_Code", "Sub_Code", "Qualifier"]],
    left_on="Product Class",
    right_on="Full_String",
    how="left",
)
df = df.drop(columns=["Product Class", "Full_String"])

### Sales Year

1. Display YoY variation in mean RRP by binding

In [None]:
# Create temporary DF grouped by Sales_Year and Binding for ease of further analysis
temp_df = df.groupby(["Sales_Year", "Binding"])["RRP_Rounded"].mean().unstack(level=1)

# Reshape temporary DF for plotting
plot_df = temp_df.reset_index().melt(
    id_vars="Sales_Year", var_name="Binding", value_name="RRP_Rounded"
)
years = plot_df["Sales_Year"].unique()

# Create the plot
ax = sns.lmplot(
    data=plot_df,
    x="Sales_Year",
    y="RRP_Rounded",
    hue="Binding",
    palette="bright",
    facet_kws=dict(legend_out=False),
)
ax.set(title="YoY variation in mean RRP by binding", xlabel="Sales_Year", ylabel="RRP_Rounded")
plt.tight_layout()
plt.show() # Clear upward trend in RRP from 2014 to 2023, so keep Sales_Year as pertinent data point for model

## Final checks

In [None]:
df.head()

In [None]:
df.dtypes

In [None]:
# Plot heatmap
plt.figure(figsize=(15, 6))
sns.heatmap(df.corr(), annot=True)
plt.show()

In [None]:
df.shape # 42080 rows remaining of the original 50000

# Model selection and optimising

### Aim of model: accurately predict HB and PB RRPS

In [None]:
from sklearn.model_selection import train_test_split, cross_val_score, RandomizedSearchCV
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, PredictionErrorDisplay
from sklearn.tree import DecisionTreeRegressor
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.ensemble import RandomForestRegressor
import xgboost

### Set up train-test split and scaling

In [None]:
# Define X and y values
X = df.drop(columns=["RRP_Rounded"])
y = df["RRP_Rounded"]

# Set up train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(X_train.shape, X_test.shape)

# Standardise features with both StandardScaler and MinMaxScaler
ss = StandardScaler()
X_train_ss = ss.fit_transform(X_train)
X_test_ss = ss.transform(X_test)
mm = MinMaxScaler()
X_train_mm = mm.fit_transform(X_train)
X_test_mm = mm.transform(X_test)

### Model selection

In [None]:
# Simple linear regression
model = LinearRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(mse)

In [None]:
# Linear regression with scaled data - first StandardScaler
model = LinearRegression()
model.fit(X_train_ss, y_train)
y_pred_ss = model.predict(X_test_ss)
mse_ss = mean_squared_error(y_test, y_pred_ss)
print(mse_ss)

# Then MinMaxScaler
model.fit(X_train_mm, y_train)
y_pred_mm = model.predict(X_test_mm)
mse_mm = mean_squared_error(y_test, y_pred_mm)
print(mse_mm) # No tangible difference 

In [None]:
# Decision tree
model = DecisionTreeRegressor()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(mse)

In [None]:
# Test different values for decision tree max_depth
values = [i for i in range(1, 21)]
test_scores = []
for i in values:
    model = DecisionTreeRegressor(max_depth=i)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    test_scores.append(mse)
test_scores

In [None]:
# XGBoost
model = xgboost.XGBRegressor()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(mse)

In [None]:
# Random forest
model = RandomForestRegressor()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(mse) # Good starting point

### Tuning hyperparameters

In [None]:
# Test different values for n_estimators
values = [i for i in range(100, 201, 10)]
test_scores = []
for i in values:
    model = RandomForestRegressor(n_estimators=i)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    test_scores.append(mse)
test_scores

In [None]:
param_grid = {
    "n_estimators": [100, 300, 500, 1000],
    "max_depth": [10, 20, 50, 100, None],
    "min_samples_split": [2, 5, 10, 20],
    "min_samples_leaf": [1, 2, 5, 10],
    "max_features": ["auto", "sqrt", "log2", 0.2, 0.5, 0.8],
    "bootstrap": [True, False],
}

model = RandomForestRegressor()
random_search = RandomizedSearchCV(estimator=model, param_distributions=param_grid,
                                   n_iter=100, cv=3, verbose=2, random_state=42, n_jobs=-1)
random_search.fit(X_train, y_train)

print(f"Best parameters: {random_search.best_params_}")

best_model = random_search.best_estimator_
y_pred = best_model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(mse)

In [None]:
# model = RandomForestRegressor()
# scores = cross_val_score(model, X, y, cv=10)
# print("%0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std()))

In [None]:
# fig, axs = plt.subplots(ncols=2, figsize=(8, 4))
# PredictionErrorDisplay.from_predictions(
#     y,
#     y_pred=y_pred,
#     kind="actual_vs_predicted",
#     subsample=100,
#     ax=axs[0],
#     random_state=0,
# )
# axs[0].set_title("Actual vs. Predicted values")
# PredictionErrorDisplay.from_predictions(
#     y_train,
#     y_pred=y_pred,
#     kind="residual_vs_predicted",
#     subsample=100,
#     ax=axs[1],
#     random_state=0,
# )
# axs[1].set_title("Residuals vs. Predicted Values")
# fig.suptitle("Plotting cross-validated predictions")
# plt.tight_layout()
# plt.show()