# <span style="color:red"> Outline</span>
<span style="color:red">
15-20 minutes total

- 5 mins: outline
- 5 mins: freshness
- 5 mins: distribution
- optional 5 mins: wrapping up \& questions
</span>

We want the demo to not just be an exploration of each D.O. pillar individually. There has to be a _point_ as to why we're bothering with a technical demo in the first place.

Basically the takeaway to deliver is that data observability isn't anything magical -- you can access D.O. concepts through a standard _interface_ to your data warehouse / lake, like SQL queries. The specific technology doesn't matter. But the point is to turn D.O. concepts into measureable quantities. Once you've done that, a whole world of anomaly detection, SLAs/SLOs/SLIs, runbooks, incident management, etc etc etc becomes available.

This goes beyond testing -- building blocks for proactive "observability"

# Data Observability—How to Build More Reliable Data Warehouses & Lakes
## Technical Demo
Ryan Othniel Kearns @ Monte Carlo Data | August 10, 2021
___
## 0. Setup

In [1]:
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")

In [2]:
import pandas as pd
import plotly.express as px
import re
from datetime import datetime, date, timedelta

In [3]:
import sys
sys.path.append("..")

In [4]:
from data.utils import get_days_index

all_days = get_days_index(250)

In [5]:
import sqlite3

conn = sqlite3.connect("../data/dbs/strata.db")
c = conn.cursor()

## 1. Introduction

Welcome to the Data Observability technical demo! In this demo, we'll learn Data Observability through practice on some sample datasets. In each example, we'll craft some `SQL` queries that help us learn about the state of our tables and identify Data Downtime issues.

For these exercises, we'll be using mock astronomical data to identify habitable planets.

![SegmentLocal](../data/assets/planets.gif "segment")

Here's a **lineage diagram** of the database system we'll be using for our demo:

![SegmentLocal](../data/assets/warehouse_diagram.png "segment")

In [6]:
c.execute("PRAGMA table_info(EXOPLANETS);")
c.fetchall()

[(0, '_id', 'TEXT', 0, None, 0),
 (1, 'distance', 'REAL', 0, None, 0),
 (2, 'g', 'REAL', 0, None, 0),
 (3, 'orbital_period', 'REAL', 0, None, 0),
 (4, 'avg_temp', 'REAL', 0, None, 0),
 (5, 'date_added', 'TEXT', 0, None, 0),
 (6, 'eccentricity', 'REAL', 0, None, 0),
 (7, 'atmosphere', 'TEXT', 0, None, 0)]

A database entry in `EXOPLANETS` contains the following info:

