### If you need to install the connectors do this:

import sys

!{sys.executable} -m pip install snowflake-connector-Python

!{sys.executable} -m pip install snowflake-sqlalchemy

!{sys.executable} -m pip install 'snowflake-connector-Python[pandas]'

## This section of code is just needed to ensure that you are able to properly connect to Snowflake

In [1]:
import snowflake.connector as sf

In [2]:
# Use your own Account and UserName here
Account = 'DSA57842'
UserName = 'tmxiang'
#Password = 9r*WWEL&9mmEzvW

In [3]:
## Very bad idea to store passwords in your code use getpass
sfPswd = ''
# Request user password if not provided already    
if sfPswd == '' :
  import getpass
  sfPswd = getpass.getpass('Password:')

Password: ···············


In [4]:
# Test the connection to Snowflake by retrieving the version number
try:
    sfConnection = sf.connect(
        user=UserName,
        password=sfPswd,
        account=Account
    )
    sfq = sfConnection.cursor()
    sfq.execute("SELECT current_version()")
    sfResults = sfq.fetchall()
    print('Snowflake Version: ' + sfResults[0][0])
    sfq.close()
    sfConnection.close()
except:
    print('Connection failed. Check credentials')

Snowflake Version: 6.3.3


### This part checks to see if you are able to Create and Drop Tables

In [5]:
# Open connection to Snowflake
sfConnection = sf.connect(
    user=UserName,
    password=sfPswd,
    account=Account
)
sfq = sfConnection.cursor()

In [6]:
sfq.execute('DROP DATABASE IF EXISTS DEMO_DB')

<snowflake.connector.cursor.SnowflakeCursor at 0x7f239cfb9d30>

In [7]:
sfq.execute('CREATE DATABASE DEMO_DB')
sfq.execute('USE DATABASE DEMO_DB')
sfq.execute('CREATE SCHEMA DEMO_SCHEMA')
sfq.execute('USE DEMO_DB.DEMO_SCHEMA')
sfq.execute('CREATE STAGE DEMO_STAGE')

<snowflake.connector.cursor.SnowflakeCursor at 0x7f239cfb9d30>

In [8]:
sfq.execute('DROP DATABASE IF EXISTS DEMO_DB')

<snowflake.connector.cursor.SnowflakeCursor at 0x7f239cfb9d30>

### This code shows how to write a function that interacts with Snowflake and setting up our MSBA database

In [9]:
def CreateSnowflakeDBandSchema (
    sfPswd = sfPswd,
    sfAccount = Account,
    sfUser = UserName,
    sfDatabase = 'DEMO_DB',
    sfSchema = 'DEMO_SCHEMA'
):
    import snowflake.connector as sf
    # Request user password if not provided already
    if sfPswd == '' :
      import getpass
      sfPswd = getpass.getpass('Password:')
    # Test the connection to Snowflake
    try:
      sfConnection = sf.connect(
          user=sfUser,
          password=sfPswd,
          account=sfAccount
      )
      sfq = sfConnection.cursor()
      # sfq.execute("SELECT current_version()")
      # sfResults = sfq.fetchall()
      # print('Snowflake Version: ' + sfResults[0][0])
      sfq.close()
      sfConnection.close()
    except:
      print('Connection failed. Check credentials')
    # Open connection to Snowflake
    sfConnection = sf.connect(
      user=sfUser,
      password=sfPswd,
      account=sfAccount
    )
    sfq = sfConnection.cursor()
    sfq.execute('CREATE DATABASE IF NOT EXISTS {0}'.format(sfDatabase))
    sfq.execute('CREATE SCHEMA IF NOT EXISTS {0}.{1}'.format(sfDatabase, sfSchema))
    print('Steps complete')

In [10]:
CreateSnowflakeDBandSchema (
    sfAccount = Account,
    sfUser = UserName,
    sfDatabase = 'DEMO_DB',
    sfSchema = 'DEMO_SCHEMA'
)

Steps complete


In [11]:
sfq.execute('DROP DATABASE IF EXISTS DEMO_DB')

<snowflake.connector.cursor.SnowflakeCursor at 0x7f239cfb9d30>

In [12]:
CreateSnowflakeDBandSchema (
    sfAccount = Account,
    sfUser = UserName,
    sfDatabase = 'MSBA_ETL',
    sfSchema = 'MSBA_ETL'
)

Steps complete


In [13]:
sfq.execute('CREATE STAGE IF NOT EXISTS "MSBA_ETL"."PUBLIC".CSV_STAGE')

<snowflake.connector.cursor.SnowflakeCursor at 0x7f239cfb9d30>

In [14]:
sfq.close() # Make sure it's closed

True

## Staging and Copying files into Snowflake Tables

Redefined the connection function to only return the connection, not the cursor object. The cursor object will be created separately so that:

1. the cursor (`cur`) for when we need to *execute* queries in Snowflakes

2. the connection/engine (`engine`) to *retrieve* queries into pandas

The default database and schema is also set below to use when connecting to snowflakes using the function

And we first need to check for the file name and location relative to the current working directory

In [7]:
import os
import pandas as pd
import re

import sqlalchemy
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL

In [16]:
# Logging in and connecting...
Account = 'DSA57842'
UserName = 'tmxiang'

# Since we're mainly going to use just one warehouse/database/schema, Identify the default location of the table you want to stage and load data into
warehouse = 'COMPUTE_WH'
database = 'MSBA_ETL'
schema = 'PUBLIC'

In [17]:
def ConnectSnowflake (
    sfPswd = '',
    sfAccount = Account,
    sfUser = UserName,
    sfDatabase = 'MSBA_ETL',
    sfSchema = 'PUBLIC'
):
    import snowflake.connector as sf
    # Request user password if not provided already
    if sfPswd == '' :
      import getpass
      sfPswd = getpass.getpass('Password:')
    # Test the connection to Snowflake
    try:
      sfConnection = sf.connect(
          user=sfUser,
          password=sfPswd,
          account=sfAccount
      )
      sfq = sfConnection.cursor()
      # sfq.execute("SELECT current_version()")
      # sfResults = sfq.fetchall()
      # print('Snowflake Version: ' + sfResults[0][0])
      sfq.close()
      sfConnection.close()
    except:
      print('Connection failed. Check credentials')
    # Open connection to Snowflake
    sfConnection = sf.connect(
      user=sfUser,
      password=sfPswd,
      account=sfAccount,
      warehouse=warehouse,
      database=database,
      schema=schema
    )
    #sfq = sfConnection.cursor()
    print('Connection opened, here is your engine')
    return sfConnection

In [18]:
engine = ConnectSnowflake()

# getting cursor
cur = engine.cursor()

Password: ···············


Connection opened, here is your engine


In [19]:
cwd = os.listdir(os.getcwd())

In [20]:
# This is in the case we all have something slightly different for our sub folder names
def cwd_search(list_dir, pattern=r'.*[Mm]onthly.*'):
    
    # Takes of list of files in current working directory and search for desired file/folder name
    # Returns only the most recent match
    found = None
    for f in list_dir:
        match = re.search(pattern, f)
        
        if match:
            #print(f"found match: {f}")
            found = f

    if found:
        return found
    else:
        raise LookupError("Sorry, no match is found in current working directory. Please check your directory and pattern search!")

