This demo presents an example of ML model training that leverages GCP lakehouse architecture features such as open data format, access level security, data change tracking and versioning.

In [1]:
import numpy as np
from google.cloud import bigquery

In [2]:
# creating a bigquery client 
client = bigquery.Client()

In [None]:
BUCKET = "your GCS bucket"
BQ_DATASET = "your BQ dataset"

## 1. Data Sources
For this demo we have used Austin housing price dataset from Kaggle (https://www.kaggle.com/datasets/ericpierce/austinhousingprices). 

### External dataset in BQ from image data

In [3]:
query = f"""
    CREATE EXTERNAL TABLE {BQ_DATASET}.housing_image
    WITH CONNECTION us.my_connection 
    OPTIONS(uris=["gs://{BUCKET}/homeImage/*.jpg"],
         object_metadata="SIMPLE");
"""
client.query(query)

QueryJob<project=safshari-sandbox, location=US, id=8f59784d-6ce6-4106-92ec-61662da80d24>

### Data from CSV metadata file that is converted to parquet in GCS Refined Zone

We reuse the biglake table that we created in previous part. This table has multiple versions of data as they are pointing to the delta tables in GCS.
We will use the created view on this table named ```delta_housing_view``` created on top of ```delta_housing_parquet``` table.

## 2. Create ML models in BQ from the pretraned model binaries

### BQML model from the image embedding model

In [4]:
query = f"""
    CREATE OR REPLACE MODEL {BQ_DATASET}.imported_image_model
    OPTIONS (MODEL_TYPE='TENSORFLOW',
    MODEL_PATH='gs://{BUCKET}/model/image/*')
"""
client.query(query)

QueryJob<project=safshari-sandbox, location=US, id=49df0e47-db89-4fd3-86c6-896082f934bd>

### BQML model from the text embedding model

In [5]:
query = f"""
    CREATE OR REPLACE MODEL {BQ_DATASET}.imported_text_model
    OPTIONS (MODEL_TYPE='TENSORFLOW',
    MODEL_PATH='gs://{BUCKET}/model/text-universal/*')
"""
client.query(query)

QueryJob<project=safshari-sandbox, location=US, id=81575fb3-682a-4ee1-a14e-1454423c76eb>

## 3. Run model prediction

### Get embedding for image data

In [6]:
query = f"""
CREATE TABLE {BQ_DATASET}.image_embedding
AS 
    SELECT uri, fc1 as image_embedding  FROM ML.PREDICT(
    MODEL `{BQ_DATASET}.imported_image_model`, 
    (SELECT uri, ML.DECODE_IMAGE(data) AS input_2 FROM {BQ_DATASET}.housing_image
    where REGEXP_CONTAINS(uri,'gs://{BUCKET}/homeImage/11*'))
);
"""
client.query(query)

QueryJob<project=safshari-sandbox, location=US, id=8d601077-e0d6-402f-b183-c7b32f59f807>

### Get embedding for text data

In [7]:
query = f"""
    CREATE TABLE {BQ_DATASET}.text_embedding
    AS 
    SELECT uri, outputs as text_embedding  FROM ML.PREDICT(
    MODEL `{BQ_DATASET}.imported_text_model`, 
    (SELECT homeImage as uri , description as inputs FROM `{BQ_DATASET}.delta_housing_view` 
    join `{BQ_DATASET}.image_embedding` as t
    on homeImage = SPLIT(t.uri, '/')[OFFSET(ARRAY_LENGTH(SPLIT(t.uri, "/")) - 1)]
    """
client.query(query)

QueryJob<project=safshari-sandbox, location=US, id=2ebbbcb4-c04f-4a15-bb2b-a5bb25e8164d>

## 4. Join and fetch the image and text embedding with housing prices as ground truth label

In [11]:
query = f"""
        SELECT md.yearBuilt, md.latestPrice, img.uri, img.image_embedding, txt.text_embedding
        FROM `{BQ_DATASET}.delta_housing_view` as md
        JOIN `{BQ_DATASET}.image_embedding` as img
        ON md.homeImage = SPLIT(img.uri, '/')[OFFSET(ARRAY_LENGTH(SPLIT(img.uri, "/")) - 1)]
        JOIN `{BQ_DATASET}.text_embedding` as txt
        ON SPLIT(img.uri, '/')[OFFSET(ARRAY_LENGTH(SPLIT(img.uri, "/")) - 1)] = txt.uri
        """
df = client.query(query).result().to_dataframe()

In [12]:
df

Unnamed: 0,yearBuilt,latestPrice,uri,image_embedding,text_embedding
0,1985,269900.0,gs://techcon23-safshari/homeImage/185165346_eb...,"[0.0, 0.7384738922119141, 0.0, 0.0, 0.0, 2.010...","[-0.05178000032901764, -0.035562582314014435, ..."
1,1981,339900.0,gs://techcon23-safshari/homeImage/145655084_48...,"[0.0, 0.9409899711608887, 0.0, 0.0, 0.0, 1.667...","[-0.06605076789855957, -0.01758900284767151, -..."
2,2001,290000.0,gs://techcon23-safshari/homeImage/185218123_6a...,"[0.0, 1.2250003814697266, 0.0, 0.0, 0.0, 2.048...","[-0.020198747515678406, -0.04961514472961426, ..."
3,1986,199950.0,gs://techcon23-safshari/homeImage/144334371_0f...,"[0.0, 0.6215362548828125, 0.0, 0.0, 0.0, 1.940...","[-0.008488993160426617, -0.045883093029260635,..."
4,2002,359965.0,gs://techcon23-safshari/homeImage/185218453_3e...,"[0.0, 1.0795612335205078, 0.0, 0.0, 0.0, 1.752...","[0.018340934067964554, -0.04425225406885147, 0..."
...,...,...,...,...,...
1054,2020,675000.0,gs://techcon23-safshari/homeImage/125805827_c0...,"[0.0, 1.39948308467865, 0.0, 0.0, 0.0, 1.83178...","[-0.01679230108857155, -0.07212793081998825, 0..."
1055,2020,1499000.0,gs://techcon23-safshari/homeImage/125807167_7f...,"[0.0, 1.1083102226257324, 0.0, 0.0, 0.0, 1.680...","[-0.06935032457113266, 0.02305689826607704, 0...."
1056,2020,645000.0,gs://techcon23-safshari/homeImage/125916067_43...,"[0.0, 1.109668493270874, 0.0, 0.0, 0.0, 1.7355...","[-0.007569410838186741, -0.032274093478918076,..."
1057,2020,580000.0,gs://techcon23-safshari/homeImage/144237736_91...,"[0.0, 1.1190240383148193, 0.0, 0.0, 0.0, 2.160...","[-0.05300542712211609, -0.04676753282546997, -..."


In [None]:
df.loc[df['yearBuilt'] < "1980" ]

## (Optional) Apply row-level access policies

In [10]:
query = f"""
    CREATE ROW ACCESS POLICY house
    ON {BQ_DATASET}.delta_housing_parquet
    GRANT TO ("user:test@test.com")
    FILTER USING (yearBuilt > "1980");
    """
client.query(query)

QueryJob<project=safshari-sandbox, location=US, id=1b4bc595-13e0-4272-a72e-821eb4c1397a>

In [None]:
#query = f""" DROP ALL ROW ACCESS POLICIES  ON {BQ_DATASET}.delta_housing_parquet"""
#client.query(query)

## 5. Create a price prediction deep neural network

In [13]:
import tensorflow as tf
import pandas as pd
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  1


2023-04-19 06:07:50.599565: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-04-19 06:07:50.612836: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-04-19 06:07:50.614538: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero


### b. Split data to train & test data 

In [14]:
train_df = pd.DataFrame(columns=['text_embedding', 'image_embedding'])
test_df = pd.DataFrame(columns=['text_embedding', 'image_embedding'])
train_label = pd.DataFrame(columns=['latestPrice'])
test_label = pd.DataFrame(columns=['latestPrice'])

In [15]:
train_df[['text_embedding', 'image_embedding']] = df[['text_embedding', 'image_embedding']][:1000]#df_new['embedding'][:1000]
test_df[['text_embedding', 'image_embedding']] = df[['text_embedding', 'image_embedding']][1000:]#df_new['embedding'][1000:]
train_label['latestPrice'] = df['latestPrice'][:1000].astype(float)
test_label['latestPrice'] = df['latestPrice'][1000:].astype(float)

In [16]:
train_label

Unnamed: 0,latestPrice
0,269900.0
1,339900.0
2,290000.0
3,199950.0
4,359965.0
...,...
995,1200000.0
996,565000.0
997,285000.0
998,525000.0


### c. Noramlize the target label (price) 

In [17]:
tr_min = np.min(train_label['latestPrice'])
tr_max = np.max(train_label['latestPrice'])
train_label['latestPrice_normalized']= (train_label['latestPrice'] - tr_min)/(tr_max - tr_min)

In [18]:
train_label['latestPrice_normalized']

0      0.015251
1      0.020461
2      0.016747
3      0.010045
4      0.021955
         ...   
995    0.084481
996    0.037216
997    0.016375
998    0.034239
999    0.037961
Name: latestPrice_normalized, Length: 1000, dtype: float64

### d. Create a simple fully connected Keras network

In [19]:
import keras
from keras.layers import Input, Dense, Embedding
from keras.layers import Dropout
from keras.models import Model
from keras import  optimizers, layers
from keras.preprocessing import text, sequence


In [20]:
train_set_1 = train_df.text_embedding.apply(pd.Series)
train_set_2 = train_df.image_embedding.apply(pd.Series)
test_set_1 = test_df.text_embedding.apply(pd.Series)
test_set_2 = test_df.image_embedding.apply(pd.Series)

In [21]:
# Concatenate continuous and embeddings inputs
input_1 = Input(shape=(train_set_1.shape[1], ))
input_2 = Input(shape=(train_set_2.shape[1], ))
all_input = keras.layers.concatenate([input_1, input_2])

In [22]:
x = Embedding(5000, 500)(all_input)
x = Dropout(0.5)(x)
x = Dense(50, activation='relu')(x)
x = Dropout(0.1)(x)
x = Dense(25, activation='relu')(x)
# There are 41 different possible classes, so we use 41 neurons in our output layer
x = Dense(1, activation='relu')(x)

model = Model(inputs=[input_1, input_2], outputs=x)

2023-04-19 06:09:00.371050: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-04-19 06:09:00.371561: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-04-19 06:09:00.375173: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-04-19 06:09:00.377700: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zer

### e. Train the model 

In [23]:
optimizer =  tf.keras.optimizers.Adam(0.001)
optimizer.learning_rate.assign(0.0001)
model.compile(loss='mean_squared_error', optimizer=optimizer)

In [24]:
model.fit([train_set_1, train_set_2], train_label['latestPrice_normalized'].to_numpy(), epochs=5, batch_size=100, validation_split=0.3, verbose=1)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f19c4d43790>

### f. Predict the price for test data

In [25]:
y = model.predict([test_set_1, test_set_2])

In [26]:
y = y * (tr_max- tr_min) + tr_min
y