In [2]:
import duckdb

In [3]:
duckdb.sql("PRAGMA threads=4;") 

In [4]:
query_create_table = """
CREATE TABLE review_summary (
        business_id VARCHAR PRIMARY KEY,
        stars DOUBLE,
        useful DOUBLE,
        funny DOUBLE,
        cool DOUBLE,
        date date,
        count_review INT
    );
"""
duckdb.sql(query_create_table)

### Initial load and Incremental

In [None]:
query_create_table_temp = "CREATE TEMP TABLE review_temp AS SELECT * FROM read_json_auto('yelp_academic_dataset_review.json');"
duckdb.sql(query_create_table_temp)

In [7]:
# Check metrics after Scenario 1
duckdb.sql("SELECT count(*) AS row_count, SUM(stars) AS sum_stars, SUM(useful) AS sum_useful, SUM(funny) AS sum_funny, SUM(cool) AS sum_cool, MAX(DATE) AS latest_date FROM review_temp").show(max_width=300, max_rows=300)

┌───────────┬────────────┬────────────┬───────────┬──────────┬─────────────────────┐
│ row_count │ sum_stars  │ sum_useful │ sum_funny │ sum_cool │     latest_date     │
│   int64   │   double   │   int128   │  int128   │  int128  │      timestamp      │
├───────────┼────────────┼────────────┼───────────┼──────────┼─────────────────────┤
│   6990280 │ 26203650.0 │    8280748 │   2282743 │  3485476 │ 2022-01-19 19:48:45 │
└───────────┴────────────┴────────────┴───────────┴──────────┴─────────────────────┘



In [8]:
duckdb.sql("SELECT date::date AS date, count(*) AS row_count, SUM(stars) AS sum_stars, SUM(useful) AS sum_useful, SUM(funny) AS sum_funny, SUM(cool) AS sum_cool, MAX(DATE) AS latest_date FROM review_temp GROUP BY date::date").show(max_width=300, max_rows=300)


┌────────────┬───────────┬───────────┬────────────┬───────────┬──────────┬─────────────────────┐
│    date    │ row_count │ sum_stars │ sum_useful │ sum_funny │ sum_cool │     latest_date     │
│    date    │   int64   │  double   │   int128   │  int128   │  int128  │      timestamp      │
├────────────┼───────────┼───────────┼────────────┼───────────┼──────────┼─────────────────────┤
│ 2009-11-08 │       200 │     720.0 │        391 │       151 │      191 │ 2009-11-08 23:51:33 │
│ 2012-09-17 │       939 │    3453.0 │       1716 │       462 │      562 │ 2012-09-17 23:59:32 │
│ 2017-08-01 │      2197 │    8321.0 │       2347 │       530 │      888 │ 2017-08-01 23:59:51 │
│ 2015-12-27 │      2148 │    7824.0 │       2370 │       667 │      866 │ 2015-12-27 23:59:55 │
│ 2012-03-11 │       747 │    2775.0 │       1176 │       278 │      367 │ 2012-03-11 23:55:59 │
│ 2013-02-03 │       893 │    3138.0 │       1451 │       390 │      442 │ 2013-02-03 23:58:42 │
│ 2017-06-22 │      2393 │    

### Insert for initial load

In [9]:
duckdb.sql("""
           INSERT INTO review_summary  (
           SELECT business_id, AVG(stars) AS avg_stars, AVG(useful) AS avg_useful, AVG(funny) AS avg_funny, AVG(cool) AS avg_cool, MAX(DATE) AS latest_date, COUNT(business_id) AS count_review
           FROM review_temp 
           WHERE date BETWEEN '2018-01-01' AND '2019-01-01'
           GROUP BY business_id
           );
           """)


In [11]:
# Check metrics after Scenario 1
duckdb.sql("SELECT count(*) AS row_count, SUM(stars) AS sum_stars, SUM(useful) AS sum_useful, SUM(funny) AS sum_funny, SUM(cool) AS sum_cool, MAX(DATE) AS latest_date FROM review_summary").show(max_width=300, max_rows=300)

