# Change Data Capture




#### This notebook implements the CDC process for the Air_Carrier table. Here are the steps in the process:  

1.   Make a copy of the CSV file so we can revert back if needed `air_carriers_copy.csv`

2.   Simulate changes (inserts, updates, and deletes) by manually altering `air_carriers.csv`:

**Inserts**: Added 3 records to the file:<br>
`{30001,"Centrum Air: CAX"}` <br>
`{30002,"City Airlines: CAY"}` <br>
`{30003,"Greater Bay Airlines: GBY"}` <br>

**Updates**: removed the substrings `(1)` and `(2)` from all the records in the file by doing a find/replace.<br>

**Deletes**: removed the four records starting with the substring `Desert`:<br>
`{19174,"Desert Pacific Airways: DPA"}` <br>
`{19175,"Desert Pacific Airlines Inc.: DPI"}` <br>
`{19176,"Desert Airlines: DST"}` <br>
`{19618,"Desert Sun Airlines: DSA"}` <br><br>

4.   Rename `air_carriers.csv` to `air_carriers_032324.csv`, to indicate the date in which the changes arrived.

5.   Make a new folder in the GCS bucket called `incrementals`.

6.   Copy `air_carriers_032324.csv` into our `incrementals` folder in GCS.

7.   Make a copy of the raw table in case we need to revert back to it (`air_carriers_copy`).

8.   Make a new dataset in BigQuery to hold temporary tables (`airline_tmp`).

9.   Load `air_carriers_032324.csv` into a temp table (`airline_tmp.air_carriers`).

10.   Detect the deltas between `airline_tmp.air_carriers` and `airline_raw.air_carriers`:
- If the new record is the same as the old one, ignore it because it means that it's unchanged.
- If the new record is different from the old record, update the old record in the raw table such that it matches the new record in the temp table.
- If the old record in the raw table does not have a corresponding new record in the temp table, delete it from the raw table.  

11.   Make a copy of staging table (`airline_stg.Air_Carrier_copy`) so we can use it for comparison.

12.   Re-create the staging table `airline_stg.Air_Carrier` by applying the same cleansing, modeling, and validation logic as before. The logic for the staging layer remains the same.

13.   Make a copy of target table (`airline_csp.Air_Carrier_copy`) so we can use it for comparison.

14.  Once the staging table is ready, merge it into the target table (`airline_csp.Air_Carrier`) as follows:

- If the record in staging is the same as the record in the target table, ignore it.
- If the record in staging is different from the record in the target table, set the `discontinue_time` of the existing record in the target table and insert the new record into the target table. The `effective_time` of the new record should be equal to the current timestamp and the `discontinue_time` of the old record should be equal to the current timestamp - 1 second.
-If a record in the target table does not have a record in staging, set its `discontinue_time` to current timestamp - 1 second.  

15.  Drop the temp table (`airline_tmp.air_carriers`). The next time we process an incremental, a new temp table will be created.

The rest of this notebook implements the steps 7-13. Note: steps 1-6 were done by hand.  

### Create backup of the raw table (step 7)

### **Don't re-run this cell after mutating the raw table.**

In [None]:
%%bigquery
create or replace table airline_raw.air_carriers_copy as
  select * from airline_raw.air_carriers

Query is running:   0%|          |

### Create the temp dataset (step 8)

In [None]:
%%bigquery
create schema if not exists airline_tmp
  options(location="us")

Query is running:   0%|          |

### Create and populate temp table (step 9)

In [None]:
from google.cloud import storage
from google.cloud import bigquery

project_id = "cs329e-sp2024"
bucket_name = "cs329e-open-access"
folder_name = "incrementals"
dataset_name = "airline_tmp"
region = "us"
file_name = 'air_carriers_032324.csv'
table_name = 'air_carriers'

storage_client = storage.Client()
bq_client = bigquery.Client()

schema = [
  bigquery.SchemaField("code", "INTEGER", mode="REQUIRED"),
  bigquery.SchemaField("description", "STRING", mode="REQUIRED"),
  bigquery.SchemaField("load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

uri = "gs://{}/{}/{}".format(bucket_name, folder_name, file_name)
table_id = "{}.{}.{}".format(project_id, dataset_name, table_name)

table = bigquery.Table(table_id, schema=schema)
table = bq_client.create_table(table, exists_ok=True)
print("Created table {}".format(table.table_id))

# remove the load_time field from the schema before loading the data,
# the load_time value will be auto-generated
del schema[-1]

job_config = bigquery.LoadJobConfig(
      schema=schema,
      skip_leading_rows=1,
      source_format=bigquery.SourceFormat.CSV,
      write_disposition="WRITE_TRUNCATE",
      field_delimiter=","
    )

load_job = bq_client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result()

destination_table = bq_client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))