In [21]:
# Where does your data stored? 
csvPath = cwd_search(cwd)  # <-------Manual correction if needed when the path/folder to the csv files is completely dfferent
xmlPath = "Supplier" # You don't need to put the full file path name, it will wild card it late

In [22]:
# Whatever folder name that the csv files are stored in and get the list
files = os.listdir(csvPath)

In [23]:
files

['2013-8.csv',
 '2015-3.csv',
 '2015-2.csv',
 '2013-1.csv',
 '2015-11.csv',
 '2015-1.csv',
 '2015-4.csv',
 '2015-10.csv',
 '2013-4.csv',
 '2015-5.csv',
 '2016-1.csv',
 '2016-3.csv',
 '2016-4.csv',
 '2015-6.csv',
 '2015-7.csv',
 '2013-2.csv',
 '2013-5.csv',
 '2014-1.csv',
 '2014-2.csv',
 '2014-12.csv',
 '2013-6.csv',
 '2013-10.csv',
 '2014-7.csv',
 '2014-3.csv',
 '2014-11.csv',
 '.ipynb_checkpoints',
 '2016-2.csv',
 '2014-8.csv',
 '2014-9.csv',
 '2014-5.csv',
 '2013-7.csv',
 '2014-6.csv',
 '2015-8.csv',
 '2013-12.csv',
 '2014-4.csv',
 '2014-10.csv',
 '2013-9.csv',
 '2013-3.csv',
 '2015-9.csv',
 '2016-5.csv',
 '2013-11.csv',
 '2015-12.csv']

In [24]:
# Remove the uncessary folder in the list
if '.ipynb_checkpoints' in files:
    files.remove('.ipynb_checkpoints')

In [25]:
# Get the years for the files
years = set([y.split("-")[0] for y in files])
years

{'2013', '2014', '2015', '2016'}

### 1.	Extract and load the 41 comma delimited purchases data files and form a single table of purchases data; preferably follow these guidelines when staging the files (this staging approach does not make sense for our data as the files are small, but it is good practice if you have more data and if the data is loaded over time)

In [26]:
# user-defined functions for staging files
def staging(file_pattern_lst, stage="CSV_STAGE", wh=warehouse, schema=schema, sub_folder=None):
    cur.execute(f"USE WAREHOUSE {wh}")
    cur.execute(f"USE SCHEMA {schema}")
    
    # For single file in current working directory
    if (type(file_pattern_lst) == str) & (sub_folder==None):
        cur.execute(f"put file://{file_pattern_lst}* @{stage}")
        print(f"Staging {file_pattern_lst} XML file in {stage}...")
    
    # For list of csv files in monthly PO data in sub folder of current working directory
    else:
        for f in file_pattern_lst:
            print(f"Staging {f} files in {stage}...")
            cur.execute(f"put file://{sub_folder}/{f}* @{stage}")

# User-defined function for removing files from stage        
def remove_files(file_ext=None, stage="CSV_STAGE"):
    
    # If no file extension is provided, then it is assumed that the user wants to remove all files from defined stage name
    if file_ext==None:
        cur.execute(f"REMOVE @{stage}")
    
    # Finds the specific files to remove
    else:
        cur.execute(f"REMOVE @CSV_STAGE PATTERN='.*.{file_ext}.*'")
    print(f"...Removing files from {stage}...")

In [27]:
# Create File Format if it doesn't exist already
cur.execute("CREATE FILE FORMAT IF NOT EXISTS CSV_FORMAT "
                "TYPE = 'CSV' "
                "FIELD_DELIMITER = ',' "
                "SKIP_HEADER = 1 "
                "FIELD_OPTIONALLY_ENCLOSED_BY = '\042' "
                "ERROR_ON_COLUMN_COUNT_MISMATCH = TRUE "
                "NULL_IF=('null')"
              )

<snowflake.connector.cursor.SnowflakeCursor at 0x7f23bd0710d0>

#### Note, we had to change the "LastReceiptDate" to a varchar (should be date) because there are NULLs in file 2016-5.csv
#### This will have to be resolved later: it's the 21st column in the file

In [28]:
# For reference in creating table: listing the column numbers and names that we'll be choosing from to put into Snowflake database
data_sample = pd.read_csv(f"{csvPath}/{files[0]}")
pd.DataFrame(
    {
        "col_number": list(range(1, len(data_sample.columns) + 1)),
        "col_name":data_sample.columns})

Unnamed: 0,col_number,col_name
0,1,PurchaseOrderID
1,2,SupplierID
2,3,OrderDate
3,4,DeliveryMethodID
4,5,ContactPersonID
5,6,ExpectedDeliveryDate
6,7,SupplierReference
7,8,IsOrderFinalized
8,9,Comments
9,10,InternalComments


In [29]:
# ... manual input to create default table, choosing and droping columns as deemed necessary
cur.execute("CREATE OR REPLACE TABLE "
               "MONTHLY_PO(PurchaseOrderID number, SupplierID number, OrderDate date, DeliveryMethodID number, "
               "ExpectedDeliveryDate date, SupplierReference varchar, LastEditedBy number, PurchaseOrderLineID number, "
               "StockItemID number, OrderedOuters number, Description varchar, ReceivedOuters number, PackageTypeID number, "
               "ExpectedUnitPricePerOuter number, LastReceiptDate varchar, Right_LastEditedBy number)")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f23bd0710d0>

In [30]:
# stagging all PO files into table
# Monthly PO Folder name changed to Monthly_PO_Data due to complication error for the first.
staging(years, stage="CSV_STAGE", sub_folder=csvPath)

Staging 2013 files in CSV_STAGE...
Staging 2015 files in CSV_STAGE...
Staging 2016 files in CSV_STAGE...
Staging 2014 files in CSV_STAGE...


In [31]:
# The column order from monthly_po table must match the order in the csv file, or it will return a casting error.
cur.execute(f"COPY INTO monthly_po FROM (SELECT $1, $2, $3, $4, $6, $7, $11, $13, $14, $15, $16, $17, $18, $19, $20, $22 FROM @CSV_STAGE) "
               "FILE_FORMAT = (FORMAT_NAME = CSV_FORMAT)")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f23bd0710d0>

In [32]:
# squery statement to get list of the files that are currently in the staging area
sql = "LIST @CSV_STAGE"

# Fetching sql query into python
cur.execute(sql)
data = cur.fetchall()

In [33]:
# Checks if all 41 files are in stage
pd.DataFrame(data).shape

(41, 4)

In [34]:
# What the retrieved data will look like in a pandas DF.
pd.read_sql_query(sql, engine)