0. `_id`: A UUID corresponding to the planet.
1. `distance`: Distance from Earth, in lightyears.
2. `g`: Surface gravity as a multiple of $g$, the gravitational force constant.
3. `orbital_period`: Length of a single orbital cycle in days.
4. `avg_temp`: Average surface temperature in degrees Kelvin.
5. `date_added`: The date our system discovered the planet and added it automatically to our databases.
6. `eccentricity`: The [orbital eccentricity](https://en.wikipedia.org/wiki/Orbital_eccentricity) of the planet about its host star.
7. `atmosphere`: The dominant chemical makeup of the planet's atmosphere.

Note that one or more of `distance`, `g`, `orbital_period`, `avg_temp`, `eccentricity`, and `atmosphere` may be `NULL` for a given planet as a result of missing or erroneous data.

In [7]:
pd.read_sql_query("SELECT * FROM EXOPLANETS LIMIT 10", conn)

Unnamed: 0,_id,distance,g,orbital_period,avg_temp,date_added,eccentricity,atmosphere
0,c168b188-ef0c-4d6a-8cb2-f473d4154bdb,34.627304,,476.480044,,2020-01-01,,
1,e7b56e84-41f4-4e62-b078-01b076cea369,110.19692,2.525074,839.837817,,2020-01-01,,
2,a27030a0-e4b4-4bd7-8d24-5435ed86b395,26.695795,10.276497,301.018816,,2020-01-01,,
3,54f9cf85-eae9-4f29-b665-855357a14375,54.888352,,173.788968,328.644125,2020-01-01,,
4,4d06ec88-f5c8-4d03-91ef-7493a12cd89e,153.264217,0.922875,200.712662,,2020-01-01,,
5,e16250b8-2d9d-49f3-aaef-58eed9a8864c,7.454811,5.503701,763.56171,245.129285,2020-01-01,,
6,a0a6bf97-90d5-4686-8ccb-10753f8d335e,4.925946,0.953473,486.053323,267.786557,2020-01-01,,
7,b28b4e19-8517-4ab5-97f0-c445f1aae6c4,94.540173,7.118254,629.287426,368.859206,2020-01-01,,
8,7e34e44e-663f-491c-96c5-bb5acb8d5f1e,19.786255,3.999081,744.106326,180.445029,2020-01-01,,
9,305e8ea0-663b-4311-b6b3-4198c051c335,95.65403,0.677212,472.344447,,2020-01-01,,


## 2. Exercise: Visualizing Freshness

Grouping by the `DATE_ADDED` column can give us insight into how `EXOPLANETS` updates daily. For example, we can query for the number of new IDs added per day:

In [39]:
SQL = """
SELECT
    DATE_ADDED,
    COUNT(*) AS ROWS_ADDED
FROM
    EXOPLANETS
GROUP BY
    DATE_ADDED
"""

In [40]:
rows_added = pd.read_sql_query(SQL, conn)
rows_added = rows_added.rename(columns={clmn: clmn.lower() for clmn in rows_added.columns})
rows_added = rows_added.set_index("date_added")
rows_added = rows_added.reindex(all_days)

It looks like `EXOPLANETS` typically updates with around 100 new entries each day. Something looks off in a few places, though. We have what we'd call a **freshness** incident -- on a couple of occasions, the table doesn't update at all for 3 or more days. It has "stale" (3+ day old) data.

In [41]:
fig = px.bar(x=all_days, y=rows_added["rows_added"])
fig.update_xaxes(title="Date")
fig.update_yaxes(title="Rows Added")
fig.show()

In this exercise, we'll try writing some `SQL` code that returns timestamps for when freshness incidents occur. Feel free to use the query above as a starting point.

To start, let's just copy the `SQL` statement from above, which gives the count of entries added per day.

In [12]:
# YOUR CODE HERE
SQL = """
SELECT
    DATE_ADDED,
    COUNT(*) AS ROWS_ADDED
FROM
    EXOPLANETS
GROUP BY
    DATE_ADDED
"""
# END YOUR CODE

In [13]:
pd.read_sql_query(SQL, conn).head(5)

Unnamed: 0,date_added,ROWS_ADDED
0,2020-01-01,84
1,2020-01-02,92
2,2020-01-03,101
3,2020-01-04,102
4,2020-01-05,100


Since we've grouped by the `DATE_ADDED` field, we now have one row entry for each day where data came in. As a next step, let's devise a way to compare adjacent dates in our grouped output. For example, in row 1 above, we'd like to know that the previous date (on row 0) was `2021-01-01`.

A great way to compare adjacent rows in SQL is to use the [`LAG` window function](https://www.sqltutorial.org/sql-window-functions/sql-lag/). Also, you can try including our data from above using [SQL's `WITH` prefix](https://modern-sql.com/feature/with).

In [42]:
# YOUR CODE HERE
SQL = """
WITH ROW_COUNTS AS(
    SELECT
        DATE_ADDED,
        COUNT(*) AS ROWS_ADDED
    FROM
        EXOPLANETS
    GROUP BY
        DATE_ADDED
)

SELECT
    DATE_ADDED,
    LAG(DATE_ADDED) OVER(ORDER BY DATE_ADDED) AS LAST_DATE_ADDED,
    ROWS_ADDED
FROM
    ROW_COUNTS;
"""
# END YOUR CODE

In [43]:
pd.read_sql_query(SQL, conn).head(5)

Unnamed: 0,DATE_ADDED,LAST_DATE_ADDED,ROWS_ADDED
0,2020-01-01,,84
1,2020-01-02,2020-01-01,92
2,2020-01-03,2020-01-02,101
3,2020-01-04,2020-01-03,102
4,2020-01-05,2020-01-04,100


The ability to compare adjacent dates is crucial for detecting stale data. Our next step is this: given two adjacent dates, calculate the *difference in days* between those dates. We're answering the question, "How many days old is the previous batch?"

Since we're in SQLite, we can cast our strings into dates with `JULIANDAY()`, and [easily find the difference between them](https://www.w3resource.com/sqlite/sqlite-julianday.php).

In [16]:
# YOUR CODE HERE
SQL = """
WITH ROW_COUNTS AS(
    SELECT
        DATE_ADDED,
        COUNT(*) AS ROWS_ADDED
    FROM
        EXOPLANETS
    GROUP BY
        DATE_ADDED
)

SELECT
    DATE_ADDED,
    LAG(DATE_ADDED) OVER(ORDER BY DATE_ADDED) AS LAST_DATE_ADDED,
    JULIANDAY(DATE_ADDED) - JULIANDAY(LAG(DATE_ADDED) OVER(ORDER BY DATE_ADDED)) AS DAYS_SINCE_LAST_UPDATE,
    ROWS_ADDED
FROM
    ROW_COUNTS;
"""
# END YOUR CODE

In [17]:
pd.read_sql_query(SQL, conn).head(5)

Unnamed: 0,DATE_ADDED,LAST_DATE_ADDED,DAYS_SINCE_LAST_UPDATE,ROWS_ADDED
0,2020-01-01,,,84
1,2020-01-02,2020-01-01,1.0,92
2,2020-01-03,2020-01-02,1.0,101
3,2020-01-04,2020-01-03,1.0,102
4,2020-01-05,2020-01-04,1.0,100


Now, we're basically all the way there. Recall that our original task was to identify **freshness incidents** -- that is, dates where the previous data entry is more than 1 day old. After adding another `WITH` statement and a `WHERE` clause, our query should be able to do just that.

In [58]:
# YOUR CODE HERE
SQL = """
WITH RC_UPDATES AS(
    WITH ROW_COUNTS AS(
        SELECT
            DATE_ADDED,
            COUNT(*) AS ROWS_ADDED
        FROM
            EXOPLANETS
        GROUP BY
            DATE_ADDED
    )

    SELECT
        DATE_ADDED,
        LAG(DATE_ADDED) OVER(ORDER BY DATE_ADDED) AS LAST_DATE_ADDED,
        JULIANDAY(DATE_ADDED) - JULIANDAY(LAG(DATE_ADDED) OVER(ORDER BY DATE_ADDED)) AS DAYS_SINCE_LAST_UPDATE,
        ROWS_ADDED
    FROM
        ROW_COUNTS
)

SELECT
    *
FROM
    RC_UPDATES
WHERE
    DAYS_SINCE_LAST_UPDATE > 1;
"""
# END YOUR CODE

In [59]:
detections = pd.read_sql_query(SQL, conn)
detections

Unnamed: 0,DATE_ADDED,LAST_DATE_ADDED,DAYS_SINCE_LAST_UPDATE,ROWS_ADDED
0,2020-02-08,2020-01-31,8.0,83
1,2020-03-30,2020-03-26,4.0,117
2,2020-05-14,2020-05-06,8.0,103
3,2020-06-07,2020-06-04,3.0,82
4,2020-06-17,2020-06-12,5.0,87
5,2020-06-30,2020-06-27,3.0,86
6,2020-08-10,2020-08-05,5.0,97
7,2020-08-29,2020-08-24,5.0,100


We've built a detector for **freshness incidents**, a key part of any data observability solution. With the following code, you can visualize your detections along with the update data.

In [60]:
fig = px.bar(x=all_days, y=rows_added["rows_added"])
fig.update_xaxes(title="Date")
fig.update_yaxes(title="Rows Added")

for _, row in detections.iterrows():
    fig.add_vline(x=row['DATE_ADDED'], line_color='red')
fig.show()

## 3. Exercise: Understanding Schema Change

Thankfully, we have been recording historical `table_info` on the `EXOPLANETS` table and collecting the results in a table called `EXOPLANETS_SCHEMA`, updated daily.

In [61]:
c.execute("PRAGMA table_info(EXOPLANETS_SCHEMA);")
c.fetchall()

[(0, 'date', 'TEXT', 0, None, 0), (1, 'schema', 'TEXT', 0, None, 0)]

Querying the very beginning and end of `EXOPLANETS_SCHEMA`'s data reflects that `EXOPLANETS`'s metadata has changed since January 2020:

In [62]:
exoplanets_schema_df = pd.read_sql_query("SELECT * FROM EXOPLANETS_SCHEMA", conn)
print("Was:    " + exoplanets_schema_df.iloc[0]["schema"])
print("Is now: " + exoplanets_schema_df.iloc[-1]["schema"])

Was:    [
    (0, '_id', 'TEXT', 0, None, 0),
    (1, 'distance', 'REAL', 0, None, 0),
    (2, 'g', 'REAL', 0, None, 0),
    (3, 'orbital_period', 'REAL', 0, None, 0),
    (4, 'avg_temp', 'REAL', 0, None, 0),
    (5, 'date_added', 'TEXT', 0, None, 0)
  ]
Is now: [
    (0, '_id', 'TEXT', 0, None, 0),
    (1, 'distance', 'REAL', 0, None, 0),
    (2, 'g', 'REAL', 0, None, 0),
    (3, 'orbital_period', 'REAL', 0, None, 0),
    (4, 'avg_temp', 'REAL', 0, None, 0),
    (5, 'date_added', 'TEXT', 0, None, 0),
    (6, 'eccentricity', 'REAL', 0, None, 0),
    (7, 'atmosphere', 'TEXT', 0, None, 0)
  ]


When exactly did `EXOPLANETS` start recording new data? The metadata in `EXOPLANETS_SCHEMA` should tell us. See if you can write a SQL query that returns the date(s) the schema changed.

To start, here's a query that returns each date and schema in the table:

In [63]:
SQL = """
SELECT
    DATE,
    SCHEMA
FROM
    EXOPLANETS_SCHEMA
"""

In [64]:
pd.read_sql_query(SQL, conn).head(5)

Unnamed: 0,date,schema
0,2020-01-01,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."
1,2020-01-02,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."
2,2020-01-03,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."
3,2020-01-04,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."
4,2020-01-05,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."


Using this query, and the trick with `LAG()` we utilized in the last notebook, try to get a table comparing a date's schema to the schema from the day prior.

In [65]:
# YOUR CODE HERE
SQL = """
SELECT
    DATE,
    SCHEMA,
    LAG(SCHEMA) OVER(ORDER BY DATE) AS PAST_SCHEMA
FROM
    EXOPLANETS_SCHEMA;
"""
# END YOUR CODE

In [66]:
pd.read_sql_query(SQL, conn).head(5)

Unnamed: 0,date,schema,PAST_SCHEMA
0,2020-01-01,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1...",
1,2020-01-02,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1...","[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."
2,2020-01-03,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1...","[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."
3,2020-01-04,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1...","[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."
4,2020-01-05,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1...","[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."


Next, let's simply add a `WITH` statement like we did before, and return all of the rows where `SCHEMA` and `PAST_SCHEMA` are different!

In [67]:
# YOUR CODE HERE
SQL = """
WITH CHANGES AS(
    SELECT
        DATE,
        SCHEMA,
        LAG(SCHEMA) OVER(ORDER BY DATE) AS PAST_SCHEMA
    FROM
        EXOPLANETS_SCHEMA
)

SELECT
    *
FROM
    CHANGES
WHERE
    SCHEMA != PAST_SCHEMA;
"""
# END YOUR CODE

In [68]:
pd.read_sql_query(SQL, conn).head(5)

Unnamed: 0,DATE,SCHEMA,PAST_SCHEMA
0,2020-07-19,"[\n (0, '_id', 'TEXT', 0, None, 0),\n (1...","[\n (0, '_id', 'TEXT', 0, None, 0),\n (1..."


A correct implementation should show a single date, **2020-07-19**. If you got that, nice work! This date will come in handy later, so keep it in mind.
___

## `HABITABLES`

Now, we want to involve another table in our DB. `HABITABLES` records information about the habitability of exoplanets we've discovered. This table takes data from `EXOPLANETS` and other upstream tables and transforms it to produce a `habitability` index: a real number between 0 and 1 indicating how likely the planet is to harbor life.

In [29]:
c.execute("PRAGMA TABLE_INFO(HABITABLES);")
c.fetchall()

[(0, '_id', 'TEXT', 0, None, 0),
 (1, 'perihelion', 'REAL', 0, None, 0),
 (2, 'aphelion', 'REAL', 0, None, 0),
 (3, 'atmosphere', 'TEXT', 0, None, 0),
 (4, 'habitability', 'REAL', 0, None, 0),
 (5, 'min_temp', 'REAL', 0, None, 0),
 (6, 'max_temp', 'REAL', 0, None, 0),
 (7, 'date_added', 'TEXT', 0, None, 0)]

In [30]:
pd.read_sql_query("SELECT * FROM HABITABLES LIMIT 10", conn)

Unnamed: 0,_id,perihelion,aphelion,atmosphere,habitability,min_temp,max_temp,date_added
0,c168b188-ef0c-4d6a-8cb2-f473d4154bdb,,,,0.29144,,,2020-01-01
1,e7b56e84-41f4-4e62-b078-01b076cea369,,,,0.835647,,,2020-01-01
2,a27030a0-e4b4-4bd7-8d24-5435ed86b395,,,,0.894001,,,2020-01-01
3,54f9cf85-eae9-4f29-b665-855357a14375,,,,0.415902,103.713749,560.180947,2020-01-01
4,4d06ec88-f5c8-4d03-91ef-7493a12cd89e,,,,0.593524,,,2020-01-01
5,e16250b8-2d9d-49f3-aaef-58eed9a8864c,,,,0.392487,119.300295,287.365651,2020-01-01
6,a0a6bf97-90d5-4686-8ccb-10753f8d335e,,,,0.354851,219.467658,463.261262,2020-01-01
7,b28b4e19-8517-4ab5-97f0-c445f1aae6c4,,,,0.518242,294.097443,464.827688,2020-01-01
8,7e34e44e-663f-491c-96c5-bb5acb8d5f1e,,,,0.202482,107.737755,326.815526,2020-01-01
9,305e8ea0-663b-4311-b6b3-4198c051c335,,,,0.426799,,,2020-01-01


## 4. Exercise: Visualizing Distribution Errors
Like in exercise 1, I'll write a quick query assessing a **distributional** feature of the `HABITABILITY` table -- how habitable is the average planet we detect, as a function of the day it was detected?

In [31]:
SQL = """
SELECT
    DATE_ADDED,
    AVG(HABITABILITY) AS AVG_HABITABILITY
FROM
    HABITABLES
GROUP BY
    DATE_ADDED
"""

In [32]:
avg_habitability = pd.read_sql_query(SQL, conn)
avg_habitability = avg_habitability \
    .rename(columns={clmn: clmn.lower() for clmn in avg_habitability.columns})
avg_habitability = avg_habitability.set_index("date_added")
avg_habitability = avg_habitability.reindex(all_days)

In [33]:
fig = px.bar(x=all_days, y=avg_habitability["avg_habitability"])
fig.update_xaxes(title="Date")
fig.update_yaxes(title="AVG(habitability)")
fig.add_vline(x="2020-07-19", line_color='red')
fig.show()

I plotted the date of the schema change, 2020-07-19, in red as a visual aid. Clearly, unless our instruments are malfunctioning, something is wrong! The planets we're adding to the table *after* the schema change seem much less habitable on average. Using a `SQL` query below, see if you can figure out what exactly.

*Hint*: When averages change, it's natural to look for occurrences of unusual values. When is `habitability` NULL, 0, or outside of the range $[0, 1]$? What about other fields in the table that might be related?

Let's try looking at the zero rate to see if anything is unusual.

As a hint, it's easy to find special conditions, like when a metric is equal to `0`, using a SQL [`CASE` statement](https://www.w3schools.com/sql/sql_case.asp). You'll probably want to aggregate all of these `CASE`s, using `SUM()`, and be sure to `CAST(... AS FLOAT)` if you're dividing anything. Try it out!

In [34]:
# YOUR CODE HERE
SQL = """
SELECT
    DATE_ADDED,
    CAST(SUM(CASE WHEN HABITABILITY = 0 THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*) AS ZERO_RATE
FROM
    HABITABLES
GROUP BY
    DATE_ADDED
"""
# END YOUR CODE

In [35]:
zero_rate = pd.read_sql_query(SQL, conn)
zero_rate = zero_rate \
    .rename(columns={clmn: clmn.lower() for clmn in zero_rate.columns})
zero_rate = zero_rate.set_index("date_added")
zero_rate = zero_rate.reindex(all_days)

In [36]:
fig = px.bar(x=all_days, y=zero_rate["zero_rate"])
fig.add_vline(x="2020-07-19", line_color='red')
fig.update_xaxes(title="Date")
fig.update_yaxes(title="Habitability Zero Rate")
fig.show()