# Library Data Integration

## Imports

In [1]:
from pyspark.sql.types import ArrayType, StringType, BooleanType, StructField, StructType, LongType

In [35]:
import great_expectations as gx

ModuleNotFoundError: No module named 'great_expectations.expectations.expectation_configuration'

## Streaming Process

### Define the source schema

In [3]:
    schema = StructType(
        [
            StructField("key", StringType(), True),
            StructField("name", StringType(), True),
            StructField("subject_type", StringType(), True),
            StructField("work_count", LongType(), True),
            StructField(
                "works",
                ArrayType(
                    StructType(
                        [
                            StructField(
                                "authors",
                                ArrayType(
                                    StructType(
                                        [
                                            StructField("key", StringType(), True),
                                            StructField("name", StringType(), True),
                                        ]
                                    ),
                                    True,
                                ),
                                True,
                            ),
                            StructField(
                                "availability",
                                StructType(
                                    [
                                        StructField("__src__", StringType(), True),
                                        StructField(
                                            "available_to_borrow", BooleanType(), True
                                        ),
                                        StructField(
                                            "available_to_browse", BooleanType(), True
                                        ),
                                        StructField(
                                            "available_to_waitlist", BooleanType(), True
                                        ),
                                        StructField("identifier", StringType(), True),
                                        StructField(
                                            "is_browseable", BooleanType(), True
                                        ),
                                        StructField("is_lendable", BooleanType(), True),
                                        StructField(
                                            "is_previewable", BooleanType(), True
                                        ),
                                        StructField(
                                            "is_printdisabled", BooleanType(), True
                                        ),
                                        StructField("is_readable", BooleanType(), True),
                                        StructField(
                                            "is_restricted", BooleanType(), True
                                        ),
                                        StructField("isbn", StringType(), True),
                                        StructField(
                                            "last_loan_date", StringType(), True
                                        ),
                                        StructField(
                                            "last_waitlist_date", StringType(), True
                                        ),
                                        StructField("num_waitlist", StringType(), True),
                                        StructField("oclc", StringType(), True),
                                        StructField(
                                            "openlibrary_edition", StringType(), True
                                        ),
                                        StructField(
                                            "openlibrary_work", StringType(), True
                                        ),
                                        StructField("status", StringType(), True),
                                    ]
                                ),
                                True,
                            ),
                            StructField("cover_edition_key", StringType(), True),
                            StructField("cover_id", LongType(), True),
                            StructField("edition_count", LongType(), True),
                            StructField("first_publish_year", LongType(), True),
                            StructField("has_fulltext", BooleanType(), True),
                            StructField("ia", StringType(), True),
                            StructField(
                                "ia_collection", ArrayType(StringType(), True), True
                            ),
                            StructField("key", StringType(), True),
                            StructField("lending_edition", StringType(), True),
                            StructField("lending_identifier", StringType(), True),
                            StructField("lendinglibrary", BooleanType(), True),
                            StructField("printdisabled", BooleanType(), True),
                            StructField("public_scan", BooleanType(), True),
                            StructField("subject", ArrayType(StringType(), True), True),
                            StructField("title", StringType(), True),
                        ]
                    ),
                    True,
                ),
                True,
            ),
        ]
    )

### Creating the streaming source

In [4]:
src_data = spark.readStream.format("json").schema(schema).load("./raw/*")

In [5]:
src_data.printSchema()