Unnamed: 0,name,size,md5,last_modified
0,csv_stage/2013-1.csv.gz,7024,280e9355bc641d992e8ba15504a53ba5,"Sun, 13 Feb 2022 01:05:09 GMT"
1,csv_stage/2013-10.csv.gz,3392,9c24a5774dfaed36119e5f48e3d7c1f6,"Sun, 13 Feb 2022 01:05:10 GMT"
2,csv_stage/2013-11.csv.gz,3120,95d8654293b8ba49882f4a828e3f5dc9,"Sun, 13 Feb 2022 01:05:10 GMT"
3,csv_stage/2013-12.csv.gz,3040,58b0703af9062f7af5a161c3d672ab83,"Sun, 13 Feb 2022 01:05:10 GMT"
4,csv_stage/2013-2.csv.gz,2432,0683f2f9e5c5547a061c4bf78a87ae1e,"Sun, 13 Feb 2022 01:05:10 GMT"
5,csv_stage/2013-3.csv.gz,2928,80d11036c71d227512842c17a9aa86b3,"Sun, 13 Feb 2022 01:05:10 GMT"
6,csv_stage/2013-4.csv.gz,3168,99d1b703682e930b315a9c47c9e4d879,"Sun, 13 Feb 2022 01:05:09 GMT"
7,csv_stage/2013-5.csv.gz,3264,477ed944eba5f66b15b47a0c96bd0ce6,"Sun, 13 Feb 2022 01:05:10 GMT"
8,csv_stage/2013-6.csv.gz,3088,8ba8923cf4c76c778e68a5365f065880,"Sun, 13 Feb 2022 01:05:10 GMT"
9,csv_stage/2013-7.csv.gz,3312,5e06dee34cc228e2d58abc28cbb78005,"Sun, 13 Feb 2022 01:05:10 GMT"


In [35]:
# Remove files from staging for best practice
remove_files(file_ext = 'csv')

# Checks if there is anything in the stage
sql = "LIST @CSV_STAGE"
pd.read_sql_query(sql, engine).shape

...Removing files from CSV_STAGE...


(0, 4)

In [36]:
# New query for 100 rows from monthly_po data
sql = "select * from monthly_po limit 100;"

pd.read_sql_query(sql, engine)

Unnamed: 0,PURCHASEORDERID,SUPPLIERID,ORDERDATE,DELIVERYMETHODID,EXPECTEDDELIVERYDATE,SUPPLIERREFERENCE,LASTEDITEDBY,PURCHASEORDERLINEID,STOCKITEMID,ORDEREDOUTERS,DESCRIPTION,RECEIVEDOUTERS,PACKAGETYPEID,EXPECTEDUNITPRICEPEROUTER,LASTRECEIPTDATE,RIGHT_LASTEDITEDBY
0,106,4,2013-03-01,7,2013-03-21,293092,4,469,77,92,"""The Gu"" red shirt XML tag t-shirt (White) XXS",92,6,84,2013-03-04,4
1,106,4,2013-03-01,7,2013-03-21,293092,4,470,78,127,"""The Gu"" red shirt XML tag t-shirt (White) XS",127,6,84,2013-03-04,4
2,106,4,2013-03-01,7,2013-03-21,293092,4,471,80,20,"""The Gu"" red shirt XML tag t-shirt (White) M",20,6,84,2013-03-04,4
3,106,4,2013-03-01,7,2013-03-21,293092,4,472,86,74,"""The Gu"" red shirt XML tag t-shirt (White) 5XL",74,6,96,2013-03-04,4
4,106,4,2013-03-01,7,2013-03-21,293092,4,473,95,22,"""The Gu"" red shirt XML tag t-shirt (Black) XL",22,6,90,2013-03-04,4
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,132,7,2013-03-18,2,2013-04-07,BC0280982,3,564,204,30,Tape dispenser (Red),30,7,170,2013-03-19,3
96,133,4,2013-03-19,7,2013-04-08,293092,13,565,77,156,"""The Gu"" red shirt XML tag t-shirt (White) XXS",156,6,84,2013-03-20,13
97,133,4,2013-03-19,7,2013-04-08,293092,13,566,78,193,"""The Gu"" red shirt XML tag t-shirt (White) XS",193,6,84,2013-03-20,13
98,133,4,2013-03-19,7,2013-04-08,293092,13,567,80,42,"""The Gu"" red shirt XML tag t-shirt (White) M",42,6,84,2013-03-20,13


In [37]:
#ALTERNATIVELY...

# New query for 100 rows from monthly_po data
#sql = "select * from monthly_po limit 100;"
#cur.execute(sql)
#data = cur.fetchall()

#df = pd.DataFrame(data)

In [38]:
#df

So, the cells above just checked if all 41 csv files has been uploaded into the staging table and that they have all been copied into the monthly_po table in Snowflake. Both of which are true, so moving on...

### 2.	Create a calculated field that shows purchase order totals, i.e., for each order, sum the line item amounts (defined as ReceivedOuters * ExpectedUnitPricePerOuter), and name this field POAmount

Note: Final query results will be stored in a variable name so that we can view it at convenience in Python. (Applies in all upcoming parts)

In [39]:
sql = """ALTER TABLE MSBA_ETL.PUBLIC.MONTHLY_PO ADD COLUMN POAmount INTEGER DEFAULT 0;"""
pd.read_sql_query(sql, engine)

Unnamed: 0,status
0,Statement executed successfully.


In [40]:
sql = """UPDATE MSBA_ETL.PUBLIC.MONTHLY_PO SET POAmount = ReceivedOuters * ExpectedUnitPricePerOuter;"""
pd.read_sql_query(sql, engine)

Unnamed: 0,number of rows updated,number of multi-joined rows updated
0,8367,0


In [41]:
sql = "select PurchaseOrderID, OrderDate, PurchaseOrderLineID, ReceivedOuters, ExpectedUnitPricePerOuter, POAmount from monthly_po;"
pd.read_sql_query(sql, engine)

Unnamed: 0,PURCHASEORDERID,ORDERDATE,PURCHASEORDERLINEID,RECEIVEDOUTERS,EXPECTEDUNITPRICEPEROUTER,POAMOUNT
0,106,2013-03-01,469,92,84,7728
1,106,2013-03-01,470,127,84,10668
2,106,2013-03-01,471,20,84,1680
3,106,2013-03-01,472,74,96,7104
4,106,2013-03-01,473,22,90,1980
...,...,...,...,...,...,...
8362,2073,2016-05-31,8363,0,90,0
8363,2073,2016-05-31,8364,0,96,0
8364,2074,2016-05-31,8365,0,48,0
8365,2074,2016-05-31,8366,0,38,0


In [42]:
sql = """select purchaseorderid, orderdate, sum(POAmount) AS POAmount 
        from monthly_po
        group by purchaseorderid, orderdate"""
po_amount = pd.read_sql_query(sql, engine)
po_amount

Unnamed: 0,PURCHASEORDERID,ORDERDATE,POAMOUNT
0,106,2013-03-01,38472
1,107,2013-03-01,3910
2,108,2013-03-02,2664
3,109,2013-03-04,43866
4,110,2013-03-04,4760
...,...,...,...
2069,517,2013-11-05,71278
2070,303,2013-06-29,2514
2071,1487,2015-06-16,475990
2072,1501,2015-06-24,484234


