In [8]:
state_dict = {
    "connection_parameters": {"user": "moukthika97",
                              "account": "qmb30657.us-east-1",
                              "role": "ACCOUNTADMIN"
                             },
    "compute_parameters" : {"default_warehouse": "XSMALL_WH",  
                            "task_warehouse": "XSMALL_WH",  
                            "load_warehouse": "LARGE_WH",  
          
                            "fe_warehouse": "XXLARGE_WH",
                            "train_warehouse": "XXLARGE_WH",
                            "train_warehouse_sow": "XXLARGE_SNOWPARKOPT_WH"  
                            }
}

In [9]:
# Snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.functions import pandas_udf

# Pandas & json
import pandas as pd
import numpy as np
import json

# Plotting
# import matplotlib as mpl
# import matplotlib.pyplot as plt
# import seaborn as sns

# Models
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
#from sklearn.ensemble import RandomForestRegressor
from sklearn import metrics

# %matplotlib inline

In [10]:
# Read credentials
with open('./include/creds.json') as f:
    connection_parameters = json.load(f)    
session = Session.builder.configs(connection_parameters).create()

In [11]:
snowpark_version = VERSION
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Role                        : {}'.format(session.get_current_role()))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

Database                    : "CUSTOMERS"
Schema                      : "PUBLIC"
Warehouse                   : "XSMALL_WH"
Role                        : "ACCOUNTADMIN"
Snowpark for Python version : 0.11.0


# EXTRACT FROM S3

In [12]:
state_dict['connection_parameters']['database'] = 'CUSTOMERS'
state_dict['connection_parameters']['schema'] = 'PUBLIC'
state_dict['load_stage_name']='LOAD_STAGE' 
state_dict['download_base_url']='https://s3.amazonaws.com/damgbucket/data/'
state_dict['trips_table_name']='ecommerce'
state_dict['load_table_name'] = 'RAW_'

In [13]:
# import json
# with open('./creds.json', 'w') as sdf:
#     json.dump(state_dict, sdf)

In [14]:
def reset_database(session, state_dict:dict, prestaged=False):
    _ = session.sql('CREATE OR REPLACE DATABASE '+state_dict['connection_parameters']['database']).collect()
    _ = session.sql('CREATE SCHEMA IF NOT EXISTS '+state_dict['connection_parameters']['schema']).collect() 

    if prestaged:
        sql_cmd = 'CREATE OR REPLACE STAGE '+state_dict['load_stage_name']+\
                  ' url='+state_dict['connection_parameters']['download_base_url']
        _ = session.sql(sql_cmd).collect()
    else: 
        _ = session.sql('CREATE STAGE IF NOT EXISTS '+state_dict['load_stage_name']).collect()

In [15]:
reset_database(session, state_dict)

In [16]:
import pandas as pd

file_name1 = 'UserDetails.csv.zip'
file_name2 = 'UserActivity.csv.zip'
files_to_download = [file_name1]
files_to_download

['UserDetails.csv.zip']

In [17]:
session.use_warehouse(state_dict['compute_parameters']['fe_warehouse'])

In [18]:
schema1_download_files = list(files_to_download)
schema1_download_files


['UserDetails.csv.zip']

In [19]:
import requests
from zipfile import ZipFile
from io import BytesIO
import os

schema1_load_stage = state_dict['load_stage_name']+'/schema1/'
schema1_files_to_load = list()

for file_name in schema1_download_files:
    url1 = state_dict['download_base_url']+file_name
    print('Downloading and unzipping: '+url1)
    
    r = requests.get(url1)
    file = ZipFile(BytesIO(r.content))
    csv_file_name=file.namelist()[0]
    file.extract(csv_file_name)
    file.close()
    
#     print('Putting '+csv_file_name+' to stage: '+schema1_load_stage)
#     session.file.put(local_file_name=csv_file_name, 
#                      stage_location=schema1_load_stage, 
#                      source_compression='NONE', 
#                      overwrite=True)
#     schema1_files_to_load.append(csv_file_name)
#     os.remove(csv_file_name)
    

Downloading and unzipping: https://s3.amazonaws.com/damgbucket/data/UserDetails.csv.zip


In [20]:
files_to_download = [file_name2]
files_to_download

['UserActivity.csv.zip']

In [21]:
schema2_download_files = list(files_to_download)
schema2_download_files

['UserActivity.csv.zip']

In [22]:
schema2_load_stage = state_dict['load_stage_name']+'/schema2/'

