In [None]:
CREATE STORAGE INTEGRATION CO2_S3_INTEGRATION
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::183295451617:role/MySnowflakeRole'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('s3://big.data.ass3/');


In [None]:
SHOW INTEGRATIONS;
DESC INTEGRATION CO2_S3_INTEGRATION;


In [None]:
CREATE OR REPLACE STAGE CO2_STAGE
STORAGE_INTEGRATION = CO2_S3_INTEGRATION
URL = 's3://big.data.ass3/';


In [None]:
DESC STAGE CO2_STAGE;


In [None]:
DESC INTEGRATION CO2_S3_INTEGRATION;



In [None]:
LIST @CO2_STAGE;


In [None]:
//temp
COPY INTO CO2_DATA
FROM @CO2_STAGE/raw_data/co2_data_v2.csv
FILE_FORMAT = (FORMAT_NAME = 'CO2_CSV_FORMAT')
FORCE = TRUE;


In [None]:
CREATE OR REPLACE FILE FORMAT CO2_CSV_FORMAT
TYPE = CSV
FIELD_DELIMITER = ','
SKIP_HEADER = 1
NULL_IF = ('NULL', 'null', '')
EMPTY_FIELD_AS_NULL = TRUE;


In [None]:
DESC FILE FORMAT CO2_CSV_FORMAT;


In [None]:
CREATE OR REPLACE TABLE RAW_DOW30_STAGING (
    YEAR INT,
    MONTH INT,
    DAY INT,
    DECIMAL_DATE FLOAT,
    CO2_EMISSION FLOAT
);


In [None]:
DESC TABLE RAW_DOW30_STAGING;


In [None]:
COPY INTO RAW_DOW30_STAGING
FROM @CO2_STAGE/raw_data/co2_data_v2.csv
FILE_FORMAT = (FORMAT_NAME = 'CO2_CSV_FORMAT')
FORCE = TRUE;


In [None]:
SELECT * FROM RAW_DOW30_STAGING LIMIT 10;


In [None]:
CREATE OR REPLACE STREAM CO2_STREAM 
ON TABLE RAW_DOW30_STAGING;


In [None]:
SELECT * FROM CO2_STREAM;


In [None]:
CREATE OR REPLACE TABLE DOW30_HARMONIZED AS
SELECT 
    YEAR,
    MONTH,
    DAY,
    DECIMAL_DATE,
    CO2_EMISSION
FROM RAW_DOW30_STAGING;


In [None]:
SELECT * FROM DOW30_HARMONIZED LIMIT 10;


In [None]:
CREATE OR REPLACE FUNCTION NORMALIZE_CO2_EMISSION(CO2_VALUE FLOAT, REFERENCE_VALUE FLOAT)
RETURNS FLOAT
AS 
$$
    CO2_VALUE / REFERENCE_VALUE
$$;


In [None]:
SELECT NORMALIZE_CO2_EMISSION(420.15, 410.0) AS NORMALIZED_CO2;


In [None]:
CREATE OR REPLACE FUNCTION NORMALIZE_CO2_EMISSION(CO2_VALUE FLOAT, REFERENCE_VALUE FLOAT)
RETURNS FLOAT
AS 
$$
    CO2_VALUE / REFERENCE_VALUE
$$;


In [None]:
SELECT * FROM RAW_DOW30_STAGING LIMIT 10;


In [None]:
DESC TABLE DOW30_HARMONIZED;


In [None]:
ALTER TABLE DOW30_HARMONIZED ADD COLUMN NORMALIZED_CO2 FLOAT;


In [None]:
INSERT INTO DOW30_HARMONIZED (YEAR, MONTH, DAY, CO2_EMISSION, NORMALIZED_CO2)
SELECT 
    YEAR, 
    MONTH, 
    DAY, 
    CO2_EMISSION,
    NORMALIZE_CO2_EMISSION(CO2_EMISSION, 400.0) AS NORMALIZED_CO2  -- Reference value
FROM RAW_DOW30_STAGING;


In [None]:
SELECT * FROM DOW30_HARMONIZED LIMIT 10;


In [None]:
SHOW USER FUNCTIONS;


In [None]:
SELECT NORMALIZE_CO2_EMISSION(333.46, 400);


In [None]:
UPDATE DOW30_HARMONIZED
SET NORMALIZED_CO2 = NORMALIZE_CO2_EMISSION(CO2_EMISSION, 400.0);


In [None]:
SELECT * FROM DOW30_HARMONIZED LIMIT 10;


