In [1]:
from google.cloud import bigquery
import jupyter_black
import black
import re
import pandas as pd
import gc
from collections import OrderedDict
from tqdm import tqdm

gc.collect()

jupyter_black.load(
    lab=False,
    line_length=100,
    verbosity="DEBUG",
    target_version=black.TargetVersion.PY310,
)

DEBUG:jupyter_black:config: {'line_length': 100, 'target_versions': {<TargetVersion.PY310: 10>}}


<IPython.core.display.Javascript object>

In [2]:
def read_bigquery_table(project_id, dataset_id, table_id):

    client = bigquery.Client()

    table_ref = client.dataset(dataset_id).table(table_id)
    table = client.get_table(table_ref)

    query = f"SELECT * FROM `{table.project}.{table.dataset_id}.{table.table_id}`"

    job_config = bigquery.QueryJobConfig()
    job_config.use_query_cache = False

    query_job = client.query(query, job_config=job_config)
    df = query_job.to_dataframe()

    prefix = table_id + "_"

    df.columns = [prefix + re.sub(r'[^A-Za-z0-9]+', '', col.upper()) for col in df.columns]
    df.rename(columns={prefix + "PRODUCTCODE": 'PRODUCTCODE'}, inplace=True)

    return df

In [3]:
def aggregate(df, agg_field):

    columns_to_aggregate = [col for col in df.columns if col != agg_field]
    agg_dict = {col: list for col in columns_to_aggregate}
    df = df.groupby(agg_field).agg(agg_dict).reset_index()

    return df

In [4]:
def get_ordered_set_dict(df, field):

    df[field] = [list(OrderedDict.fromkeys(el).keys()) for el in list(df[field])]

    return df

In [18]:
# Merge tags_cleaned and pdt_tourgrades

tags_cleaned = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="tags_cleaned")
pdt_tourgrades = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="pdt_tourgrades")

product_step_1 = pd.merge(tags_cleaned, pdt_tourgrades, on='PRODUCTCODE', how='outer')
product_step_1 = product_step_1.astype(str)
product_step_1.replace("nan", "", inplace=True)

In [19]:
del tags_cleaned
del pdt_tourgrades

In [None]:
# Merge step result and pdt_tags

pdt_tags = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="pdt_tags")
pdt_tags = pdt_tags[["PRODUCTCODE", "pdt_tags_TAGLEVEL", "pdt_tags_TAGNAME"]]

product_step_2 = pd.merge(product_step_1, pdt_tags, on='PRODUCTCODE', how='outer')
product_step_2 = product_step_2.astype(str)
product_step_2.replace("nan", "", inplace=True)

In [24]:
del product_step_1
del pdt_tags

In [25]:
# Merge step result and pdt_product_level

pdt_product_level = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="pdt_product_level")

product_step_3 = pd.merge(product_step_2, pdt_product_level, on='PRODUCTCODE', how='outer')
product_step_3 = product_step_3.astype(str)
product_step_3.replace("nan", "", inplace=True)

In [26]:
del product_step_2
del pdt_product_level

In [28]:
# Merge step result and pdt_product_detail

pdt_product_detail = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="pdt_product_detail")

product_step_3.rename(columns={"pdt_tourgrades_TOURGRADECODE": 'TOURGRADECODE'}, inplace=True)
pdt_product_detail.rename(columns={"pdt_product_detail_TOURGRADECODE": 'TOURGRADECODE'}, inplace=True)

product_step_4 = pd.merge(product_step_3, pdt_product_detail, on=['PRODUCTCODE', 'TOURGRADECODE'], how='outer')
product_step_4 = product_step_4.astype(str)
product_step_4.replace("nan", "", inplace=True)

product_step_4 = aggregate(product_step_4, "PRODUCTCODE")

for col in tqdm(product_step_4.columns):

   if col != "PRODUCTCODE":

       product_step_4 = get_ordered_set_dict(product_step_4, col)

product_step_4.to_pickle("tmp1.pkl")

100%|██████████| 56/56 [00:07<00:00,  7.90it/s]


In [29]:
del product_step_3
del pdt_product_detail

In [30]:
product_step_4 = pd.read_pickle("tmp1.pkl")

In [32]:
# Merge step result and pdt_itinerary

