In [None]:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
    finspace_clusters = FinSpaceClusterManager()
    finspace_clusters.auto_connect()
else:
    print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

# Collect Timebars and Summarize
Time bars are obtained by sampling information at fixed time intervals, e.g., once every minute. 

**Series:** Time Series Data Engineering and Analysis

As part of the big data timeseries processing workflow Habanero supports, show how one takes raw, uneven in time events of TAQ data and collects them into a performant derived dataset of collected bars of data.


### Timeseries Workflow
Raw Events → **\[Collect bars → Summarize bars\]** → Fill Missing → Prepare → Analytics

This is the collect bars stage of time series data workflow, where the raw and randomly arranged event data is collected into even bars for future summary.

![Workflow](workflow.png)

In [None]:
# REPLACE WITH CORRECT IDS!
# US Equity TAQ Sample - AMZN 6 Months - Sample
source_dataset_id = ''
source_view_id    = ''

# Group: Analyst
basicPermissionGroupId = ''

In [None]:
# notebook imports
import time
import datetime as dt
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pprint 

# Habanero imports
from aws.finspace.timeseries.spark.util import string_to_timestamp_micros
from aws.finspace.timeseries.spark.windows import create_time_bars, compute_analytics_on_features, compute_features_on_time_bars
from aws.finspace.timeseries.spark.spec import BarInputSpec, TimeBarSpec
from aws.finspace.timeseries.spark.summarizer import *

# destination if adding to an existing dataset
dest_dataset_id   = None

start_date = "2019-10-01"
end_date   = "2019-12-31"

barNum  = 3
barUnit = "minute"

# 
d = time.strftime('%Y-%m-%d %-I:%M %p %Z')  # name is unique to date and time created

name = f"TAQ Timebar Summaries - DEMO ({barNum} {barUnit})"
description = f"TAQ data summarized into time bars of {barNum} {barUnit} containing STD, VWAP, OHLC and Total Volume. start: {start_date} end: {end_date}"

# Dataset Ownership

In [None]:
# permissions that will be given on the dataset
basicPermissions = [
    "ViewDatasetDetails" 
    ,"ReadDatasetData" 
    ,"AddDatasetData" 
    ,"CreateSnapshot" 
    ,"EditDatasetMetadata"
    ,"ManageDatasetPermissions"
    ,"DeleteDataset"
]

basicOwnerInfo = {
    "phoneNumber" : "12125551000",
    "email"       : "jdoe@amazon.com",
    "name"        : "John Doe"
}

## Python Helper Functions

In [None]:
# function to generate a series of dates from a given start/stop date
def daterange(startD, endD):
    for n in range(int ((endD - startD).days)+1):
        yield startD + dt.timedelta(n)

#
def businessDatesBetween(startD, endD):
    weekdays = [6, 7]

    holidays = [ dt.date(2019, 11, 28), 
             dt.date(2019, 12, 25), 
             dt.date(2020, 1, 1), 
             dt.date(2020, 1, 20), 
             dt.date(2020, 2, 17),
             dt.date(2020, 4, 10),
             dt.date(2020, 5, 25),
             dt.date(2020, 7, 3), 
             dt.date(2020, 9, 7),
             dt.date(2020, 11, 26) ]

    processDates = list()

    for aDate in daterange(startD, endD):
        if (aDate.isoweekday() not in weekdays) & (aDate not in holidays):                    
            processDates.append( aDate )
    
    return( processDates )

## Python Helper Classes

In [None]:
%load ../Utilities/finspace.py

In [None]:
%load ../Utilities/finspace_spark.py

In [None]:
# Initialize and Connect
finspace = SparkFinSpace( spark = spark )

# Get the Data from FinSpace
Using the given dataset and view ids, get the view as a Spark DataFrame

In [None]:
finspace.list_classifications()

In [None]:
finspace.describe_dataset_details(dataset_id = source_dataset_id)

In [None]:
finspace.list_views(dataset_id = source_dataset_id)

In [None]:
tDF = finspace.read_view_as_spark(dataset_id = source_dataset_id, view_id = source_view_id)
tDF.printSchema()

## Interact with the DataFrame
As a Spark DataFrame, you can interact with the data using Spark.

In [None]:
tDF.show(5)

# FUNCTIONS: Collect and Summarize
The functions below process the time series data by first collecting the data into time-bars then summarizing the data captured in the bar. The bars are collected into a column 'activity' for the window of time in the collectTimeBars function. The summarize bar function's purpose is to summarize the data collected in the bar, that bar can be of any type, not just time.

Customizations
- vary the width and steps of the time-bar, collect different data from the source DataFrame
- Summarize the bar with other calculations  

Bring Your Own  
- Customers can add their own custom Spark user defined functions (UDF) into the summarizer phase

![Workflow](workflow.png)


In [None]:
#-------------------------------------------------------------------
# Collects event data into Time-Bars
#
# barWidth: number and units and time, e.g. '1 minute'
#-------------------------------------------------------------------
def collectTimeBars( taqDF, barWidth ): 

    # define the time-bar, column for time and how much time to collect
    timebar_spec   = TimeBarSpec(timestamp_column='datetime', window_duration=barWidth, slide_duration=barWidth)
    
    # what from the source DataFrame to collect in the bar
    bar_input_spec = BarInputSpec('activity', 'datetime', 'timestamp', 'price', 'quantity', 'exchange', 'conditions' )

    # The results in a new DataFrame
    barDF = ( create_time_bars(data=taqDF, 
                             timebar_column='window', 
                             grouping_col_list=['date', 'ticker', 'eventtype'], 
                             input_spec=bar_input_spec, 
                             timebar_spec=timebar_spec)
        .withColumn('activity_count', F.size(F.col('activity'))) )

    return( barDF )