root
 |-- key: string (nullable = true)
 |-- name: string (nullable = true)
 |-- subject_type: string (nullable = true)
 |-- work_count: long (nullable = true)
 |-- works: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- authors: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- key: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |-- availability: struct (nullable = true)
 |    |    |    |-- __src__: string (nullable = true)
 |    |    |    |-- available_to_borrow: boolean (nullable = true)
 |    |    |    |-- available_to_browse: boolean (nullable = true)
 |    |    |    |-- available_to_waitlist: boolean (nullable = true)
 |    |    |    |-- identifier: string (nullable = true)
 |    |    |    |-- is_browseable: boolean (nullable = true)
 |    |    |    |-- is_lendable: boolean (nullable = true)
 |    |    |    |-- is_previewable: boolean (nullab

### Transformations

#### Explode the `works` field
This turns all elements in the array into separate rows

In [6]:
works_explode =src_data.selectExpr(
    "key",
    "name",
    "subject_type",
    "explode(works) as works",
    "input_file_name() as __source_file_name"
)
# first_explode.show(n=1,truncate=False)

In [7]:
works_explode.printSchema()

root
 |-- key: string (nullable = true)
 |-- name: string (nullable = true)
 |-- subject_type: string (nullable = true)
 |-- works: struct (nullable = true)
 |    |-- authors: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- key: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |-- availability: struct (nullable = true)
 |    |    |-- __src__: string (nullable = true)
 |    |    |-- available_to_borrow: boolean (nullable = true)
 |    |    |-- available_to_browse: boolean (nullable = true)
 |    |    |-- available_to_waitlist: boolean (nullable = true)
 |    |    |-- identifier: string (nullable = true)
 |    |    |-- is_browseable: boolean (nullable = true)
 |    |    |-- is_lendable: boolean (nullable = true)
 |    |    |-- is_previewable: boolean (nullable = true)
 |    |    |-- is_printdisabled: boolean (nullable = true)
 |    |    |-- is_readable: boolean (nullable = true)
 |    |    |-- is_restricted

#### Explode the `authors` field
Using `works_explode` as the source, use another explode function to get all the authors into separate rows

In [8]:
authors_explode = works_explode.selectExpr(
    "key as subject_key",
    "name",
    "subject_type",
    "works.key as book_key",
    "works.first_publish_year",
    "works.title",
    "explode(works.authors) as authors",
    
)
# second_explode.show(truncate=False)

#### Flatten

Flatten books data

In [9]:
works_flattened = works_explode.selectExpr(
    "key as subject_key",
    "name as subject_name",
    "subject_type",
    "works.key as works_key",
    "works.title",
    "works.edition_count",
    "works.cover_id",
    "works.cover_edition_key",
    "works.lendinglibrary",
    "works.printdisabled",
    "works.lending_edition",
    "works.lending_identifier",
    "works.first_publish_year",
    "works.ia",
    "works.public_scan",
    "works.has_fulltext",
    "now() as __processed_datetime",
    "__source_file_name"
)

Flatten authors data

In [10]:
author_flattened = authors_explode.selectExpr(
    "subject_key",
    "name",
    "book_key",
    "subject_type",
    "authors.key as author_key",
    "authors.name as author_name"
)
# .limit(5).toPandas().head()

### Save results

In [11]:
works_flattened.writeStream \
    .format("delta") \
    .option("checkpointLocation", "./silver/books/checkpoint/delta/") \
    .trigger(availableNow=True) \
    .start("./silver/books/data/delta/")

24/03/08 03:00:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0xffff4f4e8100>

In [12]:
author_flattened.writeStream \
    .format("delta") \
    .option("checkpointLocation", "./silver/authors/checkpoint/delta") \
    .start("./silver/authors/data/delta/")

24/03/08 03:00:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0xffff4f546d60>

                                                                                

### Print results

In [13]:
spark.read.format("delta").load("silver/books/data/delta").toPandas()

24/03/08 03:01:19 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,subject_key,subject_name,subject_type,works_key,title,edition_count,cover_id,cover_edition_key,lendinglibrary,printdisabled,lending_edition,lending_identifier,first_publish_year,ia,public_scan,has_fulltext,__processed_datetime,__source_file_name
0,/subjects/love,Love,subject,/works/OL21177W,Wuthering Heights,2128,12818862,OL38586477M,False,True,OL50365053M,isbn_9798467132082,1847,isbn_9798467132082,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
1,/subjects/love,Love,subject,/works/OL468431W,The Great Gatsby,1176,10590366,OL22570129M,False,True,OL26441901M,in.ernet.dli.2015.184960,1920,in.ernet.dli.2015.184960,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
2,/subjects/love,Love,subject,/works/OL98501W,Ethan Frome,999,8303480,OL7215847M,False,True,OL32059444M,EdithWharton_SousLaNeige,1910,EdithWharton_SousLaNeige,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
3,/subjects/love,Love,subject,/works/OL362427W,Romeo and Juliet,972,8257991,OL26501367M,False,True,OL7228020M,2tragedyofromeoj00shakuoft,1597,2tragedyofromeoj00shakuoft,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
4,/subjects/love,Love,subject,/works/OL51831W,πολιτεία,792,14400145,OL24990264M,False,True,OL7004561M,republicofplat00plat,1554,republicofplat00plat,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
5,/subjects/love,Love,subject,/works/OL36287W,Le Comte de Monte Cristo,731,14566393,OL46867087M,False,True,OL43254660M,isbn_9781145233584,1830,isbn_9781145233584,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
6,/subjects/love,Love,subject,/works/OL362706W,Sonnets,579,8222090,OL6552577M,False,True,OL13719453M,sonnetsofwilliam0000shak_n0d0,1609,sonnetsofwilliam0000shak_n0d0,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
7,/subjects/love,Love,subject,/works/OL362698W,As You Like It,465,7338874,OL25667349M,False,True,OL32291448M,as_you_like_it_version_3_2012_librivox,1734,as_you_like_it_version_3_2012_librivox,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
8,/subjects/love,Love,subject,/works/OL27776452W,The Importance of Being Earnest,445,1260453,OL9694914M,False,True,OL24953263M,importanceofbein1895wild,1893,importanceofbein1895wild,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
9,/subjects/love,Love,subject,/works/OL267096W,Anna Karenina,391,2560652,OL10601812M,False,True,OL24197455M,annakarnin02tols,1876,annakarnin02tols,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...


In [14]:
spark.read.format("delta").load("silver/authors/data/delta").toPandas()

                                                                                

Unnamed: 0,subject_key,name,book_key,subject_type,author_key,author_name
0,/subjects/love,Love,/works/OL21177W,subject,/authors/OL24529A,Emily Brontë
1,/subjects/love,Love,/works/OL468431W,subject,/authors/OL27349A,F. Scott Fitzgerald
2,/subjects/love,Love,/works/OL98501W,subject,/authors/OL20188A,Edith Wharton
3,/subjects/love,Love,/works/OL362427W,subject,/authors/OL9388A,William Shakespeare
4,/subjects/love,Love,/works/OL51831W,subject,/authors/OL12823A,Πλάτων
5,/subjects/love,Love,/works/OL36287W,subject,/authors/OL18236A,Alexandre Dumas
6,/subjects/love,Love,/works/OL362706W,subject,/authors/OL9388A,William Shakespeare
7,/subjects/love,Love,/works/OL362698W,subject,/authors/OL9388A,William Shakespeare
8,/subjects/love,Love,/works/OL27776452W,subject,/authors/OL20646A,Oscar Wilde
9,/subjects/love,Love,/works/OL267096W,subject,/authors/OL26783A,Lev Nikolaevič Tolstoy


## Data Quality

In [37]:
context = gx.get_context()

In [16]:
def get_datasource(datasource_name, base_directory):
    datasource = context.sources.add_or_update_spark_filesystem(
        name=datasource_name,
        base_directory=base_directory
    )

### Test Source files

In [78]:
datasource_name = "library_api"
data_asset_name = "library_raw_data_asset"
expecation_suite_name = "library__expectation_suite"
datasource = context.sources.add_or_update_spark_filesystem(
    name=datasource_name, base_directory="raw/"
)

datasource.add_directory_json_asset(
    name=data_asset_name,
    data_directory="./"
)

my_asset = context.get_datasource(datasource_name).get_asset(data_asset_name)
my_batch_request = my_asset.build_batch_request()

validator = context.get_validator(
    batch_request=my_batch_request,
    expectation_suite_name=expecation_suite_name
)
validator.head()

24/03/08 03:36:35 WARN CacheManager: Asked to cache already cached data.


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,authors,ebook_count,key,languages,name,people,places,publishers,publishing_history,subject_type,subjects,times,work_count,works
0,"[(70, /authors/OL32625A, Martin Waddell), (65,...",7439,/subjects/love,"[(10440, eng), (1120, fre), (818, spa), (744, ...",Love,"[(114, /subjects/person:jesus_christ, Jesus Ch...","[(266, /subjects/place:united_states, United S...","[(224, /publishers/HarperCollins_Publishers, H...","[[1471, 1], [1490, 1], [1492, 1], [1494, 1], [...",subject,"[(3053, /subjects/fiction, Fiction), (2096, /s...","[(172, /subjects/time:20th_century, 20th centu...",16503,"[([Row(key='/authors/OL24529A', name='Emily Br..."


In [80]:
checkpoint = context.add_or_update_checkpoint(
    name="library_raw_checkpoint",
    validations=[
        {
            "batch_request": my_batch_request,
            "expectation_suite_name": expecation_suite_name,
        },
    ],
)
results = checkpoint.run()
if results.success:
    print("all validation checks passed")
else:
    print("something went wrong")
    print(results)
    raise Exception

24/03/08 03:37:48 WARN CacheManager: Asked to cache already cached data.


Calculating Metrics:   0%|          | 0/28 [00:00<?, ?it/s]

all validation checks passed


### Test books

In [89]:
datasource_name = "library_api__silver"
data_asset_name = "books_silver_asset"
expecation_suite_name = "library__books__expectation_suite"
datasource = context.sources.add_or_update_spark_filesystem(
    name=datasource_name, base_directory="silver/"
)

datasource.add_directory_delta_asset(
    name=data_asset_name,
    data_directory="books/data/delta"
)

my_asset = context.get_datasource(datasource_name).get_asset(data_asset_name)
my_batch_request = my_asset.build_batch_request()

validator = context.get_validator(
    batch_request=my_batch_request,
    expectation_suite_name=expecation_suite_name,
)
validator.head()

24/03/08 03:43:36 WARN CacheManager: Asked to cache already cached data.


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

                                                                                

Unnamed: 0,subject_key,subject_name,subject_type,works_key,title,edition_count,cover_id,cover_edition_key,lendinglibrary,printdisabled,lending_edition,lending_identifier,first_publish_year,ia,public_scan,has_fulltext,__processed_datetime,__source_file_name
0,/subjects/love,Love,subject,/works/OL21177W,Wuthering Heights,2128,12818862,OL38586477M,False,True,OL50365053M,isbn_9798467132082,1847,isbn_9798467132082,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
1,/subjects/love,Love,subject,/works/OL468431W,The Great Gatsby,1176,10590366,OL22570129M,False,True,OL26441901M,in.ernet.dli.2015.184960,1920,in.ernet.dli.2015.184960,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
2,/subjects/love,Love,subject,/works/OL98501W,Ethan Frome,999,8303480,OL7215847M,False,True,OL32059444M,EdithWharton_SousLaNeige,1910,EdithWharton_SousLaNeige,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
3,/subjects/love,Love,subject,/works/OL362427W,Romeo and Juliet,972,8257991,OL26501367M,False,True,OL7228020M,2tragedyofromeoj00shakuoft,1597,2tragedyofromeoj00shakuoft,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...
4,/subjects/love,Love,subject,/works/OL51831W,πολιτεία,792,14400145,OL24990264M,False,True,OL7004561M,republicofplat00plat,1554,republicofplat00plat,True,True,2024-03-08 03:00:54.446646,file:///workspaces/data-engineering-exercise/r...


In [82]:
checkpoint = context.add_or_update_checkpoint(
    name="library__silver__books__checkpoint",
    validations=[
        {
            "batch_request": my_batch_request,
            "expectation_suite_name": expecation_suite_name,
        },
    ],
)
results = checkpoint.run()
if results.success:
    print("all validation checks passed")
else:
    print("something went wrong")
    print(results)
    raise Exception

24/03/08 03:38:13 WARN CacheManager: Asked to cache already cached data.


Calculating Metrics:   0%|          | 0/55 [00:00<?, ?it/s]

                                                                                

all validation checks passed


### Test authors

In [90]:
datasource_name = "library_api__silver"
data_asset_name = "authors_silver_asset"
expecation_suite_name = "library__authors__expectation_suite"
datasource = context.sources.add_or_update_spark_filesystem(
    name=datasource_name, base_directory="silver/"
)

datasource.add_directory_delta_asset(
    name=data_asset_name,
    data_directory="authors/data/delta"
)

my_asset = context.get_datasource(datasource_name).get_asset(data_asset_name)
my_batch_request = my_asset.build_batch_request()

validator = context.get_validator(
    batch_request=my_batch_request,
    expectation_suite_name=expecation_suite_name,
)
validator.head()

24/03/08 03:44:13 WARN CacheManager: Asked to cache already cached data.


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,subject_key,name,book_key,subject_type,author_key,author_name
0,/subjects/love,Love,/works/OL21177W,subject,/authors/OL24529A,Emily Brontë
1,/subjects/love,Love,/works/OL468431W,subject,/authors/OL27349A,F. Scott Fitzgerald
2,/subjects/love,Love,/works/OL98501W,subject,/authors/OL20188A,Edith Wharton
3,/subjects/love,Love,/works/OL362427W,subject,/authors/OL9388A,William Shakespeare
4,/subjects/love,Love,/works/OL51831W,subject,/authors/OL12823A,Πλάτων


In [91]:
checkpoint = context.add_or_update_checkpoint(
    name="library__silver_authors__checkpoint",
    validations=[
        {
            "batch_request": my_batch_request,
            "expectation_suite_name": expecation_suite_name,
        },
    ],
)
results = checkpoint.run()
if results.success:
    print("all validation checks passed")
else:
    print("something went wrong")
    print(results)
    raise Exception

24/03/08 03:45:42 WARN CacheManager: Asked to cache already cached data.


Calculating Metrics:   0%|          | 0/32 [00:00<?, ?it/s]

all validation checks passed
{
  "run_id": {
    "run_name": null,
    "run_time": "2024-03-08T03:45:42.571898+00:00"
  },
  "run_results": {
    "ValidationResultIdentifier::library__authors__expectation_suite/__none__/20240308T034542.571898Z/library_api__silver-authors_silver_asset": {
      "validation_result": {
        "success": true,
        "results": [
          {
            "success": true,
            "expectation_config": {
              "expectation_type": "expect_column_values_to_not_be_null",
              "kwargs": {
                "column": "book_key",
                "batch_id": "library_api__silver-authors_silver_asset"
              },
              "meta": {}
            },
            "result": {
              "element_count": 12,
              "unexpected_count": 0,
              "unexpected_percent": 0.0,
              "partial_unexpected_list": [],
              "partial_unexpected_counts": []
            },
            "meta": {},
            "exception_info

## SQL Queries

In [25]:
a = spark.read \
    .format("delta") \
    .load("silver/authors/data/delta")

a.createOrReplaceTempView("authors")
b = spark.read \
    .format("delta") \
    .load("silver/books/data/delta")

b.createOrReplaceTempView("books")

### The number of books written each year by an author.

In [26]:
books_by_author_per_year = spark.sql(
    """
    WITH joined AS (
        SELECT
            a.author_key,
            a.author_name,
            b.first_publish_year,
            count(*) as num_books_published
        FROM authors as a
        INNER JOIN books as b
            ON a.book_key = b.works_key
        GROUP BY ALL
            
    )
    SELECT *
    from joined
    """
)
books_by_author_per_year.createOrReplaceTempView("books_per_year")
books_by_author_per_year.toPandas().head()

Unnamed: 0,author_key,author_name,first_publish_year,num_books_published
0,/authors/OL9388A,William Shakespeare,1609,1
1,/authors/OL24529A,Emily Brontë,1847,1
2,/authors/OL20646A,Oscar Wilde,1893,1
3,/authors/OL9388A,William Shakespeare,1734,1
4,/authors/OL26783A,Lev Nikolaevič Tolstoy,1876,1


### The average number of books written by an author per year.

In [27]:
spark.sql(
    """
    SELECT AVG(num_books_published) as avg_books_published
    FROM books_per_year
    """
).toPandas().head()

Unnamed: 0,avg_books_published
0,1.0


## Save csv

In [28]:
books =spark.read.format("delta").load("silver/books/data/delta")
books_pd = books.toPandas()
authors =spark.read.format("delta").load("silver/authors/data/delta")
authors_pd = authors.toPandas()


In [29]:
books_pd.to_csv("books.csv")
authors_pd.to_csv("authors.csv")