# Installing Snowflake Connector

In [None]:
pip install --upgrade snowflake-connector-python

# Importing Libraries

In [None]:
import snowflake.connector
import glob
import sqlite3
import os
import xml.etree.ElementTree as ET

# Connecting to Snowflake

In [None]:
conn = snowflake.connector.connect(
    user='',
    password='',
    account='',
    warehouse= 'my_first_warehouse',
    database= 'testdb12',
    schema= 'testschema12'
    )

# Creating new warehouse, database and schema inside Snowflake

In [None]:
cs = conn.cursor()
cs.execute("CREATE WAREHOUSE IF NOT EXISTS my_first_warehouse")
cs.execute("CREATE DATABASE IF NOT EXISTS testdb12")
cs.execute("CREATE SCHEMA IF NOT EXISTS testschema12")

# 1. Creating a blank table for Purchase Order

In [None]:
cs.execute(
    "CREATE OR REPLACE TABLE po_table(" \
    "PurchaseOrderID INT, SupplierID INT," \
    "OrderDate DATE, DeliveryMethodID INT," \
    "ContactPersonID INT, ExpectedDeliveryDate STRING," \
    "SupplierReference STRING, IsOrderFinalized INT," \
    "Comments FLOAT, InternalComments FLOAT," \
    "LastEditedBy INT, LastEditedWhen STRING," \
    "PurchaseOrderLineID INT, StockItemID INT," \
    "OrderedOuters INT, Description STRING," \
    "ReceivedOuters INT, PackageTypeID INT," \
    "ExpectedUnitPricePerOuter FLOAT, LastReceiptDate STRING," \
    "IsOrderLineFinalized INT, Right_LastEditedBy INT," \
    "Right_LastEditedWhen STRING)"
)

# 1. Using glob to iterate through and put all purchases files automatically, creating a stage and using copy into to ingest the data into the Purchase Order Table

In [None]:
# Create a stage named "local_stage"
cs.execute("CREATE OR REPLACE STAGE local_stage")

# Your directory path containing the CSV files
directory_path = 'Documents/UCSD/495_SQL/Assignment/Group_project/Case_Data/Monthly_PO_Data/'

# Use glob or your custom logic to fetch all relevant CSV files
csv_files = glob.glob(directory_path + '201[3-6]-*.csv')

# Upload files to the stage
for csv_file in csv_files:
    with open(csv_file, 'rb') as f:
        cs.execute("PUT file://{} @local_stage".format(csv_file))

# Use COPY INTO to move data from stage to the table
for csv_file in csv_files:
    file_name = os.path.basename(csv_file)
    cs.execute(f"""
        COPY INTO po_table
        FROM @local_stage/{file_name}
        FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '"' NULL_IF=('NULL', '\\N'))
    """)

# 2. Adding a new column POAmount in the table

In [None]:
cs.execute("""
    ALTER TABLE po_table ADD COLUMN POAmount FLOAT AS (ReceivedOuters * ExpectedUnitPricePerOuter)
""")


# 3. Creating a blank table for Supplier Invoice

In [None]:
columns = {
    "SupplierTransactionID": "INTEGER",
    "SupplierID": "INTEGER",
    "TransactionTypeID": "INTEGER",
    "PurchaseOrderID": "INTEGER",
    "PaymentMethodID": "INTEGER",
    "SupplierInvoiceNumber": "INTEGER",
    "TransactionDate": "TEXT",
    "AmountExcludingTax": "REAL",
    "TaxAmount": "REAL",
    "TransactionAmount": "REAL",
    "OutstandingBalance": "REAL",
    "FinalizationDate": "DATE",
    "IsFinalized": "INTEGER",
    "LastEditedBy": "DATE",
    "LastEditedWhen": "TEXT"
}

# Defining the SQL Table Creation Statement
create_table_stmt = "CREATE OR REPLACE TABLE transactions (" + ", ".join([f"{col} {dtype}" for col, dtype in columns.items()]) + ");"
cs.execute(create_table_stmt)


# 3. Extracting rows from the XML file and inserting them into the Supplier Invoice table

In [None]:
# Parsing the XML File
tree = ET.parse('Documents/UCSD/495_SQL/Assignment/Group_project/Case_Data/Supplier Transactions XML.xml')
root = tree.getroot()

all_data = []

for row in root.findall('row'):
    # Extracting column data and handle potential None values
    data = [row.find(col).text if row.find(col) is not None else '' for col in columns.keys()]
    all_data.append(data)

# Constructing the INSERT INTO statement
placeholders = ', '.join(['%s'] * len(columns))
insert_stmt = f"INSERT INTO transactions VALUES ({placeholders})"

# Use executemany to insert all rows
cs.executemany(insert_stmt, all_data)


# 4. Joining Data

