# Snowpark For Python -- Natural Language Processing using spaCy

### In this session, we will cover:

* Snowpark for Python Installation
* Creating Session object and connecting to Snowflake
* Reading and loading data from Snowflake table into Snowpark DataFrame
* Perfoming Exploratory Data Analysis (EDA) on Snowpark DataFrame
* Creating User-Defined Function (UDF)
* Using pre-trained scikit-learn model for inference in UDF

### Snowpark For Python Installation
- conda create --name snowpark -c https://repo.anaconda.com/pkgs/snowflake python=3.8
- conda activate snowpark
- pip install "snowflake-snowpark-python[pandas]"
- pip install ipykernel
- pip install cachetools
- pip install scikit-learn

### Import Libraries

In [60]:
# Snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udf, count, col, year, call_udf, array_construct
from snowflake.snowpark.types import Variant
from snowflake.snowpark.version import VERSION
# Misc
import pandas as pd
import json
from cachetools import cached

pd.set_option('max_colwidth', 400)

### Establish Secure Connection to Snowflake

##### *Options: Username/Password, MFA, OAuth, Okta, SSO*

In [61]:
connection_parameters = json.load(open('../connection.json'))
session = Session.builder.configs(connection_parameters).create()

snowflake_environment = session.sql('select current_warehouse(), current_database(), current_schema(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('Warehouse                   : {}'.format(snowflake_environment[0][0]))
print('Database                    : {}'.format(snowflake_environment[0][1]))
print('Schema                      : {}'.format(snowflake_environment[0][2]))
print('Snowflake version           : {}'.format(snowflake_environment[0][3]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

Warehouse                   : DASH_XL
Database                    : DASH_DB
Schema                      : DASH_SCHEMA
Snowflake version           : 6.18.3
Snowpark for Python version : 0.7.0


### Load Amazon Reviews data from Snowflake table into Snowpark DataFrame

In [62]:
snow_df = session.table("SUMMIT_AMAZON_REVIEWS_DB.DASH_SCHEMA.AMAZON_REVIEWS")
snow_df.show()

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"MARKETPLACE"  |"REVIEW_ID"     |"PRODUCT_ID"  |"PRODUCT_PARENT"  |"PRODUCT_TITLE"                                     |"PRODUCT_CATEGORY"  |"STAR_RATING"  |"HELPFUL_VOTES"  |"TOTAL_VOTES"  |"VINE"  |"VERIFIED_PURCHASE"  |"REVIEW_HEADLINE"                                   |"REVIEW_BODY"                                       |"REVIEW_DATE"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Number of records per STAR_RATING 

In [63]:
snow_df.group_by('STAR_RATING').agg(count('STAR_RATING')).show()

----------------------------------------
|"STAR_RATING"  |"COUNT(STAR_RATING)"  |
----------------------------------------
|4              |3827                  |
|5              |16697                 |
|1              |1709                  |
|3              |1612                  |
|2              |1155                  |
----------------------------------------



### Number of records per PRODUCT_CATEGORY 

In [64]:
snow_df.group_by('PRODUCT_CATEGORY').agg(count('PRODUCT_CATEGORY')).show()

--------------------------------------------------
|"PRODUCT_CATEGORY"  |"COUNT(PRODUCT_CATEGORY)"  |
--------------------------------------------------
|Video               |1751                       |
|Music               |5888                       |
|Watches             |5000                       |
|Video DVD           |721                        |
|Books               |6640                       |
|Camera              |5000                       |
--------------------------------------------------



In [65]:
snow_df.group_by(year('REVIEW_DATE')).agg(count('STAR_RATING').as_('TOTAL_RATINGS')).with_column_renamed('YEAR(REVIEW_DATE)','YEAR').sort('YEAR').show()

----------------------------
|"YEAR"  |"TOTAL_RATINGS"  |
----------------------------
|1995    |9                |
|1996    |192              |
|1997    |1753             |
|1998    |11274            |
|1999    |1772             |
|2015    |10000            |
----------------------------



### Missing data ... rows with no STAR_RATING or REVIEW_BODY

In [66]:
temp_df = snow_df.filter(col('STAR_RATING').is_null() | col('REVIEW_BODY').is_null()).select(['REVIEW_ID','STAR_RATING','REVIEW_BODY'])
temp_df.show()

--------------------------------------------------
|"REVIEW_ID"     |"STAR_RATING"  |"REVIEW_BODY"  |
--------------------------------------------------
|R3R885VN6USBYM  |5              |NULL           |
|R3GKZOOU9MQIB8  |5              |NULL           |
|R26LKY7Y8QG2Y2  |1              |NULL           |
|R3QXY2UIFIUEYI  |4              |NULL           |
|R2OJE1F0QFYC8E  |5              |NULL           |
--------------------------------------------------



#### >>>>>>>>>> *Examine Snowpark DataFrame Query* <<<<<<<<<< 

In [67]:
temp_df.queries

{'queries': ['SELECT "REVIEW_ID", "STAR_RATING", "REVIEW_BODY" FROM ( SELECT  *  FROM ( SELECT  *  FROM (SUMMIT_AMAZON_REVIEWS_DB.DASH_SCHEMA.AMAZON_REVIEWS)) WHERE ("STAR_RATING" IS NULL OR "REVIEW_BODY" IS NULL))'],
 'post_actions': []}

#### >>>>>>>>>> *Examine Snowpark DataFrame Execution Plan* <<<<<<<<<< 

In [68]:
temp_df.explain()

---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT "REVIEW_ID", "STAR_RATING", "REVIEW_BODY" FROM ( SELECT  *  FROM ( SELECT  *  FROM (SUMMIT_AMAZON_REVIEWS_DB.DASH_SCHEMA.AMAZON_REVIEWS)) WHERE ("STAR_RATING" IS NULL OR "REVIEW_BODY" IS NULL))
Logical Execution Plan:
GlobalStats:
    partitionsTotal=4
    partitionsAssigned=1
    bytesAssigned=2110464
Operations:
1:0     ->Result  AMAZON_REVIEWS.REVIEW_ID, AMAZON_REVIEWS.STAR_RATING, AMAZON_REVIEWS.REVIEW_BODY  
1:1          ->Filter  AMAZON_REVIEWS.REVIEW_BODY IS NULL  
1:2               ->TableScan  SUMMIT_AMAZON_REVIEWS_DB.DASH_SCHEMA.AMAZON_REVIEWS  REVIEW_ID, STAR_RATING, REVIEW_BODY  {partitionsTotal=4, partitionsAssigned=1, bytesAssigned=2110464}

--------------------------------------------


### Data cleanup -- Remove rows with null values, etc.
* Delete rows with missing values
* Filter out rows with no STAR_RATING or REVIEW_BODY


In [69]:
records_before = snow_df.count()
print('Records before cleanup    : ',records_before)

# Delete rows with missing values
snow_df = snow_df.dropna()

# Filter out rows with no STAR_RATING or REVIEW_BODY
snow_df = snow_df.filter(col('STAR_RATING').is_not_null() | col('REVIEW_BODY').is_not_null())

records_after = snow_df.count()
print('Records after cleanup     : ',records_after)
print('Number of records removed : ',(records_before - records_after))

Records before cleanup    :  25000
Records after cleanup     :  24994
Number of records removed :  6


### User-Defined Function (UDF) for Text Processing Using spaCy

* Upload external dependency to an internal stage
* Add spaCy's English optimized pipeline as a dependency for the UDF
* Create UDF with additional packages from Snowflake Anaconda Channel
* Call UDF on Amaxon Reviews

In [70]:
# Upload dependencies to a stage
session.sql("create or replace stage dash_udf_imports").collect()
session.file.put("file:///Users/ddesai/en_core_web_sm.zip", "@dash_udf_imports/")

# Add dependency to the Session for the UDF
session.clear_imports()
session.add_import('@dash_udf_imports/en_core_web_sm.zip.gz')

# Function to download and extract English pipeline in spaCy
@cached(cache={})
def extract_en_core_web_sm(input_file: str, output_dir: str)-> object:
    import zipfile
    import spacy
            
    with zipfile.ZipFile(input_file, 'r') as zip_ref:
        zip_ref.extractall(output_dir)
        
    # load and return the english language small model of spacy
    nlp = spacy.load(output_dir + "/en_core_web_sm/en_core_web_sm-2.3.0")
    return nlp 

# Create UDF with additional packages from Snowflake Anaconda Channel
# -- Remove HTML, tokenize text, lemmatize verbs and remove stop words
@udf(name='process_text',session=session,packages=['spacy==2.3.5','beautifulsoup4','cachetools==4.2.2'],replace=True,is_permanent=True,stage_location='dash_udfs')
def process_text(text: str) -> str:
    import os
    import sys
    from bs4 import BeautifulSoup 
    from spacy.tokenizer import Tokenizer
                       
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    input_file = import_dir + 'en_core_web_sm.zip'
    output_dir = '/tmp/en_core_web_sm' + str(os.getpid())
    
    nlp = extract_en_core_web_sm(input_file,output_dir)
    stop_words = nlp.Defaults.stop_words
    tokenizer = Tokenizer(nlp.vocab)
    
    # strip html
    text = BeautifulSoup(text, "html.parser").get_text()
    
    # tokenize
    tokens = tokenizer(text)
    
    # lemmatize verbs and remove stop words
    text = [str(t.lemma_) for t in tokens if (t.orth_) not in stop_words] 

    return text



### >>>>>>>>>> *Examine Query History in Snowsight* <<<<<<<<<<

### Call UDF on Amazon Reviews -- optionally convert results into Pandas DataFrame

In [71]:
%%time
df_amazon_reviews = snow_df.limit(10).select('REVIEW_BODY', call_udf("process_text", col("REVIEW_BODY")).as_('PROCESSED_TEXT')).to_pandas()
df_amazon_reviews.head()

CPU times: user 7.8 ms, sys: 2.28 ms, total: 10.1 ms
Wall time: 2.75 s


Unnamed: 0,REVIEW_BODY,PROCESSED_TEXT
0,ok,['okay']
1,"Perfect, even sturdier than the original!","['Perfect,', 'sturdy', 'original!']"
2,"If the words, &#34;Cheap Chinese Junk&#34; come to your mind when you see this, then congratulate yourself. You're pretty close. One of the most important features of a 'security camera&#34; is the ability to detect motion and record, especially when running on battery and limited storage space. I tested the motion detect on this camera in a few different environments so far (i.e. low light...","['If', 'words,', '""Cheap', 'Chinese', 'Junk""', 'come', 'mind', 'this,', 'congratulate', 'yourself.', ' ', ""You're"", 'pretty', 'close.', ' ', 'One', 'important', 'feature', ""'security"", 'camera""', 'ability', 'detect', 'motion', 'record,', 'especially', 'run', 'battery', 'limit', 'storage', 'space.', ' ', 'I', 'test', 'motion', 'detect', 'camera', 'different', 'environment', 'far', '(i.e.', 'low..."
3,"Exactly what I wanted and expected. Perfect for hiking or carrying when you are going someplace you MAY need a quick closeup. I bought it to leave in my glovebox so I always have it with me. I've used it a few times already and couldn't be happier with it. For the price, it's definitely worth picking up if you are looking for a good monocular.","['Exactly', 'I', 'want', 'expected.', 'Perfect', 'hike', 'carry', 'go', 'someplace', 'MAY', 'need', 'quick', 'closeup.', 'I', 'buy', 'leave', 'glovebox', 'I', 'me.', ""I've"", 'time', ""couldn't"", 'happy', 'it.', 'For', 'price,', ""it's"", 'definitely', 'worth', 'pick', 'look', 'good', 'monocular.']"
4,"I will look past the fact that they tricked me into believing this is a Canon product. It's not by Canon. It's some generic brand that i've never heard of. HOWEVER, it works surprisingly well! The sound quality is actually really good. The wire is actually super long and is perfect for indoor shooting.","['I', 'look', 'past', 'fact', 'trick', 'believe', 'Canon', 'product.', ""It's"", 'Canon.', ""It's"", 'generic', 'brand', ""i've"", 'hear', 'of.', 'HOWEVER,', 'work', 'surprisingly', 'well!', 'The', 'sound', 'quality', 'actually', 'good.', 'The', 'wire', 'actually', 'super', 'long', 'perfect', 'indoor', 'shooting.']"


### Model Training in Snowflake

#### Load Advertising Data into Snowpark DataFrame

In [None]:
snow_df_budgets = session.table('ADVERTISING_BUDGETS')
snow_df_budgets.show()

#### Snowpark Python code to train model

In [None]:
def train_sales_prediction_model(session: Session, features_table: str) -> Variant:
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import PolynomialFeatures
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import train_test_split, GridSearchCV

    import os
    from joblib import dump

    # Load features
    df = session.table(features_table).to_pandas()

    # Preprocess the Numeric columns
    # We apply PolynomialFeatures and StandardScaler preprocessing steps to the numeric columns. NOTE: High degrees can cause overfitting.
    numeric_features = ['TV','Radio','Newspaper']
    numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = 2)),('scaler', StandardScaler())])

    # Combine the preprocessed step together using the Column Transformer module
    preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features)])

    # The next step is the integrate the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression())])
    parameteres = {}

    X = df.drop('Sales', axis = 1)
    y = df['Sales']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 42)

    model = GridSearchCV(pipeline, param_grid=parameteres, cv=10)

    model.fit(X_train, y_train)

    # Upload trained model to a stage
    model_output_dir = '/tmp'
    model_file = os.path.join(model_output_dir, 'sales_model.joblib')
    dump(model, model_file)
    session.file.put(model_file, "@dash_models",overwrite=True)

    # Return model R2 score on train and test data
    return {"R2 score on Train": model.score(X_train, y_train),"R2 score on Test": model.score(X_test, y_test)}