Created table air_carriers
Loaded 1655 rows.


### Detect the deltas and refresh the raw table (step 10)

#### Identify the deltas

In [None]:
%%bigquery
select t.code, t.description as new_description, r.description as old_description
from airline_tmp.air_carriers t full join airline_raw.air_carriers r on t.code = r.code
where t.description != r.description
or t.description is null or r.description is null
order by t.code

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,code,new_description,old_description
0,,,Desert Pacific Airlines Inc.: DPI
1,,,Desert Sun Airlines: DSA
2,,,Desert Airlines: DST
3,,,Desert Pacific Airways: DPA
4,19071,Antilles Air Boats Inc.: AAB,Antilles Air Boats Inc.: AAB (1)
...,...,...,...
111,20440,Lineas Aereas Del Caribe: LC,Lineas Aereas Del Caribe: LC (1)
112,20441,Legend Airlines: LC,Legend Airlines: LC (2)
113,30001,Centrum Air: CAX,
114,30002,City Airlines: CAY,


#### Process new records (inserts)

In [None]:
%%bigquery
select (select count(*) from airline_tmp.air_carriers) as tmp_count,
(select count(*) from airline_raw.air_carriers) as raw_count

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,tmp_count,raw_count
0,1655,1656


In [None]:
%%bigquery
insert into airline_raw.air_carriers(code, description, load_time)
  (select *
  from airline_tmp.air_carriers
  where code not in (select code from airline_raw.air_carriers))

Query is running:   0%|          |

In [None]:
%%bigquery
select (select count(*) from airline_tmp.air_carriers) as tmp_count,
(select count(*) from airline_raw.air_carriers) as raw_count

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,tmp_count,raw_count
0,1655,1659


#### Process the changed records (updates)

In [None]:
%%bigquery
select t.code, t.description as new_description, r.description as old_description
from airline_tmp.air_carriers t join airline_raw.air_carriers r on t.code = r.code
where t.description != r.description
order by t.code

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,code,new_description,old_description
0,19071,Antilles Air Boats Inc.: AAB,Antilles Air Boats Inc.: AAB (1)
1,19083,Air Central Inc. : ACT,Air Central Inc. (1): ACT
2,19385,Reeve Aleutian Airways Inc.: RV,Reeve Aleutian Airways Inc.: RV (1)
3,19391,Pacific Southwest Airlines: PS,Pacific Southwest Airlines: PS (1)
4,19395,Mid-South Aviation Inc. : RCA,Mid-South Aviation Inc. (1): RCA
...,...,...,...
104,20425,Viscount Air Service Inc.: VIQ,Viscount Air Service Inc.: VIQ (1)
105,20435,Frontier Airlines Inc. : FL,Frontier Airlines Inc. (1): FL (1)
106,20438,Air Club International: HB,Air Club International: HB (1)
107,20440,Lineas Aereas Del Caribe: LC,Lineas Aereas Del Caribe: LC (1)


In [None]:
%%bigquery
update airline_raw.air_carriers r
  set r.description = (select t.description from airline_tmp.air_carriers t where t.code = r.code),
  r.load_time = (select t.load_time from airline_tmp.air_carriers t where t.code = r.code)
  where r.description != (select description from airline_tmp.air_carriers t where t.code = r.code)
  and r.code = (select code from airline_tmp.air_carriers t where t.code = r.code)

Query is running:   0%|          |

In [None]:
%%bigquery
select t.code, t.description as new_description, r.description as old_description
from airline_tmp.air_carriers t join airline_raw.air_carriers r on t.code = r.code
where t.description != r.description
order by t.code

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,code,new_description,old_description


#### Process the deleted records (deletes)

In [None]:
%%bigquery
select r.code as old_code, r.description as old_description, t.code as new_code, t.description as new_description
from airline_tmp.air_carriers t right join airline_raw.air_carriers r on t.code = r.code
where t.code is null
order by r.code

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,old_code,old_description,new_code,new_description
0,19174,Desert Pacific Airways: DPA,,
1,19175,Desert Pacific Airlines Inc.: DPI,,
2,19176,Desert Airlines: DST,,
3,19618,Desert Sun Airlines: DSA,,


In [None]:
%%bigquery
delete from airline_raw.air_carriers r
where r.code not in (select t.code from airline_tmp.air_carriers t)

