# ETL/ELT (Snowflake and Python) Assignment



Step 1:
Extract and load the 41 comma delimited purchases data files and form a single table of purchases data; 
This query takes data from the directory and loads it into the staging area
From the staging area, it loads data into the base tables

In [None]:

# Creating table structure

cs.execute(
    "CREATE OR REPLACE TABLE po_table2(" \
    "PurchaseOrderID INT, SupplierID INT," \
    "OrderDate date, DeliveryMethodID INT," \
    "ContactPersonID INT, ExpectedDeliveryDate date," \
    "SupplierReference STRING, IsOrderFinalized INT," \
    "Comments STRING, InternalComments STRING," \
    "LastEditedBy INT, LastEditedWhen STRING," \
    "PurchaseOrderLineID INT, StockItemID INT," \
    "OrderedOuters INT, Description STRING," \
    "ReceivedOuters INT, PackageTypeID INT," \
    "ExpectedUnitPricePerOuter FLOAT, LastReceiptDate date," \
    "IsOrderLineFinalized INT, Right_LastEditedBy INT," \
    "Right_LastEditedWhen STRING)"
)

import glob
import snowflake.connector

# Establish Snowflake Connection
conn = snowflake.connector.connect(
    user='Lynn',
    password='Lynn961108li',
    account='mzniiiw-cjb77811',
    warehouse='MY_FIRST_WAREHOUSE',
    database='TESTDB',
    schema='TESTSCHEMA'
)

# Create a Cursor Object
cs = conn.cursor()

# Define the path and filename pattern
folder_path = "/Users/lynnli/Desktop/CaseData/Monthly_PO_Data"

# Upload CSV Files to PO_STAGE
for year in range(2013, 2017):
    file_pattern = f"{year}-*"
    file_path_pattern = f"{folder_path}/{file_pattern}.csv"
    for file_path in glob.glob(file_path_pattern):
        try:
            put_command = f"PUT file://{file_path} @PO_STAGE1"
            cs.execute(put_command)
        except Exception as e:
            print(f"Error while executing PUT command for file {file_path}: {e}")


# Command to copy data from stage to table. The below code verifies the number of rows successfully copied into that table
# Run COPY INTO command with SKIP_HEADER = 1 to skip the header row
try:
    copy_into_command = """
        COPY INTO PO_TABLE2
        FROM @PO_STAGE1
        FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER = 1)
        ON_ERROR = 'CONTINUE'
    """
    cs.execute(copy_into_command)
    check_data_command = "SELECT COUNT(*) FROM PO_TABLE2"
    cs.execute(check_data_command)
    row = cs.fetchone()
    print(f"Number of rows in po_table2: {row[0]}")
    print("Data successfully loaded into po_table2.")
except Exception as e:
    print(f"Error while executing COPY INTO command: {e}")

# To drop the columns with null values
cs.execute("alter table PO_TABLE2 Drop column comments,internalcomments")



STEP 2:
Create a calculated field that shows purchase order totals, i.e., for each order, sum the line item amounts 
(defined as ReceivedOuters * ExpectedUnitPricePerOuter), and name this field POAmount
Execute the SQL query to create a new table with POAmount

In [None]:

try:
    cs.execute("""
        CREATE TABLE new_po_table AS
        SELECT *, 
               SUM(ReceivedOuters * ExpectedUnitPricePerOuter) OVER (PARTITION BY PurchaseOrderID) AS POAmount
        FROM po_table2;
    """)
    print("New table 'new_po_table' with POAmount created.") 
#Created a new table 'new_po_table' with POAmount
except Exception as e:
    print(f"Error while executing SQL query: {e}")


STEP 3:
Extract and load the supplier invoice XML data
a. shred the data into a table (preferably in the COPY INTO process) where each row corresponds to a single invoice


In [None]:

# Using INSERT INTO