┌───────────┬────────────────────┬───────────────────┬────────────────────┬───────────────────┬─────────────┐
│ row_count │     sum_stars      │    sum_useful     │     sum_funny      │     sum_cool      │ latest_date │
│   int64   │       double       │      double       │       double       │      double       │    date     │
├───────────┼────────────────────┼───────────────────┼────────────────────┼───────────────────┼─────────────┤
│    100944 │ 360537.29313858156 │ 111926.7919532337 │ 24863.909522832793 │ 41688.41791418097 │ 2018-12-31  │
└───────────┴────────────────────┴───────────────────┴────────────────────┴───────────────────┴─────────────┘



In [12]:
duckdb.sql("SELECT date::date AS date, count(*) AS row_count, SUM(stars) AS sum_stars, SUM(useful) AS sum_useful, SUM(funny) AS sum_funny, SUM(cool) AS sum_cool, MAX(DATE) AS latest_date FROM review_summary GROUP BY date::date").show(max_width=300, max_rows=300)


┌────────────┬───────────┬────────────────────┬────────────────────┬────────────────────┬────────────────────┬─────────────┐
│    date    │ row_count │     sum_stars      │     sum_useful     │     sum_funny      │      sum_cool      │ latest_date │
│    date    │   int64   │       double       │       double       │       double       │       double       │    date     │
├────────────┼───────────┼────────────────────┼────────────────────┼────────────────────┼────────────────────┼─────────────┤
│ 2018-01-01 │        48 │              140.0 │               51.0 │               19.0 │               30.0 │ 2018-01-01  │
│ 2018-01-02 │        77 │              305.0 │               77.0 │               17.0 │               28.0 │ 2018-01-02  │
│ 2018-01-03 │        57 │              174.0 │              112.0 │               31.0 │               53.0 │ 2018-01-03  │
│ 2018-01-04 │        87 │              290.5 │             116.75 │               14.5 │               26.5 │ 2018-01-04  │


### Incremental mode if 02 Januray in example

In [13]:
duckdb.sql("""
           CREATE TEMP TABLE review_temp_01_02 AS (
           SELECT *
           FROM review_temp 
           WHERE date::date = '2018-01-02'::date - 1);
           -- SHOULD BE THIS IF RUNNING DAILY (current_date - 1)
           """)

### Running Insert Into or Replace

In [107]:
duckdb.sql("""
           WITH aggregate_temp AS (
           SELECT business_id, AVG(stars) AS avg_stars, AVG(useful) AS avg_useful, AVG(funny) AS avg_funny, AVG(cool) AS avg_cool, MAX(DATE) AS latest_date, COUNT(business_id) AS count_review
           FROM review_temp_01_02 
           GROUP BY business_id)

           
           INSERT OR REPLACE INTO review_summary (
           SELECT  
            U.business_id,
            COALESCE(((T.count_review * T.avg_stars) + (U.count_review * U.avg_stars)) / (U.count_review + T.count_review), U.avg_stars) AS avg_stars,
            COALESCE(((T.count_review * T.avg_useful) + (U.count_review * U.avg_useful)) / (U.count_review + T.count_review), U.avg_useful) AS avg_useful,
            COALESCE(((T.count_review * T.avg_funny) + (U.count_review * U.avg_funny)) / (U.count_review + T.count_review), U.avg_funny) AS avg_funny,
            COALESCE(((T.count_review * T.avg_cool) + (U.count_review * U.avg_cool)) / (U.count_review + T.count_review), U.avg_cool) AS avg_cool,
            GREATEST(U.latest_date, T.latest_date) as latest_date,
            COALESCE(T.count_review + U.count_review, U.count_review) AS count_review
           FROM aggregate_temp U
           LEFT JOIN review_summary T ON U.business_id = T.business_id);
           """)

In [15]:
# Scenario 2: First Update

# Check metrics after Scenario 2
duckdb.sql("SELECT count(*) AS row_count, SUM(stars) AS sum_stars, SUM(useful) AS sum_useful, SUM(funny) AS sum_funny, SUM(cool) AS sum_cool, MAX(DATE) AS latest_date FROM review_summary").show(max_width=300, max_rows=300)

duckdb.sql("SELECT date::date AS date, count(*) AS row_count, SUM(stars) AS sum_stars, SUM(useful) AS sum_useful, SUM(funny) AS sum_funny, SUM(cool) AS sum_cool, MAX(DATE) AS latest_date FROM review_summary GROUP BY date::date").show(max_width=300, max_rows=300)

