In [None]:
-- Create Storage Integration to connect Snowflake with GCS
CREATE OR REPLACE STORAGE INTEGRATION gcs_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://solarpowerplantdata/');


-- Create File Format for CSV data
CREATE OR REPLACE FILE FORMAT my_csv_format
TYPE = CSV
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
SKIP_HEADER = 1;
-- TIMESTAMP_FORMAT = 'MM/DD/YYYY HH24:MI';

-- Create a Stage pointing to the GCS bucket
CREATE OR REPLACE STAGE gcs_stage
URL = 'gcs://solarpowerplantdata/'
STORAGE_INTEGRATION = gcs_integration
FILE_FORMAT = my_csv_format;


-- Create Raw Power Data Table
CREATE OR REPLACE TABLE raw_power_data (
    "DATE_TIME" TIMESTAMP_LTZ,
    "DC_POWER" FLOAT,
    "AC_POWER" FLOAT,
    "DAILY_YIELD" FLOAT,
    "TOTAL_YIELD" FLOAT,
    "AMBIENT_TEMPERATURE" FLOAT,
    "MODULE_TEMPERATURE" FLOAT,
    "IRRADIATION" FLOAT,
    "Hour" INT,
    "Power_Efficiency" FLOAT
);

COPY INTO raw_power_data
FROM @gcs_stage/Cleaned_Plant_Data.csv
FILE_FORMAT = my_csv_format;

-- Create Cleaned Power Data Table
CREATE OR REPLACE TABLE cleaned_power_data (
    "DATE_TIME" TIMESTAMP_LTZ,
    "DC_POWER" FLOAT,
    "AC_POWER" FLOAT,
    "DAILY_YIELD" FLOAT,
    "TOTAL_YIELD" FLOAT,
    "AMBIENT_TEMPERATURE" FLOAT,
    "MODULE_TEMPERATURE" FLOAT,
    "IRRADIATION" FLOAT,
    "Hour" INT,
    "Power_Efficiency" FLOAT
);

-- Create Snowpipe to automatically load data into raw_power_data table
CREATE OR REPLACE PIPE power_data_pipe
AS
COPY INTO raw_power_data
FROM @gcs_stage
FILE_FORMAT = (TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1);

-- Create a Stream to track new rows in raw_power_data table
CREATE OR REPLACE STREAM power_data_stream
ON TABLE raw_power_data
APPEND_ONLY = TRUE;

-- Create a Task that processes new data every 30 seconds from the stream
CREATE OR REPLACE TASK process_power_data_task
WAREHOUSE = COMPUTE_WH
SCHEDULE = '30 SECOND'
WHEN SYSTEM$STREAM_HAS_DATA('power_data_stream')
AS
INSERT INTO cleaned_power_data (
    "DATE_TIME", "DC_POWER", "AC_POWER", "DAILY_YIELD", "TOTAL_YIELD",
    "AMBIENT_TEMPERATURE", "MODULE_TEMPERATURE", "IRRADIATION", "Hour", "Power_Efficiency"
)
SELECT
    "DATE_TIME", "DC_POWER", "AC_POWER", "DAILY_YIELD", "TOTAL_YIELD",
    "AMBIENT_TEMPERATURE", "MODULE_TEMPERATURE", "IRRADIATION", "Hour", "Power_Efficiency"
FROM power_data_stream;

-- Resume the task to start processing data
ALTER TASK process_power_data_task RESUME;

-- Verify data in the cleaned_power_data table
SELECT * FROM cleaned_power_data ORDER BY DATE_TIME DESC LIMIT 10;

ALTER PIPE power_data_pipe REFRESH;

SELECT COUNT(*) FROM raw_power_data;
SELECT * FROM raw_power_data ORDER BY DATE_TIME DESC LIMIT 10;

-- Check if the stream has detected new data
-- SELECT SYSTEM$STREAM_HAS_DATA('power_data_stream');