# Extract and load the supplier invoice XML data
for row in root.findall('row'):
    try:
        # Extract fields from the XML
        SupplierTransactionID = int(row.find('SupplierTransactionID').text)
        SupplierID = int(row.find('SupplierID').text)
        
        TransactionTypeID_element = row.find('TransactionTypeID')
        TransactionTypeID = int(TransactionTypeID_element.text) if TransactionTypeID_element is not None else None
        
        PurchaseOrderID_element = row.find('PurchaseOrderID')
        PurchaseOrderID = int(PurchaseOrderID_element.text) if PurchaseOrderID_element is not None and PurchaseOrderID_element.text is not None else None
        
        PaymentMethodID_element = row.find('PaymentMethodID')
        PaymentMethodID = int(PaymentMethodID_element.text) if PaymentMethodID_element is not None and PaymentMethodID_element.text is not None else None
        
        SupplierInvoiceNumber_element = row.find('SupplierInvoiceNumber')
        SupplierInvoiceNumber = int(SupplierInvoiceNumber_element.text) if SupplierInvoiceNumber_element is not None and SupplierInvoiceNumber_element.text is not None else None
        
        TransactionDate = row.find('TransactionDate').text
        AmountExcludingTax = float(row.find('AmountExcludingTax').text)
        TaxAmount = float(row.find('TaxAmount').text)
        TransactionAmount = float(row.find('TransactionAmount').text)
        OutstandingBalance = float(row.find('OutstandingBalance').text)
        
        FinalizationDate_element = row.find('FinalizationDate')
        FinalizationDate = FinalizationDate_element.text if FinalizationDate_element.text else None

        IsFinalized = int(row.find('IsFinalized').text)
        LastEditedBy = int(row.find('LastEditedBy').text)
        LastEditedWhen = row.find('LastEditedWhen').text
        
        # Prepare the insert command
        insert_command = """
            INSERT INTO supplier_invoice_xml
            (SupplierTransactionID, SupplierID, TransactionTypeID, PurchaseOrderID, PaymentMethodID, SupplierInvoiceNumber, TransactionDate, AmountExcludingTax, TaxAmount, TransactionAmount, OutstandingBalance, FinalizationDate, IsFinalized, LastEditedBy, LastEditedWhen)
            VALUES
            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """
        
        # Prepare the values
        values = (
            SupplierTransactionID, SupplierID, TransactionTypeID, PurchaseOrderID,
            PaymentMethodID, SupplierInvoiceNumber, TransactionDate,
            AmountExcludingTax, TaxAmount, TransactionAmount, OutstandingBalance,
            FinalizationDate, IsFinalized, LastEditedBy, LastEditedWhen
        )
        
        # Execute the insert command using parameterized query
        cs.execute(insert_command, values)
    
    except Exception as e:
        print(f"Error inserting row with SupplierTransactionID: {SupplierTransactionID}. Error: {e}")





We have achieved this method Using COPY INTO as well. For reference, we have retained the code for both the methods.
The second method (commented) is using COPY INTO and the first method is using INSERT INTO.
insert into took more time than copy


In [None]:

# cs = conn.cursor()

# import xml.etree.ElementTree as ET
# import pandas as pd

# tree = ET.parse('/Users/lynnli/Desktop/CaseData/SupplierTransactions.xml')
# root = tree.getroot()

# # Create a list of dictionaries for each invoice
# invoices = []

# for row in root.findall('row'):
#     invoice = {}
#     for child in row:
#         invoice[child.tag] = child.text
#     invoices.append(invoice)
    
# # Create a dataframe from the list of dictionaries
# data_invoices = pd.DataFrame(invoices)

#converting the dataframe into a csv file
# data_invoices.to_csv("supplier_invoices.csv", index=False)

