# 2. Importing Packages That Aren't In Snowflake's Anaconda Channel

Now we're going to create and backtest a strategy within Snowpark using our initial data and the ML prediction.

In this lab you will learn how to:

1. Create a session for Snowpark with Snowflake
2. Create a SPROC in Snowflake, and include packages that aren't in Snowflake's Anaconda Channel yet
3. Migrate the SPROC to a vectorised UDTF functionality for better scalability

## Prerequisites:
Part 1. is assumed to have been fully executed

In [1]:
import json
import pandas as pd
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType, DateType, Variant
from snowflake.ml._internal.utils import identifier
import backtesting as bt
from backtesting.lib import crossover



# 2.1 Reading Snowflake Connection Details, create a Session 

TODO: Update your json path for your credentials

In [3]:
snowflake_connection_cfg = json.loads(open("/home/harry/Development/Hol2.0-FS-Predictions/creds.json").read()) # <--- Update here
session = Session.builder.configs(snowflake_connection_cfg).create()
session.sql("USE DATABASE HOL_DEMO").collect()
session.sql("CREATE OR REPLACE STAGE YOUR_STAGE").collect()
session.sql("CREATE OR REPLACE WAREHOUSE ASYNC_WH WITH WAREHOUSE_SIZE='MEDIUM' WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'").collect()

[Row(status='Warehouse ASYNC_WH successfully created.')]

# 2.2.1 Let's run a SPROC

TODO: Just run the cell!

In [4]:
def hello_world(session: Session) -> Variant:
    return "hello world"

# Register sproc
hello_world_demo = session.sproc.register(
                              func=hello_world, 
                              name='hello_world', 
                              is_permanent=True, 
                              replace=True,
                              stage_location='@HOL_DEMO.PUBLIC.YOUR_STAGE', 
                              packages=['snowflake-snowpark-python'])
# Call sproc
hello_world_demo()

'"hello world"'

# 2.2.2 Trying to Create a SPROC Using the python BACKTEST library

TODO: Just run the cell!

In [5]:
def sproc_test_backtesting(session: Session) -> Variant:
    import backtesting as bt
    return "It works!"

# Register sproc
sproc_test_backtesting_demo = session.sproc.register(
                              func=sproc_test_backtesting, 
                              name='YOUR_SPROC_NAME', 
                              is_permanent=True, 
                              replace=True,
                              stage_location='@YOUR_STAGE', 
                              packages=['snowflake-snowpark-python', 'backtesting', 'bokeh'])
# Call sproc
sproc_test_backtesting_demo()

RuntimeError: Cannot add package backtesting because it is not available in Snowflake and Session.custom_package_usage_config['enabled'] is not set to True. To upload these packages, you can set it to True or find the directory of these packages and add it via Session.add_import. See details at https://docs.snowflake.com/en/developer-guide/snowpark/python/creating-udfs.html#using-third-party-packages-from-anaconda-in-a-udf.

Disaster!  Apparently Backtesting isn't available in Snowflake's Anaconda Channel (YET), but not to worry, we have a fix

See here for the full list of available packages https://repo.anaconda.com/pkgs/snowflake/

# 2.2.3 Trying to Create a SPROC Using the python BACKTEST library

TODO: 

1. Open up your browswer and log in to Snowflake, copy 'wheel_loader.py' and 'Backtesting-0.3.4.dev30+g0ce24d8-py3-none-any.whl' to Your_Stage (Data-> DataBases -> HOL_DEMO -> PUBLIC -> STAGES)
2. Run the cell

Note you can do this programmatically with SNOWCLI, but this is an easy way for this demo.  Further reading - https://www.askpython.com/python/examples/wheel-for-python-package if you want to learn how to create a whl file from scratch

In [None]:
# Define sproc to test that the import was successful
def sproc_test_backtesting(session: Session) -> Variant:
    import wheel_loader
    wheel_loader.load('Backtesting-0.3.4.dev30+g0ce24d8-py3-none-any.whl')
    import pandas as pd
    import backtesting as bt
    return "It works!"

# Register sproc
sproc_test_backtesting_demo = session.sproc.register(
                              func=sproc_test_backtesting, 
                              name='YOUR_SPROC_NAME', 
                              is_permanent=True, 
                              replace=True,
                              stage_location='@YOUR_STAGE', 
                              packages=['snowflake-snowpark-python', 'bokeh'], # Needed as dependency
                              imports=["@YOUR_STAGE/wheel_loader.py",
                                       "@YOUR_STAGE/Backtesting-0.3.4.dev30+g0ce24d8-py3-none-any.whl"])
# Call sproc
sproc_test_backtesting_demo()

# 2.2.4 Using the Backtest library locally

TODO: Optional - update the Strategy as you see fit

In [None]:
# Create Strategy
def MovingAverage(closes:pd.Series, n:int) -> pd.Series:
    return pd.Series(closes).rolling(n).mean()

