Data can be sourced from [Brisbane City Council Library Checkouts](https://www.data.brisbane.qld.gov.au/data/dataset/library-checkouts-branch-date)

#### Setup the SparkSession and SparkContext

1. Set up the session
2. Set up the context
3. Raise the log level

In [None]:
from pyspark.sql import SparkSession, functions as F, types as T, Window

spark = (SparkSession
            .builder
            .appName("Demo")
            .getOrCreate())

sc = spark.sparkContext
sc.setLogLevel("ERROR")

#### Import the data

1. Read in the csv
2. Print the record count
3. Print the schema
4. Print 3 records

In [None]:
df_in = spark.read.csv("library_checkouts_202206.csv", header=True, quote="\"", escape="\"").drop("status", "language")

print(f"Number of records: {df_in.count()}\n")

df_in.printSchema()

df_in.show(3)

#### Data pre-processing

1. Read in a mapping for library_code -> library_name
2. Cast the date column to timestamp
3. Apply the library_code mapping
4. Check the date column was correctly remapped
4. Print 3 records

In [None]:
library_map = spark.read.csv("library_mapping.csv", header=True)

df = (df_in.withColumn("date", F.to_timestamp(F.col("date"), "yyyyMMddHHmmss"))
        .withColumnRenamed("checkout_library", "library_code")
        .join(library_map, "library_code", "left"))

print(df.schema["date"], "\n")

df.show(3)

#### Explore the libraries

1. Print the number of libraries
2. Print the top 3 libraries by number of checkouts

In [None]:
library_count = (df.select("library_name")
                   .distinct()
                   .count())

print(f"Distinct libraries: {library_count}\n")

print("Top 3 libraries by number of checkouts")

(df.groupBy("library_name")
   .count()
   .orderBy("count", ascending=False)
   .show(3))

#### Which titles are the most popular?

1. Print the top 10 titles by number of checkouts

In [None]:
(df.groupBy("title", "item_type_code")
   .count()
   .orderBy("count", ascending=False)
   .show(10, truncate=False))

#### What is the most popular title in each item type?

1. Get the checkout count by title and category
2. Rank the titles within each category type
3. Select the top ranked title (in each category)
4. Print the top 10 top ranked titles

In [None]:
(df.groupBy("title", "item_type_code")
   .count()
   .withColumn("rank", F.rank().over(Window.partitionBy("item_type_code").orderBy(F.desc("count"))))
   .filter(F.col("rank") == 1)
   .drop("rank")
   .orderBy("count", ascending=False)
   .show(10, truncate=False))

#### What does the borrowing pattern look like for a specific library?

1. Print 20 consecutive checkouts

In [None]:
df_exp = df.select("library_code", "library_name", "title", "author", "item_type_code", "age", "date")

(df_exp.orderBy("library_code", "date").drop("title")
    .show(truncate=False))

#### Can we group sets of checkouts together?

In [None]:
def group_records_into_checkouts(df, allowed_gap_in_seconds, show_intermediate=True):
    # Add a row number to act as a tie break when ordering identical dates
    df_checkout = df.withColumn("row_number", F.row_number().over(Window.partitionBy("library_code").orderBy("date")))

    # Grab the time of the immediately previous checkout
    df_checkout = df_checkout.withColumn("previous_checkout", F.lag(F.col("date")).over(Window.partitionBy("library_code").orderBy("date", "row_number")))

    # Only consider it to be the same checkout if the previous occurred within the previous X seconds
    df_checkout = (df_checkout.withColumn("time_between_checkouts", F.unix_timestamp("date") - F.unix_timestamp("previous_checkout"))
                    .withColumn("is_same_checkout", F.col("time_between_checkouts") <= allowed_gap_in_seconds)
                    .withColumn("new_checkout_increment", F.when(F.col("is_same_checkout"), 0).otherwise(1))
                    .drop("is_same_checkout"))

    # The checkout IDs can be generated by doing a cumulative sum 
    df_checkout = df_checkout.withColumn("checkout_id", F.sum("new_checkout_increment")
                                            .over(Window.partitionBy("library_code").orderBy("date", "row_number").rangeBetween(Window.unboundedPreceding, 0)))

    if show_intermediate:
        df_checkout.drop("title", "author", "item_type_code", "age").orderBy("library_code", "date", "row_number").show(5)

    return df_checkout.drop("row_number", "previous_checkout", "new_checkout_increment")

1. Group the records into clusters with unique checkout_ids
2. Print statistics about the checkout_ids

In [None]:
df_checkout = group_records_into_checkouts(df_exp, 5)

(df_checkout.groupBy("library_code", "checkout_id")
                       .count()
                       .select("count")
                       .describe()
                       .show())

1. Calculate some checkout cluster statistics (count, duration)
2. Get the checkout cluster with the largest number of checkouts
3. Print the cluster stats
3. Print all of the records in that cluster

In [None]:
top_record = (df_checkout.groupBy("library_code", "checkout_id")
                         .agg(F.count("library_code").alias("count"), 
                              F.max("date").alias("max_date"),
                              F.min("date").alias("min_date"))
                         .withColumn("duration", F.unix_timestamp(F.col("max_date")) - F.unix_timestamp(F.col("min_date")))
                         .orderBy("count", ascending=False)
                         .limit(1))

top_record.show()

biggest_checkout = df_checkout.join(top_record, ["library_code", "checkout_id"])

(biggest_checkout.drop("library_code", "library_name", "count", "min_date", "max_date", "duration")
                 .orderBy("date")
                 .show(100, truncate=False))


In [None]:
df.explain(True)

In [None]:
print(df.count())
print(df.count())
print(df.count())
print(df.count())

In [None]:
df.cache()
print(df.count())

In [None]:
df.select("test").show()

In [None]:
df.withColumn("test", F.max(F.col("library_name"))).show()