## Automated ETL and Data Analysis with Python and Snowflake



Please follow the instructions below before running the codes:

1. Change "Monthly PO Data" folder name to "MonthlyPOData". (Done to reduce unnecessary space cells)
2. The name of "Supplier Transactions XML" file needs to be changed to "SupplierTransactionsXML". (Done to reduce unnecessary space cells)
3. Please Unzip and save "2021_Gaz_zcta_national" txt file in the "Data" folder
4. Enter valid Snowflake account information.


In [1]:
# Step 0: Initial Setup
# Install or upgrade the Snowflake connector for Python.
pip install --upgrade snowflake-connector-python

The history saving thread hit an unexpected error (DatabaseError('database disk image is malformed')).History will not be written to the database.
Collecting snowflake-connector-python
  Using cached snowflake_connector_python-3.12.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (64 kB)
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Using cached asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting tomlkit (from snowflake-connector-python)
  Using cached tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Using cached snowflake_connector_python-3.12.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (2.5 MB)
Using cached asn1crypto-1.5.1-py2.py3-none-any.whl (105 kB)
Using cached tomlkit-0.13.2-py3-none-any.whl (37 kB)
Installing collected packages: asn1crypto, tomlkit, snowflake-connector-python
Successfully installed asn1crypto-1.5.1 snowflake-connector-python-3.12.1 tomlkit-0.13.2
Note: you may need to restart th

# PART 1 - IMPORTING THE LIBRARIES AND DATASETS (CONNECTING TO SNOWFLAKE)

In [2]:
import snowflake.connector

In [3]:
conn = snowflake.connector.connect(
    user='aman05',
    password='XXXXX',
    account='cdflalr-yfb48642'
    )


In [4]:
cs = conn.cursor()

In [14]:
cs.execute("CREATE WAREHOUSE IF NOT EXISTS my_first_warehouse")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff7498b5d0>

In [15]:
cs.execute("CREATE DATABASE IF NOT EXISTS testdb")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff7498b5d0>

In [102]:
cs.execute("USE DATABASE testdb")
cs.execute("CREATE SCHEMA IF NOT EXISTS testschema")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff70859c90>

In [13]:
cs.execute(
"CREATE OR REPLACE TABLE "    
    "test_table(col1 integer, col2 string)")
cs.execute(
"INSERT INTO test_table(col1, col2) "
    "VALUES (123, 'test string1'), (456, 'test string2')")


<snowflake.connector.cursor.SnowflakeCursor at 0xffff732209d0>

In [14]:
cs.execute('SELECT * FROM test_table')
print(cs.fetchmany(2))


[(123, 'test string1'), (456, 'test string2')]


In [15]:
import glob

In [16]:
cs.execute('USE DATABASE testdb')
cs.execute('CREATE OR REPLACE STAGE test_table_stage')


<snowflake.connector.cursor.SnowflakeCursor at 0xffff732209d0>

In [17]:
import os
import pandas as pd
    

In [18]:
folder_path = '/Users/aman/Desktop/Data/Monthly PO Data'
all_files = glob.glob(os.path.join(folder_path, "*.csv"))

In [19]:
if all_files:
    combined_df = pd.concat((pd.read_csv(f) for f in all_files))
else:
    combined_df = pd.DataFrame()


In [21]:
stage_name = 'test_table_stage'
cursor = conn.cursor()
cursor.execute(f"CREATE OR REPLACE STAGE {stage_name}")


<snowflake.connector.cursor.SnowflakeCursor at 0xffff72b44b50>

In [22]:
for file_path in all_files:
    file_name = os.path.basename(file_path)
    cursor.execute(f"PUT file://{file_path} @{stage_name}/{file_name}")


In [23]:
table_name = 'TEST_TABLE'
stage_name = 'test_table_stage'

# Step 1: Create the table structure
cursor.execute(f"""
CREATE OR REPLACE TABLE {table_name} (
    PurchaseOrderID INTEGER,
    SupplierID INTEGER,
    OrderDate DATE,
    DeliveryMethodID INTEGER,
    ContactPersonID INTEGER,
    ExpectedDeliveryDate DATE,
    SupplierReference VARCHAR,
    IsOrderFinalized BOOLEAN,
    Comments VARCHAR,
    InternalComments VARCHAR,
    LastEditedBy INTEGER,
    LastEditedWhen TIMESTAMP,
    PurchaseOrderLineID INTEGER,
    StockItemID INTEGER,
    OrderedOuters INTEGER,
    Description VARCHAR,
    ReceivedOuters INTEGER,
    PackageTypeID INTEGER,
    ExpectedUnitPricePerOuter FLOAT,
    LastReceiptDate DATE,
    IsOrderLineFinalized BOOLEAN,
    Right_LastEditedBy INTEGER,
    Right_LastEditedWhen TIMESTAMP
);
""")

# Step 2: Load data from the stage into the table
cursor.execute(f"""
COPY INTO {table_name}
FROM @{stage_name}
FILE_FORMAT = (TYPE = 'CSV', SKIP_HEADER = 1, FIELD_OPTIONALLY_ENCLOSED_BY = '"');
""")


<snowflake.connector.cursor.SnowflakeCursor at 0xffff72b44b50>

