#Entity Join

In this notebook, we connect Parts and Transactions to create the Parts_Purchased entity.

In [6]:
%%bigquery
select *
from retails_stg.Parts
limit 5

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,p_partkey,size_description,material,p_size,p_brand,p_name,p_container,p_mfgr,p_retailprice,p_comment,data_source,load_time
0,84151,ECONOMY ANODIZED,NICKEL,3,Brand#11,lavender papaya chiffon magenta turquoise,LG BAG,Manufacturer#1,1135.15,slyly unusual instru,bird,2024-02-06 18:09:51.654365+00:00
1,71794,SMALL BURNISHED,TIN,6,Brand#11,cyan medium forest sienna rose,LG BAG,Manufacturer#1,1765.79,regular,bird,2024-02-06 18:09:51.654365+00:00
2,77224,PROMO ANODIZED,COPPER,9,Brand#11,purple magenta misty sienna white,LG BAG,Manufacturer#1,1201.22,doggedly final packag,bird,2024-02-06 18:09:51.654365+00:00
3,82043,PROMO PLATED,STEEL,26,Brand#11,salmon misty turquoise sandy rosy,LG BAG,Manufacturer#1,1025.04,blithely ir,bird,2024-02-06 18:09:51.654365+00:00
4,129293,SMALL BRUSHED,BRASS,39,Brand#11,coral antique lime misty cornflower,LG BAG,Manufacturer#1,1322.29,excuses hag,bird,2024-02-06 18:09:51.654365+00:00


In [7]:
%%bigquery
select *
from retails_stg.Transactions
limit 5

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,Transaction_ID,date,Day,Month,Year,store_nbr,family,sales_amount,on_promotion,data_source,load_time
0,1065,2013-01-01,1,1,2013,39,DELI,0.0,False,kaggle,2024-02-10 18:23:57.714487+00:00
1,1692,2013-01-01,1,1,2013,7,DELI,0.0,False,kaggle,2024-02-10 18:23:57.714487+00:00
2,108,2013-01-01,1,1,2013,12,DELI,0.0,False,kaggle,2024-02-10 18:23:57.714487+00:00
3,999,2013-01-01,1,1,2013,37,DELI,0.0,False,kaggle,2024-02-10 18:23:57.714487+00:00
4,2583,2013-01-02,2,1,2013,31,DELI,178.85,False,kaggle,2024-02-10 18:23:57.714487+00:00


##Generic functions

These functions will be used by Parts_Purchased:



In [11]:
import json, datetime
from google.cloud import bigquery
bq_client = bigquery.Client()

def serialize_datetime(obj):
    if isinstance(obj, datetime.datetime):
        return obj.isoformat()
    raise TypeError("Type not serializable")

# removes the entries from the dictionary whose values are None
# this filter is needed for loading JSON into BQ
def remove_none_values(record):
    filtered_record = {}
    for field in record.keys():
        if record[field] != None:
            filtered_record[field] = record[field]
    return filtered_record


def create_table(table_id, schema):

    table = bigquery.Table(table_id, schema=schema)
    table = bq_client.create_table(table, exists_ok=True)
    print("Created table {}".format(table.table_id))


def load_records(table_id, schema, records):

    bq_client = bigquery.Client()

    # load records into staging table
    job_config = bigquery.LoadJobConfig(schema=schema, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, write_disposition='WRITE_TRUNCATE')
    table_ref = bigquery.table.TableReference.from_string(target_table_id)

    try:
        load_job = bq_client.load_table_from_json(records, table_ref, job_config=job_config)
        load_job.result()

        destination_table = bq_client.get_table(target_table_id)
        print("Loaded {} rows.".format(destination_table.num_rows))

        if load_job.errors:
            print('job errors:', load_job.errors)


    except Exception as e:
        print("Error inserting into BQ: {}".format(e))

##Parts_Purchased

Create the Parts_Purchased table and populate it with some part records:

Note: the part records don't contain a broad category, so for now the X will be empty.

In [13]:
from google.cloud import bigquery
bq_client = bigquery.Client()

pp_records = []
part_ids = []

project_id = "automated-style-411721"
stg_dataset_name = "retails_stg"
stg_table_name = "Parts_Purchased"
target_table_id = "{}.{}.{}".format(project_id, stg_dataset_name, stg_table_name)