class SmaCross(bt.Strategy):
    sma_fast = 12 
    sma_slow = 35
    
    def init(self):
        self.sma1 = self.I(MovingAverage, self.data.Close, self.sma_fast)
        self.sma2 = self.I(MovingAverage, self.data.Close, self.sma_slow)

    def next(self):
        if not self.position and crossover(self.sma1, self.sma2) and crossover(self.data.CLOSE_PREDICT,self.sma2):
            self.buy()
        elif self.position and crossover(self.sma2, self.sma1):
            self.position.close()

# Run backtest
data = session.sql("""SELECT * FROM ML_PREDICT WHERE SYMBOL = 'IBM'""").to_pandas()
data.columns = ['DATE', 'Open', 'High', 'Low', 'Close', 'SYMBOL', 'CLOSE_M1', 'CLOSE_M2', 'CLOSE_M3', 'CLOSE_M4', 'CLOSE_M5', 'CLOSE_PREDICT']
btest = bt.Backtest(data, SmaCross, cash=10_000, commission=0,exclusive_orders=True)
stats = btest.run()[:-3]
df = pd.DataFrame(stats).T
df['Strategy_Name'] = "SMA_CROSS"
df

# 2.2.5 Putting it all together

TODO: Update the SPROC with a Strategy

hints: 
1. This should be a copy and paste exercise, don't over think it
2. writing to a table from pandas looks like this session.write_pandas(df, table_name='BACKTEST_RESULTS', auto_create_table=True)

In [None]:
def sproc_test_backtesting(session: Session, symbol:str, target_table:str) -> Variant:
    import wheel_loader
    wheel_loader.load('Backtesting-0.3.4.dev30+g0ce24d8-py3-none-any.whl')


    ### Do Part 1 here

    ### Do Part 2 here

# Register sproc
sproc_test_backtesting_demo = session.sproc.register(
                              func=sproc_test_backtesting, 
                              name='YOUR_SPROC_NAME', 
                              is_permanent=True, 
                              replace=True,
                              stage_location='@YOUR_STAGE', 
                              packages=['snowflake-snowpark-python', 'bokeh'], # Needed as dependency
                              imports=["@YOUR_STAGE/wheel_loader.py",
                                       "@YOUR_STAGE/Backtesting-0.3.4.dev30+g0ce24d8-py3-none-any.whl"])
# Call sproc
sproc_test_backtesting_demo("IBM", "SPROC_BT_TEST")
session.table('SPROC_BT_TEST').limit(5).to_pandas()

# 2.3 Parallelise with a UDTF

TODO: 

1. Update the UDTF with code that mirrors the SPROC above


In [8]:
class Strat_Backtest:
    def end_partition(self, data):
        data.columns = ['DATE', 'Open', 'High', 'Low', 'Close', 'SYMBOL', 'CLOSE_PREDICT']

        ### Do Part 1 here

        yield df_stats # note instead of writing to a table as you do in a stored procedure, in a UDTF you return a pandas dataframe (see the next line) which gets combined into a snowpark dataframe

# Register UDTF
Strat_Backtest.end_partition._sf_vectorized_input = pd.DataFrame
strat_bt_udtf = session.udtf.register(
    Strat_Backtest, # the class
    input_types=[PandasDataFrameType([DateType(), FloatType(),FloatType(),FloatType(),FloatType(),StringType(), FloatType()])],
    output_schema=PandasDataFrameType([FloatType()]*28+[StringType()],
                                      ["ST", "End_", 'Duration', 'Exposure_Time_pct', 'Equity_Final_USD',
                                       'Equity_Peak_USD', 'Return_pct', 'Buy_and_Hold_Return_pct',
                                       'Return_Ann_pct', 'Volatility_Ann_pct', 'Sharpe_Ratio',
                                       'Sortino_Ratio', 'Calmar_Ratio', 'Max_Drawdown_pct',
                                       'Avg_Drawdown_pct', 'Max_Drawdown_Duration', 'Avg_Drawdown_Duration',
                                       'Num_Trades', 'Win_Rate_pct', 'Best_Trade_pct', 'Worst_Trade_pct',
                                       'Avg_Trade_pct', 'Max_Trade_Duration', 'Avg_Trade_Duration',
                                       'Profit_Factor', 'Expectancy_pct', 'SQN', "Kelly_Criterion","STRAT"]),
                              stage_location='@YOUR_STAGE', 
                              packages=['snowflake-snowpark-python', 'bokeh'], # Needed as dependency
                              imports=["@YOUR_STAGE/wheel_loader.py",
                                       "@YOUR_STAGE/Backtesting-0.3.4.dev30+g0ce24d8-py3-none-any.whl"]) 


The version of package 'bokeh' in the local environment is 3.3.4, which does not fit the criteria for the requirement 'bokeh'. Your UDF might not work when the package version is different between the server and your local environment.


In [None]:
sdf = session.table("ML_PREDICT")
sdf_prepped = sdf.select(strat_bt_udtf(*['DATE', 'Open', 'High', 'Low', 'Close', 'SYMBOL', 'CLOSE_PREDICT']).over(partition_by=['SYMBOL']))
sdf_prepped.limit(5).to_pandas()

In [10]:
sdf_prepped.write.save_as_table("UDTF_BT_TEST", mode="overwrite")