In [24]:
cursor.execute("DROP DATABASE IF EXISTS testdb;")
print("Database 'Test_db' deleted successfully.")

Database 'Test_db' deleted successfully.


In [27]:
cs = conn.cursor()

In [28]:
cs.execute("DROP WAREHOUSE IF EXISTS my_first_warehouse;")
print("Warehouse 'my_first_warehouse' deleted successfully.")


Warehouse 'my_first_warehouse' deleted successfully.


In [29]:
cs.execute("CREATE WAREHOUSE IF NOT EXISTS my_first_warehouse")
cs.execute("CREATE DATABASE IF NOT EXISTS testdb")
cs.execute("CREATE SCHEMA IF NOT EXISTS testschema")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff72b2eb90>

In [30]:
cs.execute(
"CREATE OR REPLACE TABLE "    
    "test_table(col1 integer, col2 string)")
cs.execute(
"INSERT INTO test_table(col1, col2) "
    "VALUES (123, 'test string1'), (456, 'test string2')")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff72b2eb90>

In [31]:
cs.execute('SELECT * FROM test_table')
print(cs.fetchmany(2))

[(123, 'test string1'), (456, 'test string2')]


In [32]:
import glob
import os
import pandas as pd

In [33]:
#CREATING A STAGE TO LOAD DATA INTO SNOWFLAKE

cs.execute('USE DATABASE testdb')
cs.execute('CREATE OR REPLACE STAGE test_table_stage')

<snowflake.connector.cursor.SnowflakeCursor at 0xffff72b2eb90>

# PART 2 - DATA PREPROCESSING

STEP 1

In [34]:
folder_path = '/Users/aman/Desktop/Data/Monthly PO Data/*.csv'

# Get all CSV files in the folder
csv_files = glob.glob(folder_path)


In [35]:
for file in csv_files:
    file_name = os.path.basename(file)
    cursor.execute(f"PUT file://{file} @test_table_stage/{file_name};")

print("All files have been uploaded to the Snowflake stage.")


All files have been uploaded to the Snowflake stage.


In [68]:
#re running the copy into command
cs.execute(f"""
COPY INTO combined_table
FROM @test_table_stage
FILE_FORMAT = (FORMAT_NAME = 'my_csv_format');
""")


<snowflake.connector.cursor.SnowflakeCursor at 0xffff60f3e550>

In [39]:
#CHECKING IF THE CSV FILES ARE IN THE DIRECTORY (TO ENSURE THE PATH USED IS CORRECT)

import os

directory = r'Data/MonthlyPOData'
print("Directory path:", directory)
print("Exists:", os.path.exists(directory))
print("Is Directory:", os.path.isdir(directory))

if os.path.exists(directory):
    files = os.listdir(directory)
    print("Files found:", files)
else:
    print("Directory not accessible")

Directory path: Data/MonthlyPOData
Exists: True
Is Directory: True
Files found: ['2019-1.csv', '2020-5.csv', '2020-4.csv', '2022-4.csv', '2019-2.csv', '2020-6.csv', '2020-7.csv', '2019-3.csv', '2022-5.csv', '2020-10.csv', '2022-1.csv', '2019-7.csv', '2020-3.csv', '2020-2.csv', '2019-6.csv', '2020-11.csv', '2022-2.csv', '2019-4.csv', '2020-1.csv', '2019-5.csv', '2022-3.csv', '2020-12.csv', '2021-6.csv', '2021-7.csv', '2021-5.csv', '2021-4.csv', '2021-1.csv', '2021-3.csv', '2021-2.csv', '2019-10.csv', '2019-11.csv', '2019-12.csv', '2021-9.csv', '2021-12.csv', '2021-8.csv', '2021-11.csv', '2021-10.csv', '2019-8.csv', '2019-9.csv', '2020-9.csv', '2020-8.csv']


In [40]:
#IMPORTING FILES TO SNOWFLAKE STAGE

folder_path = "Data/MonthlyPOData/*.csv"

# Use glob to find all CSV files in the specified folder
csv_files = glob.glob(folder_path)

# Create the stage in Snowflake (if it doesn't exist)
cs.execute("CREATE OR REPLACE STAGE test_table_stage;")

# Upload each CSV file to the Snowflake stage
for file in csv_files:
    file_name = os.path.basename(file)
    cs.execute(f"PUT file://{file} @test_table_stage/{file_name};")

print("All files have been uploaded to the Snowflake stage.")


All files have been uploaded to the Snowflake stage.


In [41]:
# STEP 1: CREATING A TABLE NAMED PURCHASES_DATA (for csv files)
create_table_command = """
CREATE OR REPLACE TABLE purchases_data (
    PurchaseOrderID INT,
    SupplierID INT,
    OrderDate DATE,
    DeliveryMethodID INT,
    ContactPersonID INT,
    ExpectedDeliveryDate DATE,
    SupplierReference VARCHAR,
    IsOrderFinalized BOOLEAN,
    Comments VARCHAR,
    InternalComments VARCHAR,
    LastEditedBy INT,
    LastEditedWhen TIMESTAMP,
    PurchaseOrderLineID INT,
    StockItemID INT,
    OrderedOuters INT,
    Description VARCHAR,
    ReceivedOuters INT,
    PackageTypeID INT,
    ExpectedUnitPricePerOuter FLOAT,
    LastReceiptDate DATE,
    IsOrderLineFinalized BOOLEAN,
    Right_LastEditedBy INT,
    Right_LastEditedWhen TIMESTAMP
);
"""
cs.execute(create_table_command)
print("Table 'purchases_data' created successfully.")