In [None]:
cs.execute("""
    CREATE OR REPLACE VIEW joined_data AS
    SELECT
        pt.PurchaseOrderID AS pt_PurchaseOrderID,
        pt.SupplierID AS pt_SupplierID,
        pt.OrderDate AS pt_OrderDate,
        pt.DeliveryMethodID AS pt_DeliveryMethodID,
        pt.ContactPersonID AS pt_ContactPersonID,
        pt.ExpectedDeliveryDate AS pt_ExpectedDeliveryDate,
        pt.SupplierReference AS pt_SupplierReference,
        pt.IsOrderFinalized AS pt_IsOrderFinalized,
        pt.Comments AS pt_Comments,
        pt.InternalComments AS pt_InternalComments,
        pt.LastEditedBy AS pt_LastEditedBy,
        pt.LastEditedWhen AS pt_LastEditedWhen,
        pt.PurchaseOrderLineID AS pt_PurchaseOrderLineID,
        pt.StockItemID AS pt_StockItemID,
        pt.OrderedOuters AS pt_OrderedOuters,
        pt.Description AS pt_Description,
        pt.ReceivedOuters AS pt_ReceivedOuters,
        pt.PackageTypeID AS pt_PackageTypeID,
        pt.ExpectedUnitPricePerOuter AS pt_ExpectedUnitPricePerOuter,
        pt.LastReceiptDate AS pt_LastReceiptDate,
        pt.IsOrderLineFinalized AS pt_IsOrderLineFinalized,
        pt.Right_LastEditedBy AS pt_Right_LastEditedBy,
        pt.Right_LastEditedWhen AS pt_Right_LastEditedWhen,
        pt.POAmount AS pt_POAmount,
        tr.SupplierTransactionID AS tr_SupplierTransactionID,
        tr.SupplierID AS tr_SupplierID,
        tr.TransactionTypeID AS tr_TransactionTypeID,
        tr.PurchaseOrderID AS tr_PurchaseOrderID,
        tr.PaymentMethodID AS tr_PaymentMethodID,
        tr.SupplierInvoiceNumber AS tr_SupplierInvoiceNumber,
        tr.TransactionDate AS tr_TransactionDate,
        tr.AmountExcludingTax AS tr_AmountExcludingTax,
        tr.TaxAmount AS tr_TaxAmount,
        tr.TransactionAmount AS tr_TransactionAmount,
        tr.OutstandingBalance AS tr_OutstandingBalance,
        tr.FinalizationDate AS tr_FinalizationDate,
        tr.IsFinalized AS tr_IsFinalized,
        tr.LastEditedBy AS tr_LastEditedBy,
        tr.LastEditedWhen AS tr_LastEditedWhen
    FROM po_table pt
    JOIN transactions tr USING (PurchaseOrderID)
""")


# 5. Calculating the Field for Difference between Amounts


In [None]:
cs.execute("""
    CREATE OR REPLACE TABLE purchase_orders_and_invoices AS
    SELECT *, TR_AmountExcludingTax - PT_POAmount as invoiced_vs_quoted
    FROM joined_data
""")


In [None]:
cs.execute("""
    CREATE MATERIALIZED VIEW purchase_orders_and_invoices1 AS
    SELECT *
    FROM purchase_orders_and_invoices
""")


# 6. Extracting Data from Postgres and Loading to Snowflake

In [None]:
# 6: Importing supplier_case from postgres to snowflake
import psycopg2
import csv
#Step1: Conntecting to Postgres creating supplier_case in VScode
postgres_conn = psycopg2.connect(
  host = "127.0.0.1",
  user= "jovyan",
  port= 8765,
  database= "Northwind",
  password= "postgres"
)
pg_cursor = postgres_conn.cursor()

# Saving supplier_case table to a CSV file on a local drive

# SQL query to fetch data
query = "SELECT * FROM supplier_case;"
# Executing the query
pg_cursor.execute(query)
# Fetching all rows
rows = pg_cursor.fetchall()
# Getting column headers
column_names = [desc[0] for desc in pg_cursor.description]

In [None]:
# Writing to CSV file
with open("Documents/UCSD/495_SQL/Assignment/Group_project/Case_Data/output1.csv", "w", newline="") as f_out:
    csv_writer = csv.writer(f_out)
    # Write header
    csv_writer.writerow(column_names)
    # Write data
    for row in rows:
        csv_writer.writerow(row)

In [None]:
# Creating or replacing a stage for the CSV file
local_csv_path = "Documents/UCSD/495_SQL/Assignment/Group_project/Case_Data/output1.csv"
stage_name = 'supplier_case_stage'
create_stage_sql = f"CREATE OR REPLACE STAGE {stage_name}"
cs.execute(create_stage_sql)

# Uploading the CSV file to the stage
put_sql = f"PUT file://{local_csv_path} @{stage_name}/output1.csv"
cs.execute(put_sql)

# Creating a function to generate create table statement
def generate_create_table_sql(csv_filepath):
    with open(csv_filepath, 'r') as f:
        reader = csv.reader(f)
        headers = next(reader)
        first_row = next(reader)

    sql = "CREATE TABLE supplier_case (\n"
    for header, value in zip(headers, first_row):
        if value.isdigit():
            datatype = "INT"
        else:
            datatype = "VARCHAR(255)"
        sql += f"    {header} {datatype},\n"
    sql = sql[:-2] + "\n);"
    return sql