#### Test Python function

In [None]:
print(train_sales_prediction_model(session,"ADVERTISING_BUDGETS"))

### Create Stored Procedure to deploy training code on Snowflake

In [None]:
session.sproc.register(func=train_sales_prediction_model,name="train_sales_prediction_model",packages=['snowflake-snowpark-python','scikit-learn','joblib'],is_permanent=True,stage_location="@dash_sprocs",replace=True)

### Execute Stored Procedure to train model and deploy it on Snowflake

In [None]:
print(session.call('train_sales_prediction_model','ADVERTISING_BUDGETS'))

### >>>>>>>>>> *Examine Query History in Snowsight* <<<<<<<<<<

### Create User-Defined Function for Inference on Snowflake

In [None]:
session.clear_imports()
session.clear_packages()

# Add trained model as dependency
session.add_import('@dash_models/sales_model.joblib.gz')

@udf(name='predict_sales',session=session,packages=['pandas','joblib','scikit-learn'],replace=True,is_permanent=True,stage_location='@dash_udfs')
def predict_sales(budget_allocations: list) -> float:
    import sys
    import pandas as pd
    from joblib import load

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    model_file = import_dir + 'sales_model.joblib.gz'

    model = load(model_file)
            
    features = ['TV','Radio','Newspaper']
    df = pd.DataFrame([budget_allocations], columns=features)
    sales = round(model.predict(df)[0],2)

    return sales

In [None]:
test_df = session.create_dataframe([[180.8,10.8,58.4],[120.2,19.6,11.6],[199.8,2.6,21.2]], schema=['TV','Radio','Newspaper'])
test_df.select('TV','Radio','Newspaper', 
    call_udf("predict_sales", array_construct(col("TV"), col("Radio"), col("Newspaper"))).as_("PREDICTED_SALES")).show()

### >>>>>>>>>> *Examine Query History in Snowsight* <<<<<<<<<<

# Code on GitHub

### Python Notebook is available at https://github.com/iamontheinet/dash-at-summit-2020