In [21]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import IntegerType, FloatType
from snowflake.snowpark.functions import avg, sum, col, udf, call_udf, call_builtin, year, month
import streamlit as st
import pandas as pd
from datetime import date

# scikit-learn (install: pip install -U scikit-learn)
from sklearn.linear_model import LinearRegression

In [62]:
# Session
connection_parameters = {
   "account": "zburrql-mb49734",
   "user": "CK3000",
   "password": "-P3n5ac()la#1",
   "warehouse": "compute_wh",
   "role": "accountadmin",
   "database": "summit_hol",
   "schema": "CYBERSYN"
}
session = Session.builder.configs(connection_parameters).create()
# test if we have a connection
session.sql("select current_account() acct, current_warehouse() wh, current_database() db, current_schema() schema, current_version() v").show()

---------------------------------------------------
|"ACCT"   |"WH"        |"DB"  |"SCHEMA"  |"V"     |
---------------------------------------------------
|VW65405  |COMPUTE_WH  |NULL  |NULL      |8.11.4  |
---------------------------------------------------



In [63]:
# SQL queries to explore the data

# What financial data is available as a time-series from FRED?
session.sql("SELECT DISTINCT variable_name FROM FINANCIAL__ECONOMIC_ESSENTIALS.CYBERSYN.FINANCIAL_FRED_TIMESERIES").show()

# What is the size of all the time-series data?
session.sql("SELECT COUNT(*) FROM FINANCIAL__ECONOMIC_ESSENTIALS.CYBERSYN.FINANCIAL_FRED_TIMESERIES").show()

------------------------------------------------------
|"VARIABLE_NAME"                                     |
------------------------------------------------------
|30-Day AA Nonfinancial Commercial Paper Interes...  |
|6-Month Treasury Bill Secondary Market Rate, Di...  |
|New Privately-Owned Housing Units Completed: Un...  |
|Other Loans and Leases: All Other Loans and Lea...  |
|Nominal Emerging Market Economies U.S. Dollar I...  |
|Other Securities: Non-MBS, Foreign-Related Inst...  |
|Federal Funds Target Range - Upper Limit, Not s...  |
|Liabilities and Capital: Liabilities: Deposits ...  |
|1-Year Treasury Bill Secondary Market Rate, Dis...  |
|Loans to Commercial Banks, Large Domestically C...  |
------------------------------------------------------

--------------
|"COUNT(*)"  |
--------------
|2307474     |
--------------



In [64]:
# Now use Snowpark dataframe
snow_df_pce = (session.table("FINANCIAL__ECONOMIC_ESSENTIALS.CYBERSYN.FINANCIAL_FRED_TIMESERIES")
               .filter(col('VARIABLE_NAME') == 'Personal Consumption Expenditures: Chain-type Price Index, Seasonally adjusted, Monthly, Index 2017=100')
               .filter(col('DATE') >= '1972-01-01')
               .filter(month(col('DATE')) == 1)
               .orderBy(col('DATE'))) 
snow_df_pce.show()

-----------------------------------------------------------------------------------------------------------------------------------------------------------------
|"GEO_ID"     |"VARIABLE"                                          |"VARIABLE_NAME"                                     |"DATE"      |"VALUE"  |"UNIT"          |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
|country/USA  |Personal_Consumption_Expenditures:_Chain-type_P...  |Personal Consumption Expenditures: Chain-type P...  |1972-01-31  |21.015   |Index 2017=100  |
|country/USA  |Personal_Consumption_Expenditures:_Chain-type_P...  |Personal Consumption Expenditures: Chain-type P...  |1973-01-31  |21.695   |Index 2017=100  |
|country/USA  |Personal_Consumption_Expenditures:_Chain-type_P...  |Personal Consumption Expenditures: Chain-type P...  |1974-01-31  |23.523   |Index 2017=100  |
|country/USA  |Personal_Cons

In [65]:
# Let Snowflake perform filtering using the Snowpark pushdown and display results in a Pandas dataframe
snow_df_pce = (session.table("FINANCIAL__ECONOMIC_ESSENTIALS.CYBERSYN.FINANCIAL_FRED_TIMESERIES")
               .filter(col('VARIABLE_NAME') == 'Personal Consumption Expenditures: Chain-type Price Index, Seasonally adjusted, Monthly, Index 2017=100')
               .filter(col('DATE') >= '1972-01-01')
               .filter(month(col('DATE')) == 1))
pd_df_pce_year = snow_df_pce.select(year(col('DATE')).alias('"Year"'), col('VALUE').alias('PCE')).orderBy(col('DATE')).to_pandas()
pd_df_pce_year

Unnamed: 0,Year,PCE
0,1972,21.015
1,1973,21.695
2,1974,23.523
3,1975,26.132
4,1976,27.795
5,1977,29.263
6,1978,31.194
7,1979,33.597
8,1980,37.124
9,1981,41.011


In [66]:
# train model with PCE index

x = pd_df_pce_year["Year"].to_numpy().reshape(-1,1)
y = pd_df_pce_year["PCE"].to_numpy()

model = LinearRegression().fit(x, y)

# test model for 2022
predictYear = 2022
pce_pred = model.predict([[predictYear]])
# print the last 5 years
print (pd_df_pce_year.tail() )
# run the prediction for 2022
print ('Prediction for '+str(predictYear)+': '+ str(round(pce_pred[0],2)))

    Year      PCE
48  2020  104.458
49  2021  106.145
50  2022  112.829
51  2023  119.011
52  2024  121.870
Prediction for 2022: 111.71


In [67]:
def predict_pce(predictYear: int) -> float:
   return model.predict([[predictYear]])[0].round(2).astype(float)

_ = session.udf.register(predict_pce,
                       return_type=FloatType(),
                       input_type=IntegerType(),
                       packages= ["pandas","scikit-learn"],
                       is_permanent=True,
                       name="predict_pce_udf",
                       replace=True,
                       stage_location="@udf_stage")

SnowparkSQLException: (1304): 01b338b9-0103-909a-0002-1b360001c01a: 090105 (22000): Cannot perform SELECT. This session does not have a current database. Call 'USE DATABASE', or use a qualified name.

In [48]:
session.sql("select predict_pce_udf(2022)").show()

SnowparkSQLException: (1304): 01b33895-0103-9028-0002-1b360001109a: 002140 (42601): SQL compilation error:
Unknown function PREDICT_PCE_UDF