## Prerequsites 

In [10]:
# Prerequsite have service account..and corrresponding key
# export GOOGLE_APPLICATION_CREDENTIALS="KEY_PATH"

# pip install -r requirements.txt
# pip install --upgrade 'google-cloud-bigquery[bqstorage,pandas]'

In [11]:
from google.cloud import bigquery
from google.oauth2 import service_account

# Step0: Setup BigQuery Client by Service Account key
# Reference: https://cloud.google.com/bigquery/docs/authentication/service-account-file
key_path = "../../../.google/credentials/google_credentials.json"
credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(credentials=credentials, project=credentials.project_id,)

In [12]:
# Set Constant Variables

PROJECT_NAME = "blissful-scout-339008"
DATABASE_NAME = "trips_data_all"
EXTERNAL_TABLE_NAME = "external_fhv_tripdata_2019"

In [16]:
from google.cloud.bigquery.client import Client
import time

def get_query_estimates(client:Client, query:str):
    job_config = bigquery.QueryJobConfig()
    job_config.dry_run = True
    job_config.use_query_cache = False
    
    query_job = client.query(
        (query),
        job_config=job_config,
    )
    
    print("Estimated: This query will process {} bytes".format(query_job.total_bytes_processed))    


def run_query_bq(client:Client, query_template:str, select_table_name:str="", create_table_name:str=""):
    
    select_table_name = ".".join([PROJECT_NAME, DATABASE_NAME, select_table_name])
    create_table_name = ".".join([PROJECT_NAME, DATABASE_NAME, create_table_name])
    
    query = query_template.format(select_table_name=select_table_name, create_table_name=create_table_name)
    
    get_query_estimates(client, query)
    
    query_job = client.query(query)
    
    while(query_job.done() != True):
        time.sleep(1)
    
    print("Actual: This query will process {} bytes".format(query_job.total_bytes_processed))
        
    # table_row_iterator = query_job.result()
    # table_df = table_row_iterator.to_dataframe()
    
    table_df = query_job.to_dataframe()
    return table_df



## Q1

**What is count for fhv vehicles data for year 2019?**  


In [17]:
# q1.1: Create external table for fhv_tripdata in 2019 files
q1_1= """
CREATE OR REPLACE EXTERNAL TABLE
  `{create_table_name}` OPTIONS( format = 'PARQUET',
    uris = ['gs://dtc_data_lake_blissful-scout-339008/raw/fhv_tripdata_2019-*.parquet']);
"""
# q1.2 Query the total rows in the table
q1_2= """
SELECT COUNT(*) FROM `{select_table_name}`;
"""

In [18]:
run_query_bq(client, q1_1, create_table_name=EXTERNAL_TABLE_NAME)
run_query_bq(client, q1_2, select_table_name=EXTERNAL_TABLE_NAME) # Answer: 42084899

Estimated: This query will process 0 bytes
Actual: This query will process 0 bytes
Estimated: This query will process 0 bytes
Actual: This query will process 0 bytes


Unnamed: 0,f0_
0,42084899


## Q2

**How many distinct dispatching_base_num we have in fhv for 2019?**  

In [19]:
q2 = """
SELECT
  COUNT(DISTINCT dispatching_base_num)
FROM
  `{select_table_name}`;
"""

In [20]:
run_query_bq(client, q2, select_table_name=INITIAL_EXTERNAL_TABLE_NAME) # Answer:792

Estimated: This query will process 0 bytes
Actual: This query will process 337526049 bytes


Unnamed: 0,f0_
0,792


## Q3 and Q4

**Q3: Best strategy to optimise if query always filter by dropoff_datetime and order by dispatching_base_num**  

**Q4: What is the count, estimated and actual data processed for query which counts trip between 2019/01/01 and 2019/03/31 for dispatching_base_num B00987, B02060, B02279** 


In [23]:
# Create table from external table to being able to compare the estimations

table_name_0 = "fhv_tripdata_2019_non_partitioned"
table_name_1 = "fhv_tripdata_2019_partitioned_Q3Q4"
table_name_2 = "fhv_tripdata_2019_partitioned_clustered_Q3Q4"