# # create supplier_invoice table with columns 
# cs.execute("""CREATE OR REPLACE TABLE supplier_invoice_xml1(
#     SupplierTransactionID INTEGER,
#     SupplierID INTEGER,
#     TransactionTypeID INTEGER,
#     PurchaseOrderID INTEGER,
#     PaymentMethodID INTEGER,
#     SupplierInvoiceNumber INTEGER,
#     TransactionDate DATE,
#     AmountExcludingTax FLOAT,
#     TaxAmount FLOAT,
#     TransactionAmount FLOAT,
#     OutstandingBalance FLOAT,
#     FinalizationDate DATE,
#     IsFinalized STRING,
#     LastEditedBy INTEGER,
#     LastEditedWhen STRING
#     );
#     """)

# # put the csv file into the Stage on Snowflake

# put_query1 = f"PUT 'file://{'supplier_invoices.csv'}' @mystage3;"
# cs.execute(put_query1)

# # Use copy into to copy file into the table 

# copy_into_query1 = f"""
# COPY INTO supplier_invoice_xml1
#          FROM (
#              SELECT 
#                  $1::INTEGER AS SupplierTransactionID,
#                  $2::INTEGER AS SupplierID,
#                  $3::INTEGER AS TransactionTypeID,
#                  $4::INTEGER AS PurchaseOrderID,
#                  $5::INTEGER AS PaymentMethodID,
#                  $6::INTEGER AS SupplierInvoiceNumber,
#                  $7::DATE AS TransactionDate,
#                  $8::FLOAT AS AmountExcludingTax,
#                  $9::FLOAT AS TaxAmount,
#                  $10::FLOAT AS TransactionAmount,
#                  $11::FLOAT AS OutstandingBalance,
#                  $12::DATE AS FinalizationDate,
#                  $13::STRING AS IsFinalized,
#                  $14::INTEGER AS LastEditedBy,
#                  $15::STRING AS LastEditedWhen
#              FROM @mystage3/{'supplier_invoices.csv'.split("/")[-1]}
#          )
#          FILE_FORMAT = (TYPE = 'CSV' skip_header = 1)
#          ON_ERROR = 'CONTINUE';  -- Or specify another error handling strategy
#      """
# cs.execute(copy_into_query1)

 STEP 4 & 5:


 Using the joined data across purchases data from step 2 and the supplier invoices data from step 3 (including matching rows only),
 create a calculated field that shows the difference between AmountExcludingTax and POAmount, name this field invoiced_vs_quoted, 
 and save the result as a materialized view named purchase_orders_and_invoices. In the below code, 



In [None]:
#we have created a table with join instead of a materialized view.

cs.execute("""create  table purchase_orders_and_invoices as
select  po.*,invoice.AmountExcludingTax as invoiceamount
,(invoice.AmountExcludingTax- po.poamount) as invoiced_vs_quoted
FROM 
NEW_PO_TABLE po,
SUPPLIER_INVOICE_XML invoice

where po.supplierid=invoice.supplierid
and po.purchaseorderid=invoice.purchaseorderid""")

STEP 6:

Extract the supplier_case data from postgres, do not import the data into Python, instead use Python to move the data 
from postgres to your local drive and then directly into a Snowflake stage

In [None]:
# Install the required libraries used for connecting to PostgreSQL databases from Python.

pip install psycopg2-binary snowflake-connector-python

import psycopg2

# PostgreSQL database connection parameters
pg_params = {
    "host": "127.0.0.1",
    "database": "WestCoastImporters",
    "user": "jovyan",
    "password": "postgres",
    "port":"8765",

}
# Connect to PostgreSQL
conn = psycopg2.connect(**pg_params)
cur = conn.cursor()

# Run the the sql command in  PostgreSQL using the supplier_case file
with open('/Users/lynnli/Desktop/CaseData/supplier_case.pgsql', 'r') as f:
    cur.execute(f.read())

conn.commit()

# Export data to a CSV on your local drive
csv_file_path = "/Users/lynnli/Desktop/CaseData/postgresoutput.csv" # Path to your CSV file

query = "COPY supplier_case TO STDOUT WITH CSV HEADER"
with open(csv_file_path, 'w') as f:
cur.copy_expert(query, f)

