In [4]:
#!pip install snowflake-snowpark-python
#!pip install snowflake-ml-python

In [10]:
import pandas as pd
import json
import os
import snowflake.snowpark.functions as F
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session

In [11]:
connection_parameters = {
    "account": os.getenv("SNOWFLAKE_ACCOUNT"),
    "user": os.getenv("SNOWFLAKE_USER"),
    "password": os.getenv("SNOWFLAKE_PASSWORD"),
    "role": os.getenv("SNOWFLAKE_ROLE"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
}
session = Session.builder.configs(connection_parameters).create()

In [12]:
session.sql("CREATE TRANSIENT  DATABASE IF NOT EXISTS THINGSBOARD;").collect()
session.sql("CREATE  TRANSIENT  SCHEMA IF NOT EXISTS THINGSBOARD.SENSOR;").collect()
session.sql("USE DATABASE THINGSBOARD").collect()
session.sql("USE SCHEMA THINGSBOARD.SENSOR").collect()

[Row(status='Statement executed successfully.')]

In [7]:
session.sql("""
    CREATE OR REPLACE TRANSIENT TABLE THINGSBOARD.SENSOR.Environmental 
    (ts TIMESTAMP,
     co FLOAT,
     humidity FLOAT, 
     light BOOLEAN, 
     lpg FLOAT,
     motion BOOLEAN, 
     smoke FLOAT, 
     temp FLOAT)
""").collect()

[Row(status='Table ENVIRONMENTAL successfully created.')]

In [13]:
session.sql("""
    CREATE OR REPLACE TRANSIENT TABLE THINGSBOARD.SENSOR.co_table_predication 
    (ts TIMESTAMP,
     co FLOAT)
""").collect()

[Row(status='Table CO_TABLE_PREDICATION successfully created.')]

In [14]:

import os

aws_access_key = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')

# Create stage with env variables
session.sql(f"""
    CREATE OR REPLACE STAGE THINGSBOARD.SENSOR.co_table_predication
    url = 's3://fastapi-snowflake/co_table_predication.json'
    credentials=(AWS_KEY_ID='{aws_access_key}' AWS_SECRET_KEY='{aws_secret_key}')
    file_format = (type = 'JSON');
""").collect()



[Row(status='Stage area CO_TABLE_PREDICATION successfully created.')]

In [9]:
session.sql(""" INSERT INTO ENVIRONMENTAL (
    TS,
    CO,
    HUMIDITY,
    LIGHT,
    LPG,
    MOTION,
    SMOKE,
    TEMP
)
SELECT
    TO_TIMESTAMP(co.value:ts::bigint / 1000) AS TS,
    co.value:value::float AS CO,
    humidity.value:value::float AS HUMIDITY,
    light.value:value::boolean AS LIGHT,
    lpg.value:value::float AS LPG,
    motion.value:value::boolean AS MOTION,
    smoke.value:value::float AS SMOKE,
    temp.value:value::float AS TEMP
FROM
    @THINGSBOARD.SENSOR.s3_stage (FILE_FORMAT => 'JSON_FORMAT') AS t,
    LATERAL FLATTEN(input => t.$1:co) AS co,
    LATERAL FLATTEN(input => t.$1:humidity) AS humidity,
    LATERAL FLATTEN(input => t.$1:light) AS light,
    LATERAL FLATTEN(input => t.$1:lpg) AS lpg,
    LATERAL FLATTEN(input => t.$1:motion) AS motion,
    LATERAL FLATTEN(input => t.$1:smoke) AS smoke,
    LATERAL FLATTEN(input => t.$1:temp) AS temp
WHERE
    co.value:ts = humidity.value:ts AND
    co.value:ts = light.value:ts AND
    co.value:ts = lpg.value:ts AND
    co.value:ts = motion.value:ts AND
    co.value:ts = smoke.value:ts AND
    co.value:ts = temp.value:ts;

    """).collect()

[Row(number of rows inserted=220)]

In [16]:
session.sql(""" INSERT INTO co_table_predication (
    TS,
    CO
)
SELECT
    TO_TIMESTAMP(co.value:ts::bigint / 1000) AS TS,
    co.value:value::float AS CO

FROM
    @THINGSBOARD.SENSOR.co_table_predication (FILE_FORMAT => 'JSON_FORMAT') AS t,
    LATERAL FLATTEN(input => t.$1:co) AS co

    """).collect()

[Row(number of rows inserted=91)]