In [None]:
CREATE OR REPLACE TABLE ANALYTICS_DOW30 AS
SELECT 
    YEAR, 
    MONTH, 
    AVG(CO2_EMISSION) AS AVG_CO2, 
    MAX(CO2_EMISSION) AS MAX_CO2, 
    MIN(CO2_EMISSION) AS MIN_CO2
FROM DOW30_HARMONIZED
GROUP BY YEAR, MONTH;


In [None]:
SELECT * FROM ANALYTICS_DOW30 LIMIT 10;


In [None]:
CREATE OR REPLACE PROCEDURE UPDATE_DOW30_SP()
RETURNS STRING
LANGUAGE SQL
AS 
$$
BEGIN
    MERGE INTO DOW30_HARMONIZED AS TARGET
    USING (SELECT * FROM CO2_STREAM) AS SOURCE
    ON TARGET.YEAR = SOURCE.YEAR
    WHEN MATCHED THEN 
        UPDATE SET TARGET.CO2_EMISSION = SOURCE.CO2_EMISSION
    WHEN NOT MATCHED THEN 
        INSERT (YEAR, MONTH, DAY, DECIMAL_DATE, CO2_EMISSION) 
        VALUES (SOURCE.YEAR, SOURCE.MONTH, SOURCE.DAY, SOURCE.DECIMAL_DATE, SOURCE.CO2_EMISSION);
    
    RETURN 'Update Successful!';
END;
$$;


In [None]:
//stored procedure ...edit here
CREATE OR REPLACE PROCEDURE UPDATE_DOW30_SP()
RETURNS STRING
LANGUAGE SQL
AS 
$$
BEGIN
    MERGE INTO DOW30_HARMONIZED AS TARGET
    USING (SELECT * FROM CO2_STREAM) AS SOURCE
    ON TARGET.YEAR = SOURCE.YEAR
    WHEN MATCHED THEN 
        UPDATE SET TARGET.CO2_EMISSION = SOURCE.CO2_EMISSION
    WHEN NOT MATCHED THEN 
        INSERT (YEAR, MONTH, DAY, DECIMAL_DATE, CO2_EMISSION) 
        VALUES (SOURCE.YEAR, SOURCE.MONTH, SOURCE.DAY, SOURCE.DECIMAL_DATE, SOURCE.CO2_EMISSION);
    
    RETURN 'Update Successful!';
END;
$$;


In [None]:
CALL UPDATE_DOW30_SP();


In [None]:
CREATE OR REPLACE TASK UPDATE_DOW30_METRICS_TASK
SCHEDULE = 'USING CRON 0 12 * * * UTC'
AS 
CALL UPDATE_DOW30_SP();


In [None]:
ALTER TASK UPDATE_DOW30_METRICS_TASK RESUME;


In [None]:
SELECT COUNT(*) FROM DOW30_HARMONIZED;


In [None]:
 SELECT COUNT(*) FROM CO2_DB.RAW_DATA.RAW_DOW30_STAGING;


In [None]:
SELECT COUNT(*) FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED;


In [None]:
SELECT * FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED LIMIT 10;


In [None]:
-- Verify if data is in the staging table
SELECT COUNT(*) FROM CO2_DB.RAW_DATA.RAW_DOW30_STAGING;

-- Verify if data is harmonized
SELECT COUNT(*) FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED;

-- Verify analytics processing
SELECT * FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED LIMIT 10;

-- Monitor Snowflake logs
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) ORDER BY COMPLETED_TIME DESC LIMIT 10;


In [None]:
SELECT COUNT(*) FROM CO2_DB.RAW_DATA.RAW_DOW30_STAGING;
SELECT COUNT(*) FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED;


In [None]:
SELECT * FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED LIMIT 10;


In [None]:
CREATE SCHEMA CO2_DB.ANALYTICS_DOW30;


In [None]:
GRANT USAGE, CREATE FUNCTION, CREATE TABLE, CREATE VIEW ON SCHEMA CO2_DB.ANALYTICS_DOW30 TO ROLE ACCOUNTADMIN;


In [None]:
SHOW SCHEMAS IN DATABASE CO2_DB;


In [None]:
CREATE OR REPLACE FUNCTION CO2_DB.ANALYTICS_DOW30.CALCULATE_PERCENT_CHANGE(
    current_value FLOAT, previous_value FLOAT
) RETURNS FLOAT 
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'calculate_change'
AS $$
def calculate_change(current_value, previous_value):
    if previous_value == 0 or previous_value is None:
        return None
    return ((current_value - previous_value) / previous_value) * 100
$$;