Query is running:   0%|          |

In [None]:
%%bigquery
select r.code as old_code, r.description as old_description, t.code as new_code, t.description as new_description
from airline_tmp.air_carriers t right join airline_raw.air_carriers r on t.code = r.code
where t.code is null
order by r.code

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,old_code,old_description,new_code,new_description


In [None]:
%%bigquery
select * from airline_raw.air_carriers
order by load_time desc

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,code,description,load_time
0,30001,Centrum Air: CAX,2024-03-23 18:15:04.101035+00:00
1,30003,Greater Bay Airlines: GBY,2024-03-23 18:15:04.101035+00:00
2,30002,City Airlines: CAY,2024-03-23 18:15:04.101035+00:00
3,20220,Air 21: A7,2024-03-23 18:15:04.101035+00:00
4,20393,Business Express: HQ,2024-03-23 18:15:04.101035+00:00
...,...,...,...
1650,19849,Tri Air Freight: TAF,2024-01-26 22:23:22.051288+00:00
1651,21205,"Inter Island Airways, d/b/a Inter Island Air: 0GQ",2024-01-26 22:23:22.051288+00:00
1652,19312,Air Cortez International: SAS,2024-01-26 22:23:22.051288+00:00
1653,20379,Zantop International: ZKQ,2024-01-26 22:23:22.051288+00:00


## Create copy of staging table (step 11)

**Don't re-run this cell after mutating the staging table.**

In [None]:
%%bigquery
create or replace table airline_stg.Air_Carrier_copy as
  select * from airline_stg.Air_Carrier

Query is running:   0%|          |

## Re-create the staging table (step 12)

#### **Note: This logic is the same as before (and was copied from previous notebooks).**

---



In [None]:
%%bigquery
create or replace table airline_stg.Air_Carrier as
  select airline_id, description_array[0] as airline_name, description_array[1] as airline_code, 'bird' as data_source, load_time
  from
  (select code as airline_id, split(description, ':') as description_array, load_time
  from airline_raw.air_carriers)

Query is running:   0%|          |

In [None]:
%%bigquery
select (select count(*) from airline_raw.air_carriers) as raw_count,
(select count(*) from airline_stg.Air_Carrier) as staging_count

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,raw_count,staging_count
0,1655,1655


In [None]:
%%bigquery
alter table airline_stg.Air_Carrier
  add primary key (airline_id) not enforced;

Query is running:   0%|          |

In [None]:
%%bigquery
select airline_id, count(*) duplicate_records
from airline_stg.Air_Carrier
group by airline_id
having count(*) > 1
order by count(*) desc

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,airline_id,duplicate_records


## Create copy of target table (step 13)

**Don't re-run this cell after mutating the target table.**

In [None]:
%%bigquery
create or replace table airline_csp.Air_Carrier_copy as
  select * from airline_csp.Air_Carrier

Query is running:   0%|          |

## Merge staging into target table (step 14)

In [None]:
%%bigquery
select count(*) as num_records from airline_csp.Air_Carrier

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,num_records
0,1656


In [None]:
%%bigquery
select distinct effective_time, discontinue_time, status_flag
from airline_csp.Air_Carrier

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,effective_time,discontinue_time,status_flag
0,2024-02-17 16:27:34.248933+00:00,NaT,True


In [None]:
%%bigquery
merge airline_csp.Air_Carrier t
using airline_stg.Air_Carrier s
on t.airline_id = s.airline_id
when matched and s.airline_name != t.airline_name or s.airline_code != t.airline_code then
  update set discontinue_time = timestamp_sub(current_timestamp(), interval 1 second), status_flag = false
  insert (airline_id, airline_name, airline_code, data_source, load_time, effective_time, status_flag)
    values (s.airline_id, s.airline_name, s.airline_code, s.data_source, s.load_time, current_timestamp(), true)
when not matched by source then
  update set discontinue_time = current_timestamp(), status_flag = false
when not matched by target then
  insert (airline_id, airline_name, airline_code, data_source, load_time, effective_time, status_flag)
    values (s.airline_id, s.airline_name, s.airline_code, s.data_source, s.load_time, current_timestamp(), true)

Executing query with job ID: e2786a28-e71e-4306-b31d-8ba55449520d
Query executing: 0.24s


ERROR:
 400 Syntax error: Expected end of input but got keyword INSERT at [6:3]

Location: US
Job ID: e2786a28-e71e-4306-b31d-8ba55449520d



