# Composer Pipeline running ML in BigQuery

In [8]:
import os

In [13]:
FILE_NAME = 'bqml-logistic-regression.py'
DAG_FOLDER = 'gs://us-east1-compose-crazy-a3d52ae3-bucket/dags/'

os.environ['FILE_NAME'] = FILE_NAME
os.environ['DAG_FOLDER'] = DAG_FOLDER

#### Saving composer python file to local storage.

In [18]:
%%writefile {FILE_NAME}

import datetime
import logging
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from google.cloud import bigquery
from airflow.contrib.operators import bigquery_operator

dag = DAG(
        'Earnings_ml_model_log_regression',
        schedule_interval='@weekly',
        start_date=datetime.datetime.now()
)

def read_raw_storage():
    client = bigquery.Client()
    dataset_id = 'machine_learning_02'
    dataset_ref = client.dataset(dataset_id)

    #Job Configuration Parameters
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
    job_config.source_format = bigquery.SourceFormat.CSV
    data_uri = "gs://crazy-hippo-01/dataset/census_train.csv"

    load_job = client.load_table_from_uri(data_uri, dataset_ref.table("income_model"), job_config=job_config)
                                            
    logging.info("Starting job {}".format(load_job.job_id))

    results = load_job.result()  # Waits for table load to complete.
    logging.info("Job finished.")

    destination_table = client.get_table("machine_learning_02.income_model")
    logging.info("Loaded {} rows.".format(destination_table.num_rows))                                      

read_raw_storage = PythonOperator(
    task_id="read_raw_storage",
    python_callable=read_raw_storage,
    dag=dag
)

bq_clean_table = bigquery_operator.BigQueryOperator(
    task_id='bq_clean_table',
    bql="""
    SELECT age, workclass, gender, occupation, education_num, marital_status, relationship, capital_gain, income_bracket
    FROM `crazy-hippo-01.machine_learning_02.income_model` 
    WHERE workclass IS NOT NULL AND workclass != "Never-worked"
    """,
    use_legacy_sql=False,
    destination_dataset_table="machine_learning_02.bq_clean_table",
    write_disposition="WRITE_TRUNCATE",
    location="US"
)

model_training = bigquery_operator.BigQueryOperator(
    task_id='model_training',
    bql="""
    CREATE OR REPLACE MODEL machine_learning_02.income_model_log_classifier
    OPTIONS(input_label_cols=['income_bracket'], model_type='logistic_reg')
    AS 
    SELECT *
    FROM `crazy-hippo-01.machine_learning_02.bq_clean_table`
    """,
    use_legacy_sql=False,
    write_disposition="WRITE_TRUNCATE",
    location="US"
)

model_evaluation = bigquery_operator.BigQueryOperator(
    task_id='model_evaluation',
    bql="""
    INSERT machine_learning_02.evaluation_log_regression (Accuracy, f1_Score, Precision, Recall, ROC_AUC) 
    SELECT ROUND(precision,2) as Accuracy, ROUND(recall,2) as f1_Score, ROUND(accuracy,2) as Precision, ROUND(f1_score,2) as Recall, ROUND(roc_auc,2) as ROC_AUC
    FROM
    ML.EVALUATE(MODEL `machine_learning_02.income_model_log_classifier`)
    """,
    use_legacy_sql=False,
    write_disposition="WRITE_TRUNCATE",
    location="US"
)


# Configure Task Dependencies
read_raw_storage >> bq_clean_table
bq_clean_table >> model_training
model_training >> model_evaluation

Overwriting bqml-logistic-regression.py


#### Copy new file to DAG folder

In [17]:
%%bash

#Check if Model already exists
if [[ $(gsutil ls $DAG_FOLDER | grep $FILE_NAME) ]]; then
    echo "$FILE already exists"
else
    # create model
    echo "$FILE_NAME does not exist. Copying file to DAG folder...."
    gsutil cp $FILE_NAME $DAG_FOLDER
    echo "Done!"
fi

bqml-logistic-regression.py does not exist. Copying file to DAG folder....
Done!


Copying file://bqml-logistic-regression.py [Content-Type=text/x-python]...
/ [1 files][  2.9 KiB/  2.9 KiB]                                                
Operation completed over 1 objects/2.9 KiB.                                      


#### Update file already in DAG folder

In [19]:
%%bash

#Check if Model already exists
if [[ $(gsutil ls $DAG_FOLDER | grep $FILE_NAME) ]]; then
    echo "$FILE already exists"
    # delete file
    gsutil del $DAG_FOLDER$FILE_NAME
    # copy file
    echo "$FILE_NAME  Copying file to DAG folder...."
    gsutil cp $FILE_NAME gs://us-east1-compose-crazy-a3d52ae3-bucket/dags/
    echo "Done!"
else
    # File Exists
    echo "$FILE_NAME does not exist..."
fi

 already exists
bqml-logistic-regression.py  Copying file to DAG folder....
Done!


Removing gs://us-east1-compose-crazy-a3d52ae3-bucket/dags/bqml-logistic-regression.py...
/ [1 objects]                                                                   
Operation completed over 1 objects.                                              
Copying file://bqml-logistic-regression.py [Content-Type=text/x-python]...
/ [1 files][  3.0 KiB/  3.0 KiB]                                                
Operation completed over 1 objects/3.0 KiB.                                      


#### Delete file in DAG folder

In [None]:
!gsutil del $DAG_FOLDER$FILE_NAME