# Analytics Pipeline DevOps - Firebase to Snowflake

## Setup the Firebase Poject

1. To setup a firebase project navigate to this [link](https://console.firebase.google.com/) and click on **Create a project** and follow the setup on the screen. Make sure to **Enable Google Analytics** for the project
2. Navigate to the **Project Settings** > **Integration** > **Big Query** > **Manage** and make sure that **Google Analytics** from the **Exported Integrations** is **On** and in the **Export Settings** tick at least **Daily** (*Note: You will need to switch to the Blaze Plan to enable Streaming*)
3. After a few days of app runtime, the event data should have collected in Big Query. Navigate to [Google Cloud Console](https://console.cloud.google.com/) and enable **Billing**

## Authentication

### Google Cloud Console

There are various ways to authenticate but in this template we will create and use a **Service Account**:
1. Navigate to [IAM & Roles](https://console.cloud.google.com/iam-admin/serviceaccounts) and select your project
2. Click **+ CREATE SERVICE ACCOUNT** and add a service account name (e.g. bigquery-python), add a **description** (optional) and click **CREATE AND CONTINUE**
3. Add **Roles** from the dropdown and **+ ADD ANOTHER ROLE** as many times as needed then **CONTINUE**:
    * Add **Basic** > **Owner** role
    * Add **BigQuery Data Viewer** role
4. Click **Done**
5. Once the **Service Account** has been created, click on the three dots under **Action** and click **Manage Keys**
6. From the next page, click on **ADD KEY** > **Create New Key** > **JSON** > **Create**
7. This will download a one time file on your computer. **DO NOT LOSE THIS FILE** as it cannot be downloaded again. If you do lose it, you can delete the current key and create a new one (from Step 5)
8. Rename the file to **"gccreds.json"** and drop it in the same folder as this notebook (if you want to keep it in more secure location I suggest you save the path in an environment variable)

### Snowflake

1. Create an account in Snowflake (e.g. data_loader) with a role that has **ACCOUNTADMIN** rights (e.g. data_loader and assign the accountadmin role to this role)
2. (For Windows) In the Search Bar search for **Environment Variables** and open **Edit the system environment variables** 
3. Click on **Environment Variables** and in the **User variables** section click **New**
4. In the **Variable name** type in **ETL_SN_CREDS**
5. In the **Variable value** insert a **JSON** in the format
```JSON
{"username":"data_loader", "password":"<PASSWORD>","account":"<ACCOUNT>","warehouse":"<WAREHOUSE>","role":"data_loader"}
```

## Enable Google Cloud APIs

To access services programatically, Google requires you to manually enable some APIs:
1. Enable the [Access Management (IAM) API](https://console.cloud.google.com/flows/enableapi?apiid=iam.googleapis.com)

## Install Dependencies

Open a python terminal (I use **Anaconda Prompt**) and run
```python
pip install -r requirements.txt
```

## Import Packages

In [None]:
from google.oauth2 import service_account

from google.cloud import storage
from google.cloud import bigquery
from google.cloud import pubsub_v1
from google.cloud import bigquery_datatransfer
from googleapiclient import discovery

from google.api_core import exceptions
from googleapiclient import errors

import snowflake.connector as sf

import json, os, datetime, re

## Auths

In [None]:
gccreds = json.loads(open('gccreds.json', 'rt').read()) # Or change the path to the location you have saved the service account details
sncreds = json.loads(os.environ['ETL_SN_CREDS'])

credentials = service_account.Credentials.from_service_account_info(gccreds)

storage_client = storage.Client(credentials=credentials)
bq_client = bigquery.Client(credentials=credentials)
publisher = pubsub_v1.PublisherClient(credentials=credentials)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
transfer_client = bigquery_datatransfer.DataTransferServiceClient(credentials=credentials)

iam_service = discovery.build(
    serviceName='iam',
    version='v1',
    credentials=credentials)

crm_service = discovery.build(
    serviceName='cloudresourcemanager',
    version='v1',
    credentials=credentials)

conn = sf.connect(
    user=sncreds['username'],
    password=sncreds['password'],
    account=sncreds['account'],
    warehouse=sncreds['warehouse'],
    role=sncreds['role']
)

def run(sql):
    cursor = conn.cursor()
    cursor.execute(sql)
    cursor.close()

## User Variables

In [None]:
env = 'prod' # Use 'prod' or 'dev' only
app_name = input('Please insert the app name: ')
database = input('Please insert the Snowflake database name where you want data to stream to: ')

# Global Variables

In [None]:
project_id = gccreds['project_id']
dataset_id = [x.dataset_id for x in bq_client.list_datasets() if 'analytics_' in x.dataset_id][0]
location = bq_client.get_dataset(dataset_id).location

datasource = "bigquery"
gc_format = re.sub(r'[^0-9a-zA-Z]+', '-', app_name).lower()
sn_format = re.sub(r'[^0-9a-zA-Z]+', '_', app_name).lower()
dssn_format = sn_format + "_" + datasource

bucket_name = f"{gc_format}-event-data"
topic_name = f"{bucket_name}-file-created"
subscription_name = topic_name
storage_service_agent = storage_client.get_service_account_email()

storage_integration = f"{dssn_format}_storage_integration_{env.lower()}"
notification_integration = f"{dssn_format}_notification_integration_{env.lower()}"

schema = sn_format
stage = f"{dssn_format}_stage"
pipe = f"{dssn_format}_pipe"
table = "raw"

## Create an IAM Role that Snowflake can use to Access Google Cloud Resources

In [None]:
try:
    role = iam_service.projects().roles().create(
        parent='projects/' + project_id,
        body={
            'roleId':'snowflake',
            'role':{
                'title':'Snowflake Integration',
                'description':f'Created on: {datetime.datetime.utcnow().date():%Y-%m-%d}\nThis role is used by Snowflake to communicate with Google Cloud Storage',
                'includedPermissions':['storage.buckets.get', 'storage.objects.get', 'storage.objects.list'],
                'stage':'ALPHA'
            }
        }).execute()
except errors.HttpError as e:
    if 'already exists' in e.reason: print(f"Custom IAM Role 'snowflake' already exists")
    role = {
        'name':f'projects/{project_id}/roles/snowflake'
    }

## Create a Google Cloud Storage Bucket

In [None]:
try:
    bucket = storage_client.bucket(bucket_name)
    bucket.storage_class = 'STANDARD'
    new_bucket = storage_client.create_bucket(
        bucket_or_name=bucket,
        location=location)

    print(f"Created bucket {new_bucket.name} in {new_bucket.location} with storage class {new_bucket.storage_class}")
except exceptions.Conflict as e:
    if 'You already own this bucket' in e.message: print(f"Bucket '{bucket_name}' already exists")

## Create Snowflake Storage Integration with Google Cloud Storage Bucket and Get the Service Account Created by Snowflake

In [None]:
sql = f'''
create storage integration if not exists {storage_integration}
  type = external_stage
  storage_provider = gcs
  enabled = true
  storage_allowed_locations = ('gcs://{bucket_name}/')
'''
run(sql)
print(f"Storage Integration {storage_integration} has been created if it didn't already exist")

sql = f'''
desc storage integration {storage_integration}
'''
cursor = conn.cursor(sf.DictCursor)
cursor.execute(sql)
storage_gcp_service_account = [x['property_value'] for x in cursor if x['property'] == 'STORAGE_GCP_SERVICE_ACCOUNT'][0]
cursor.close()

## Add Snowflake Permissions to the Bucket

In [None]:
bucket_policy = bucket.get_iam_policy()
_ = [x for x in bucket_policy.bindings if role['name'] in x['role']] == []
if _:
    bucket_policy.bindings.append({
        "role": role["name"],
        "members": {
            f"serviceAccount:{storage_gcp_service_account}"
        }
    })
    bucket.set_iam_policy(bucket_policy)
    print(f"Added '{storage_gcp_service_account}' with role '{role['name']}' to bucket '{bucket.name}'")
else:
    print(f"'{storage_gcp_service_account}' with role '{role['name']}' to bucket '{bucket.name}' already exists.")

## Create a Pub/Sub Topic and Subscription

In [None]:
topic_path = publisher.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(project_id, subscription_name)
try:
    topic = publisher.create_topic(
        request={
            "name": topic_path
        }
    )
except exceptions.AlreadyExists as e:
    if topic_name in e.message: print(f"Topic {topic_name} already exists")
        
try:
    subscription = subscriber.create_subscription(request={"name": subscription_path, "topic": topic_path})
    print(f"Created Subscription '{subscription.name}'")
except exceptions.AlreadyExists as e:
    if subscription_name in e.message: print(f"Subscription {subscription_name} already exists")

## Add Bucket Permissions to the Topic

In [None]:
topic_policy = publisher.get_iam_policy(request={
    "resource":topic_path
})
_ = [x for x in topic_policy.bindings if f"serviceAccount:{storage_service_agent}" in x.members] == []
if _:
    topic_policy.bindings.add(
        role="roles/pubsub.publisher",
        members=[
            f"serviceAccount:{storage_service_agent}"
        ])
    publisher.set_iam_policy(request={
        "resource":topic_path,
        "policy":topic_policy
    })
    print(f"Added '{storage_service_agent}' with role 'roles/pubsub.publisher' to topic '{topic_name}'")
else:
    print(f"'{storage_service_agent}' with role 'roles/pubsub.publisher' to topic '{topic_name}' already exists.")

## Create Notification from Bucket to Topic

In [None]:
_ = [x for x in bucket.list_notifications() if topic_name == x.topic_name] == []
if _:
    notification = bucket.notification(
        topic_name=topic_name,
        payload_format='JSON_API_V1',
        event_types='OBJECT_FINALIZE'
    ).create()
    print(f"Created Notification to topic '{topic_name}' from bucket '{bucket_name}'")
else:
    print(f"Notification to topic '{topic_name}' from bucket '{bucket_name}' already exists")

## Create Snowflake Notification Integration with Google Cloud Subscription and Get the Service Account Created by Snowflake

In [None]:
sql = f'''
create notification integration if not exists {notification_integration}
  type = queue
  notification_provider = gcp_pubsub
  enabled = true
  gcp_pubsub_subscription_name = 'projects/{project_id}/subscriptions/{subscription_name}'
'''
run(sql)
print(f"Notification Integration {notification_integration} has been created if it didn't already exist")

sql = f'''
desc notification integration {notification_integration}
'''
cursor = conn.cursor(sf.DictCursor)
cursor.execute(sql)
gcp_pubsub_service_account = [x['property_value'] for x in cursor if x['property'] == 'GCP_PUBSUB_SERVICE_ACCOUNT'][0]
cursor.close()

## Add Snowflake Permissions to the Subscription

In [None]:
subscription_policy = subscriber.get_iam_policy(request={
    "resource":f"projects/{project_id}/subscriptions/{subscription_name}"
})
_ = [x for x in subscription_policy.bindings if f"serviceAccount:{gcp_pubsub_service_account}" in x.members] == []
if _:
    subscription_policy.bindings.add(
        role="roles/pubsub.subscriber",
        members=[
            f"serviceAccount:{gcp_pubsub_service_account}"
        ])
    subscriber.set_iam_policy(request={
        "resource":f"projects/{project_id}/subscriptions/{subscription_name}",
        "policy":subscription_policy
    })
    print(f"Added '{gcp_pubsub_service_account}' with role 'roles/pubsub.subscriber' to subscription '{subscription_name}'")
else:
    print(f"'{gcp_pubsub_service_account}' with role 'roles/pubsub.subscriber' to subscription '{subscription_name}' already exists.")
subscriber.close()

## Add *Monitoring Viewer* role to the Snowflake Pub/Sub Service Account

In [None]:
monitoring_policy = (
    crm_service.projects()
    .getIamPolicy(
        resource=project_id
    )
    .execute()
)
_ = [x for x in monitoring_policy["bindings"] if f"serviceAccount:{gcp_pubsub_service_account}" in x["members"]] == []
if _:
    binding = {
        "role": "roles/monitoring.viewer",
        "members": [f"serviceAccount:{gcp_pubsub_service_account}"]
    }
    monitoring_policy["bindings"].append(binding)
    crm_service.projects().setIamPolicy(
        resource=project_id,
        body={"policy":monitoring_policy}
    ).execute()
    print(f"Added '{gcp_pubsub_service_account}' with role 'roles/monitoring.viewer' to project")
else:
    print(f"'{gcp_pubsub_service_account}' with role 'roles/monitoring.viewer' to project already exists.")

## Create Snowflake Database, Schema, Variant Table and File Format

In [None]:
sql = f'''create database if not exists {database}'''
run(sql)
print(f"Database {database} has been created if it didn't already exist")

sql = f'''use database {database}'''
run(sql)

sql = f'''create schema if not exists {schema}
comment = 'This schema has been created to store the raw data generated by the {app_name} app and subsequent aggregations and views' '''
run(sql)
print(f"Schema {schema} has been created if it didn't already exist")

sql = f'''use schema {schema}'''
run(sql)

sql = f'''create table if not exists {table} (
    filename varchar(1024),
    file_row_number int,
    json_data variant,
    Primary Key(json_data)
)
comment = 'This JSON Table contains the data sent by the {app_name} app' '''
run(sql)
print(f"Table {table} has been created if it didn't already exist")

sql = f'''create file format if not exists json
type = 'json'
compression = 'gzip'
strip_outer_array = true
comment = 'This file format has been created to be used by snowpipe when copying the BigQuery generated data into the {table} table' '''
run(sql)
print(f"File Format JSON has been created if it didn't already exist")

## Create the Snowflake Stage

In [None]:
sql = f'''
create stage if not exists {stage}
  url='gcs://{bucket_name}/'
  storage_integration = {storage_integration};
'''
run(sql)
print(f"Stage {stage} has been created if it didn't already exist")

## Create the Snowflake Pipe

In [None]:
sql = f'''
create pipe if not exists {pipe}
  auto_ingest = true
  integration = {notification_integration}
  comment = 'This pipe is automatically injesting data from the {stage} stage into the raw table once a notification of file creation has been transmitted from Google Pub/Sub'
  as
copy into {table} from
(select
    metadata$filename,
    metadata$file_row_number,
    t.$1
from
    @{stage}
    (file_format => json) t)
'''
run(sql)
print(f"Pipe {pipe} has been created if it didn't already exist")

## Create the BigQuery Scheduled Script

In [None]:
sql = f"""
DECLARE tables ARRAY <STRING>;

/************************************************************************************************************
*                                             DAILY EVENTS EXPORT                                           *
************************************************************************************************************/

CREATE TABLE IF NOT EXISTS `{project_id}.{dataset_id}.daily_export_log`
(
    table_name STRING,
    insert_date TIMESTAMP
)
OPTIONS
(
  expiration_timestamp=TIMESTAMP "3000-01-01"
);

SET tables = (SELECT
    ARRAY_AGG(TABLE_NAME) TABLES
FROM
    `{project_id}.{dataset_id}.INFORMATION_SCHEMA.TABLES`
WHERE
    REGEXP_CONTAINS(TABLE_NAME, 'events_\\\\d{{8}}') AND
    TABLE_NAME NOT IN (SELECT TABLE_NAME FROM `{project_id}.{dataset_id}.daily_export_log`)
);


FOR tab IN 
    (SELECT * FROM UNNEST(tables))
DO
    EXECUTE IMMEDIATE '''
    EXPORT DATA
    OPTIONS ( 
        uri = CONCAT('gs://{bucket_name}/live/', format_timestamp('%Y/%m/%d/', current_timestamp()), ''' || "'" || tab.f0_ || "'" || ''', '/*_', format_timestamp('%Y%m%d%H%M%S', current_timestamp()), '.json.gz'),
        format='JSON',
        compression='GZIP',
        overwrite=FALSE 
        ) AS
    SELECT * FROM `{project_id}.{dataset_id}.''' || tab.f0_ || '''` 
    ''';

    EXECUTE IMMEDIATE '''
    INSERT INTO `{project_id}.{dataset_id}.daily_export_log` SELECT ''' || "'" || tab.f0_ || "'" || ''' table_name, current_timestamp() insert_date
    ''';

    EXECUTE IMMEDIATE '''
    ALTER TABLE `{project_id}.{dataset_id}.''' || tab.f0_ || '''`
    ADD COLUMN IF NOT EXISTS gcs_export_timestamp TIMESTAMP
    ''';

    EXECUTE IMMEDIATE '''
    UPDATE `{project_id}.{dataset_id}.''' || tab.f0_ || '''` SET
    gcs_export_timestamp = current_timestamp()
    WHERE gcs_export_timestamp IS NULL
    ''';
END FOR;


/************************************************************************************************************
*                                             DELAYED EVENTS EXPORT                                         *
************************************************************************************************************/

SET tables = (SELECT
  ARRAY_AGG(DISTINCT destination_table.table_id) TABLES
FROM
  `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE
  job_type = 'LOAD' and
  REGEXP_CONTAINS(destination_table.table_id, 'events_\\d{8}')
  AND creation_time >= DATE_ADD(CURRENT_TIMESTAMP(), INTERVAL -1 HOUR)
);

FOR tab IN 
    (SELECT * FROM UNNEST(tables))
DO
/************  CHECK IF DAILY TABLE WAS OVERWRITTEN AND RE-ADD THE gcs_export_timestamp COLUMN  ************/
    BEGIN
        IF (SELECT 1 FROM `region-us`.INFORMATION_SCHEMA.COLUMNS WHERE table_name = tab.f0_ AND column_name = 'gcs_export_timestamp') IS NULL THEN
            IF (SELECT 1 FROM `{project_id}.{dataset_id}.daily_export_log` WHERE table_name = tab.f0_) IS NOT NULL THEN
                EXECUTE IMMEDIATE '''
                ALTER TABLE `{project_id}.{dataset_id}.''' || tab.f0_ || '''`
                ADD COLUMN IF NOT EXISTS gcs_export_timestamp TIMESTAMP
                ''';

                EXECUTE IMMEDIATE '''
                UPDATE `{project_id}.{dataset_id}.''' || tab.f0_ || '''` SET
                gcs_export_timestamp = (SELECT insert_date FROM `{project_id}.{dataset_id}.daily_export_log` WHERE table_name = \'''' || tab.f0_ || '''\')
                WHERE gcs_export_timestamp IS NULL
                ''';
            END IF;
        END IF;
    END;
/*****************************  EXPORT DATA WHERE gcs_export_timestamp IS NULL  ****************************/
    BEGIN
        EXECUTE IMMEDIATE '''
        CREATE OR REPLACE TEMP TABLE _SESSION.tmp AS
        SELECT * FROM `{project_id}.{dataset_id}.''' || tab.f0_ || '''` WHERE gcs_export_timestamp IS NULL
        ''';

        IF (SELECT COUNT(*) cnt FROM _SESSION.tmp) > 0 THEN
            EXECUTE IMMEDIATE '''
            EXPORT DATA
            OPTIONS ( 
                uri = CONCAT('gs://{bucket_name}/delayed/', format_timestamp('%Y/%m/%d/', current_timestamp()), ''' || "'" || tab.f0_ || "'" || ''', '/*_', format_timestamp('%Y%m%d%H%M%S', current_timestamp()), '.json.gz'),
                format='JSON',
                compression='GZIP',
                overwrite=FALSE 
                ) AS
            SELECT * FROM _SESSION.tmp 
            ''';

            EXECUTE IMMEDIATE '''
            UPDATE `{project_id}.{dataset_id}.''' || tab.f0_ || '''` SET
            gcs_export_timestamp = current_timestamp()
            WHERE gcs_export_timestamp IS NULL
            ''';
        END IF;
    EXCEPTION WHEN ERROR THEN END;
END FOR;
"""

transfer_config = bigquery_datatransfer.TransferConfig(
    display_name="BigQuery to GCS Daily Backup",
    data_source_id="scheduled_query",
    params={
        "query": sql
    },
    schedule="every 1 hours",
)

transfer_config = transfer_client.create_transfer_config(
    bigquery_datatransfer.CreateTransferConfigRequest(
        parent=f"projects/{project_id}/locations/{location.lower()}",
        transfer_config=transfer_config,
        service_account_name=gccreds['client_email'],
    )
)

print("Created scheduled query '{}'".format(transfer_config.name))