##### Unfortunately, BigQuery's merge function does not allow us to place an insert statement inside the matched clause. As a result, we will split up the merge operation into two parts.

In [None]:
%%bigquery
merge airline_csp.Air_Carrier t
using airline_stg.Air_Carrier s
on t.airline_id = s.airline_id
-- handle deleted records
when not matched by source then
  update set discontinue_time = current_timestamp(), status_flag = false
-- handle new records
when not matched by target then
  insert (airline_id, airline_name, airline_code, data_source, load_time, effective_time, status_flag)
  values (s.airline_id, s.airline_name, s.airline_code, s.data_source, s.load_time, current_timestamp(), true)

Query is running:   0%|          |

In [None]:
%%bigquery
select distinct effective_time, discontinue_time, status_flag
from airline_csp.Air_Carrier

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,effective_time,discontinue_time,status_flag
0,2024-03-24 00:56:51.003500+00:00,NaT,True
1,2024-02-17 16:27:34.248933+00:00,NaT,True
2,2024-02-17 16:27:34.248933+00:00,2024-03-24 00:56:51.003500+00:00,False


##### Now handle the updated records

In [None]:
%%bigquery
select s.*
from airline_csp.Air_Carrier t join airline_stg.Air_Carrier s
on t.airline_id = s.airline_id
where s.airline_name != t.airline_name
or s.airline_code != t.airline_code

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,airline_id,airline_name,airline_code,data_source,load_time
0,20214,Avia Leasing Company,AD,bird,2024-03-23 18:15:04.101035+00:00
1,19395,Mid-South Aviation Inc.,RCA,bird,2024-03-23 18:15:04.101035+00:00
2,19864,Transamerica Airlines Inc.,TV,bird,2024-03-23 18:15:04.101035+00:00
3,20187,The Hawaii Express Inc.,LP,bird,2024-03-23 18:15:04.101035+00:00
4,20393,Business Express,HQ,bird,2024-03-23 18:15:04.101035+00:00
...,...,...,...,...,...
104,20329,Skystar International Inc.,7S,bird,2024-03-23 18:15:04.101035+00:00
105,20235,Conner Air Lines Inc.,4S,bird,2024-03-23 18:15:04.101035+00:00
106,19901,Blue Bell Inc.,BUQ,bird,2024-03-23 18:15:04.101035+00:00
107,19974,Aero Uruguay,ROQ,bird,2024-03-23 18:15:04.101035+00:00


In [None]:
%%bigquery
declare current_ts TIMESTAMP;
set current_ts = current_timestamp();

create temp table updates as
  select s.*
  from airline_csp.Air_Carrier t join airline_stg.Air_Carrier s
  on t.airline_id = s.airline_id
  where s.airline_name != t.airline_name
  or s.airline_code != t.airline_code;

update airline_csp.Air_Carrier
set discontinue_time = timestamp_sub(current_ts, interval 1 second), status_flag = false
where airline_id in (select airline_id from updates);

insert into airline_csp.Air_Carrier
  (airline_id, airline_name, airline_code, data_source, load_time, effective_time, status_flag)
    (select airline_id, airline_name, airline_code, data_source, load_time, current_ts, true
    from updates);

Query is running:   0%|          |

In [None]:
%%bigquery
select distinct effective_time, discontinue_time, status_flag
from airline_csp.Air_Carrier

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,effective_time,discontinue_time,status_flag
0,2024-03-24 00:56:51.003500+00:00,NaT,True
1,2024-02-17 16:27:34.248933+00:00,NaT,True
2,2024-02-17 16:27:34.248933+00:00,2024-03-24 00:56:51.003500+00:00,False
3,2024-02-17 16:27:34.248933+00:00,2024-03-24 00:57:45.185715+00:00,False
4,2024-03-24 00:57:46.185715+00:00,NaT,True


In [None]:
%%bigquery
select * from airline_csp.Air_Carrier
where airline_id = 20054

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,airline_id,airline_name,airline_code,data_source,load_time,effective_time,discontinue_time,status_flag
0,20054,Gulf Air Transport Inc.,GF (1),bird,2024-01-26 22:23:22.051288+00:00,2024-02-17 16:27:34.248933+00:00,2024-03-24 00:57:45.185715+00:00,False
1,20054,Gulf Air Transport Inc.,GF,bird,2024-03-23 18:15:04.101035+00:00,2024-03-24 00:57:46.185715+00:00,NaT,True


## Drop the temp table

In [None]:
%%bigquery
drop table airline_tmp.air_carriers

Query is running:   0%|          |