In [10]:
import os
import sys
import pandas as pd
import logging
from google.cloud import bigquery
from hashlib import md5
from typing import List
from google.cloud import bigquery
from google.oauth2 import service_account

key_path = "/root/.creds/kphil-sa.json"

credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(credentials=credentials, project=credentials.project_id,)


# **** SETUP ****

DATA_DIR = "../data/"
DEFAULT_BOSTON_CRIME_FILE = os.path.join(DATA_DIR, "boston_crime_data.csv")
DEFAULT_PORTLAND_CRIME_FILE = os.path.join(DATA_DIR, "portland_crime_data.csv")
PROJECT_NAME = "team-week2"
BOSTON_DATASET_NAME = "boston"
PORTLAND_DATASET_NAME = "portland"


# **** TABLE SCHEMAS ****

TABLE_METADATA = {
    'boston_crime': {
        'table_name': 'boston_crime',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('incident_number', 'string', mode='REQUIRED'),
            bigquery.SchemaField('offense_code', 'int64', mode='REQUIRED'),
            bigquery.SchemaField('offense_code_group', 'string', mode='REQUIRED'),
            bigquery.SchemaField('offense_description', 'string', mode='NULLABLE'),
            bigquery.SchemaField('district', 'string', mode='REQUIRED'),
            bigquery.SchemaField('reporting_area', 'int64',mode='NULLABLE'),
            bigquery.SchemaField('shooting', 'string',mode='NULLABLE'),
            bigquery.SchemaField('occured_on_date','datetime', mode='REQUIRED'),
            bigquery.SchemaField('year', 'int64', mode='REQUIRED'),
            bigquery.SchemaField('month', 'int64', mode='REQUIRED'),
            bigquery.SchemaField('day_of_week', 'string', mode='REQUIRED'),
            bigquery.SchemaField('hour', 'int64', mode='REQUIRED'),
            bigquery.SchemaField('ucr_part','string', mode='NULLABLE'),
            bigquery.SchemaField('street', 'string', mode='REQUIRED'),
            bigquery.SchemaField('lat','float64', mode='REQUIRED'),
            bigquery.SchemaField('long', 'float64', mode='REQUIRED'),
            bigquery.SchemaField('location', 'object', mode='REQUIRED')
        ]
    },
    'portland_crime': {
        'table_name': 'portland_crime',
        'schema': [
            # indexes are written if only named in the schema
            # bigquery.SchemaField('product_id', 'int64', mode='REQUIRED'),
            # bigquery.SchemaField('product_name', 'string', mode='REQUIRED'),
            # bigquery.SchemaField('category', 'string', mode='NULLABLE'),
            # bigquery.SchemaField('unit', 'string', mode='NULLABLE'),
            # bigquery.SchemaField('unit_price', 'float', mode='REQUIRED')
        ]
    }
}


# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level


# **** BIGQUERY CLIENT ****
logger.debug(f"Creating bigquery client")
# client = bigquery.Client()

logger.info(f"Setup Completed")

[DEBUG][2023-01-23 03:46:12,310][2130767228:0082] : Creating bigquery client
[INFO ][2023-01-23 03:46:12,311][2130767228:0085] : Setup Completed


In [7]:
# Boston crime data file name
filename = DEFAULT_BOSTON_CRIME_FILE
logger.debug(f"attempting to process: {filename}")

# Check if file exists
assert os.path.exists(filename), f"Data file does not exists: '{filename}'"
# check if the file contains any data. Header row alone is 78 bytes. size > 78
assert os.path.getsize(filename) > 78, f"Data file size incorrect; does not seem to contain data: '{filename}'"

# Load boston crime file into dataframe
df = pd.read_csv(filename, header=0, encoding='windows-1252')
logger.info(f"loaded {len(df.index)} rows from: {filename}")

df.columns = df.columns.str.lower()

# *** perform check at the end ***
# check schema: contains all expected columns?
expected_columns = ['incident_number', 'offense_code', 'offense_code_group', 'offense_description', 'district', 'reporting_area', 'shooting', 'occurred_on_date', 'year', 'month', 'day_of_week', 'hour', 'ucr_part', 'street', 'lat', 'long', 'location']
for col in expected_columns:
    assert col in list(df.columns), f"Data file missing required column: {col}"

# assign & remember boston crime dataframe
boston_crime_df = df
boston_crime_df.head()