#-------------------------------------------------------------------
# Summarizes the data that was collected in the bar
#-------------------------------------------------------------------
def summarizeBars( barDF ):

# Bar data is in a column that is a list of structs named 'activity'
# values collected in 'activity': datetime, teimstamp, price, quantity, exchange, conditions
    
    sumDF = ( barDF
        .withColumn( 'std',    std( 'activity.price' ) )
        .withColumn( 'vwap',   vwap( 'activity.price', 'activity.quantity' ) )
        .withColumn( 'ohlc',   ohlc_func( 'activity.datetime', 'activity.price' ) ) 
        .withColumn( 'volume', total_volume( 'activity.quantity' ) )
#        .withColumn('MY_RESULT', MY_SPECIAL_FUNCTION( 'activity.datetime', 'activity.price', 'activity.quantity' ) )
        .drop( barDF.activity )
    )

    return( sumDF )


# Create the Spark DataFrame
Create a Spark dataframe that is the result of the data pipline to collect TAQ data into time bars and then summarizes each bar.

## Outline of Processing
- for each set of dates in the overall range....
- collect data into time bars
- summarize the data for each bar
- save as a changeset to the dataset
  - creates a new dataset if one does not exist yet
  - uses the habanero APIs to simpliffy dataset creation from a Spark DataFrame
- continue until all dates have been processed

In [None]:
# convert strings to dates
start_dt = dt.datetime.strptime(start_date, '%Y-%m-%d').date()
end_dt   = dt.datetime.strptime(end_date, '%Y-%m-%d').date()

# get the list of business dates between given dates
processDates = businessDatesBetween( start_dt, end_dt )

# grabs a set items from the list, allows us to iterate with a set of dates at a time
def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

chunk_size    = 3
barPartitions = None
sumPartitions = 4
partitionCol  = "date"

# necessary for time bar API
barWidth = f"{barNum} {barUnit}"

isFirst = True

for dates in chunker(processDates, chunk_size):
    print(f"Processing {len(dates)}: {dates}")

    # filter the data for the day
    dayDF = tDF.filter( tDF.date.isin(dates))

    # collect the data into time bars of the desired width
    dayDF = collectTimeBars( dayDF, barWidth )

    # summarize the bars, drop activity since its no longer needed
    dayDF = summarizeBars( dayDF ).drop('activity')

    # add indicators using summaries
    #dayDF = addIndicators( dayDF, numSteps = 10, shortStep = 12, longStep = 26)

    ## flatted the complex schema into a simple one, drop columns no longer needed
    finalDF = ( dayDF
        .withColumn("start", dayDF.window.start)
        .withColumn("end",   dayDF.window.end)

        .withColumn("open",  dayDF.ohlc.open)
        .withColumn("high",  dayDF.ohlc.high)
        .withColumn("low",   dayDF.ohlc.low)
        .withColumn("close", dayDF.ohlc.close)

        .drop("window")
        .drop("ohlc")
    )
    
    # create the changeset
    change_type = "APPEND"
    
    # is this the first pass and no dest_dateset_id given, create the dataset
    if (isFirst and dest_dataset_id is None): 
        
        # Get schema from the DataFrame
        schema =  {
            "columns": finspace.get_schema_from_spark(finalDF),
            "primaryKeyColumns": [ ]  
        }

        print("creating dataset")
        pprint.pprint(schema)

        # Create the dataset if it does not exist yet
        dest_dataset_id = finspace.create_dataset(
            name = name, 
            description = description, 
            permission_group_id = basicPermissionGroupId,
            dataset_permissions = basicPermissions,
            kind = "TABULAR",
            owner_info = basicOwnerInfo,
            schema = schema
        )

        # first changeset will be a replace
        change_type = "REPLACE"

        print( f"Created dest_dataset_id= {dest_dataset_id}")        

    print(f"Creating Changeset: {change_type}")
    changeset_id = finspace.ingest_dataframe(data_frame=finalDF, dataset_id = dest_dataset_id, change_type=change_type, wait_for_completion=True)
    
    isFirst = False
    
    print(f"changeset_id = {changeset_id}")

# Create Views of the Dataset
use the habanero APIs to create 2 views of the data, an 'as-of' view for state up to this moment, and an additional auto-updating view if one does not exist for the dataset.

In [None]:
print( f"dest_dataset_id= {dest_dataset_id}")        

In [None]:
existing_views = finspace.list_views(dataset_id = dest_dataset_id, max_results=100)

autoupdate_view_id = None

for ss in existing_views:
    if ss['autoUpdate'] == True: 
        autoupdate_view_id = ss['id']
        
autoupdate_view_id        

# create a an auto-update snapshot for this dataset if one does not already exist
if (autoupdate_view_id is None):
    print("creating auto-update view")

    autoupdate_view_id = finspace.create_auto_update_view(
        dataset_id = dest_dataset_id, 
        destination_type = "GLUE_TABLE",
        partition_columns = ["date"], 
        sort_columns = ["end"], 
        wait_for_completion = True)
else:
    print(f"Exists: autoupdate_view_id = {autoupdate_view_id}")
        

In [None]:
print(f"dataset_id = '{dest_dataset_id}'")

In [None]:
import datetime
print( f"Last Run: {datetime.datetime.now()}" )