┌───────────┬────────────────────┬───────────────────┬────────────────────┬───────────────────┬─────────────┐
│ row_count │     sum_stars      │    sum_useful     │     sum_funny      │     sum_cool      │ latest_date │
│   int64   │       double       │      double       │       double       │      double       │    date     │
├───────────┼────────────────────┼───────────────────┼────────────────────┼───────────────────┼─────────────┤
│    100944 │ 360537.29313858156 │ 111926.7919532337 │ 24863.909522832793 │ 41688.41791418097 │ 2018-12-31  │
└───────────┴────────────────────┴───────────────────┴────────────────────┴───────────────────┴─────────────┘

┌────────────┬───────────┬────────────────────┬────────────────────┬────────────────────┬────────────────────┬─────────────┐
│    date    │ row_count │     sum_stars      │     sum_useful     │     sum_funny      │      sum_cool      │ latest_date │
│    date    │   int64   │       double       │       double       │       double       │

-------------------------------------------

### Backfill Example using business_id not in

In [24]:
duckdb.sql("""
           CREATE TEMP TABLE review_temp_backfill AS (
           SELECT *
           FROM review_temp 
           WHERE business_id not in (SELECT business_id FROM review_summary)
           );
           """)

In [27]:
duckdb.sql("""
           WITH aggregate_temp AS (
           SELECT business_id, AVG(stars) AS stars, AVG(useful) AS useful, AVG(funny) AS funny, AVG(cool) AS cool, MAX(DATE) AS date, COUNT(business_id) AS count_review
           FROM review_temp_backfill 
           GROUP BY business_id)

           
           INSERT OR REPLACE INTO review_summary (
           SELECT  
            U.business_id,
            COALESCE(((T.count_review * T.stars) + (U.count_review * U.stars)) / (U.count_review + T.count_review), U.stars) AS avg_stars,
            COALESCE(((T.count_review * T.useful) + (U.count_review * U.useful)) / (U.count_review + T.count_review), U.useful) AS avg_useful,
            COALESCE(((T.count_review * T.funny) + (U.count_review * U.funny)) / (U.count_review + T.count_review), U.funny) AS avg_funny,
            COALESCE(((T.count_review * T.cool) + (U.count_review * U.cool)) / (U.count_review + T.count_review), U.cool) AS avg_cool,
            GREATEST(U.date, T.date) as date,
            COALESCE(T.count_review + U.count_review, U.count_review) AS count_review
           FROM aggregate_temp U
           LEFT JOIN review_summary T ON U.business_id = T.business_id);
           """)

In [28]:
# Scenario 3: Delayed Update

# Check metrics after Scenario 3
duckdb.sql("SELECT count(*) AS row_count, SUM(stars) AS sum_stars, SUM(useful) AS sum_useful, SUM(funny) AS sum_funny, SUM(cool) AS sum_cool, MAX(DATE) AS latest_date FROM review_summary").show(max_width=300, max_rows=300)

duckdb.sql("SELECT date::date AS date, count(*) AS row_count, SUM(stars) AS sum_stars, SUM(useful) AS sum_useful, SUM(funny) AS sum_funny, SUM(cool) AS sum_cool, MAX(DATE) AS latest_date FROM review_summary GROUP BY date::date").show(max_width=300, max_rows=300)

┌───────────┬──────────────────┬───────────────────┬───────────────────┬───────────────────┬─────────────┐
│ row_count │    sum_stars     │    sum_useful     │     sum_funny     │     sum_cool      │ latest_date │
│   int64   │      double      │      double       │      double       │      double       │    date     │
├───────────┼──────────────────┼───────────────────┼───────────────────┼───────────────────┼─────────────┤
│    150346 │ 539105.779781777 │ 181766.4680472459 │ 43126.58978895043 │ 67348.41933995378 │ 2022-01-19  │
└───────────┴──────────────────┴───────────────────┴───────────────────┴───────────────────┴─────────────┘

┌────────────┬───────────┬────────────────────┬─────────────────────┬─────────────────────┬─────────────────────┬─────────────┐
│    date    │ row_count │     sum_stars      │     sum_useful      │      sum_funny      │      sum_cool       │ latest_date │
│    date    │   int64   │       double       │       double        │       double        │       dou

### Successfully to run code