# ML By Example :: Time Series Data 

In this tutorial we will perform following tasks:  
- [ ] Download Stock Data to [GCS](https://cloud.google.com/storage/docs)
- [ ] Import the stock data into [BigQuery](https://cloud.google.com/bigquery/docs)
- [ ] Load the stock data from files in GCS into a data frame

# References
1. https://github.com/GoogleCloudPlatform/training-data-analyst

In [21]:
import tensorflow as tf
import google.cloud.bigquery as bq
import google.cloud.storage as gcs
import yfinance as yf
from datetime import date
import os
import numpy as np
from googleapiclient.errors import HttpError
from google.cloud import exceptions
import glob
import shutil
import pandas as pd

In [22]:
BUCKET = 'inspired-campus-278503'
PROJECT = 'inspired-campus-278503'
REGION = 'us-central1'
DATASET_NAME = "kl_ml_by_example"
DATASET_LOCATION = REGION
DATASET_TABLE = "stockdata"

LOCAL_TEMP_DIR = "./tmp"

In [23]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION
os.environ['DATASET_LOCATION'] = DATASET_LOCATION
os.environ['DATASET_NAME'] = DATASET_NAME
os.environ['DATASET_TABLE'] = DATASET_TABLE

os.environ['LOCAL_TEMP_DIR'] = LOCAL_TEMP_DIR

### Create the storage bucket if it doesn't exist

In [24]:
def get_gcs_client():
    return gcs.Client()

def get_bq_client():
    return bq.Client()

In [25]:
def create_storage_bucket(bucket_name=None):
    if bucket_name is None:
        bucket_name = BUCKET
    gcs_client = get_gcs_client()
    gcs_bucket = None
    try:
        # Get the bucket
        gcs_bucket = gcs_client.get_bucket(bucket_name)
    except exceptions.NotFound as not_found_err:
        gcs_client.create_bucket(bucket_name, location=DATASET_LOCATION)
        print("Created bucket: {bucket}!".format(bucket=bucket_name))

In [26]:
def create_bq_dataset(ds_name=None):
    if ds_name is None:
        ds_name = DATASET_NAME
    bq_client = get_bq_client()
    bq_client.create_dataset(ds_name, exists_ok=True)

In [27]:
def upload_files_to_storage(fully_qualified_file_pattern, bucket=None):
    file_names = glob.glob(fully_qualified_file_pattern)
    if bucket is None:
        bucket = BUCKET
    gcs_client = get_gcs_client()
    gcs_bucket = gcs_client.bucket(bucket)
    for file_name in file_names:
        fname = file_name.split("/")[-1]
        blob = gcs_bucket.blob(fname)
        blob.upload_from_filename(file_name)
        print("Copying '{fname}' to '{bucket}'".format(fname=fname, bucket=bucket))

In [28]:
def get_stock_historical_data(stocks=["MSFT"], start_date="2000-01-01", end_date=None):
    if end_date is None:
        end_date = date.today().strftime("%Y-%m-%d")
    tickers = ""
    for ticker in stocks:
        tickers = tickers + " " + ticker
    data = yf.download(
        tickers=tickers,
        start=start_date,
        end=end_date,
        group_by="ticker"
    )
    return data

In [29]:
stock_list = ["MSFT", "GOOG", "FB"]
data = get_stock_historical_data(stock_list)
#data = get_stock_historical_data()

[*********************100%***********************]  3 of 3 completed


In [30]:
def offset_and_copy(column, start_from_row=0):
    new_col = np.array(column[start_from_row:-1])
    for i in range(start_from_row+1):
        new_col = np.append(new_col,[np.NaN])
    return new_col

In [31]:
# Pre-process DataFrame
# 1. Creates a deep copy of the input data frame
# 2. Cleanses the data frame
# 3. Returns the new data frame
def pre_process(df):
    data_frame = df.copy(deep=True)
    
    #print(data_frame)
    #print("Finite? ", np.isfinite(data_frame["Volume"]))
    data_frame["Volume"] = data_frame["Volume"].astype(int)
    for count in range(1, 5):
        data_frame["Next_High_" + str(count)] = offset_and_copy(np.array(data_frame["High"]), count)
        data_frame["Next_Low_" + str(count)] = offset_and_copy(np.array(data_frame["Low"]), count)
    data_frame = data_frame.dropna()
    return data_frame

In [32]:
%%bash
# Create a tmp directory if one doesn't exist
mkdir -p ${LOCAL_TEMP_DIR}

In [33]:
for stock in stock_list:
    df = pre_process(data[stock].dropna())
    #print(df)
    dates = np.array(np.array(np.array(df.index.to_numpy(), dtype="str"), dtype="S10"), dtype="str")
    fname = "{local_temp_dir}/{stock}-{start_date}-TO-{end_date}.csv".format(local_temp_dir=LOCAL_TEMP_DIR,stock=stock, start_date=dates[0], end_date=dates[-1])
    df["Stock_Ticker"]=stock
    df.to_csv(fname, index=True)
    print("Stock {stock} has {records} records.".format(stock=stock, records=len(df)))

Stock MSFT has 5136 records.
Stock GOOG has 3974 records.
Stock FB has 2022 records.


In [34]:
create_storage_bucket()
create_bq_dataset()
upload_files_to_storage(LOCAL_TEMP_DIR + "/*.csv")

Created bucket: inspired-campus-278503!
Copying 'FB-2012-05-18-TO-2020-06-02.csv' to 'inspired-campus-278503'
Copying 'GOOG-2004-08-19-TO-2020-06-02.csv' to 'inspired-campus-278503'
Copying 'MSFT-2000-01-03-TO-2020-06-02.csv' to 'inspired-campus-278503'


In [35]:
def load_data_into_bq(gcs_uris=None):
    files = list()
    if gcs_uris is None:
        blobs = get_gcs_client().list_blobs(BUCKET)
        for blob in blobs:
            print("blob: ", blob.name)
            files.append("gs://{bucket}/{blob}".format(bucket=BUCKET, blob=blob.name))
    else:
        files = gcs_uris
    table_name = "{project}.{dataset}.{table}".format(
        project = PROJECT, 
        dataset = DATASET_NAME, 
        table = DATASET_TABLE
        )
    bq_client = get_bq_client()
    job_config = bq.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bq.SourceFormat.CSV
    print(files)
    load_job = bq_client.load_table_from_uri(files, bq_client.dataset(DATASET_NAME).table(DATASET_TABLE), job_config=job_config)
    print("Starting job {}".format(load_job.job_id))
    try:
        load_job.result()  # Waits for table load to complete.
        print("Job finished.")
    except exceptions.BadRequest as err:
        print(load_job.errors)

In [36]:
load_data_into_bq()

blob:  FB-2012-05-18-TO-2020-06-02.csv
blob:  GOOG-2004-08-19-TO-2020-06-02.csv
blob:  MSFT-2000-01-03-TO-2020-06-02.csv
['gs://inspired-campus-278503/FB-2012-05-18-TO-2020-06-02.csv', 'gs://inspired-campus-278503/GOOG-2004-08-19-TO-2020-06-02.csv', 'gs://inspired-campus-278503/MSFT-2000-01-03-TO-2020-06-02.csv']
Starting job dc46c2d2-9a81-4c91-b59c-be3d04baaab0
Job finished.


In [37]:
def cleanup():
    # Delete the local temp directory and its contents
    try:
        shutil.rmtree(LOCAL_TEMP_DIR)
    except OSError as e:
        print("[{path}] doesn't exist. Ignoring FileNotFoundError!".format(path=LOCAL_TEMP_DIR))
    # Delete the contents on GCS bucket
    try:
        get_gcs_client().bucket(BUCKET).delete(force=True)
        print("Bucket {bucket} deleted".format(bucket=BUCKET))
    except exceptions.NotFound as e:
        # pass
        print("Bucket {bucket} doesn't exist".format(bucket=BUCKET))
    # Remove the BQ Table
    table_id = '{project}.{dataset}.{table}'.format(project=PROJECT, dataset=DATASET_NAME, table=DATASET_TABLE)
    bq.Client().delete_table(table_id, not_found_ok=True)

In [38]:
def cleanup_ml_models(ml_models=None):
    bq = get_bq_client()
    if ml_models is None:
        ml_models = bq.list_models("{project}.{dataset}".format(project=PROJECT, dataset=DATASET_NAME))
    for ml_model in ml_models:
        bq.delete_model(ml_model)

In [20]:
#cleanup()
#cleanup_ml_models()

In [39]:
def create_sql_to_split_data(stock, dataset_split_percentage=75, random_sample_percentage=30):
    COUNT_SQL = """
    SELECT 
        COUNT(*) AS REC_COUNT
    FROM 
        `{project}.{dataset}.{table}`
    WHERE
        Stock_Ticker='{stock}'
    """.format(project=PROJECT, dataset=DATASET_NAME, table=DATASET_TABLE, stock=stock)
    bq = get_bq_client();
    df_record_count = bq.query(COUNT_SQL).to_dataframe()
    record_count = int(df_record_count.iat[0,0])
    split_count = int(record_count*dataset_split_percentage/100)
    random_sample_count = float(random_sample_percentage/100)
    print("Total number of records for {stock} is: {record_count}".format(stock=stock, record_count=record_count))
    print("Number of training records is: {split_count}".format(split_count=split_count))
    SQL = """
    #standardSQL
    SELECT 
        Open,
        Close,
        Low,
        HASH_FUNC,
        High,
        Next_High_1,
        Next_Low_1,
        Next_High_2,
        Next_Low_2,
        Next_High_3,
        Next_Low_3,
        Next_High_4,
        Next_Low_4
    FROM (
        SELECT
            #COUNT(*) AS Num,
            Open,
            Close,
            High,
            Low,
            Next_High_1,
            Next_Low_1,
            Next_High_2,
            Next_Low_2,
            Next_High_3,
            Next_Low_3,
            Next_High_4,
            Next_Low_4,
            FARM_FINGERPRINT(CONCAT(
                CAST(Date As STRING),
                CAST(Stock_Ticker As STRING)
            )) AS HASH_FUNC
        FROM
            `{project}.{dataset}.{table}`
        WHERE
            Stock_Ticker='{stock}'
    )
    WHERE 
        (ABS(MOD(HASH_FUNC, 10)) {{less_or_greater_than_operator}} ({dataset_split_percentage}/10)) #split the dataset to the requested percentage
        AND 
        (RAND() < ({random_sample_count}))
    """.format(project=PROJECT, dataset=DATASET_NAME, table=DATASET_TABLE, stock=stock,dataset_split_percentage=dataset_split_percentage,random_sample_count=random_sample_count)
    
    train_df = bq.query(SQL.format(less_or_greater_than_operator="<")).to_dataframe()
    eval_df = bq.query(SQL.format(less_or_greater_than_operator=">")).to_dataframe()
    return SQL, train_df, eval_df

In [40]:
def create_ml_model(sql, stock, less_or_greater_than_operator="<"):
    sql = sql.format(less_or_greater_than_operator=less_or_greater_than_operator)
    SQL="""
    #standardSQL
    CREATE MODEL `{project}.{dataset}.{model}`
    OPTIONS
    (model_type='linear_reg',
     input_label_cols=[
       'Next_High_1']
       # Multiple lables not supported in BQ
       #'Next_High_1',
       # 'Next_Low_1',
       # 'Next_High_2',
       # 'Next_Low_2',
       # 'Next_High_3',
       # 'Next_Low_3',
       # 'Next_High_4',
       # 'Next_Low_4']
    ) AS
    {sql}
    """.format(project=PROJECT, dataset=DATASET_NAME, model=stock+"_ml_model",sql=sql)
    print(SQL)
    create_model_job = get_bq_client().query(SQL)
    print("Starting create ml model job: {}".format(create_model_job.job_id))
    try:
        create_model_job.result()  # Waits for table load to complete.
        print("Finished creating ML model.")
    except exceptions.BadRequest as err:
        print("Error", create_model_job.errors)

In [41]:
stock = "MSFT"
sql, train_df, eval_df = create_sql_to_split_data(stock)
print("Training record#: ", len(train_df))
print("Evaluation record#: ", len(eval_df))
#print("SQL: ", sql)
create_ml_model(sql, stock)
#print (sql)

Total number of records for MSFT is: 5136
Number of training records is: 3852
Training record#:  1252
Evaluation record#:  331

    #standardSQL
    CREATE MODEL `inspired-campus-278503.kl_ml_by_example.MSFT_ml_model`
    OPTIONS
    (model_type='linear_reg',
     input_label_cols=[
       'Next_High_1',
        'Next_Low_1',
        'Next_High_2',
        'Next_Low_2',
        'Next_High_3',
        'Next_Low_3',
        'Next_High_4',
        'Next_Low_4']
    ) AS
    
    #standardSQL
    SELECT 
        Open,
        Close,
        Low,
        HASH_FUNC,
        High,
        Next_High_1,
        Next_Low_1,
        Next_High_2,
        Next_Low_2,
        Next_High_3,
        Next_Low_3,
        Next_High_4,
        Next_Low_4
    FROM (
        SELECT
            #COUNT(*) AS Num,
            Open,
            Close,
            High,
            Low,
            Next_High_1,
            Next_Low_1,
            Next_High_2,
            Next_Low_2,
            Next_High_3,
     