In [3]:
import os
import sys
import logging
from google.cloud import bigquery
from typing import List
from google.cloud import bigquery
from google.oauth2 import service_account
import pyarrow
import pandas as pd

In [4]:
#transform historical data set to better fit the scraped data
historical_df = pd.read_csv('./data/3 cities weather.csv')
historical_df = historical_df.rename({'feelslikemin': 'windchill', 'dew': 'dewpoint'}, axis=1)
historical_df['datetime'] = pd.to_datetime(historical_df['datetime'])
historical_df = historical_df.drop(['icon','sunrise','sunset','precipcover','solarenergy','solarradiation'], axis=1)
display(historical_df.head(3))

Unnamed: 0,name,datetime,tempmax,tempmin,temp,feelslikemax,windchill,feelslike,dewpoint,humidity,...,winddir,sealevelpressure,cloudcover,visibility,uvindex,severerisk,moonphase,conditions,description,stations
0,New York City,2013-02-14,46.2,33.7,38.7,43.1,26.0,35.0,27.3,65.6,...,293.3,1012.3,34.0,9.0,6,,0.13,"Snow, Rain, Partially cloudy",Partly cloudy throughout the day with early mo...,"72505394728,KEWR,KLGA,72502014734,KNYC,7250301..."
1,New York City,2013-02-15,53.3,36.9,43.7,53.3,30.8,40.5,31.1,62.0,...,220.8,1014.7,36.4,8.3,6,,0.17,Partially cloudy,Partly cloudy throughout the day.,"72505394728,KEWR,KLGA,72502014734,KNYC,7250301..."
2,New York City,2013-02-16,41.6,32.0,35.9,33.9,22.3,29.0,25.2,66.2,...,2.3,1012.3,98.4,9.6,3,,0.2,"Rain, Overcast",Cloudy skies throughout the day with early mor...,"72505394728,KEWR,KLGA,72502014734,KNYC,7250301..."


In [5]:
#authorization
key_path = "/home/reed/.creds/weather_team_week.json"
credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"])

client = bigquery.Client(credentials=credentials, project=credentials.project_id,)

In [6]:
# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level

In [7]:
# change to match your filesystem
PROJECT_NAME = credentials.project_id
DATASET_NAME = "reed_weather_data"

# **** BIGQUERY DATASET CREATION ****

dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

logger.info(f"Created emissions dataset: {dataset.full_dataset_id}")

FACTS_TABLE_METADATA = {
    'weather_data': {
        'table_name': 'weather_data',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('name', 'STRING', mode='REQUIRED'),
            bigquery.SchemaField('datetime', 'DATETIME', mode='NULLABLE'),
            bigquery.SchemaField('tempmax', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('tempmin', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('temp', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('feelslikemax', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('windchill', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('feelslike', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('dewpoint', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('humidity', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('precip', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('precipprob', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('preciptype', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('snow', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('snowdepth', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('windgust', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('windspeed', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('winddir', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('sealevelpressure', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('cloudcover', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('visibility', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('uvindex', 'INT64', mode='NULLABLE'),
            bigquery.SchemaField('severerisk', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('moonphase', 'FLOAT64', mode='NULLABLE'),
            bigquery.SchemaField('conditions', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('description', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('stations', 'STRING', mode='NULLABLE'),
        ]
    }      
}

[INFO ][2023-02-15 11:14:18,436][1575352606:0012] : Created emissions dataset: team-week3:reed_weather_data


In [8]:
def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: List[bigquery.SchemaField], 
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    # *** run some checks ***
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    #  create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

In [9]:
# Load weather_data dimension table

# get table name and schema from FACTS_TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{FACTS_TABLE_METADATA['weather_data']['table_name']}"
schema = FACTS_TABLE_METADATA['weather_data']['schema']
# load dataframe
load_table(historical_df, client, table_name, schema)

logger.info(f"loaded weather_data")

[INFO ][2023-02-15 11:14:18,549][1947479415:0029] : loading table: 'team-week3.reed_weather_data.weather_data'
[INFO ][2023-02-15 11:14:23,355][1947479415:0034] : loaded 11001 rows into team-week3:reed_weather_data.weather_data
[INFO ][2023-02-15 11:14:23,356][4025926169:0009] : loaded weather_data