In [None]:
SELECT 
    YEAR, MONTH, CO2_EMISSION,
    CO2_DB.ANALYTICS_DOW30.CALCULATE_PERCENT_CHANGE(CO2_EMISSION, LAG(CO2_EMISSION) OVER (ORDER BY YEAR, MONTH)) 
    AS DAILY_PERCENT_CHANGE
FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED
LIMIT 10;


In [None]:
CREATE OR REPLACE TABLE CO2_DB.ANALYTICS_DOW30.CO2_DAILY_PERCENT_CHANGE AS
SELECT 
    YEAR, 
    MONTH, 
    CO2_EMISSION,
    CO2_DB.ANALYTICS_DOW30.CALCULATE_PERCENT_CHANGE(CO2_EMISSION, LAG(CO2_EMISSION) OVER (ORDER BY YEAR, MONTH)) 
    AS DAILY_PERCENT_CHANGE
FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED;


In [None]:
SELECT * FROM CO2_DB.ANALYTICS_DOW30.CO2_DAILY_PERCENT_CHANGE LIMIT 10;


In [None]:
SHOW USER FUNCTIONS IN SCHEMA CO2_DB.ANALYTICS_DOW30;


In [None]:
CREATE OR REPLACE FUNCTION CO2_DB.ANALYTICS_DOW30.CALCULATE_WEEKLY_PERCENT_CHANGE(
    current_value FLOAT, week_ago_value FLOAT
) RETURNS FLOAT 
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'calculate_change'
AS $$
def calculate_change(current_value, week_ago_value):
    if week_ago_value == 0 or week_ago_value is None:
        return None
    return ((current_value - week_ago_value) / week_ago_value) * 100
$$;


In [None]:
SELECT * FROM INFORMATION_SCHEMA.FUNCTIONS 
WHERE FUNCTION_NAME = 'CALCULATE_WEEKLY_PERCENT_CHANGE' 
AND FUNCTION_SCHEMA = 'ANALYTICS_DOW30';


In [None]:
CREATE OR REPLACE TABLE CO2_DB.ANALYTICS_DOW30.CO2_WEEKLY_PERCENT_CHANGE AS
SELECT 
    YEAR, 
    MONTH, 
    CO2_EMISSION,
    CO2_DB.ANALYTICS_DOW30.CALCULATE_WEEKLY_PERCENT_CHANGE(
        CO2_EMISSION, 
        LAG(CO2_EMISSION, 7) OVER (ORDER BY YEAR, MONTH, DAY)
    ) AS WEEKLY_PERCENT_CHANGE
FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED;


In [None]:
SELECT * FROM CO2_DB.ANALYTICS_DOW30.CO2_WEEKLY_PERCENT_CHANGE LIMIT 10;


In [None]:
SELECT * 
FROM CO2_DB.ANALYTICS_DOW30.CO2_WEEKLY_PERCENT_CHANGE
WHERE WEEKLY_PERCENT_CHANGE IS NULL
LIMIT 10;


In [None]:
CREATE OR REPLACE TASK UPDATE_CO2_METRICS_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 5 * * * UTC'  -- Runs daily at 5 AM UTC
AS 
CALL UPDATE_DOW30_SP();


In [None]:
ALTER TASK UPDATE_CO2_METRICS_TASK RESUME;


In [None]:
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
ORDER BY COMPLETED_TIME DESC LIMIT 10;


In [None]:
SHOW PROCEDURES IN SCHEMA CO2_DB.RAW_DATA;


In [None]:
CREATE OR REPLACE PROCEDURE CO2_DB.RAW_DATA.UPDATE_CO2_PROCEDURE()
RETURNS STRING
LANGUAGE SQL
AS 
$$
BEGIN
    MERGE INTO CO2_DB.RAW_DATA.DOW30_HARMONIZED AS TARGET
    USING (SELECT * FROM CO2_DB.RAW_DATA.RAW_DOW30_STAGING) AS SOURCE
    ON TARGET.YEAR = SOURCE.YEAR AND TARGET.MONTH = SOURCE.MONTH
    WHEN MATCHED THEN 
        UPDATE SET TARGET.CO2_EMISSION = SOURCE.CO2_EMISSION
    WHEN NOT MATCHED THEN 
        INSERT (YEAR, MONTH, DAY, DECIMAL_DATE, CO2_EMISSION) 
        VALUES (SOURCE.YEAR, SOURCE.MONTH, SOURCE.DAY, SOURCE.DECIMAL_DATE, SOURCE.CO2_EMISSION);
    
    RETURN 'Update Successful!';
END;
$$;


In [None]:
SHOW PROCEDURES IN SCHEMA CO2_DB.RAW_DATA;


