# Sourcing the Data
This notebok imports the sourced data from storage into BigQuery, a datawarehouse. This is the start of the amchine learning lifecycle, by importing data and preparing it for machine learning. Tabular data is assumed, to fit in a supervised learning model. The model is used to predict target value by learning patterns in feature columns. For our case, we will use classification (most suited to our needs), predicting target variables having several discrte classes. 

## Setting up 

In [2]:
from google.cloud import bigquery

In [1]:
REGION = 'us-central1'
PROJECT_ID = 'mlopstqc'
DATANAME = 'leaktest'
Identifier= '1'

In [3]:
bq = bigquery.Client(project= PROJECT_ID)

In [4]:
BUCKET = 'datalaketqc'

## Creating the Dataset

In [5]:
query = f"""
SELECT schema_name
FROM `{PROJECT_ID}.INFORMATION_SCHEMA.SCHEMATA`
"""
bq.query(query = query).to_dataframe()

Unnamed: 0,schema_name


In [6]:
query = f"""
CREATE SCHEMA IF NOT EXISTS `{PROJECT_ID}.{DATANAME}`
OPTIONS(
    location = '{REGION}',
    labels = [('identifier','{Identifier}')]
)
"""
job = bq.query(query = query)
job.result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7f780234f7d0>

In [7]:
(job.ended -job.started).total_seconds()

0.69

In [9]:
destination = bigquery.TableReference.from_string(f"{PROJECT_ID}.{DATANAME}.{DATANAME}")
job_config = bigquery.LoadJobConfig(
    write_disposition = 'WRITE_TRUNCATE',
    source_format = bigquery.SourceFormat.CSV,
    autodetect = True,
    labels = {'identifier':f'{Identifier}'}
)
job = bq.load_table_from_uri(f"gs://{BUCKET}/{DATANAME}/data/{DATANAME}.csv", destination, job_config = job_config)
job.result()

LoadJob<project=mlopstqc, location=us-central1, id=7ab4533b-c954-437c-a6f6-ae1eafdef271>

In [10]:
(job.ended-job.started).total_seconds()


1.625

In [11]:
query = f"""
SELECT *
FROM `{DATANAME}.{DATANAME}`
LIMIT 5
"""
bq.query(query = query).to_dataframe()

Unnamed: 0,TestVolumeLitre,AlarmDiffPressure,TestPressureMbar
0,3.0,260,15500
1,3.0,260,15500
2,3.0,260,15500
3,3.0,10,15500
4,3.0,10,15500


## Review Data

In [12]:
query = f"""
SELECT *
FROM `{DATANAME}.{DATANAME}`
"""
df = bq.query(query = query).to_dataframe()

In [13]:
df['TestPressureMbar'].value_counts()

15500    27585
17800       51
12200       51
16800       51
10400       51
         ...  
45400        6
39000        6
43600        6
37200        6
49400        6
Name: TestPressureMbar, Length: 201, dtype: int64

In [14]:
df['TestPressureMbar'].value_counts(normalize=True)

15500    0.830173
17800    0.001535
12200    0.001535
16800    0.001535
10400    0.001535
           ...   
45400    0.000181
39000    0.000181
43600    0.000181
37200    0.000181
49400    0.000181
Name: TestPressureMbar, Length: 201, dtype: float64

# Preparing the Data
A new table with <code>_prepped</code> is made and two columns are added to the sourced data:
<ol>
<li><code>trans_id</code> a unique identification for each row.</li> 
<li><code>splits</code> divide the data into <code>TRAIN</code> (80%), <code>VALIDATE</code> (10%), and <code>TEST</code> (10%).</li> 
</ol>


In [15]:
query = f"""
CREATE OR REPLACE TABLE `{DATANAME}.{DATANAME}_prepped` AS
WITH add_id AS(SELECT *, GENERATE_UUID() transaction_id FROM `{DATANAME}.{DATANAME}`)
SELECT *,
    CASE 
        WHEN MOD(ABS(FARM_FINGERPRINT(transaction_id)),10) < 8 THEN "TRAIN" 
        WHEN MOD(ABS(FARM_FINGERPRINT(transaction_id)),10) < 9 THEN "VALIDATE"
        ELSE "TEST"
    END AS splits
FROM add_id
"""
job = bq.query(query = query)
job.result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7f7800a92510>

In [16]:
(job.ended-job.started).total_seconds()

1.567

In [17]:
job.estimated_bytes_processed/1000000 #MB


0.797472

In [18]:
query = f"""
SELECT splits, count(*) as Count, 100*count(*) / (sum(count(*)) OVER()) as Percentage
FROM `{DATANAME}.{DATANAME}_prepped`
GROUP BY splits
"""
bq.query(query = query).to_dataframe()

Unnamed: 0,splits,Count,Percentage
0,VALIDATE,3378,10.166125
1,TEST,3344,10.063802
2,TRAIN,26506,79.770073


In [19]:
query = f"""
SELECT * 
FROM `{DATANAME}.{DATANAME}_prepped`
LIMIT 5
"""
data = bq.query(query = query).to_dataframe()

In [20]:
data.head()

Unnamed: 0,TestVolumeLitre,AlarmDiffPressure,TestPressureMbar,transaction_id,splits
0,0.2,500,12800,68cfe17b-33f6-4240-9254-d77514feb083,TEST
1,10.0,320,12800,e6cc26be-a050-4c91-99cf-52c2dcea0924,TEST
2,0.1,320,12800,b5e82b71-f62a-400a-9cf1-f551b2b57bf9,TEST
3,0.1,320,12800,d2f9cc2b-4aed-4734-a723-17b44020021b,TEST
4,0.1,320,12800,2e18b196-6e95-4a0a-ae7b-1f1256bfc3b9,TRAIN