[DEBUG][2023-01-23 03:42:58,952][860230781:0003] : attempting to process: ../data/boston_crime_data.csv
[INFO ][2023-01-23 03:43:00,230][860230781:0012] : loaded 319073 rows from: ../data/boston_crime_data.csv


Unnamed: 0,incident_number,offense_code,offense_code_group,offense_description,district,reporting_area,shooting,occurred_on_date,year,month,day_of_week,hour,ucr_part,street,lat,long,location
0,I182070945,619,Larceny,LARCENY ALL OTHERS,D14,808,,2018-09-02 13:00:00,2018,9,Sunday,13,Part One,LINCOLN ST,42.357791,-71.139371,"(42.35779134, -71.13937053)"
1,I182070943,1402,Vandalism,VANDALISM,C11,347,,2018-08-21 00:00:00,2018,8,Tuesday,0,Part Two,HECLA ST,42.306821,-71.0603,"(42.30682138, -71.06030035)"
2,I182070941,3410,Towed,TOWED MOTOR VEHICLE,D4,151,,2018-09-03 19:27:00,2018,9,Monday,19,Part Three,CAZENOVE ST,42.346589,-71.072429,"(42.34658879, -71.07242943)"
3,I182070940,3114,Investigate Property,INVESTIGATE PROPERTY,D4,272,,2018-09-03 21:16:00,2018,9,Monday,21,Part Three,NEWCOMB ST,42.334182,-71.078664,"(42.33418175, -71.07866441)"
4,I182070938,3114,Investigate Property,INVESTIGATE PROPERTY,B3,421,,2018-09-03 21:05:00,2018,9,Monday,21,Part Three,DELHI ST,42.275365,-71.090361,"(42.27536542, -71.09036101)"


In [8]:
# Create dataset
dataset_id = f"{PROJECT_NAME}.{BOSTON_DATASET_NAME}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

logger.info(f"Created Boston dataset: {dataset.full_dataset_id}")

[INFO ][2023-01-23 03:44:27,923][3283432699:0007] : Created Boston dataset: team-week2:boston


In [9]:
df.dtypes

incident_number         object
offense_code             int64
offense_code_group      object
offense_description     object
district                object
reporting_area          object
shooting                object
occurred_on_date        object
year                     int64
month                    int64
day_of_week             object
hour                     int64
ucr_part                object
street                  object
lat                    float64
long                   float64
location                object
dtype: object

In [31]:
# Converting datatypes of columns
df[['incident_number', 'offense_code_group', 'offense_description', 'district', 'shooting', 'ucr_part', 'street', 'day_of_week']] = df[['incident_number', 'offense_code_group', 'offense_description', 'district', 'shooting', 'ucr_part', 'street', 'day_of_week']].astype('string')

df['occurred_on_date'] = df['occurred_on_date'].astype('datetime64[ns]')

df.dtypes

incident_number                string
offense_code                    int64
offense_code_group             string
offense_description            string
district                       string
reporting_area                 object
shooting                       string
occurred_on_date       datetime64[ns]
year                            int64
month                           int64
day_of_week                    string
hour                            int64
ucr_part                       string
street                         string
lat                           float64
long                          float64
location                       object
dtype: object

In [5]:
df = boston_crime_df
table_name = f"{PROJECT_NAME}.{BOSTON_DATASET_NAME}.{TABLE_METADATA['boston_crime']['table_name']}"

job_config = bigquery.LoadJobConfig(
    create_disposition = 'CREATE_IF_NEEDED', 
    write_disposition = 'WRITE_TRUNCATE',
    
    )
logger.info(f"loading table: '{table_name}'")
job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
job.result()        # wait for the job to finish
# get the resulting table
table = client.get_table(table_name)
logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

[INFO ][2023-01-19 09:34:02,112][1673120583:0009] : loading table: 'team-week2.san_francisco.sf_crime'
[INFO ][2023-01-19 09:35:56,693][1673120583:0014] : loaded 680655 rows into team-week2:san_francisco.sf_crime


In [6]:
df.dtypes

incident_datetime                                        object
incident_date                                            object
incident_time                                            object
incident_year                                             int64
incident_day_of_week                                     object
report_datetime                                          object
row_id                                                    int64
incident_id                                               int64
incident_number                                           int64
cad_number                                              float64
report_type_code                                         object
report_type_description                                  object
filed_online                                             object
incident_code                                             int64
incident_category                                        object
incident_subcategory                    