In [None]:
SELECT * FROM CO2_DB.ANALYTICS_DOW30.CO2_WEEKLY_PERCENT_CHANGE LIMIT 10;


In [None]:
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) 
ORDER BY COMPLETED_TIME DESC 
LIMIT 10;


In [None]:
CREATE OR REPLACE PROCEDURE UPDATE_CO2_PROCEDURE()
RETURNS STRING
LANGUAGE SQL
AS 
$$
BEGIN
    MERGE INTO CO2_DB.RAW_DATA.DOW30_HARMONIZED AS TARGET
    USING (SELECT * FROM CO2_DB.RAW_DATA.RAW_DOW30_STAGING) AS SOURCE
    ON TARGET.YEAR = SOURCE.YEAR AND TARGET.MONTH = SOURCE.MONTH AND TARGET.DAY = SOURCE.DAY
    WHEN MATCHED THEN 
        UPDATE SET TARGET.CO2_EMISSION = SOURCE.CO2_EMISSION
    WHEN NOT MATCHED THEN 
        INSERT (YEAR, MONTH, DAY, DECIMAL_DATE, CO2_EMISSION) 
        VALUES (SOURCE.YEAR, SOURCE.MONTH, SOURCE.DAY, SOURCE.DECIMAL_DATE, SOURCE.CO2_EMISSION);
    
    RETURN 'Update Successful!';
END;
$$;


In [None]:
DROP TASK IF EXISTS UPDATE_CO2_DATA;


In [None]:
CREATE OR REPLACE TASK UPDATE_CO2_DATA
SCHEDULE = 'USING CRON 0 8 * * * UTC'
AS 
CALL UPDATE_CO2_PROCEDURE();


In [None]:
ALTER TASK UPDATE_CO2_DATA RESUME;
EXECUTE TASK UPDATE_CO2_DATA;


In [None]:
SELECT COUNT(*) FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED;


In [None]:
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) 
ORDER BY COMPLETED_TIME DESC 
LIMIT 10;


In [None]:
SELECT COUNT(*) FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED;


In [None]:
SELECT * FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED LIMIT 10;


In [None]:
SELECT 
    YEAR, 
    MONTH, 
    DAY, 
    CO2_EMISSION, 
    CO2_DB.ANALYTICS_DOW30.CALCULATE_PERCENT_CHANGE(
        CO2_EMISSION, 
        LAG(CO2_EMISSION) OVER (ORDER BY YEAR, MONTH, DAY)
    ) AS DAILY_PERCENT_CHANGE
FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED
LIMIT 10;


In [None]:
SELECT 
    YEAR, 
    MONTH, 
    CO2_EMISSION,
    CO2_DB.ANALYTICS_DOW30.CALCULATE_WEEKLY_PERCENT_CHANGE(
        CO2_EMISSION, 
        LAG(CO2_EMISSION, 7) OVER (ORDER BY YEAR, MONTH)
    ) AS WEEKLY_PERCENT_CHANGE
FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED
LIMIT 10;


In [None]:
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY()) 
ORDER BY COMPLETED_TIME DESC 
LIMIT 10;


In [None]:
CREATE OR REPLACE PROCEDURE UPDATE_CO2_PROCEDURE()
RETURNS STRING
LANGUAGE SQL
AS 
$$
BEGIN
    MERGE INTO CO2_DB.RAW_DATA.DOW30_HARMONIZED AS TARGET
    USING (
        SELECT * FROM CO2_DB.RAW_DATA.RAW_DOW30_STAGING
    ) AS SOURCE
    ON TARGET.YEAR = SOURCE.YEAR 
       AND TARGET.MONTH = SOURCE.MONTH
       AND TARGET.DAY = SOURCE.DAY
    WHEN MATCHED THEN 
        UPDATE SET 
            TARGET.CO2_EMISSION = SOURCE.CO2_EMISSION
    WHEN NOT MATCHED THEN 
        INSERT (YEAR, MONTH, DAY, DECIMAL_DATE, CO2_EMISSION) 
        VALUES (SOURCE.YEAR, SOURCE.MONTH, SOURCE.DAY, SOURCE.DECIMAL_DATE, SOURCE.CO2_EMISSION);
    
    RETURN 'CO2 Data Update Successful!';
END;
$$;


In [None]:
ALTER TASK UPDATE_CO2_DATA RESUME;


In [None]:
EXECUTE TASK UPDATE_CO2_DATA;


In [None]:
--SELECT * FROM CO2_DB.RAW_DATA.DOW30_HARMONIZED ORDER BY YEAR DESC LIMIT 10;