q3_0 = """
    CREATE OR REPLACE TABLE
      `{create_table_name}` AS
    SELECT
      *
    FROM
      `{select_table_name}`;
"""

# Create partitioned table by DATE(dropoff_datetime)
q3_1 = """
    CREATE OR REPLACE TABLE
      `{create_table_name}`
    PARTITION BY
      DATE(dropoff_datetime) AS
    SELECT
      *
    FROM
      `{select_table_name}`;
"""

# Create partitioned+clustered table by DATE(dropoff_datetime) and dispatching_base_num 
q3_2 = """
    CREATE OR REPLACE TABLE
      `{create_table_name}`
    PARTITION BY
      DATE(dropoff_datetime)
    CLUSTER BY
      dispatching_base_num AS
    SELECT
      *
    FROM
      `{select_table_name}`;
"""


In [24]:
# Create prerequsite tables
run_query_bq(client,q3_0, create_table_name=table_name_0,select_table_name=INITIAL_EXTERNAL_TABLE_NAME )
run_query_bq(client,q3_1, create_table_name=table_name_1,select_table_name=INITIAL_EXTERNAL_TABLE_NAME )
run_query_bq(client,q3_2, create_table_name=table_name_2,select_table_name=INITIAL_EXTERNAL_TABLE_NAME )

Estimated: This query will process 0 bytes
Actual: This query will process 1706145457 bytes
Estimated: This query will process 0 bytes
Actual: This query will process 1706145457 bytes
Estimated: This query will process 0 bytes
Actual: This query will process 1706145457 bytes


In [25]:
# Compare Non-Partitioned vs Only Partioned
# Estimated data to be process: 643 MB (non-partitioned)
# Estimated data to be process: 30.1 MB (only partitioned)

q3_2 ="""
    SELECT
      DISTINCT(dispatching_base_num)
    FROM
      `{select_table_name}`
    WHERE
      DATE(dropoff_datetime) BETWEEN '2019-06-01'
      AND '2019-06-30';
"""

In [26]:
run_query_bq(client, q3_2, select_table_name=table_name_0)
run_query_bq(client, q3_2, select_table_name=table_name_1)

Estimated: This query will process 674205241 bytes
Actual: This query will process 674205241 bytes
Estimated: This query will process 31527600 bytes
Actual: This query will process 31527600 bytes


Unnamed: 0,dispatching_base_num
0,B00254
1,B01674
2,B02563
3,B02715
4,B01066
...,...
555,B01482
556,B01522
557,B02315
558,B02956


In [27]:
# Compare Only Partioned vs Partitioned+Clustered
# Estimated data to be process: 400.1 MB (only partitioned)

# Estimated data to be process: 400.1 MB (partitioned+clustered)
# But after the run it process only 133mb

q4 = """
    SELECT
      COUNT(*) AS trips
    FROM
      `{select_table_name}`
    WHERE
      DATE(dropoff_datetime) BETWEEN '2019-01-01'
      AND '2019-03-31'
      AND dispatching_base_num IN ("B00987",
        "B02060",
        "B02279");
"""

In [28]:
run_query_bq(client, q4, select_table_name=table_name_1)
run_query_bq(client, q4, select_table_name=table_name_2) #Answer: 26643

Estimated: This query will process 419508706 bytes
Actual: This query will process 419508706 bytes
Estimated: This query will process 419508706 bytes
Actual: This query will process 164308071 bytes


Unnamed: 0,trips
0,26643


### Q3 Discover dropoff_datetime

In [29]:
# Here checking if the column is suitable for partiioning
# As maximum 4000 partitions allowed in GCP
q3_discover_dropoff_datetime_1 = """
SELECT
  COUNT(DISTINCT DATE(dropoff_datetime))
FROM
  `{select_table_name}`;
"""

# To see how droppoff date(dropoff_datetime) is balanced in the table
# As we are interested to have balanced partitions/clusters
q3_discover_dropoff_datetime_2 = """
SELECT
  DATE(dropoff_datetime) AS dropoff_date,
  COUNT(DATE(dropoff_datetime)) AS counter
FROM
  `{select_table_name}`
GROUP BY
  dropoff_date
ORDER BY
  counter DESC;
"""