# 6 a.Consider creating a Python function that can take a csv file path as input and then generate field definitions 
# (field names and datatypes based on the header and data types in the file) that can then be used in CREATE TABLE statement.

# creating a function to generate the field definitions for the CREATE TABLE statement
import csv

def generate_field_definitions(file_path):
    # Open the CSV file for reading
    with open(file_path, 'r') as f:
        reader = csv.reader(f)
        
        # Read the headers and the first data row
        headers = next(reader)
        first_row = next(reader)
        
        definitions = []
        
        # Loop through each column in the first data row
        for header, value in zip(headers, first_row):
            
            # If the value is a whole number, infer the datatype as INTEGER
            if value.isdigit():
                datatype = "INTEGER"
            
            # If the value is a decimal number, infer the datatype as FLOAT
            elif "." in value and value.replace(".", "").isdigit():
                datatype = "FLOAT"
            
            # Otherwise, infer the datatype as TEXT
            else:
                datatype = "TEXT"
            
            # Add the column definition to the list
            definitions.append(f"{header} {datatype}")
        
        # Return the column definitions as a comma-separated string
        return ", ".join(definitions)

 6 b.You need to use psycopg2 or a similar Python library to connect to the postgres database within Python, issue a command to postgres 
 to have postgres save the supplier_case data to file, and then use cs.execute to move the file to an internal Snowflake stage and 
 eventually into a table.

In [None]:
# Path to your CSV file
csv_file_path = "/Users/lynnli/Desktop/CaseData/postgresoutput.csv"

# Create a stage and upload the CSV file
#cs.execute("CREATE OR REPLACE STAGE my_stage")
cs.execute(f"PUT file://{csv_file_path} @mystage3")

# Generate table schema from CSV and create the table
table_schema = generate_field_definitions(csv_file_path)
create_table_query = f"CREATE OR REPLACE TABLE supplier_case ({table_schema})"
cs.execute(create_table_query)

# Copy data from stage to the table

copy_query = """
COPY INTO supplier_case
FROM @mystage3/postgresoutput.csv
FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY = '' SKIP_HEADER = 1);
"""
cs.execute(copy_query)

# STEP 7: Snowflake Marketplace 

     
 A. The weather data does not contain zip codes but you can use the approach in 
 https://towardsdatascience.com/noaa-weather-data-in-snowflake-free-20e90ee916ed to find weather stations closest to each 
 zip code (only use one weather station per zip code). For this to work you need to find a data file with zip code – 
 geo location mappings, e.g., from the US census (ZCTAs are fine to use);

In [None]:

# Step to copy csv file to snowflake

import snowflake.connector

# Snowflake connection parameters
conn = snowflake.connector.connect(
user='Lynn',
password='Lynn961108li',
account='mzniiiw-cjb77811',
warehouse='MY_FIRST_WAREHOUSE',
database='TESTDB',
schema='TESTSCHEMA'
)

# Connect to Snowflake
cs = conn.cursor()


# copy csv file to snowflake

csv_file_path = "/Users/lynnli/Desktop/CaseData/uszip.csv" # file with us zip code
# Create a stage and upload the CSV file
#cs.execute("CREATE OR REPLACE STAGE my_stage")
cs.execute(f"PUT file://{csv_file_path} @mystage3")


#create table in snowflake for storing us zip code

cs.execute(
    "CREATE OR REPLACE TABLE "
    "US_ZIP_TABLE(zip INT PRIMARY KEY, lat FLOAT, lng FLOAT, city VARCHAR(255), state_id VARCHAR(10), state_name VARCHAR(255), zcta BOOLEAN, parent_zcta STRING, population INT, density FLOAT, county_fips INT, county_name VARCHAR(255), county_weights TEXT, county_names_all TEXT, county_fips_all TEXT, imprecise BOOLEAN, military BOOLEAN, timezone VARCHAR(50))"
)

#   Copy data from stage to table