pdt_itinerary = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="pdt_itinerary")
pdt_itinerary = aggregate(pdt_itinerary, "PRODUCTCODE")

for col in tqdm(pdt_itinerary.columns):

   if col != "PRODUCTCODE":

       pdt_itinerary = get_ordered_set_dict(pdt_itinerary, col)

product_step_5 = pd.merge(product_step_4, pdt_itinerary, on='PRODUCTCODE', how='outer')
product_step_5 = product_step_5.astype(str)
product_step_5.replace("nan", "", inplace=True)

product_step_5.to_pickle("tmp2.pkl")

100%|██████████| 13/13 [00:00<00:00, 41.26it/s]


In [33]:
del product_step_4
del pdt_itinerary

In [34]:
product_step_5 = pd.read_pickle("tmp2.pkl")

In [35]:
# Merge step result and pdt_inclexcl_ENG

pdt_inclexcl_ENG = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="pdt_inclexcl_ENG")
pdt_inclexcl_ENG = aggregate(pdt_inclexcl_ENG, "PRODUCTCODE")

for col in tqdm(pdt_inclexcl_ENG.columns):

   if col != "PRODUCTCODE":

       pdt_inclexcl_ENG = get_ordered_set_dict(pdt_inclexcl_ENG, col)

product_step_6 = pd.merge(product_step_5, pdt_inclexcl_ENG, on='PRODUCTCODE', how='outer')
product_step_6 = product_step_6.astype(str)
product_step_6.replace("nan", "", inplace=True)

product_step_6.to_pickle("tmp3.pkl")

100%|██████████| 5/5 [00:00<00:00, 71.65it/s]


In [36]:
del product_step_5
del pdt_inclexcl_ENG

In [37]:
product_step_6 = pd.read_pickle("tmp3.pkl")

In [39]:
# Merge step result and pdt_inclexcl

pdt_inclexcl = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="pdt_inclexcl")
pdt_inclexcl = aggregate(pdt_inclexcl, "PRODUCTCODE")

for col in tqdm(pdt_inclexcl.columns):

   if col != "PRODUCTCODE":

       pdt_inclexcl = get_ordered_set_dict(pdt_inclexcl, col)

product_step_7 = pd.merge(product_step_6, pdt_inclexcl, on='PRODUCTCODE', how='outer')
product_step_7 = product_step_7.astype(str)
product_step_7.replace("nan", "", inplace=True)

product_step_7.to_pickle("tmp4.pkl")

100%|██████████| 3/3 [00:00<00:00,  7.41it/s]


In [40]:
del product_step_6
del pdt_inclexcl

In [41]:
product_step_7 = pd.read_pickle("tmp4.pkl")

In [43]:
# Merge step result and pdt_incexcl

pdt_incexcl = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="pdt_incexcl")
pdt_incexcl = aggregate(pdt_incexcl, "PRODUCTCODE")

for col in tqdm(pdt_incexcl.columns):

   if col != "PRODUCTCODE":

       pdt_incexcl = get_ordered_set_dict(pdt_incexcl, col)

product_step_8 = pd.merge(product_step_7, pdt_incexcl, on='PRODUCTCODE', how='outer')
product_step_8 = product_step_8.astype(str)
product_step_8.replace("nan", "", inplace=True)

product_step_8.to_pickle("tmp5.pkl")

100%|██████████| 6/6 [00:00<00:00, 12.72it/s]


In [44]:
del product_step_7
del pdt_incexcl

In [5]:
product_step_8 = pd.read_pickle("tmp5.pkl")

In [None]:
# Merge step result and pdt_capacity

pdt_capacity = read_bigquery_table(project_id="ww-da-ingestion", dataset_id="v_extract1", table_id="pdt_capacity")
pdt_capacity = aggregate(pdt_capacity, "PRODUCTCODE")

for col in tqdm(pdt_capacity.columns):

   if col != "PRODUCTCODE":

       pdt_capacity = get_ordered_set_dict(pdt_capacity, col)

product_step_9 = pd.merge(product_step_8, pdt_capacity, on='PRODUCTCODE', how='outer')
product_step_9 = product_step_9.astype(str)
product_step_9.replace("nan", "", inplace=True)

product_step_9.to_pickle("tmp6.pkl")