Table 'purchases_data' created successfully.


In [43]:
copy_command = """
COPY INTO purchases_data
FROM @test_table_stage
FILE_FORMAT = (
    TYPE = 'CSV'
    FIELD_OPTIONALLY_ENCLOSED_BY='"'
    SKIP_HEADER=1
    TIMESTAMP_FORMAT = 'MM/DD/YYYY HH:MI'
    DATE_FORMAT = 'MM/DD/YYYY'
    NULL_IF = ('NULL', '00:00.0', '2/29/2022')
)
ON_ERROR = 'CONTINUE';
"""
cs.execute(copy_command)
print("All staged files have been loaded into 'purchases_data'.")

All staged files have been loaded into 'purchases_data'.


In [44]:
# Just to validate the data(optional)
cs.execute("SELECT COUNT(*) FROM purchases_data;")
row_count = cs.fetchone()[0]
print(f"Total rows in 'purchases_data': {row_count}")

Total rows in 'purchases_data': 8357


STEP 2

In [115]:
#STEP 2: CALCULATING PO AMOUNT

# Drop the POAmount column if it already exists
try:
    cs.execute('''
        ALTER TABLE purchases_data DROP COLUMN POAmount
    ''')
    print("Dropped existing POAmount column.")
except Exception as e:
    print(f"Column 'POAmount' does not exist or cannot be dropped. Skipping drop column step. Error: {e}")

# Add the new POAmount column
cs.execute('''
    ALTER TABLE purchases_data
    ADD COLUMN POAmount NUMBER(38,2)
''')

# Update the POAmount column with the calculated sum
cs.execute('''
    UPDATE purchases_data
    SET POAmount = (
        SELECT SUM(ReceivedOuters * ExpectedUnitPricePerOuter)
        FROM purchases_data AS sub
        WHERE sub.PURCHASEORDERID = purchases_data.PURCHASEORDERID
    )
''')

# Verify the update by selecting a few rows
cs.execute('SELECT PURCHASEORDERID, POAmount FROM purchases_data ORDER BY PURCHASEORDERID LIMIT 450')
results = cs.fetchall()

# Print the results to verify
for row in results:
    print(row)