schema = [bigquery.SchemaField("Transaction_ID", "INTEGER", mode="REQUIRED"),
          bigquery.SchemaField("p_partkey", "INTEGER", mode="REQUIRED"),
          bigquery.SchemaField("store_nbr", "INTEGER", mode="NULLABLE"),
          bigquery.SchemaField("part_category", "STRING", mode="NULLABLE"),
          bigquery.SchemaField("data_source", "STRING", mode="REQUIRED"),
          bigquery.SchemaField("load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
          ]

create_table(target_table_id, schema)
del schema[-1]

part_sql = 'select p_partkey from {}.Parts'.format(stg_dataset_name)
query_job = bq_client.query(part_sql)

# add the part ids to a list
for row in query_job:
    part_ids.append(row["p_partkey"])

# get all the big transactions grouped by store number
transaction_sql = '''
select store_nbr, array_agg(Transaction_ID) as Transaction_IDs
from
  (select store_nbr, Transaction_ID
    from
    (select t.Transaction_ID, t.store_nbr
      from retails_stg.Transactions t
      where t.sales_amount >= 100.0
      order by t.date desc)
    group by store_nbr, Transaction_ID
    order by store_nbr)
group by store_nbr
'''

query_job = bq_client.query(transaction_sql)

index = 0

# for each store
for row in query_job:
    store_nbr = row["store_nbr"]
    Transaction_IDs = row["Transaction_IDs"]

    # there are no more parts to allocate
    if index >= len(part_ids):
        break

    p_partkey = part_ids[index]

    # assign a part to all of the stores's "big" transactions
    for transaction_id in Transaction_IDs:
        record = {"Transaction_ID": transaction_id, "p_partkey": p_partkey, "store_nbr": store_nbr, "data_source": "bird_partsdb"}

        filtered_record = remove_none_values(record)

        pp_records.append(filtered_record)

    index += 1

# insert records into the Parts_Purchased table
load_records(target_table_id, schema, pp_records)

Created table Parts_Purchased
Loaded 971628 rows.


In [14]:
%%bigquery
select *
from retails_stg.Parts_Purchased
limit 5

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,Transaction_ID,p_partkey,store_nbr,part_category,data_source,load_time
0,3573,84151,1,,bird_partsdb,2024-02-13 23:22:13.230610+00:00
1,28521,84151,1,,bird_partsdb,2024-02-13 23:22:13.230610+00:00
2,30303,84151,1,,bird_partsdb,2024-02-13 23:22:13.230610+00:00
3,55251,84151,1,,bird_partsdb,2024-02-13 23:22:13.230610+00:00
4,78417,84151,1,,bird_partsdb,2024-02-13 23:22:13.230610+00:00


In [20]:
%%bigquery
select count(*) as num_records
from retails_stg.Parts_Purchased

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,num_records
0,971628


##Primary Key

In [21]:
%%bigquery
alter table retails_stg.Parts_Purchased
  add primary key (Transaction_ID, p_partkey) not enforced

Query is running:   0%|          |

Check for primary key violations:

In [22]:
%%bigquery
select Transaction_ID, p_partkey, count(*) as duplicate_pk
from retails_stg.Parts_Purchased
group by Transaction_ID, p_partkey
having count(*) > 1

Query is running:   0%|          |

Downloading: |          |

Unnamed: 0,Transaction_ID,p_partkey,duplicate_pk


##Foreign Keys

In [26]:
%%bigquery
alter table retails_stg.Parts_Purchased add foreign key (p_partkey)
  references retails_stg.Parts (p_partkey) not enforced

Query is running:   0%|          |

In [30]:
%%bigquery
alter table retails_stg.Parts_Purchased add foreign key (Transaction_ID)
  references retails_stg.Transactions (Transaction_ID) not enforced

Query is running:   0%|          |

Check for foreign key violations:



In [32]:
%%bigquery
select count(*) as orphan_records
from retails_stg.Parts_Purchased
where p_partkey not in (select p_partkey from retails_stg.Parts)

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,orphan_records
0,0


In [33]:
%%bigquery
select count(*) as orphan_records
from retails_stg.Parts_Purchased
where Transaction_ID not in (select Transaction_ID from retails_stg.Transactions)

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,orphan_records
0,0