schema2_files_to_load = list()
for file_name in schema2_download_files:
    
    url2 = state_dict['download_base_url']+file_name
    
    print('Downloading and unzipping: '+url2)
    r = requests.get(url2)
    file = ZipFile(BytesIO(r.content))
    csv_file_name=file.namelist()[0]
    file.extract(csv_file_name)
    file.close()
    
    # print('Putting '+csv_file_name+' to stage: '+schema2_load_stage)
    # session.file.put(local_file_name=csv_file_name, 
    #                  stage_location=schema2_load_stage, 
    #                  source_compression='NONE', 
    #                  overwrite=True)
    # schema2_files_to_load.append(csv_file_name)
    # os.remove(csv_file_name)


Downloading and unzipping: https://s3.amazonaws.com/damgbucket/data/UserActivity.csv.zip


# DATA LOADING

In [23]:
user_details = pd.read_csv("UserDetails.csv")
user_details.head()

Unnamed: 0,Email,Address,Avatar
0,mstephenson@fernandez.com,"835 Frank Tunnel\nWrightmouth, MI 82180-9605",Violet
1,hduke@hotmail.com,"4547 Archer Common\nDiazchester, CA 06566-8576",DarkGreen
2,pallen@yahoo.com,"24645 Valerie Unions Suite 582\nCobbborough, D...",Bisque
3,riverarebecca@gmail.com,"1414 David Throughway\nPort Jason, OH 22070-1220",SaddleBrown
4,mstephens@davidson-herman.com,"14023 Rodriguez Passage\nPort Jacobville, PR 3...",MediumAquaMarine


In [24]:
# Create a Snowpark DF from the pandas DF
snowdf_details = session.createDataFrame(user_details)
snowdf_details.show()

create_temp_table is deprecated. We still respect this parameter when it is True but please consider using `table_type="temporary"` instead.


-------------------------------------------------------------------------------------
|"Email"                        |"Address"                       |"Avatar"          |
-------------------------------------------------------------------------------------
|mstephenson@fernandez.com      |835 Frank Tunnel                |Violet            |
|                               |Wrightmouth, MI 82180-9605      |                  |
|hduke@hotmail.com              |4547 Archer Common              |DarkGreen         |
|                               |Diazchester, CA 06566-8576      |                  |
|pallen@yahoo.com               |24645 Valerie Unions Suite 582  |Bisque            |
|                               |Cobbborough, DC 99414-7564      |                  |
|riverarebecca@gmail.com        |1414 David Throughway           |SaddleBrown       |
|                               |Port Jason, OH 22070-1220       |                  |
|mstephens@davidson-herman.com  |14023 Rodriguez Passa

In [25]:
# Loading user details data from Snowpark DF to a Snowflake internal table

snowdf_details.write.mode("overwrite").saveAsTable("user_details_new") 

session.table("user_details_new").show(5)

-------------------------------------------------------------------------------------
|"Email"                        |"Address"                       |"Avatar"          |
-------------------------------------------------------------------------------------
|mstephenson@fernandez.com      |835 Frank Tunnel                |Violet            |
|                               |Wrightmouth, MI 82180-9605      |                  |
|hduke@hotmail.com              |4547 Archer Common              |DarkGreen         |
|                               |Diazchester, CA 06566-8576      |                  |
|pallen@yahoo.com               |24645 Valerie Unions Suite 582  |Bisque            |
|                               |Cobbborough, DC 99414-7564      |                  |
|riverarebecca@gmail.com        |1414 David Throughway           |SaddleBrown       |
|                               |Port Jason, OH 22070-1220       |                  |
|mstephens@davidson-herman.com  |14023 Rodriguez Passa

In [26]:
user_activity = pd.read_csv("UserActivity.csv")
user_activity.head()

Unnamed: 0,Email,Avg. Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
0,mstephenson@fernandez.com,34.497268,12.655651,39.577668,4.082621,587.951054
1,hduke@hotmail.com,31.926272,11.109461,37.268959,2.664034,392.204933
2,pallen@yahoo.com,33.000915,11.330278,37.110597,4.104543,487.547505
3,riverarebecca@gmail.com,34.305557,13.717514,36.721283,3.120179,581.852344
4,mstephens@davidson-herman.com,33.330673,12.795189,37.536653,4.446308,599.406092


In [27]:
# Create a Snowpark DF from the pandas DF
snowdf_activity = session.createDataFrame(user_activity)
snowdf_activity.show()