Dropped existing POAmount column.
(1, Decimal('313.50'))
(1, Decimal('313.50'))
(1, Decimal('313.50'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(2, Decimal('21732.00'))
(3, Decimal('2740.50'))
(3, Decimal('2740.50'))

STEP 3

In [46]:
#CHECKING IF THE XML FILES ARE IN THE DIRECTORY (TO ENSURE THE PATH USED IS CORRECT)

import os

# Path to the XML file
xml_file_path = r'Data/SupplierTransactionsXML.xml'

# Debug information
print("File path:", xml_file_path)
print("File exists:", os.path.exists(xml_file_path))
print("Is a file:", os.path.isfile(xml_file_path))

if os.path.exists(xml_file_path):
    print(f"File '{xml_file_path}' found.")
else:
    print(f"File '{xml_file_path}' does not exist or is not accessible.")



File path: Data/SupplierTransactionsXML.xml
File exists: True
Is a file: True
File 'Data/SupplierTransactionsXML.xml' found.


In [48]:
#STEP 3: EXTRACTING AND LOADING XML DATA INTO SNOWFLAKE

cs.execute("""
CREATE OR REPLACE TABLE raw_xml_data (
    xml_content STRING
);
""")
print("Table 'raw_xml_data' created successfully.")

# Execute the copy command
cs.execute(f"""
COPY INTO raw_xml_data(xml_content)
FROM @{stage_name}/SupplierTransactionsXML.xml
FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY = '"' FIELD_DELIMITER = ',' SKIP_HEADER = 0);
""")
print("XML data loaded into 'raw_xml_data' table successfully.")

Table 'raw_xml_data' created successfully.
XML data loaded into 'raw_xml_data' table successfully.


In [55]:
import os

xml_file_path = r'Data/SupplierTransactionsXML.xml'

if os.path.exists(xml_file_path):
    print(f"File '{xml_file_path}' found.")
    with open(xml_file_path, 'r') as file:
        content = file.read()
        print("First 500 characters of the XML file content:")
        print(content[:500])  # Preview the first 500 characters of the file
else:
    print(f"File '{xml_file_path}' does not exist.")

File 'Data/SupplierTransactionsXML.xml' found.
First 500 characters of the XML file content:
<?xml version="1.0" encoding="UTF-8"?>
<root>
  <row>
    <SupplierTransactionID>134</SupplierTransactionID>
    <SupplierID>2</SupplierID>
    <TransactionTypeID>5</TransactionTypeID>
    <PurchaseOrderID>1</PurchaseOrderID>
    <PaymentMethodID>4</PaymentMethodID>
    <SupplierInvoiceNumber>7290</SupplierInvoiceNumber>
    <TransactionDate>2019-01-02</TransactionDate>
    <AmountExcludingTax>313.50</AmountExcludingTax>
    <TaxAmount>47.03</TaxAmount>
    <TransactionAmount>360.53</Transaction


In [113]:
#checking for row count
cs.execute("SELECT COUNT(*) FROM raw_xml_data;")
row_count = cs.fetchone()[0]
print(f"Row count in raw_xml_data: {row_count}")


Row count in raw_xml_data: 41449


In [61]:
#making the supplier_invoices table again
cs.execute("""
    CREATE OR REPLACE TABLE supplier_invoices (
        SupplierTransactionID STRING,
        SupplierID STRING,
        TransactionTypeID STRING,
        PurchaseOrderID STRING,
        PaymentMethodID STRING,
        SupplierInvoiceNumber STRING,
        TransactionDate DATE,
        AmountExcludingTax FLOAT,
        TaxAmount FLOAT,
        TransactionAmount FLOAT,
        OutstandingBalance FLOAT,
        FinalizationDate DATE,
        IsFinalized STRING,
        LastEditedBy STRING,
        LastEditedWhen TIMESTAMP
    );
""")
print("Table 'supplier_invoices' created successfully.")



Table 'supplier_invoices' created successfully.


In [76]:
# Query to count the number of rows in the supplier_invoices table
cs.execute("SELECT COUNT(*) FROM raw_xml_data;")
raw_xml_row_count = cs.fetchone()[0]
print(f"Row count in raw_xml_data: {raw_xml_row_count}")

cs.execute("SELECT xml_content FROM raw_xml_data LIMIT 5;")
rows = cs.fetchall()
for row in rows:
    print(row[0])

Row count in raw_xml_data: 41449
<?xml version="1.0" encoding="UTF-8"?>
<root>
  <row>
    <SupplierTransactionID>134</SupplierTransactionID>
    <SupplierID>2</SupplierID>


In [78]:
#Check for Empty or Malformed XML Rows
cs.execute("""
    SELECT xml_content 
    FROM raw_xml_data 
    WHERE xml_content IS NULL 
    OR xml_content NOT LIKE '%</Invoice>%'
    LIMIT 5;
""")
rows = cs.fetchall()
for row in rows:
    print("Potential Issue:", row[0])

Potential Issue: <?xml version="1.0" encoding="UTF-8"?>
Potential Issue: <root>
Potential Issue:   <row>
Potential Issue:     <SupplierTransactionID>134</SupplierTransactionID>
Potential Issue:     <SupplierID>2</SupplierID>


In [103]:
#STEP 3a: SHRED THE DATA INTO A TABLE WHERE EACH ROW CORRESPONDS TO A SINGLE INVOICE

#XML data
 
import csv
import xml.etree.ElementTree as ET
from datetime import datetime

# Function to validate date
def is_valid_date(date_text):
    if date_text is None:
        return False
    try:
        datetime.strptime(date_text, '%Y-%m-%d')
        return True
    except ValueError:
        return False

# Parse the XML file
file_name = "Data/SupplierTransactionsXML.xml"
tree = ET.parse(file_name)
root = tree.getroot()

# Define CSV file path
csv_file_path = "supplier_transactions.csv"

# Open CSV file for writing
with open(csv_file_path, mode='w', newline='') as file:
    writer = csv.writer(file)

    # Write header
    writer.writerow([
        'SupplierTransactionID', 'SupplierID', 'TransactionTypeID', 'PurchaseOrderID',
        'PaymentMethodID', 'SupplierInvoiceNumber', 'TransactionDate',
        'AmountExcludingTax', 'TaxAmount', 'TransactionAmount', 'OutstandingBalance',
        'FinalizationDate', 'IsFinalized', 'LastEditedBy', 'LastEditedWhen'
    ])

    # Write rows from XML
    for row in root.findall('row'):
        transaction_date = row.find('TransactionDate').text if row.find('TransactionDate') is not None else None
        finalization_date = row.find('FinalizationDate').text if row.find('FinalizationDate') is not None else None
        last_edited_when = row.find('LastEditedWhen').text.split()[0] if row.find('LastEditedWhen') is not None else None

        # Validate dates
        if transaction_date and not is_valid_date(transaction_date):
            print(f"Invalid TransactionDate: {transaction_date}, skipping row.")
            continue
        if finalization_date and not is_valid_date(finalization_date):
            print(f"Invalid FinalizationDate: {finalization_date}, skipping row.")
            continue
        if last_edited_when and not is_valid_date(last_edited_when):
            print(f"Invalid LastEditedWhen: {last_edited_when}, skipping row.")
            continue

        writer.writerow([
            row.find('SupplierTransactionID').text,
            row.find('SupplierID').text,
            row.find('TransactionTypeID').text,
            row.find('PurchaseOrderID').text,
            row.find('PaymentMethodID').text,
            row.find('SupplierInvoiceNumber').text,
            transaction_date,
            row.find('AmountExcludingTax').text,
            row.find('TaxAmount').text,
            row.find('TransactionAmount').text,
            row.find('OutstandingBalance').text,
            finalization_date,
            row.find('IsFinalized').text,
            row.find('LastEditedBy').text,
            last_edited_when
        ])

# Delete the existing stage
cs.execute("DROP STAGE IF EXISTS DEMO_STAGE")

# Create a new stage
cs.execute("CREATE STAGE DEMO_STAGE")
# Assuming the file is uploaded to a Snowflake stage called 'DEMO_STAGE'
cs.execute("PUT file://supplier_transactions.csv @DEMO_STAGE")

cs.execute('''
create or replace TABLE TESTDB.PUBLIC.SUPPLIER_TRANSACTIONS1 (
    SUPPLIERTRANSACTIONID NUMBER(38,0),
    SUPPLIERID NUMBER(38,0),
    TRANSACTIONTYPEID NUMBER(38,0),
    PURCHASEORDERID NUMBER(38,0),
    PAYMENTMETHODID NUMBER(38,0),
    SUPPLIERINVOICENUMBER VARCHAR(16777216),
    TRANSACTIONDATE DATE,
    AMOUNTEXCLUDINGTAX NUMBER(38,0),
    TAXAMOUNT NUMBER(38,0),
    TRANSACTIONAMOUNT NUMBER(38,0),
    OUTSTANDINGBALANCE NUMBER(38,0),
    FINALIZATIONDATE DATE,
    ISFINALIZED BOOLEAN,
    LASTEDITEDBY NUMBER(38,0),
    LASTEDITEDWHEN TIMESTAMP_NTZ(9)
);
''')

cs.execute('''
    COPY INTO SUPPLIER_TRANSACTIONS1
    FROM @DEMO_STAGE/supplier_transactions.csv
    FILE_FORMAT = (TYPE = 'CSV', FIELD_OPTIONALLY_ENCLOSED_BY='"', SKIP_HEADER = 1)
''')

Invalid FinalizationDate: 2022-02-29, skipping row.
Invalid FinalizationDate: 2022-02-29, skipping row.
Invalid FinalizationDate: 2022-02-29, skipping row.
Invalid FinalizationDate: 2022-02-29, skipping row.
Invalid FinalizationDate: 2022-02-29, skipping row.
Invalid FinalizationDate: 2022-02-29, skipping row.
Invalid FinalizationDate: 2022-02-29, skipping row.
Invalid FinalizationDate: 2022-02-29, skipping row.
Invalid TransactionDate: 2022-02-29, skipping row.
Invalid TransactionDate: 2022-02-29, skipping row.
Invalid TransactionDate: 2022-02-29, skipping row.
Invalid TransactionDate: 2022-02-29, skipping row.
Invalid TransactionDate: 2022-02-29, skipping row.


<snowflake.connector.cursor.SnowflakeCursor at 0xffff70859c90>

STEP 4

In [111]:
#STEP 4: JOIN THE PURCHASES DATA FROM STEP 2 AND THE SUPPLIER INVOICES DATA FROM STEP 3 
cs.execute('''
    SELECT 
        po.PURCHASEORDERID,
        po.RECEIVEDOUTERS,
        po.EXPECTEDUNITPRICEPEROUTER,
        po.POAmount,
        st.SUPPLIERTRANSACTIONID,
        st.SUPPLIERID,
        st.TRANSACTIONAMOUNT,
        st.TRANSACTIONDATE
    FROM 
        purchases_data po
    INNER JOIN 
        Supplier_Transactions1 st
    ON 
        po.PURCHASEORDERID = st.PURCHASEORDERID
''')

# Fetch the results of the join
joined_data = cs.fetchall()

# Print the first few rows to verify the join
for row in joined_data[:]:
    print(row)

(1, 18, 5.5, Decimal('313.50'), 134, 2, 361, datetime.date(2019, 1, 2))
(1, 21, 5.5, Decimal('313.50'), 134, 2, 361, datetime.date(2019, 1, 2))
(1, 18, 5.5, Decimal('313.50'), 134, 2, 361, datetime.date(2019, 1, 2))
(10, 83, 12.5, Decimal('1037.50'), 590, 10, 1193, datetime.date(2019, 1, 3))
(11, 88, 9.5, Decimal('19869.50'), 594, 12, 22850, datetime.date(2019, 1, 3))
(11, 15, 112.5, Decimal('19869.50'), 594, 12, 22850, datetime.date(2019, 1, 3))
(11, 196, 88.5, Decimal('19869.50'), 594, 12, 22850, datetime.date(2019, 1, 3))
(12, 11, 84.0, Decimal('6661.50'), 932, 4, 7661, datetime.date(2019, 1, 4))
(12, 5, 102.0, Decimal('6661.50'), 932, 4, 7661, datetime.date(2019, 1, 4))
(12, 6, 19.0, Decimal('6661.50'), 932, 4, 7661, datetime.date(2019, 1, 4))
(12, 6, 22.0, Decimal('6661.50'), 932, 4, 7661, datetime.date(2019, 1, 4))
(12, 27, 8.0, Decimal('6661.50'), 932, 4, 7661, datetime.date(2019, 1, 4))
(12, 36, 8.0, Decimal('6661.50'), 932, 4, 7661, datetime.date(2019, 1, 4))
(12, 27, 8.5, Dec

STEP 5

In [28]:
#STEP 5: CREATING A VIEW TO SHOW THE PURCHASE ORDERS AND INVOICES

cs.execute('''
    CREATE OR REPLACE TABLE purchase_orders_and_invoices AS
    SELECT DISTINCT
        po.supplierid,
        po.PURCHASEORDERID,
        st.TRANSACTIONDATE,
        po.LASTRECEIPTDATE,
        po.POAmount,
        st.AmountExcludingTax,
        st.AmountExcludingTax - po.POAmount AS invoiced_vs_quoted
    FROM 
        purchases_data po
    INNER JOIN 
        Supplier_Transactions1 st
    ON 
        po.PURCHASEORDERID = st.PURCHASEORDERID
    ORDER BY po.PURCHASEORDERID
''')

# Fetch the results from the table to verify
cs.execute('SELECT * FROM purchase_orders_and_invoices')
result_data = cs.fetchall()

# Print the first few rows to verify the table creation
for row in result_data[:100]:
    print(row)

(2, 1, datetime.date(2019, 1, 2), datetime.date(2019, 1, 2), Decimal('313.50'), 314, Decimal('0.50'))
(4, 2, datetime.date(2019, 1, 2), datetime.date(2019, 1, 2), Decimal('21732.00'), 21732, Decimal('0.00'))
(5, 3, datetime.date(2019, 1, 2), datetime.date(2019, 1, 2), Decimal('2740.50'), 2741, Decimal('0.50'))
(7, 4, datetime.date(2019, 1, 2), datetime.date(2019, 1, 2), Decimal('42481.20'), 42481, Decimal('-0.20'))
(10, 5, datetime.date(2019, 1, 2), datetime.date(2019, 1, 2), Decimal('35067.50'), 35068, Decimal('0.50'))
(12, 6, datetime.date(2019, 1, 2), datetime.date(2019, 1, 2), Decimal('5528.50'), 5529, Decimal('0.50'))
(4, 7, datetime.date(2019, 1, 3), datetime.date(2019, 1, 3), Decimal('10000.50'), 10001, Decimal('0.50'))
(5, 8, datetime.date(2019, 1, 3), datetime.date(2019, 1, 3), Decimal('657.00'), 657, Decimal('0.00'))
(7, 9, datetime.date(2019, 1, 3), datetime.date(2019, 1, 3), Decimal('9281.50'), 9282, Decimal('0.50'))
(10, 10, datetime.date(2019, 1, 3), datetime.date(2019, 1

STEP 6

In [12]:
# Extract the supplier_case data from postgres.
import psycopg2
from pathlib import Path

# Defining the PostgreSQL connection parameters
def execute_pgsql_file(file_path):
    # Connect to PostgreSQL
    conn = psycopg2.connect(
        dbname="rsm-docker",
        user="jovyan",
        password="postgres",
        host="127.0.0.1",
        port="8765"
    )
    
    cursor = conn.cursor()

    # Open and execute the .pgsql file
    with open(file_path, 'r') as f:
        cursor.execute(f.read())

    conn.commit()
    cursor.close()
    conn.close()

# Example usage
pgsql_fp = Path('Data/supplier_case.pgsql')
execute_pgsql_file(pgsql_fp)

In [18]:
def export_data_to_csv():
    conn = psycopg2.connect(
        dbname="rsm-docker",
        user="jovyan",
        password="postgres",
        host="127.0.0.1",
        port="8765"
    )
    
    cursor = conn.cursor()
    export_query = "COPY (SELECT * FROM supplier_case) TO STDOUT WITH CSV HEADER"
    
    with open('Data/supplier_case.csv', 'w') as f:
        cursor.copy_expert(export_query, f)
    
    cursor.close()
    conn.close()

export_data_to_csv()

In [20]:
cs.execute('USE DATABASE testdb')
cs.execute('CREATE OR REPLACE STAGE Supplier_Case')

print("Stage 'Supplier_Case' created successfully.")

def upload_to_snowflake(csv_file_path):
    conn = snowflake.connector.connect(
        user='aman05',
        password='XXXXX',
        account='cdflalr-yfb48642',
        database='testdb',
        schema='public'
    )

    cursor = conn.cursor()
    put_command = f"PUT file://{csv_file_path} @Supplier_Case"
    cursor.execute(put_command)

    cursor.close()
    conn.close()

csv_fp = Path('Data/supplier_case.csv')
upload_to_snowflake(csv_fp)

Stage 'Supplier_Case' created successfully.


In [22]:
def create_and_load_table_in_snowflake(create_table_statement):
    conn = snowflake.connector.connect(
        user='aman05',
        password='XXXXX',
        account='cdflalr-yfb48642',
        database='testdb',
        schema='public'
    )
    
    cursor = conn.cursor()
    cursor.execute(create_table_statement)
    
    copy_command_typecsv = """COPY INTO supplier_case FROM @Supplier_case 
                        FILE_FORMAT=(TYPE=CSV,
                                FIELD_OPTIONALLY_ENCLOSED_BY = '"',
                                TIMESTAMP_FORMAT = 'MM/DD/YYYY HH24:MI'
                                PARSE_HEADER = TRUE
                                    )
                        MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
                    ON_ERROR = 'CONTINUE'"""

    cursor.execute(copy_command_typecsv)

#creating table supplier_case in snowflake
create_table_command = ("""
                CREATE OR REPLACE TABLE Supplier_Case (
                supplierid INTEGER,
                suppliername STRING,
                suppliercategoryid INTEGER,
                primarycontactpersonid INTEGER,
                alternatecontactpersonid INTEGER,
                deliverymethodid INTEGER,
                postalcityid INTEGER,
                supplierreference STRING,
                bankaccountname STRING,
                bankaccountbranch STRING,
                bankaccountcode INTEGER,
                bankacccountnumber NUMERIC,
                bankinternationalcode INTEGER,
                paymentdays INTEGER,
                internalcomments STRING,
                phonenumber STRING,
                faxnumber STRING,
                websiteurl STRING,
                deliveryaddressline1 STRING,
                deliveryaddressline2 STRING,
                deliveralpostalcode INTEGER,
                deliverylocation STRING,
                postaladdressline1 STRING,
                postaladdressline2 STRING,
                postalpostalcode INTEGER,
                lasteditedby INTEGER,
                validform STRING,
                validto STRING
                )
                """)


create_and_load_table_in_snowflake(create_table_command)
print("Table 'Supplier_Case' created and loaded successfully.")


Table 'Supplier_Case' created and loaded successfully.


STEP 7

In [17]:
cs.execute("USE DATABASE testdb")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff7498b5d0>

In [18]:
# Create a staging area and load the .txt file into a table
cs.execute("CREATE OR REPLACE STAGE txtstage")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff7498b5d0>

In [25]:
# Upload the text file to the stage
txt_path = 'Data/2021_Gaz_zcta_national.txt'
cs.execute(f"PUT file://{txt_path} @txtstage")
print("File uploaded to Snowflake stage successfully.")

File uploaded to Snowflake stage successfully.


In [29]:
# Create a table for the zip code to geo-location mapping
cs.execute("""CREATE OR REPLACE TABLE zcta_geolocation (
    GEOID VARCHAR(5),
    ALAND NUMERIC,
    AWATER NUMERIC,
    ALAND_SQMI FLOAT,
    AWATER_SQMI FLOAT,
    INTPTLAT FLOAT,
    INTPTLONG FLOAT
);
""")
print("Table 'zcta_geolocation' created successfully.")

Table 'zcta_geolocation' created successfully.


In [46]:
# Load data from the staged file into the 'zcta_geolocation' table
cs.execute("""
COPY INTO zcta_geolocation
FROM @txtstage
FILE_FORMAT = (
    TYPE = 'CSV',
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',
    FIELD_DELIMITER = '\t',
    SKIP_HEADER = 1
)
""")

print("Data loaded into 'zcta_geolocation' table successfully.")

Data loaded into 'zcta_geolocation' table successfully.


In [32]:
# Verify that data was loaded correctly
cs.execute("SELECT COUNT(*) FROM zcta_geolocation;")
row_count = cs.fetchone()[0]
print(f"Total rows loaded: {row_count}")

Total rows loaded: 33791


In [31]:
cs.execute("""
        CREATE OR REPLACE TABLE supplier_zip_geolocation AS (
        SELECT sc.postalpostalcode, z.INTPTLAT AS latitude, z.INTPTLONG AS longitude
        FROM supplier_case sc
        JOIN zcta_geolocation z ON sc.postalpostalcode = z.GEOID
        GROUP BY sc.postalpostalcode, z.INTPTLAT, z.INTPTLONG);
""")

print("Table 'supplier_zip_geolocation' created successfully.")

Table 'supplier_zip_geolocation' created successfully.


In [16]:
cs.execute("USE DATABASE testdb")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff7498b5d0>

In [46]:
cs.execute("""
CREATE OR REPLACE TABLE zip_weather_station_mapping AS
    SELECT 
        sz.postalpostalcode,
        ws.noaa_weather_station_id AS station_id,
        ws.noaa_weather_station_name AS station_name,
        SQRT(
            POW(69.1 * (sz.latitude - ws.latitude), 2) +
            POW(69.1 * (ws.longitude - sz.longitude) * COS(sz.latitude / 57.3), 2)
        ) AS distance
    FROM testdb.public.supplier_zip_geolocation sz
    CROSS JOIN WEATHER__ENVIRONMENT.cybersyn.noaa_weather_station_index ws
    QUALIFY ROW_NUMBER() OVER (PARTITION BY sz.postalpostalcode ORDER BY distance ASC) = 1;
""")
print("Table 'zip_weather_station_mapping' created successfully.")

Table 'zip_weather_station_mapping' created successfully.


In [47]:
cs.execute("""
CREATE OR REPLACE TABLE zip_weather_station_mapping_temp AS
    SELECT 
        sz.postalpostalcode,
        ws.noaa_weather_station_id AS station_id,
        ws.noaa_weather_station_name AS station_name,
        st_distance(
            st_makepoint(sz.longitude, sz.latitude),
            st_makepoint(ws.longitude, ws.latitude)
        ) AS distance
    FROM testdb.public.supplier_zip_geolocation sz
    JOIN WEATHER__ENVIRONMENT.cybersyn.noaa_weather_station_index ws
    ON st_distance(st_makepoint(sz.longitude, sz.latitude), st_makepoint(ws.longitude, ws.latitude)) < 50000
    QUALIFY ROW_NUMBER() OVER (PARTITION BY sz.postalpostalcode ORDER BY distance ASC) = 1;
""")
print("Table 'zip_weather_station_mapping_temp' created successfully.")

Table 'zip_weather_station_mapping_temp' created successfully.


In [48]:
cs.execute("""
CREATE OR REPLACE TABLE zip_code_weather_data AS
    SELECT 
            z.postalpostalcode,
            wm.date,
            wm.value as Max_Temp,
            wm.variable
            FROM testdb.public.zip_weather_station_mapping z
            JOIN WEATHER__ENVIRONMENT.cybersyn.noaa_weather_metrics_timeseries wm
            ON z.station_id = wm.noaa_weather_station_id
            WHERE wm.variable = 'maximum_temperature';
""")
print("Table 'zip_code_weather_data' created successfully.")

Table 'zip_code_weather_data' created successfully.


In [49]:
cs.execute("""
CREATE OR REPLACE TABLE zip_code_weather_data_temp AS
    SELECT 
            z.postalpostalcode,
            wm.date,
            wm.value as Max_Temp,
            wm.variable
            FROM testdb.public.zip_weather_station_mapping_temp z
            JOIN WEATHER__ENVIRONMENT.cybersyn.noaa_weather_metrics_timeseries wm
            ON z.station_id = wm.noaa_weather_station_id
            WHERE wm.variable = 'maximum_temperature';
""")
print("Table 'zip_code_weather_data_temp' created successfully.")

Table 'zip_code_weather_data_temp' created successfully.


In [51]:
cs.execute("""
CREATE OR REPLACE TABLE supplier_zip_code_weather_temp AS 
SELECT A.PostalPostalCode, B.date, B.Max_temp FROM supplier_case as A
LEFT JOIN zip_code_weather_data as B ON A.postalpostalcode = B.postalpostalcode
ORDER BY DATE;
""")
print("Table 'supplier_zip_code_weather_temp' created successfully.")

Table 'supplier_zip_code_weather_temp' created successfully.


In [52]:
cs.execute("""
CREATE OR REPLACE TABLE supplier_zip_code_weather_temp2 AS 
SELECT A.PostalPostalCode, B.date, B.Max_temp FROM supplier_case as A
JOIN zip_code_weather_data_temp as B ON A.postalpostalcode = B.postalpostalcode
ORDER BY DATE;
""")
print("Table 'supplier_zip_code_weather_temp2' created successfully.")

Table 'supplier_zip_code_weather_temp2' created successfully.


In [54]:
cs.execute("CREATE OR REPLACE MATERIALIZED VIEW supplier_zip_code_weather AS (SELECT * FROM supplier_zip_code_weather_temp);")
print("Materialized view 'supplier_zip_code_weather' created successfully.")

Materialized view 'supplier_zip_code_weather' created successfully.


STEP 8

In [52]:
# Step 8: Create a view to join weather data with purchase orders and supplier information

try:
    # Define the SQL query to create the view with joined data
    create_view_query = """
        CREATE OR REPLACE VIEW testdb.public.order_zip_temp AS
        SELECT 
            szw.*,  -- All columns from supplier_zip_code_weather (including PostalPostalCode, date, and Max_temp)
            poi.SUPPLIERID AS SupplierID1,  -- Supplier ID from purchase_orders_and_invoices
            sc.SUPPLIERID AS SupplierID2  -- Supplier ID from supplier_case
        FROM 
            testdb.public.supplier_zip_code_weather AS szw  -- Base table with supplier ZIP code weather data
        LEFT JOIN 
            testdb.public.purchase_orders_and_invoices AS poi  -- Joining with purchase orders and invoices
            ON szw.date = poi.TRANSACTIONDATE  -- Match on transaction date
        LEFT JOIN 
            testdb.public.supplier_case AS sc  -- Joining with supplier case table
            ON szw.PostalPostalCode = sc.PostalPostalCode  -- Match on postal code
        WHERE 
            szw.Max_temp IS NOT NULL;  -- Filter out rows with null maximum temperature
    """
    
    # Execute the query to create the view
    cs.execute(create_view_query)
    print("View 'order_zip_temp' created successfully.")

    # Fetch and display the first few rows of the view to verify
    cs.execute("SELECT * FROM testdb.public.order_zip_temp LIMIT 10;")
    rows = cs.fetchall()
    for row in rows:
        print(row)

except Exception as e:
    print(f"An error occurred: {e}")


View 'order_zip_temp' created successfully.
(42437, datetime.date(2007, 4, 26), Decimal('25.000000'), None, 6)
(42437, datetime.date(2008, 2, 29), Decimal('8.300000'), None, 6)
(42437, datetime.date(2005, 9, 14), Decimal('31.700000'), None, 6)
(42437, datetime.date(2007, 9, 16), Decimal('20.000000'), None, 6)
(42437, datetime.date(2005, 7, 10), Decimal('32.200000'), None, 6)
(42437, datetime.date(2007, 7, 3), Decimal('26.700000'), None, 6)
(42437, datetime.date(2007, 4, 17), Decimal('13.300000'), None, 6)
(42437, datetime.date(2009, 1, 20), Decimal('-1.700000'), None, 6)
(42437, datetime.date(2007, 11, 6), Decimal('23.300000'), None, 6)
(42437, datetime.date(2008, 8, 7), Decimal('31.100000'), None, 6)


In [54]:
# Verify that data was loaded into the view correctly
cs.execute("SELECT COUNT(*) FROM order_zip_temp;")
row_count = cs.fetchone()[0]
print(f"Total rows loaded: {row_count}")

Total rows loaded: 1258