# Executing the create table statement
create_table_sql = generate_create_table_sql(local_csv_path)
cs.execute(create_table_sql)
# Copying the data into the supplier case table
copy_into_sql = f"""
    COPY INTO supplier_case
    FROM @{stage_name}/output1.csv
    FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '"' NULL_IF=('NULL', '\\N'));
"""
cs.execute(copy_into_sql)

# 7. Connect to Snowflake Marketplace and Extract Weather Data

In [None]:
# 7: Installing Geopandas to turn shape file into csv
!pip install geopandas

In [None]:
# Snowflake Marketplace to the Knoema Environment Data Atlas
import geopandas as gpd

# Reading shapefiles for geodata mapping
gdf = gpd.read_file('Documents/UCSD/495_SQL/Assignment/Group_project/Case_Data/tl_2020_us_zcta520/tl_2020_us_zcta520.shp')

# Moving Data to Snowflake
gdf['geometry'] = gdf['geometry'].apply(lambda x: x.wkt)

# Saving to CSV
gdf.to_csv('Documents/UCSD/495_SQL/Assignment/Group_project/Case_Data/tl_2020_us_zcta520/tl_2020_us_zcta520.csv', index=False)



In [None]:
# Creating Table in Snowflake
cs.execute(
"""
CREATE OR REPLACE TABLE
ZC_GeoLocation (
            ZCTA5CE20 INTEGER,
            GEOID20 INTEGER,
            CLASSFP20 TEXT,
            MTFCC20 TEXT,
            FUNCSTAT20 TEXT,
            ALAND20 TEXT,
            AWATER20 TEXT,
            INTPTLAT20 NUMERIC,
            INTPTLON20 NUMERIC,
            geometry TEXT
                )
"""
)

csv_path = 'Documents/UCSD/495_SQL/Assignment/Group_project/Case_Data/tl_2020_us_zcta520/tl_2020_us_zcta520.csv'

# Staging the data
put_command = f"PUT file://{csv_path} @%ZC_GeoLocation"
cs.execute(put_command)
print(put_command)
# Copying staged supplier data into new snowflake table
copy_command = """
COPY INTO ZC_GeoLocation (ZCTA5CE20, GEOID20, CLASSFP20, MTFCC20, FUNCSTAT20, ALAND20, AWATER20, INTPTLAT20, INTPTLON20, geometry)
FROM @%ZC_GeoLocation
FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1)
"""
cs.execute(copy_command)

In [None]:
# JOINING GEOLOCATION DATA TO SUPPLIER_CASE TABLE (DISTINCT ZIPS)
cs.execute(
    """
    CREATE OR REPLACE TABLE SUPPLIERZIP_TO_GEOLOCATION AS
        SELECT Distinct S.postalpostalcode, z.INTPTLAT20, z.INTPTLON20
        FROM supplier_case s
        JOIN ZC_GeoLocation z on z.GEOID20 = s.postalpostalcode
    """
)

rows = cs.fetchall()
for row in rows:
    print(row)

In [None]:
# CROSS JOIN and RANK TO CREATE TABLE WITH ZIP AND CLOSEST WEATHER STATION

cs.execute(
"""
CREATE OR REPLACE TABLE CLOSEST_STATIONS AS
SELECT * FROM (
SELECT DISTINCT SZ.POSTALPOSTALCODE, EN.STATIONSNAME,
RANK() over(partition by postalpostalcode order by st_distance(st_makepoint(SZ.INTPTLON20,SZ.INTPTLAT20),st_makepoint(EN.STATIONSLONGITUDE,EN.STATIONSLATITUDE)) asc) as rank,
st_distance(st_makepoint(SZ.INTPTLON20,SZ.INTPTLAT20),st_makepoint(EN.STATIONSLONGITUDE,EN.STATIONSLATITUDE)) as min_distance
FROM SUPPLIERZIP_TO_GEOLOCATION SZ, DISTINCT_NOAACD2019R EN
)
WHERE rank = 1
"""
)

rows = cs.fetchall()
for row in rows:
    print(row)



In [None]:
# Joining above table of closest stations to get all daily weather data
cs.execute(
    """
    CREATE OR REPLACE TABLE supplier_zip_code_weather as
        select
            cs.postalpostalcode as zip_code,
            n."Date" as date,
            n."Value" as daily_high_temp
        from CLOSEST_STATIONS cs
        join NOAACD2019R n on n.stationsname = cs.stationsname
        where "Indicator Name" = 'Maximum temperature (Fahrenheit)'
        order by postalpostalcode, date
    """
)
rows = cs.fetchall()
for row in rows:
    print(row)


# 8. Final Join

In [None]:
# Joining purchase_orders_and_invoices, supplier_case, and supplier_zip_code_weather based on zip codes and the transaction date
cs.execute(
    """
CREATE OR REPLACE TABLE FINAL_PO_WEATHER AS
    SELECT pi.*, sw.daily_high_temp as daily_high_temp
    FROM purchase_orders_and_invoices pi
    JOIN supplier_case sc on sc.supplierid = pi.supplierid
    JOIN supplier_zip_code_weather sw on sw.zip_code = postalpostalcode and sw.date = transactiondate
    """
)

rows = cs.fetchall()
for row in rows:
    print(row)