----------------------------------------------------------------------------------------------------------------------------------------------
|"Email"                        |"Avg. Session Length"  |"Time on App"  |"Time on Website"  |"Length of Membership"  |"Yearly Amount Spent"  |
----------------------------------------------------------------------------------------------------------------------------------------------
|mstephenson@fernandez.com      |34.49726773            |12.65565115    |39.57766802        |4.082620633             |587.951054             |
|hduke@hotmail.com              |31.92627203            |11.10946073    |37.26895887        |2.664034182             |392.2049334            |
|pallen@yahoo.com               |33.00091476            |11.33027806    |37.11059744        |4.104543202             |487.5475049            |
|riverarebecca@gmail.com        |34.30555663            |13.71751367    |36.72128268        |3.120178783             |581.852344             |

In [28]:
# Loading user activity data from Snowpark DF to a Snowflake internal table

snowdf_activity.write.mode("overwrite").saveAsTable("user_activity_new") 

session.table("user_activity_new").limit(5).show(5)

----------------------------------------------------------------------------------------------------------------------------------------------
|"Email"                        |"Avg. Session Length"  |"Time on App"  |"Time on Website"  |"Length of Membership"  |"Yearly Amount Spent"  |
----------------------------------------------------------------------------------------------------------------------------------------------
|mstephenson@fernandez.com      |34.49726773            |12.65565115    |39.57766802        |4.082620633             |587.951054             |
|hduke@hotmail.com              |31.92627203            |11.10946073    |37.26895887        |2.664034182             |392.2049334            |
|pallen@yahoo.com               |33.00091476            |11.33027806    |37.11059744        |4.104543202             |487.5475049            |
|riverarebecca@gmail.com        |34.30555663            |13.71751367    |36.72128268        |3.120178783             |581.852344             |

In [29]:
# Create a pandas data frame from the Snowflake table
UD_df = session.table('user_details_new').toPandas() 

print(f"'UD_df' local dataframe created. Number of records: {len(UD_df)} ")

'UD_df' local dataframe created. Number of records: 500 


In [30]:
# Create a pandas data frame from the Snowflake table
UA_df = session.table('user_activity_new').toPandas() 

print(f"'UA_df' local dataframe created. Number of records: {len(UA_df)} ")

'UA_df' local dataframe created. Number of records: 500 


In [31]:
transformed_df = pd.concat([UD_df, UA_df], axis=1, join='inner')
display(transformed_df)

customers = pd.DataFrame(transformed_df)

customers.to_csv('customers.csv')

Unnamed: 0,Email,Address,Avatar,Email.1,Avg. Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
0,mstephenson@fernandez.com,"835 Frank Tunnel\nWrightmouth, MI 82180-9605",Violet,mstephenson@fernandez.com,34.497268,12.655651,39.577668,4.082621,587.951054
1,hduke@hotmail.com,"4547 Archer Common\nDiazchester, CA 06566-8576",DarkGreen,hduke@hotmail.com,31.926272,11.109461,37.268959,2.664034,392.204933
2,pallen@yahoo.com,"24645 Valerie Unions Suite 582\nCobbborough, D...",Bisque,pallen@yahoo.com,33.000915,11.330278,37.110597,4.104543,487.547505
3,riverarebecca@gmail.com,"1414 David Throughway\nPort Jason, OH 22070-1220",SaddleBrown,riverarebecca@gmail.com,34.305557,13.717514,36.721283,3.120179,581.852344
4,mstephens@davidson-herman.com,"14023 Rodriguez Passage\nPort Jacobville, PR 3...",MediumAquaMarine,mstephens@davidson-herman.com,33.330673,12.795189,37.536653,4.446308,599.406092
...,...,...,...,...,...,...,...,...,...
495,lewisjessica@craig-evans.com,"4483 Jones Motorway Suite 872\nLake Jamiefurt,...",Tan,lewisjessica@craig-evans.com,33.237660,13.566160,36.417985,3.746573,573.847438
496,katrina56@gmail.com,"172 Owen Divide Suite 497\nWest Richard, CA 19320",PaleVioletRed,katrina56@gmail.com,34.702529,11.695736,37.190268,3.576526,529.049004
497,dale88@hotmail.com,"0787 Andrews Ranch Apt. 633\nSouth Chadburgh, ...",Cornsilk,dale88@hotmail.com,32.646777,11.499409,38.332576,4.958264,551.620146
498,cwilson@hotmail.com,"680 Jennifer Lodge Apt. 808\nBrendachester, TX...",Teal,cwilson@hotmail.com,33.322501,12.391423,36.840086,2.336485,456.469510


In [32]:
customers = pd.read_csv('customers.csv')

snowdf_cust = session.createDataFrame(customers)

snowdf_cust.drop("Unnamed: 0","Email.1").write.mode("overwrite").saveAsTable("customers")


In [33]:
snowdf_cust.write.mode("overwrite").saveAsTable("customers") 