In [1]:
from pyspark.sql import SparkSession, functions as f, Window

spark = (SparkSession
            .builder
            .appName("Demo")
            .getOrCreate())

df = spark.read.csv("data/library_checkouts_all_branches_jul_2023.csv", header=True, escape="\"")

df.select("checkout_library", "author", "item_type", "language", "age").show(5)

23/08/14 21:26:30 WARN Utils: Your hostname, spyro-IdeaPad-5-15ITL05 resolves to a loopback address: 127.0.1.1; using 192.168.1.114 instead (on interface wlp0s20f3)
23/08/14 21:26:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/14 21:26:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+----------------+-----------------+---------+--------+--------+
|checkout_library|           author|item_type|language|     age|
+----------------+-----------------+---------+--------+--------+
|             BSQ|    Cano, Carles,|LOTE-BOOK| SPANISH|JUVENILE|
|             BSQ|    Cano, Carles,|LOTE-BOOK| SPANISH|JUVENILE|
|             BSQ|  Dupuis, Sylvia,|LOTE-BOOK| SPANISH|JUVENILE|
|             BSQ|Maudet, Matthieu.|LOTE-BOOK| SPANISH|JUVENILE|
|             BSQ|Foreman, Michael,|LOTE-BOOK| SPANISH|JUVENILE|
+----------------+-----------------+---------+--------+--------+
only showing top 5 rows



In [2]:
print("\nQuick stats\n")

print("Number of checkouts:", df.count())

df_language = (df.select("language")
                 .filter(f.col("language").isNotNull())
                 .distinct())

print("Number of languages:", df_language.count())

df_age = df.groupby("age").count().orderBy("count", ascending=False)

print("\nPopularity by age:")
df_age.show()


Quick stats

Number of checkouts: 50487
Number of languages: 31

Popularity by age:
+----------------+-----+
|             age|count|
+----------------+-----+
|        JUVENILE|24740|
|           ADULT|24477|
|              YA| 1260|
|POLICY NOT FOUND|   10|
+----------------+-----+



In [3]:
def group_records_into_checkouts(df, allowed_gap_in_seconds):
    """Assumes that two books that are borrowed out within x seconds of eachother in the same library can be grouped into a single checkout"""

    # Grab the time of the immediately previous checkout
    library_partition = Window.partitionBy("checkout_library").orderBy("date", "item_id")
    df_checkout = df.withColumn("previous_checkout", f.lag(f.col("date")).over(library_partition))

    # Only consider it to be the same checkout if the previous occurred within the previous X seconds
    df_checkout = (df_checkout.withColumn("time_between_checkouts", f.unix_timestamp("date") - f.unix_timestamp("previous_checkout"))
                              .withColumn("is_same_checkout", f.col("time_between_checkouts") <= allowed_gap_in_seconds)
                              .withColumn("new_checkout_increment", f.when(f.col("is_same_checkout"), 0).otherwise(1))
                              .drop("is_same_checkout"))

    # The checkout group IDs can then be generated by doing a cumulative sum
    df_checkout = df_checkout.withColumn("group_id", f.sum("new_checkout_increment")
                                                      .over(library_partition.rangeBetween(Window.unboundedPreceding, 0)))

    return df_checkout.drop("row_number", "previous_checkout", "new_checkout_increment")

In [6]:
df_explore = df.withColumn("date", f.to_timestamp(f.col("date"), "yyyyMMddHHmmss"))

df_check = group_records_into_checkouts(df_explore, allowed_gap_in_seconds=5)

(df_check.groupBy("checkout_library", "group_id")
         .count()
         .orderBy("count", ascending=False)
         .limit(1)
         .join(df_check, ["checkout_library", "group_id"])
         .select("checkout_library", "title", "author", "item_type", "age", "date")
         .show(50, truncate=False))

+----------------+----------------------------------------------------------+---------------------+----------+--------+-------------------+
|checkout_library|title                                                     |author               |item_type |age     |date               |
+----------------+----------------------------------------------------------+---------------------+----------+--------+-------------------+
|CDE             |FASTBACK - Really weird!                                  |Do, Anh,             |FBKIDS    |JUVENILE|2023-07-09 12:13:01|
|CDE             |The spectacular revenge of Suzi Sims /                    |French, Vivian,      |JU-PBK    |JUVENILE|2023-07-09 12:13:03|
|CDE             |FASTBACK - Camping time!                                  |Do, Anh,             |FBKIDS    |JUVENILE|2023-07-09 12:13:05|
|CDE             |FASTBACK - Hotdog!                                        |Do, Anh,             |FBKIDS    |JUVENILE|2023-07-09 12:13:07|
|CDE             |FA

In [5]:
df_check.groupBy("checkout_library", "group_id").count().select("count").describe().show()

+-------+------------------+
|summary|             count|
+-------+------------------+
|  count|             17133|
|   mean|2.9467693924006304|
| stddev|2.8774430654290577|
|    min|                 1|
|    max|                38|
+-------+------------------+