In [43]:
# Not needed unless you plan on running the above codes multiple times, this is the drop the POAmount column sql
#sql = """ALTER TABLE MSBA_ETL.PUBLIC.MONTHLY_PO DROP COLUMN POAmount;"""
#pd.read_sql_query(sql, engine)

### 3.	Extract and load the supplier invoice XML data, flatten the data into a table where each row  corresponds to a single invoice

In [44]:
# Creating new file format for XML if needed
creat_xml_format = """
CREATE FILE FORMAT IF NOT EXISTS XML_FORMAT
TYPE = 'XML'
STRIP_OUTER_ELEMENT = TRUE;
"""
# Create table to load raw form of transactions xml
create_xml_table = """
CREATE OR REPLACE TABLE raw_transactions(xml_data VARIANT);"""

In [45]:
cur.execute(create_xml_table)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f23bd0710d0>

In [46]:
cur.execute(creat_xml_format)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f23bd0710d0>

In [47]:
#cur.execute(f"put file://Supplier* @%raw_transactions")

# NEED TO put in your own supplier xml path
staging(xmlPath, stage="CSV_STAGE")
#staging("Supplier", stage='CSV_STAGE')

#pd.read_sql_query("LIST @%raw_transactions", engine) # <----- checks if it's staged

Staging Supplier XML file in CSV_STAGE...


In [48]:
# Checks if file is in stage
sql = "LIST @CSV_STAGE;"
pd.read_sql_query(sql, engine).shape

(1, 4)