cs.execute(f"COPY INTO US_ZIP_TABLE FROM @mystage3/uszip.csv FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY = '\"' SKIP_HEADER = 1) ON_ERROR = 'CONTINUE'")

# dropping null columns
cs.execute("ALTER TABLE US_ZIP_TABLE DROP COLUMN parent_zcta;")


B.Create a materialized view named supplier_zip_code_weather that contains the unique zip codes (PostalPostalCode) from 
 the supplier data, date, and daily high temperatures, i.e., the view should have three columns (zip code, date, and high temperature) 
 and one row per day and unique supplier zip code


In [None]:

# Create a view for the table US_ZIP_TABLE that join zip code 
# with supplier case table to get supplier latitude and longitude

cs.execute("""
CREATE VIEW  supplier_zip_main AS
SELECT DISTINCT PostalPostalCode as Zip_Code, LAT as Supplier_Latitude, LNG as Supplier_Longitude
FROM supplier_case A
JOIN US_ZIP_TABLE B
ON A.POSTALPOSTALCODE = B.ZIP;
""")


# Create a view for the table ENVIRONMENT_DATA_ATLAS.ENVIRONMENT.NOAACD2019R to get max temperature for each day from snowflake marketplace

cs.execute("""create view weather_main as

SELECT MAX("Value") AS "MaxValue Of Daily Temparature", CAST("Stations Latitude" AS FLOAT) as Weather_latitude 
, CAST("Stations Longitude" AS FLOAT) as Weather_longitude
, "Date"
FROM ENVIRONMENT_DATA_ATLAS.ENVIRONMENT.NOAACD2019R
WHERE "Country ISO Code" = 'USA' 
AND "Indicator Name" = 'Maximum temperature (Fahrenheit)' 
AND "Units" = 'Fahrenheit'
GROUP BY 4,2,3""")

In [None]:


# Calculating distance between supplier and weather station and ranking them based on 
  # distance and selecting the nearest weather station

# Creating a view supplier_zip_code_weather_view instead of materialized view 
  #as it is not supported in snowflake

cs.execute(""" CREATE VIEW supplier_zip_code_weather_view AS 

WITH Distance_Calculated AS (
  SELECT 
      E.Weather_latitude AS Weather_latitude,
      E.Weather_longitude AS Weather_longitude,
      E."Date" as Date,
      E."MaxValue Of Daily Temparature" as Value,
      z.Supplier_Latitude AS Supplier_Latitude,
      z.Supplier_Longitude AS Supplier_Longitude,
      z.Zip_Code as zipcode,
      ST_DISTANCE(
        ST_POINT(E.Weather_longitude,E.Weather_latitude),
        ST_POINT(z.Supplier_Longitude, z.Supplier_Latitude) 
      ) <50000 AS Distance
    FROM weather_main AS E
    CROSS JOIN supplier_zip_main AS z
),
ranked_stations AS (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY zipcode ORDER BY Distance ASC) AS rank
  FROM Distance_Calculated
)

SELECT zipcode, Date, Value
           FROM ranked_stations R
WHERE rank = 1
ORDER BY 1,2;
 """)





 STEP 8:
 Join purchase_orders_and_invoices, supplier_case, and supplier_zip_code_weather based on zip codes and the transaction date. Only 
 include transactions that have matching temperature readings
 selecting data from supplier case table, purchase order table and supplier_zip_code_weather 
 and joining them based on zip code and date 

In [None]:
# selecting data from supplier case table, purchase order table and supplier_zip_code_weather 
# and joining them based on zip code and date 

cs.execute(""" select s.supplierid,s.suppliername,sz.zipcode,
p.orderdate,sz.Value,sz.zipcode
from
supplier_case s,
purchase_orders_and_invoices p,
supplier_zip_code_weather_view sz

where s.postalpostalcode=sz.zipcode
and dateadd(year, +5, p.orderdate)=sz.date 
           """)

print(cs.fetchall())


# Close the connection
cs.close()
conn.close()