In [98]:
import os
import sys
import pandas as pd
import logging
from typing import List
from google.cloud import bigquery
from google.cloud import bigquery
from google.oauth2 import service_account

key_path = "/home/philiprobertovich/.creds/team-week-3.json"

credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

client = bigquery.Client(credentials=credentials, project=credentials.project_id)


# **** SETUP ****

DATA_DIR = "./data/"
DEFAULT_TORNADOES_FILE = os.path.join(DATA_DIR, "tornadoes.csv")
PROJECT_NAME = "team-week-3"
TORNADOES_DATASET_NAME = "tornadoes"



# **** TABLE SCHEMAS ****

TABLE_METADATA = {
    'tornadoes': {
        'table_name': 'tornadoes',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('date', 'datetime', mode='NULLABLE'),
            bigquery.SchemaField('tornado_number', 'int64', mode='REQUIRED'),
            bigquery.SchemaField('year', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('month', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('day', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('timezone', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('state', 'string', mode='NULLABLE'),
            bigquery.SchemaField('magnitude', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('injuries', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('fatalities', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('property_loss', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('crop_loss', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('starting_lat', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('starting_lon', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('end_lat', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('end_lon', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('length', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('width', 'int64', mode='NULLABLE'),
        ]
    }
}


# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')     
logger.setLevel(logging.DEBUG)                         

logger.debug(f"Creating bigquery client")

logger.info(f"Setup Completed")

[DEBUG][2023-02-15 15:25:29,221][1528603796:0069] : Creating bigquery client
[INFO ][2023-02-15 15:25:29,239][1528603796:0071] : Setup Completed


In [99]:
# create dataset 
dataset_id = f"{PROJECT_NAME}.{TORNADOES_DATASET_NAME}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

logger.info(f"Created Tornadoes dataset: {dataset.full_dataset_id}")

[INFO ][2023-02-15 15:25:32,270][36941708:0007] : Created Tornadoes dataset: team-week-3:tornadoes


In [100]:
# tornadoes data file name
filename = DEFAULT_TORNADOES_FILE

# check if the file exists
assert os.path.exists(filename), f"Data file does not exists: '{filename}'"
# check if the file contains any data. Header row alone is 78 bytes. size > 78
assert os.path.getsize(filename) > 78, f"Data file size incorrect; does not seem to contain data: '{filename}'"

# load into dataframe
df = pd.read_csv(
    filename, 
    header=0, 
    parse_dates=[['date', 'time']],
    on_bad_lines='warn'
    )
logger.info(f"loaded {len(df.index)} rows from {filename}")


# check schema: contains all expected columns?
expected_columns = [
    'om',
    'yr',
    'mo',
    'dy',
    'date_time',
    'tz',
    'st',
    'stf',
    'stn',
    'mag',
    'inj',
    'fat',
    'loss',
    'closs',
    'slat',
    'slon',
    'elat',
    'elon',
    'len',
    'wid',
    'ns',
    'sn',
    'f1',
    'f2',
    'f3',
    'f4',
    'fc'
]
for col in expected_columns:
    assert col in list(df.columns), f"Data file missing required column: {col}"

# assign & remember tornadoes dataframe
# tornadoes_df = df
display(df.head(10))


[INFO ][2023-02-15 15:25:35,461][3726965278:0016] : loaded 68868 rows from ./data/tornadoes.csv


Unnamed: 0,date_time,om,yr,mo,dy,tz,st,stf,stn,mag,...,len,wid,ns,sn,sg,f1,f2,f3,f4,fc
0,1950-10-01 21:00:00,192,1950,10,1,3,OK,40,23,1,...,15.8,10,1,1,1,25,0,0,0,0
1,1950-10-09 02:15:00,193,1950,10,9,3,NC,37,9,3,...,2.0,880,1,1,1,47,0,0,0,0
2,1950-11-20 02:20:00,195,1950,11,20,3,KY,21,1,2,...,0.1,10,1,1,1,177,0,0,0,0
3,1950-11-20 04:00:00,196,1950,11,20,3,KY,21,2,1,...,0.1,10,1,1,1,209,0,0,0,0
4,1950-11-20 07:30:00,197,1950,11,20,3,MS,28,14,1,...,2.0,37,1,1,1,101,0,0,0,0
5,1950-11-04 17:00:00,194,1950,11,4,3,PA,42,5,3,...,15.9,100,1,1,1,71,11,0,0,0
6,1950-12-02 15:00:00,198,1950,12,2,3,IL,17,7,2,...,18.8,50,1,1,1,119,117,0,0,0
7,1950-12-02 16:00:00,199,1950,12,2,3,IL,17,8,3,...,18.0,200,1,1,1,119,5,0,0,0
8,1950-12-02 16:25:00,200,1950,12,2,3,AR,5,12,3,...,7.8,10,1,1,1,65,0,0,0,0
9,1950-12-02 17:30:00,201,1950,12,2,3,IL,17,9,1,...,9.6,50,1,1,1,157,0,0,0,0


In [101]:
# Drop columns
drop_cols = [
  'stf',
  'stn',
  'ns',
  'sn',
  'sg',
  'f1',
  'f2',
  'f3',
  'f4',
  'fc',
]

df = df.drop(columns=drop_cols)

logger.info('Dropped columns.')

df.head()

[INFO ][2023-02-15 15:25:36,061][2866547761:0017] : Dropped columns.


Unnamed: 0,date_time,om,yr,mo,dy,tz,st,mag,inj,fat,loss,closs,slat,slon,elat,elon,len,wid
0,1950-10-01 21:00:00,192,1950,10,1,3,OK,1,0,0,4.0,0.0,36.73,-102.52,36.88,-102.3,15.8,10
1,1950-10-09 02:15:00,193,1950,10,9,3,NC,3,3,0,5.0,0.0,34.17,-78.6,0.0,0.0,2.0,880
2,1950-11-20 02:20:00,195,1950,11,20,3,KY,2,0,0,5.0,0.0,37.37,-87.2,0.0,0.0,0.1,10
3,1950-11-20 04:00:00,196,1950,11,20,3,KY,1,0,0,5.0,0.0,38.2,-84.5,0.0,0.0,0.1,10
4,1950-11-20 07:30:00,197,1950,11,20,3,MS,1,3,0,4.0,0.0,32.42,-89.13,0.0,0.0,2.0,37


In [102]:
# Rename columns
renamed = {
  'date_time':'date',
  'om':'tornado_number',
  'yr':'year',
  'mo':'month',
  'dy':'day',
  'tz':'timezone',
  'st':'state',
  'mag':'magnitude',
  'inj':'injuries',
  'fat':'fatalities',
  'loss':'property_loss',
  'closs':'crop_loss',
  'slat':'starting_lat',
  'slon':'starting_lon',
  'elat':'end_lat',
  'elon':'end_lon',
  'len':'length',
  'wid':'width'
}

df = df.rename(columns=renamed)

logger.info('Renamed columns.')

df.head()

[INFO ][2023-02-15 15:25:36,489][2164801676:0025] : Renamed columns.


Unnamed: 0,date,tornado_number,year,month,day,timezone,state,magnitude,injuries,fatalities,property_loss,crop_loss,starting_lat,starting_lon,end_lat,end_lon,length,width
0,1950-10-01 21:00:00,192,1950,10,1,3,OK,1,0,0,4.0,0.0,36.73,-102.52,36.88,-102.3,15.8,10
1,1950-10-09 02:15:00,193,1950,10,9,3,NC,3,3,0,5.0,0.0,34.17,-78.6,0.0,0.0,2.0,880
2,1950-11-20 02:20:00,195,1950,11,20,3,KY,2,0,0,5.0,0.0,37.37,-87.2,0.0,0.0,0.1,10
3,1950-11-20 04:00:00,196,1950,11,20,3,KY,1,0,0,5.0,0.0,38.2,-84.5,0.0,0.0,0.1,10
4,1950-11-20 07:30:00,197,1950,11,20,3,MS,1,3,0,4.0,0.0,32.42,-89.13,0.0,0.0,2.0,37


In [103]:
df.dtypes

date              datetime64[ns]
tornado_number             int64
year                       int64
month                      int64
day                        int64
timezone                   int64
state                     object
magnitude                  int64
injuries                   int64
fatalities                 int64
property_loss            float64
crop_loss                float64
starting_lat             float64
starting_lon             float64
end_lat                  float64
end_lon                  float64
length                   float64
width                      int64
dtype: object

In [104]:
# Chnage data type
df['state'] = df['state'].astype('string')

df.dtypes

date              datetime64[ns]
tornado_number             int64
year                       int64
month                      int64
day                        int64
timezone                   int64
state                     string
magnitude                  int64
injuries                   int64
fatalities                 int64
property_loss            float64
crop_loss                float64
starting_lat             float64
starting_lon             float64
end_lat                  float64
end_lon                  float64
length                   float64
width                      int64
dtype: object

In [105]:
df.head()

Unnamed: 0,date,tornado_number,year,month,day,timezone,state,magnitude,injuries,fatalities,property_loss,crop_loss,starting_lat,starting_lon,end_lat,end_lon,length,width
0,1950-10-01 21:00:00,192,1950,10,1,3,OK,1,0,0,4.0,0.0,36.73,-102.52,36.88,-102.3,15.8,10
1,1950-10-09 02:15:00,193,1950,10,9,3,NC,3,3,0,5.0,0.0,34.17,-78.6,0.0,0.0,2.0,880
2,1950-11-20 02:20:00,195,1950,11,20,3,KY,2,0,0,5.0,0.0,37.37,-87.2,0.0,0.0,0.1,10
3,1950-11-20 04:00:00,196,1950,11,20,3,KY,1,0,0,5.0,0.0,38.2,-84.5,0.0,0.0,0.1,10
4,1950-11-20 07:30:00,197,1950,11,20,3,MS,1,3,0,4.0,0.0,32.42,-89.13,0.0,0.0,2.0,37


In [106]:
def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: List[bigquery.SchemaField], 
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    # create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

In [107]:
# get table name and schema from our TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{TORNADOES_DATASET_NAME}.{TABLE_METADATA['tornadoes']['table_name']}"
schema = TABLE_METADATA['tornadoes']['schema']
# load dataframe
load_table(df, client, table_name, schema)

logger.info(f"loaded tornadoes dataset")

[INFO ][2023-02-15 15:25:41,069][1724165452:0029] : loading table: 'team-week-3.tornadoes.tornadoes'
[INFO ][2023-02-15 15:25:50,159][1724165452:0034] : loaded 68868 rows into team-week-3:tornadoes.tornadoes
[INFO ][2023-02-15 15:25:50,179][663563515:0007] : loaded tornadoes dataset