In [49]:
cur.execute("""COPY INTO raw_transactions FROM @CSV_STAGE FILE_FORMAT = (FORMAT_NAME = XML_FORMAT)""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f23bd0710d0>

In [50]:
# Remove from stage after copying into Snowflake table
#cur.execute("REMOVE @CSV_STAGE PATTERN='.*.xml.gz'")
remove_files()
# pd.read_sql_query("LIST @CSV_STAGE", engine) # <----- checks if it's deleted

...Removing files from CSV_STAGE...


In [51]:
sql = "select * from raw_transactions"
pd.read_sql_query(sql, engine)

# so... this calls for the magic of "flattening" or in layman's terms, pretty much just create a better looking.

Unnamed: 0,XML_DATA
0,<row>\n <SupplierTransactionID>134</SupplierT...
1,<row>\n <SupplierTransactionID>169</SupplierT...
2,<row>\n <SupplierTransactionID>186</SupplierT...
3,<row>\n <SupplierTransactionID>215</SupplierT...
4,<row>\n <SupplierTransactionID>224</SupplierT...
...,...
2433,<row>\n <SupplierTransactionID>335510</Suppli...
2434,<row>\n <SupplierTransactionID>335511</Suppli...
2435,<row>\n <SupplierTransactionID>335512</Suppli...
2436,<row>\n <SupplierTransactionID>335843</Suppli...


In [52]:
sql = """
SELECT 
  GET(Elements.value, '@')::string nodeType, count(*) 
FROM 
  raw_transactions,
  LATERAL FLATTEN(GET(xml_data, '$')) Elements
group by nodeType;
"""
# Actual columns that we can look at (not all of them needed)
pd.read_sql_query(sql, engine)

Unnamed: 0,NODETYPE,COUNT(*)
0,SupplierTransactionID,2438
1,SupplierID,2438
2,TransactionTypeID,2438
3,PurchaseOrderID,2438
4,SupplierInvoiceNumber,2438
5,AmountExcludingTax,2438
6,TaxAmount,2438
7,TransactionAmount,2438
8,OutstandingBalance,2438
9,FinalizationDate,2438


In [53]:
# I tried casting to different data types, but there's empty values in some of the columns: PurchaseOrderID, SupplierInvoiceNumber, FinalizationDate
# 3 super long case when statements to relace empty values with NULL
sql = """
CREATE OR REPLACE TABLE supplier_transactions AS(
SELECT GET(XMLGET(xml_data, 'SupplierTransactionID'), '$')::number AS SupplierTransactionID,
GET(XMLGET(xml_data, 'SupplierID'), '$')::number AS SupplierID,
GET(XMLGET(xml_data, 'TransactionTypeID'), '$')::number AS TransactionTypeID,
case when GET(XMLGET(xml_data, 'PurchaseOrderID'), '$'):: varchar = '' then null else GET(XMLGET(xml_data, 'PurchaseOrderID'), '$')::number end AS PurchaseOrderID,
GET(XMLGET(xml_data, 'PaymentMethodID'), '$')::number AS PaymentMethodID,
case when GET(XMLGET(xml_data, 'SupplierInvoiceNumber'), '$')::varchar = '' then null else GET(XMLGET(xml_data, 'SupplierInvoiceNumber'), '$')::number end AS SupplierInvoiceNumber,
GET(XMLGET(xml_data, 'TransactionDate'), '$')::date AS TransactionDate,
GET(XMLGET(xml_data, 'AmountExcludingTax'), '$')::number AS AmountExcludingTax,
GET(XMLGET(xml_data, 'TaxAmount'), '$')::number AS TaxAmount,
GET(XMLGET(xml_data, 'TransactionAmount'), '$')::number AS TransactionAmount,
GET(XMLGET(xml_data, 'OutstandingBalance'), '$')::number AS OutstandingBalance,
case when GET(XMLGET(xml_data, 'FinalizationDate'), '$')::varchar = '' then null else GET(XMLGET(xml_data, 'FinalizationDate'), '$')::date end AS FinalizationDate,
GET(XMLGET(xml_data, 'IsFinalized'), '$')::number AS IsFinalized,
GET(XMLGET(xml_data, 'LastEditedBy'), '$')::number AS LastEditedBy
FROM raw_transactions);
"""

pd.read_sql_query(sql, engine)

Unnamed: 0,status
0,Table SUPPLIER_TRANSACTIONS successfully created.


In [54]:
sql = """select * from supplier_transactions limit 200"""
pd.read_sql_query(sql, engine)

Unnamed: 0,SUPPLIERTRANSACTIONID,SUPPLIERID,TRANSACTIONTYPEID,PURCHASEORDERID,PAYMENTMETHODID,SUPPLIERINVOICENUMBER,TRANSACTIONDATE,AMOUNTEXCLUDINGTAX,TAXAMOUNT,TRANSACTIONAMOUNT,OUTSTANDINGBALANCE,FINALIZATIONDATE,ISFINALIZED,LASTEDITEDBY
0,134,2,5,1.0,4,7290.0,2013-01-02,314,47,361,0,2013-01-07,1,4
1,169,4,5,2.0,4,3898.0,2013-01-02,21732,3260,24992,0,2013-01-07,1,4
2,186,5,5,3.0,4,616.0,2013-01-02,2741,411,3152,0,2013-01-07,1,4
3,215,7,5,4.0,4,3869.0,2013-01-02,42481,6372,48853,0,2013-01-07,1,4
4,224,10,5,5.0,4,4697.0,2013-01-02,35068,5260,40328,0,2013-01-07,1,4
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
195,22934,7,5,162.0,4,1691.0,2013-04-08,12109,1816,13926,0,2013-04-08,1,2
196,22937,4,5,163.0,4,743.0,2013-04-08,912,137,1049,0,2013-04-08,1,2
197,22939,7,5,164.0,4,4720.0,2013-04-08,1020,153,1173,0,2013-04-08,1,2
198,22940,4,7,,4,,2013-04-08,0,0,-433499,0,2013-04-08,1,2


### 4.	Join the purchases data from step 2 and the supplier invoices data from step 3 (only include matching rows); assuming that step 2 was completely correctly, you can assume the following relationships among the four tables

In [55]:
#Query with CTE, final resulting query has duplicate PurchaseOrderID
sql = """
    with 
    purch_data as (
        select purchaseorderid, sum(POAmount) AS POAmount 
        from monthly_po
        group by purchaseorderid)
    select * 
        from purch_data A
        join supplier_transactions B on A.purchaseorderid = B.purchaseorderid;
"""

purch_supply = pd.read_sql_query(sql, engine)

In [56]:
purch_supply

Unnamed: 0,PURCHASEORDERID,POAMOUNT,SUPPLIERTRANSACTIONID,SUPPLIERID,TRANSACTIONTYPEID,PURCHASEORDERID.1,PAYMENTMETHODID,SUPPLIERINVOICENUMBER,TRANSACTIONDATE,AMOUNTEXCLUDINGTAX,TAXAMOUNT,TRANSACTIONAMOUNT,OUTSTANDINGBALANCE,FINALIZATIONDATE,ISFINALIZED,LASTEDITEDBY
0,106,38472,13834,4,5,106,4,845,2013-03-04,38472,5771,44243,0,2013-03-04,1,14
1,107,3910,13836,7,5,107,4,9908,2013-03-04,3910,587,4497,0,2013-03-04,1,14
2,108,2664,13842,4,5,108,4,6244,2013-03-04,2664,400,3064,0,2013-03-04,1,14
3,109,43866,14211,4,5,109,4,2164,2013-03-05,43866,6580,50446,0,2013-03-11,1,17
4,110,4760,14213,7,5,110,4,3906,2013-03-05,4760,714,5474,0,2013-03-11,1,17
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2067,2062,1618332,334071,4,5,2062,4,6820,2016-05-25,1618332,242750,1861082,0,2016-05-30,1,7
2068,2064,1618188,334383,4,5,2064,4,470,2016-05-26,1618188,242728,1860916,0,2016-05-30,1,7
2069,2070,1164,335510,4,5,2070,4,6305,2016-05-30,1164,175,1339,0,2016-05-30,1,7
2070,2004,1561956,323669,4,5,2004,4,3230,2016-04-20,1561956,234293,1796249,0,2016-04-25,1,2


### 5.	Using the joined data from step 4, create a calculated field that shows the difference between AmountExcludingTax and POAmount, name this field invoiced_vs_quoted, and save the result as a materialized view named purchase_orders_and_invoices

In [57]:
sql = """
create or replace view purchase_orders_and_invoices as(
    with 
    purch_data as (
        select purchaseorderid, sum(POAmount) AS POAmount 
        from monthly_po
        group by purchaseorderid),
    purch_supply_invoice as (
        select A.poamount, B.*
        from purch_data A
        join supplier_transactions B on A.purchaseorderid = B.purchaseorderid)
    
    select purchaseorderid, transactiondate, supplierinvoicenumber, amountexcludingtax, poamount, amountexcludingtax - poamount as invoiced_vs_quoted
    from purch_supply_invoice
);
"""

pd.read_sql_query(sql, engine)

Unnamed: 0,status
0,View PURCHASE_ORDERS_AND_INVOICES successfully...


In [58]:
sql = "select * from purchase_orders_and_invoices"
purchase_orders_and_invoices = pd.read_sql_query(sql, engine)

purchase_orders_and_invoices

Unnamed: 0,PURCHASEORDERID,TRANSACTIONDATE,SUPPLIERINVOICENUMBER,AMOUNTEXCLUDINGTAX,POAMOUNT,INVOICED_VS_QUOTED
0,106,2013-03-04,845,38472,38472,0
1,107,2013-03-04,9908,3910,3910,0
2,108,2013-03-04,6244,2664,2664,0
3,109,2013-03-05,2164,43866,43866,0
4,110,2013-03-05,3906,4760,4760,0
...,...,...,...,...,...,...
2067,2062,2016-05-25,6820,1618332,1618332,0
2068,2064,2016-05-26,470,1618188,1618188,0
2069,2070,2016-05-30,6305,1164,1164,0
2070,2004,2016-04-20,3230,1561956,1561956,0


### 6.	Extract the supplier_case data from postgres

In [5]:
import psycopg2
from snowflake.connector.pandas_tools import write_pandas

# Connecting to postgresql
conn = psycopg2.connect( host="127.0.0.1", port="8765",database="WestCoastImporters", user="jovyan", password="postgres")

In [8]:
# Under the assumption that the supplier_case table was already created in postgresql ("supplier_case.pgsql" file)
# Retrieving table into pandas dataframe

sql = ("""SELECT * FROM supplier_case""")
supplier_case = pd.read_sql_query(sql, conn)

# Close connection to postgresql
conn.close()

**We will be cleaning and transforming the supplier_case data slightly before loading into Snowflake to prepare for questions 7 and 8:**

(1) we'll change the `postalpostalcode` in supplier_case table to a string and correct one of the values, under the assumption that all suppliers are located within USA and not of foreign country.

(2) Using an open source geocoding service (namely, the Nominatim tool from `geopy` module) to identify the latitude and longitude of the supplier's postal codes, this will lead to adding two new columns to data extracted from postgresql

    (a) These coordinates will be used later on to find the closest station to each unique postal code.
   
*NOTE: Due to the request limit of Nominatim, the geocoding part will be commented out to prevent accidentally running it multiple times and getting blocked as a result. The lat/lon coordinates are hard coded after successful requests.*

In [61]:
supplier_case.postalpostalcode

0     22202
1     80125
2     60523
3     95642
4     80125
5     42437
6     95642
7     60523
8     29625
9     22202
10    34269
11    80125
12     6331
Name: postalpostalcode, dtype: int64

In [9]:
# Converting to postal code to string
supplier_case["postalpostalcode"] = supplier_case["postalpostalcode"].astype(str)

# Correcting the last postal code. Googled "postal code 6331", it sent me to Hünenberg, Switzerland...
# 06331 is more accurate because google took me to Canterbury, Windham County, Connecticut. This matches the address city and postol code in the original data.
supplier_case.loc[supplier_case["postalpostalcode"]=="6331", "postalpostalcode"] = "06331"

Below is the commented out geocoding portion. 

If you want to run it yourself, please review the Nominatim policy *first*: https://operations.osmfoundation.org/policies/nominatim/ 

In [63]:
# Installing geopy module
#import sys

#!{sys.executable} -m pip install geopy

In [1]:
# Getting geospatial data using geopy module and Nominatim geocoding service

#import geopy
#from geopy.geocoders import Nominatim
#from geopy.extra.rate_limiter import RateLimiter

In [2]:
# Used geocoding service

#locator = Nominatim(user_agent = "https://www.openstreetmap.org/#map=5/38.661/-102.437")

# To prevent the possibility of denying access or running into timeout in case of we need to look through a huge list of addresses
# Not the case here, since we're only searching through 13, but it's a good thing to know

#geolocator = RateLimiter(locator.geocode, min_delay_seconds = 1)

In [66]:
# This cell is a test

#location = locator.geocode("2934 Night Road, Highlands Ranch, United States") <------ Doesn't give the accurate results, if any at all.

#location= locator.geocode("Canterbury, 06331, United States")
#print("Lat = {}, Lon = {}".format(location.latitude, location.longitude))

In [67]:
#location.address

In [3]:
# User-defined function to use on pandas dataframe and return address, longitude, latitude as an array

#def get_lonlat(df):
#    location = geolocator(df)
#    print("Searching...")
#    return {"address":location.address, "longitude":location.longitude, "latitude":location.latitude}

In [11]:
# Set temporary column for geolocating

#supplier_case["search_address"] = supplier_case.postalpostalcode + ", " + "United States"

In [12]:
#geo_results = supplier_case["search_address"].apply(get_lonlat)

#supplier_case.drop(columns=["search_address"], inplace=True)

Searching...
Searching...
Searching...
Searching...
Searching...
Searching...
Searching...
Searching...
Searching...
Searching...
Searching...
Searching...
Searching...


In [13]:
#pd.DataFrame.from_dict(list(geo_results), orient='columns')

Unnamed: 0,address,longitude,latitude
0,"Arlington County, Virginia, 22202, United States",-77.056181,38.855545
1,"Douglas County, Colorado, 80125, United States",-105.044672,39.501488
2,"Oak Brook, DuPage County, Illinois, 60523, Uni...",-87.954996,41.838441
3,"Jackson, Amador County, California, 95642, Uni...",-120.788896,38.359977
4,"Douglas County, Colorado, 80125, United States",-105.044672,39.501488
5,"Union County, Kentucky, 42437, United States",-87.926939,37.659818
6,"Jackson, Amador County, California, 95642, Uni...",-120.788896,38.359977
7,"Oak Brook, DuPage County, Illinois, 60523, Uni...",-87.954996,41.838441
8,"Anderson County, South Carolina, 29625, United...",-82.711822,34.532045
9,"Arlington County, Virginia, 22202, United States",-77.056181,38.855545


In [72]:
# Below are the hard coded data added into the supplier_case dataframe after geolocating with city and postal addresses.
# Previously, there was an semi-complete address (street not included) column added as validating that we're searching the right places
supplier_case[["longitude", "latitude"]] = pd.DataFrame({
#"address": [
#"Arlington County, Virginia, 22202, United States", 
#"Douglas County, Colorado, 80125, United States", 
#"Oak Brook, DuPage County, Illinois, 60523, United States",
#"Jackson, Amador County, California, 95642, United States",
#"Douglas County, Colorado, 80125, United States",
#"Union County, Kentucky, 42437, United States",
#"Jackson, Amador County, California, 95642, United States",
#"Oak Brook, DuPage County, Illinois, 60523, United States",
#"Anderson County, South Carolina, 29625, United States",
#"Arlington County, Virginia, 22202, United States",
#"DeSoto County, Florida, 34269, United States",
#"Douglas County, Colorado, 80125, United States",
#"Canterbury, Windham County, Connecticut, United States"
#],

"longitude": [-77.056181, -105.04467, -87.954996, -120.788896, -105.044672, -87.926939, -120.788896, -87.954996, -82.711822, -77.056181, -81.985788, -105.044672, -71.9710811],

"latitude": [38.855545, 39.501488, 41.838441, 38.359977, 39.501488, 37.659818, 38.359977, 41.838441, 34.532045, 38.855545, 27.067568, 39.501488, 41.662033]})

In [73]:
supplier_case[["supplierid", "suppliername", "postalpostalcode", "longitude", "latitude"]]

Unnamed: 0,supplierid,suppliername,postalpostalcode,longitude,latitude
0,1,A Datum Corporation,22202,-77.056181,38.855545
1,2,"Contoso, Ltd.",80125,-105.04467,39.501488
2,3,Consolidated Messenger,60523,-87.954996,41.838441
3,4,"Fabrikam, Inc.",95642,-120.788896,38.359977
4,5,Graphic Design Institute,80125,-105.044672,39.501488
5,6,Humongous Insurance,42437,-87.926939,37.659818
6,7,"Litware, Inc.",95642,-120.788896,38.359977
7,8,Lucerne Publishing,60523,-87.954996,41.838441
8,9,Nod Publishers,29625,-82.711822,34.532045
9,10,Northwind Electric Cars,22202,-77.056181,38.855545


In [74]:
sql = """
CREATE OR REPLACE TABLE supplier_case(
   SupplierID               INTEGER
  ,SupplierName             VARCHAR 
  ,SupplierCategoryID       INTEGER  
  ,PrimaryContactPersonID   INTEGER  
  ,AlternateContactPersonID INTEGER  
  ,DeliveryMethodID         INTEGER  
  ,PostalCityID             INTEGER  
  ,SupplierReference        VARCHAR 
  ,BankAccountName          VARCHAR 
  ,BankAccountBranch        VARCHAR 
  ,BankAccountCode          INTEGER  
  ,BankAccountNumber        NUMERIC  
  ,BankInternationalCode    INTEGER  
  ,PaymentDays              INTEGER  
  ,InternalComments         VARCHAR 
  ,PhoneNumber              VARCHAR 
  ,FaxNumber                VARCHAR 
  ,WebsiteURL               VARCHAR 
  ,DeliveryAddressLine1     VARCHAR 
  ,DeliveryAddressLine2     VARCHAR 
  ,DeliveryPostalCode       INTEGER  
  ,DeliveryLocation         VARCHAR 
  ,PostalAddressLine1       VARCHAR  
  ,PostalAddressLine2       VARCHAR 
  ,PostalPostalCode         VARCHAR  
  ,LastEditedBy             INTEGER  
  ,ValidFrom                VARCHAR 
  ,ValidTo                  VARCHAR
  ,Longitude                FLOAT
  ,Latitude                 FLOAT
);
"""

pd.read_sql_query(sql, engine)

Unnamed: 0,status
0,Table SUPPLIER_CASE successfully created.


In [75]:
# load supplier_case dataframe into Snowflake
success, nchunks, nrows, _ = write_pandas(engine, supplier_case, 'supplier_case', quote_identifiers=False)

In [76]:
sql = "select * from supplier_case;"
supplier_case = pd.read_sql_query(sql, engine)

In [77]:
supplier_case

Unnamed: 0,SUPPLIERID,SUPPLIERNAME,SUPPLIERCATEGORYID,PRIMARYCONTACTPERSONID,ALTERNATECONTACTPERSONID,DELIVERYMETHODID,POSTALCITYID,SUPPLIERREFERENCE,BANKACCOUNTNAME,BANKACCOUNTBRANCH,...,DELIVERYPOSTALCODE,DELIVERYLOCATION,POSTALADDRESSLINE1,POSTALADDRESSLINE2,POSTALPOSTALCODE,LASTEDITEDBY,VALIDFROM,VALIDTO,LONGITUDE,LATITUDE
0,1,A Datum Corporation,2,21,22,7.0,22202,AA20384,A Datum Corporation,Woodgrove Bank Zionsville,...,22202,0xE6100000010CDE115F37B6F9434031276893C39055C0,PO Box 1039,Arlington,22202,1,05:00.0,##################,-77.056181,38.855545
1,2,"Contoso, Ltd.",2,23,24,9.0,80125,B2084020,Contoso Ltd,Woodgrove Bank Greenbank,...,80125,0xE6100000010CDA4B6430900C4840C04EFBF7AAA45EC0,PO Box 1012,Highlands Ranch,80125,1,05:00.0,##################,-105.04467,39.501488
2,3,Consolidated Messenger,6,25,26,,60523,209340283,Consolidated Messenger,Woodgrove Bank San Francisco,...,60523,0xE6100000010C529ACDE330E34240DFFB1BB4D79A5EC0,PO Box 1014,Westmont,60523,1,05:00.0,##################,-87.954996,41.838441
3,4,"Fabrikam, Inc.",4,27,28,7.0,95642,293092,Fabrikam Inc,Woodgrove Bank Lakeview Heights,...,95642,0xE6100000010C86E7A5626313434023C9625147E054C0,PO Box 301,Jackson,95642,1,05:00.0,##################,-120.788896,38.359977
4,5,Graphic Design Institute,2,29,30,10.0,80125,8803922,Graphic Design Institute,Woodgrove Bank Lanagan,...,80125,0xE6100000010C15E46723D74D424081F8AF62A79C57C0,PO Box 393,Highlands Ranch,80125,1,05:00.0,##################,-105.044672,39.501488
5,6,Humongous Insurance,9,31,32,,42437,82420938,Humongous Insurance,Woodgrove Bank Lancing,...,42437,0xE6100000010CCCF2D0D2700F424085C7235DD82955C0,PO Box 94829,Boxville,42437,1,05:00.0,##################,-87.926939,37.659818
6,7,"Litware, Inc.",5,33,34,2.0,95642,BC0280982,Litware Inc,Woodgrove Bank Mokelumne Hill,...,95642,0xE6100000010C297398D475264340B63CC560342D5EC0,PO Box 20290,Jackson,95642,1,05:00.0,##################,-120.788896,38.359977
7,8,Lucerne Publishing,2,35,36,10.0,60523,JQ082304802,Lucerne Publishing,Woodgrove Bank Jonesborough,...,60523,0xE6100000010C9D8F21B6AA25424091F69A794D9E54C0,PO Box 8747,Westmont,60523,1,05:00.0,##################,-87.954996,41.838441
8,9,Nod Publishers,2,37,38,10.0,29625,GL08029802,Nod Publishers,Woodgrove Bank Elizabeth City,...,29625,0xE6100000010C0EB0A07AB52542407452A923111053C0,PO Box 3390,Anderson,29625,1,05:00.0,##################,-82.711822,34.532045
9,10,Northwind Electric Cars,3,39,40,8.0,22202,ML0300202,Northwind Electric Cars,Woodgrove Bank Crandon Lakes,...,22202,0xE6100000010C6C4E14D7E78F4440C74ED3C2C0B552C0,PO Box 30920,Arlington,22202,1,05:00.0,##################,-77.056181,38.855545


### 7.	Connect to the Environment Data Atlas Marketplace data (do this manually inside Snowflake) and then extract weather data for each unique zip code in the supplier_case table (suppliers can have the same zip code but you only need to extract weather data for each zip code once)

Will be pulling weather data from the Snowflake market place.

Note: If the database and schema name is different when you pulled the weather from the marketplace, please refined the aforementioned names below in the `db` and `sch` variables respectively.

In [78]:
db = "ENVIRONMENT_DATA_ATLAS"
sch = "ENVIRONMENT"

In [79]:
# Checking the years and number of records we have in weather data from the marketplace
sql = f"""
select date_part(year, "Date") AS Year, count("Date") AS Records 
from {db}.{sch}.NOAACD2019R 
where "Country"='US'
group by 1
order by 1 desc;
"""
pd.read_sql_query(sql, engine)

Unnamed: 0,YEAR,RECORDS
0,2022,1656656
1,2021,13275921
2,2020,14093687
3,2019,13991130
4,2018,12833124


In [80]:
# We only want US data on daily high temperature readings.
sql = f"""
create or replace table us_weather
cluster by ("Stations", "Date") as(
    select * 
    from {db}.{sch}.NOAACD2019R 
    where "Country"='US' and "Indicator Name" = 'Maximum temperature (Fahrenheit)' and "Measure" = 'M1'
);
"""
pd.read_sql_query(sql, engine)

Unnamed: 0,status
0,Table US_WEATHER successfully created.


In [81]:
sql = "select * from us_weather limit 100;"
us_weather = pd.read_sql_query(sql, engine)

In [82]:
us_weather

Unnamed: 0,Country,Country Name,Country RegionId,Country ISO Code,Stations,Stations Name,Stations Latitude,Stations Longitude,Stations USAF,Stations WBAN,Stations Country Code,Indicator,Indicator Name,Measure,Measure Name,Units,Scale,Frequency,Date,Value
0,US,United States,US,USA,997288-99999,QUONSET POINT,41.58,-071.400,997288,99999,US,KN.A2,Maximum temperature (Fahrenheit),M1,Value,,1,D,2020-03-20,56.8
1,US,United States,US,USA,997283-99999,NEW LONDON,41.35,-072.080,997283,99999,US,KN.A2,Maximum temperature (Fahrenheit),M1,Value,,1,D,2020-03-20,56.3
2,US,United States,US,USA,997282-99999,997282 - MONTAUK,41.05,-071.970,997282,99999,US,KN.A2,Maximum temperature (Fahrenheit),M1,Value,Fahrenheit,1,D,2020-03-20,53.1
3,US,United States,US,USA,997281-99999,LEWES,38.78,-075.120,997281,99999,US,KN.A2,Maximum temperature (Fahrenheit),M1,Value,Fahrenheit,1,D,2020-03-20,82.9
4,US,United States,US,USA,997287-99999,997287 - EASTPORT,44.9,-066.980,997287,99999,US,KN.A2,Maximum temperature (Fahrenheit),M1,Value,,1,D,2020-03-20,46.6
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3786403,US,United States,US,USA,722115-12871,SARASOTA/BRADENTON INTL AP,27.401,-082.559,722115,12871,US,KN.A2,Maximum temperature (Fahrenheit),M1,Value,Fahrenheit,1,D,2021-01-24,79.0
3786404,US,United States,US,USA,722114-54901,BUFFALO MUNICIPAL AIRPORT,45.159,-093.843,722114,54901,US,KN.A2,Maximum temperature (Fahrenheit),M1,Value,Fahrenheit,1,D,2021-01-24,25.7
3786405,US,United States,US,USA,722116-12873,ST PETE-CLWTR INTL AIRPORT,27.911,-082.688,722116,12873,US,KN.A2,Maximum temperature (Fahrenheit),M1,Value,Fahrenheit,1,D,2021-01-24,79.0
3786406,US,United States,US,USA,722118-00482,VENICE MUNICIPAL AIRPORT,27.072,-082.440,722118,00482,US,KN.A2,Maximum temperature (Fahrenheit),M1,Value,Fahrenheit,1,D,2021-01-24,77.0


In [83]:
# Checking if Station USAF-WBAN have a unique Station Name
(len(us_weather["Stations"].unique()), len(us_weather["Stations Name"].unique()))

(3136, 3109)

### 7. connect to the Environment Data Atlas Marketplace data (do this manually inside Snowflake) and then extract weather data for each unique zip code in the supplier_case table (suppliers can have the same zip code but you only need to extract weather data for each zip code once)

                a.	the weather data does not contain zip codes but you can use the approach in https://towardsdatascience.com/noaa-weather-data-in-snowflake-free-20e90ee916ed to find weather stations closest to each zip code (only use one weather station per zip code); 

                b.	create a materialized view named supplier_zip_code_weather that contains the unique zip codes (PostalPostalCode) from the supplier data, date, and daily high temperatures, i.e., the view should have three columns (zip code, date, and high temperature) and one row per day and unique supplier zip code


This is where we cross joined the us_weather table with the supplier_case table. We will be using the st_distance and st_makepoint functions to calculate the minimum distance between the station and supplier postal code coordinates (Longitude/Latitude), then limit to search to 50km.  We used the code from: https://towardsdatascience.com/noaa-weather-data-in-snowflake-free-20e90ee916ed as reference.

And in preparation for part 8, we roll will roll back by 5 because the weather data doesn't have records older than 2018 while our transaction dates range from 2013 to 2016. 

In [84]:
sql = """
create or replace view supplier_zip_code_weather
as (
    with 
    
-- 7a: closest station to each unique zipcode

    closest_stations as (
        select 
            b.postalpostalcode as Zip_Code,
            a."Stations Name" as Station_Name,
            a."Stations" as Stations,
            st_distance(st_makepoint(a."Stations Longitude", a."Stations Latitude"), st_makepoint(b.Longitude, b.Latitude)) as Distance
            from us_weather a
            cross join supplier_case b
            where st_distance(st_makepoint(a."Stations Longitude", a."Stations Latitude"), st_makepoint(b.Longitude, b.Latitude)) < 30000
            qualify row_number() over(partition by Zip_Code order by Distance) = 1
    ),
    daily_temps_closest_stations as (
        select B.Zip_code, A."Stations Name" as Station_Name, dateadd(year, -5, A."Date") as Date, A."Value" as High_Temperature
        from us_weather A
        join closest_stations B
        on A."Stations" = B.Stations
    )
    
-- 7b: Complete materialized view

    select Zip_Code, Date, High_Temperature
    from daily_temps_closest_stations
)
;
"""

pd.read_sql_query(sql, engine)

Unnamed: 0,status
0,View SUPPLIER_ZIP_CODE_WEATHER successfully cr...


In [85]:
sql = "select * from supplier_zip_code_weather;"
supplier_zip_code_weather = pd.read_sql_query(sql, engine)

In [86]:
supplier_zip_code_weather

Unnamed: 0,ZIP_CODE,DATE,HIGH_TEMPERATURE
0,22202,2015-11-02,59.0
1,22202,2015-11-03,66.0
2,22202,2015-11-04,66.9
3,22202,2015-11-05,69.1
4,22202,2015-11-06,70.0
...,...,...,...
11706,80125,2014-05-22,50.0
11707,80125,2014-05-23,50.0
11708,80125,2014-05-24,70.0
11709,80125,2014-05-25,73.9


### 8.	Join purchase_orders_and_invoices, supplier_case, and supplier_weather based on zip codes and the transaction date. Only include transactions that have matching temperature readings  

In [87]:
sql = """
WITH 
zip_weather_join_supplier AS (select *
    FROM supplier_zip_code_weather AS A
    JOIN supplier_case AS B
    ON A.ZIP_CODE=B.postalpostalcode
    ),
matching_temps AS (SELECT A.*
    FROM zip_weather_join_supplier as A
    RIGHT JOIN (
        SELECT CASE WHEN count(HIGH_TEMPERATURE) >1 THEN HIGH_TEMPERATURE ELSE NULL END AS HT 
        FROM zip_weather_join_supplier GROUP BY HIGH_TEMPERATURE) AS B
    ON A.HIGH_TEMPERATURE = B.HT
    WHERE A.HIGH_TEMPERATURE IS not NULL
    )

SELECT B.PurchaseOrderID, B.TRANSACTIONDATE, B.SupplierInvoiceNumber, B.POAmount, B. Invoiced_vs_Quoted, A.SupplierID, A.SupplierName, A.Zip_code, A.High_Temperature, A.BankAccountName
    FROM matching_temps as A
    JOIN purchase_orders_and_invoices as B
    ON A.Date = B.TRANSACTIONDATE
    ORDER BY HIGH_TEMPERATURE;
"""

purchase_supplier_temps = pd.read_sql_query(sql, engine)

purchase_supplier_temps

Unnamed: 0,PURCHASEORDERID,TRANSACTIONDATE,SUPPLIERINVOICENUMBER,POAMOUNT,INVOICED_VS_QUOTED,SUPPLIERID,SUPPLIERNAME,ZIP_CODE,HIGH_TEMPERATURE,BANKACCOUNTNAME
0,653,2014-01-31,4877,450336,0,8,Lucerne Publishing,60523,-4.0,Lucerne Publishing
1,654,2014-01-31,9731,131256,199,3,Consolidated Messenger,60523,-4.0,Consolidated Messenger
2,653,2014-01-31,4877,450336,0,3,Consolidated Messenger,60523,-4.0,Consolidated Messenger
3,654,2014-01-31,9731,131256,199,8,Lucerne Publishing,60523,-4.0,Lucerne Publishing
4,6,2013-01-02,1375,5570,-41,3,Consolidated Messenger,60523,7.0,Consolidated Messenger
...,...,...,...,...,...,...,...,...,...,...
26102,1621,2015-09-07,9839,532712,311,7,"Litware, Inc.",95642,107.4,Litware Inc
26103,1623,2015-09-07,2439,152,2,7,"Litware, Inc.",95642,107.4,Litware Inc
26104,1620,2015-09-07,3647,1228056,0,7,"Litware, Inc.",95642,107.4,Litware Inc
26105,1622,2015-09-07,8991,3558,0,7,"Litware, Inc.",95642,107.4,Litware Inc


In [88]:
cur.close()
engine.close()