### Google Cloud Functions

This notebook shows how to export, host, and automate a web-scraping script in Google Cloud Functions (GCF). 

---

Airflow is extremely useful for coordinating complex pipelines in distributed systems and can be run in the cloud with [Google Cloud Composer](https://cloud.google.com/composer). The downside with this approach is that the increased overhead with Airflow results in a more expensive project. By contrast, GCF is dedicated to simple stateless functions and is much more affordable. Thankfully, our use case is simple enough for GCF to handle. 


#### Example: `nws_update_gcf`

We have two DAGs we'd like to port over as Google Cloud Functions. Let's use the NWS script as an example.

Each function needs its own directory to export as a package. 

```bash
nws_update_gcf
├── main.py     # houses actual script 
├── README.md   
├── requirements.txt # GCF container automatically installs these 
└── utils     
    ├── __init__.py
    └── utils.py
```

In [None]:
import pandas as pd
import numpy as np
import re
import datetime as dt 
import logging 
from io import BytesIO
# GCP imports: 
from google.cloud import bigquery, storage, logging as cloud_logging 
from google.oauth2 import service_account
from google.api_core.exceptions import NotFound
# Utils
import utils.utils
## ^^ For the actual package it will just be "utils.utils"

: 

Upon creating a function in Google Cloud Functions, it is automatically associated with a single Google Cloud project. When creating various GCP client (e.g. `storage.Client()`) we would not have to specify the project or our credentials. We define these here just to test-run our code locally.

In [14]:
# Creating GCP Connection 
from yaml import full_load
from google.oauth2 import service_account

with open("../airflow/dags/config/gcp-config.yaml", "r") as fp: 
    gcp_config = full_load(fp)

PROJECT_ID = gcp_config["project-id"]
DATASET_ID = gcp_config["dataset-id"]
STAGING_TABLE_ID =  "nws_staging"
MAIN_TABLE_ID = "nws"

SCHEMA =  [
    bigquery.SchemaField("location", "STRING", mode="REQUIRED"), 
    bigquery.SchemaField("utc_datetime", "DATETIME", mode="REQUIRED"), 
    bigquery.SchemaField("lst_datetime", "DATETIME", mode="REQUIRED"), 
    bigquery.SchemaField("temperature_f", "INTEGER", mode="NULLABLE"), 
    bigquery.SchemaField("dewpoint_f", "INTEGER", mode="NULLABLE"), 
    bigquery.SchemaField("wind_chill_f", "INTEGER", mode="NULLABLE"), 
    bigquery.SchemaField("surface_wind_mph", "INTEGER", mode="NULLABLE"), 
    bigquery.SchemaField("wind_dir", "STRING", mode="NULLABLE"), 
    bigquery.SchemaField("gust", "INTEGER", mode="NULLABLE"), 
    bigquery.SchemaField("sky_cover_pct", "INTEGER", mode="NULLABLE"), 
    bigquery.SchemaField("precipitation_potential_pct", "FLOAT", mode="NULLABLE"), 
    bigquery.SchemaField("relative_humidity_pct", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("rain", "STRING", mode="NULLABLE"), 
    bigquery.SchemaField("thunder", "STRING", mode="NULLABLE"), 
    bigquery.SchemaField("snow", "STRING", mode="NULLABLE"), 
    bigquery.SchemaField("freezing_rain", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("sleet", "STRING", mode="NULLABLE"), 
    bigquery.SchemaField("fog", "STRING", mode="NULLABLE"), 
    bigquery.SchemaField("last_update_nws", "DATETIME", mode="NULLABLE")
  ] 


key_path = gcp_config["credentials"]
credentials = service_account.Credentials.from_service_account_file(
   key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],

)

## ---------- LOGGING ---------- ## 
# Cloud logging client
logger_client = cloud_logging.Client(credentials=credentials, project=credentials.project_id)

# Cloud logging handler
handler = logger_client.get_default_handler()

# Create logger with cloud handler
logger = logging.getLogger(__name__)
logger.addHandler(handler)

# Set logging levels 
logger.setLevel(logging.INFO)
handler.setLevel(logging.INFO)

# Format logger 
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# Confirm logger is working  
logger.info(f"Running daily scrape of NWS Weather Forecasts in Alaska")

## ---------- CLOUD STORAGE ---------- ## 
storage_client = storage.Client(credentials=credentials, project=credentials.project_id)
bucket = storage_client.bucket(f"{PROJECT_ID}-bucket")

# Locations 
blob = bucket.blob("locations.csv")
content = blob.download_as_bytes()
locations_df = pd.read_csv(BytesIO(content))

## ---------- BIGQUERY ---------- ## 
bq_client = bigquery.Client(credentials=credentials, project=credentials.project_id)



In [5]:
def get_forecast_dict() -> pd.DataFrame:
  """Get dataframe of forecast data for next 6 days from various points in Alaska"""

  nws_urls = locations_df.apply(utils.get_nws_url, axis=1)
  url_map = dict(zip(locations_df['station_location'], nws_urls))

  combined_table = []
  for location, url in url_map.items():
    soup_list = [utils.get_soup(url + f"&AheadHour={hr}") for hr in (0,48,96)]
    table_list = utils.flatten([utils.extract_table_data(soup, location) for soup in soup_list])
    combined_table.extend(table_list)

  forecast_dict = utils.transpose_as_dict(combined_table)
  
  return forecast_dict

fore_cast_dict = get_forecast_dict()

In [15]:

def transform_df(forecast_dict:dict) -> pd.DataFrame: 
  """Cast dictionary from transpose_as_dict() to a dataframe and transform"""
  ## Create dataframe
  df = pd.DataFrame(forecast_dict)
  
  ## Edit column headers 
  df.columns = [col.lower() for col in df.columns] 
  df.rename(columns=lambda x: re.sub('°|\(|\)', '', x), inplace=True)
  df.rename(columns=lambda x: re.sub('%', 'pct', x), inplace=True)
  df.rename(columns=lambda x: re.sub(' ', '_', x.strip()), inplace=True)
  
  ## Replace missing values
  # Replace missing values in gust with zero -- gust is never *actually* 0 so no masking
  # Replace missing values in windchill with an explicity np.NaN
  df.replace({'gust':{'':0}, 'wind_chill_f':{'':np.nan}}, inplace=True)

  ## Datetime Transformations
  cur_year = dt.datetime.now().year
  dt_strings = df['date'] + '/' + str(cur_year) + ' ' + df['hour_akst'] + ':00 AKST'
  # Local time (AKST)
  df['lst_datetime'] = pd.to_datetime(dt_strings, format='%m/%d/%Y %H:%M AKST')
  # UTC time
  akst_offset = dt.timedelta(hours=9)
  df['utc_datetime'] = df['lst_datetime'] + akst_offset

  ## Drop duplicates in composite key columns 
  duplicates = df.duplicated(subset=["location", "lst_datetime"], keep=False)
  duplicate_rows = df[duplicates]
  if not duplicate_rows.empty:
    logger.warning(f"Warning: {len(duplicate_rows)} rows have duplicate values in location and lst_datetime")
    logger.info("Dropping")
    df.drop_duplicates(subset=['location', 'lst_datetime'], inplace=True, ignore_index=True)

  ## Reorder columns 
  col_names = ['location', 'utc_datetime', 'lst_datetime'] + list(df.columns[4:-2]) + ["last_update_nws"]
  df = df[col_names]

  return df 

transformed_df = transform_df(fore_cast_dict)

In [27]:

def load_staging_table(df:pd.DataFrame) -> None:
  """Upload dataframe from transform_df() to BigQuery staging table"""

  jc = bigquery.LoadJobConfig(
    source_format = bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    autodetect=False,
    schema=SCHEMA,
    create_disposition="CREATE_IF_NEEDED",
    write_disposition="WRITE_TRUNCATE"   
  )
 
  # Set target table in BigQuery
  full_table_id = f"{PROJECT_ID}.{DATASET_ID}.{STAGING_TABLE_ID}"

  # Upload to BigQuery
  ## If any required columns are missing values, include name of column in error message
  try: 
    job = bq_client.load_table_from_dataframe(df, full_table_id, job_config=jc)
    job.result()
  except Exception as e:
    error_message = str(e)
    if 'Required column value for column index' in error_message:
      start_index = error_message.index('Required column value for column index') + len('Required column value for column index: ')
      end_index = error_message.index(' is missing', start_index)
      missing_column_index = int(error_message[start_index:end_index])
      missing_column_name = list(df.columns)[missing_column_index]
      error_message = error_message[:start_index] + f'{missing_column_name} ({missing_column_index})' + error_message[end_index:]
    raise Exception(error_message) 

  # Log result 
  table_ref = bq_client.get_table(full_table_id)
  logger.info(f"Loaded {table_ref.num_rows} rows and {table_ref.schema} columns")

load_staging_table(transformed_df)

In [29]:
def insert_table() -> None: 
  """Insert staging table into the main data table -- creates the table if it doesn't exist yet"""
  
  insert_query=f"""
    INSERT INTO {DATASET_ID}.{MAIN_TABLE_ID} 
    SELECT *, CURRENT_TIMESTAMP() as date_added
    FROM {DATASET_ID}.{STAGING_TABLE_ID}
    """
  
  full_table_id = f"{PROJECT_ID}.{DATASET_ID}.{MAIN_TABLE_ID}"

  try: 
    query_job = bq_client.query(insert_query) 
    query_job.result()
  except NotFound:
    logger.info(f"Table {DATASET_ID}.{MAIN_TABLE_ID} does not exist. Creating.")

    # Adding date_added to SCHEMA 
    schema = SCHEMA + [bigquery.SchemaField("date_added", "TIMESTAMP", mode="REQUIRED")]

    table = bigquery.Table(full_table_id, schema=schema)
    table = bq_client.create_table(table)

    query_job = bq_client.query(insert_query)
    query_job.result()
    
  table = bq_client.get_table(full_table_id)
  logger.info(f"Loaded {table.num_rows} rows and {len(table.schema)} columns into {full_table_id}\n")

insert_table()

Next we copy this script into `main.py` 