In [30]:
run_query_bq(client, q3_discover_dropoff_datetime_1, select_table_name=INITIAL_EXTERNAL_TABLE_NAME) # Ans:549 therefore suitable for the partitioning as it is < 4000

Estimated: This query will process 0 bytes
Actual: This query will process 336679192 bytes


Unnamed: 0,f0_
0,549


In [31]:
run_query_bq(client, q3_discover_dropoff_datetime_2, select_table_name=INITIAL_EXTERNAL_TABLE_NAME )

Estimated: This query will process 0 bytes
Actual: This query will process 336679192 bytes


Unnamed: 0,dropoff_date,counter
0,2019-01-26,925126
1,2019-01-12,888643
2,2019-01-31,888211
3,2019-01-25,850977
4,2019-01-19,845616
...,...,...
544,2027-07-01,1
545,2919-04-14,1
546,2109-05-12,1
547,2109-05-15,1


### Q3 Discover dispatching_base_num

In [32]:
q3_discover_dispatching_base_num = """
    SELECT
      `dispatching_base_num`,
      COUNT(dispatching_base_num) AS counter
    FROM
      `{select_table_name}`
    GROUP BY
      dispatching_base_num
    ORDER BY
      counter DESC;
"""

In [33]:
run_query_bq(client, q3_discover_dispatching_base_num, select_table_name = INITIAL_EXTERNAL_TABLE_NAME)

Estimated: This query will process 0 bytes
Actual: This query will process 337526049 bytes


Unnamed: 0,dispatching_base_num,counter
0,B02510,4623412
1,B02764,1662983
2,B02765,1059883
3,B02875,1009567
4,B02800,1004974
...,...,...
787,B03237,1
788,B03241,1
789,B03242,1
790,B03245,1


## Q5

In [37]:
# Create clustered table by dispatching_base_num and SR_Flag

table_name_q5_1="fhv_tripdata_2019_partitioned_clustered_Q5"

q5_1 = """
    CREATE OR REPLACE TABLE
      `{create_table_name}`
    CLUSTER BY
      dispatching_base_num,
      SR_Flag AS
    SELECT
      *
    FROM
      `{select_table_name}`;
"""

In [38]:
run_query_bq(client ,q5_1, create_table_name=table_name_q5_1, select_table_name = INITIAL_EXTERNAL_TABLE_NAME)

Estimated: This query will process 0 bytes
Actual: This query will process 1706145457 bytes


In [41]:
# Compare Non Partioned vs Clustered
# Estimated data to be process: 363 MB (non partitioned)

# Estimated data to be process: 363 MB(partitioned+clustered)
# But after the run it process only 9,9mb

table_name_q5_2_1="fhv_tripdata_2019_non_partitioned"
table_name_q5_2_2="fhv_tripdata_2019_partitioned_clustered_Q5"

q5_2 = """
    SELECT
      COUNT(*) AS trips
    FROM
      `{select_table_name}`
    WHERE
      SR_Flag=5
      AND dispatching_base_num IN ("B00987",
        "B02060",
        "B02279");
"""

In [42]:
run_query_bq(client, q5_2, select_table_name=table_name_q5_2_1)
run_query_bq(client, q5_2, select_table_name=table_name_q5_2_2)

Estimated: This query will process 380595129 bytes
Actual: This query will process 380595129 bytes
Estimated: This query will process 380595129 bytes
Actual: This query will process 7605181 bytes


Unnamed: 0,trips
0,0


### Q5 Discover SR_FLAG

In [43]:
q5_discover_sr_flag = """
    SELECT
      `SR_Flag`,
      COUNT(SR_Flag) AS counter
    FROM
      `{select_table_name}`
    GROUP BY
      SR_Flag
    ORDER BY
      counter DESC;
"""

In [None]:
run_query_bq(client, q5_discover_sr_flag, select_table_name = INITIAL_EXTERNAL_TABLE_NAME)

Estimated: This query will process 0 bytes


## Q6

**What improvements can be seen by partitioning and clustering for data size less than 1 GB?**


For the data less than 1GB partitioning and clustering does not add imporevements but cost as the metadata needs to processed additionally.

## Q7

**In which format does BigQuery save data**

In Columnar format