In [None]:
import json
from pathlib import Path

import duckdb
import boto3

# import plotly.express as px
# import polars as pl
# import s3fs
from contextlib import contextmanager
import logging

In [None]:
@contextmanager
def init_duckdb_connection(aws_credentials: dict, ram_limit: str):
    logger = logging.getLogger(__name__)

    existing_keys = list(aws_credentials.keys())
    required_keys = ["aws_secret_access_key", "aws_access_key_id", "aws_region"]
    if not set(required_keys) <= set(existing_keys):
        logger.exception(
            f"AWS Credentials doesn't contain required keys {required_keys}"
        )
        raise

    try:
        logger.info("Initializing duckdb connection to S3")
        conn = duckdb.connect()
        conn.execute("INSTALL httpfs;")
        conn.execute("LOAD httpfs;")
        conn.execute(f"SET memory_limit = '{ram_limit}'")
        conn.execute(f"SET s3_region = '{aws_credentials['aws_region']}';")
        conn.execute(
            f"SET s3_access_key_id = '{aws_credentials['aws_access_key_id']}';"
        )
        conn.execute(
            f"SET s3_secret_access_key = '{aws_credentials['aws_secret_access_key']}';"
        )

        yield conn
    finally:
        conn.close()


In [None]:
with open(Path("creds/creds.json"), "r") as target:
    creds = json.load(target)
    storage_options = creds["AWS"]
storage_options

In [None]:
with init_duckdb_connection(storage_options, "4GB") as conn:
    conn.sql("""
    SELECT hiveperiod,COUNT(1) as jml
    FROM read_parquet('s3://smartdbucket/datalog/cis_smartd_tbl_iot_scania/**/*.parquet')
    GROUP BY hiveperiod
    """).show()

In [None]:
with init_duckdb_connection(storage_options, "4GB") as conn:
    conn.sql("""
    SELECT COUNT(1) as jml
    FROM read_parquet('s3://smartdbucket/datalog/cis_smartd_tbl_iot_scania/**/*.parquet',hive_partitioning=true)
    WHERE hiveperiod BETWEEN '2025-12-01' AND '2025-12-31' AND dstrct_code = 'BRCB'
    """).show()

In [None]:
with init_duckdb_connection(storage_options, "4GB") as conn:
    conn.sql("""
    SELECT *
    FROM read_json_auto('s3://smartdbucket/datalog/BRCB/SLS30I172/2025120705/2025120705.txt.gz')
    """).show()

In [None]:
a = set(list(storage_options.keys()))
b = set(["aws_secret_access_keya", "aws_access_key_id", "aws_region"])

b <= a

In [None]:
# pull gzip data from s3
s3 = s3fs.S3FileSystem(
    key=storage_options["aws_access_key_id"],
    secret=storage_options["aws_secret_access_key"],
)


filepath = f"s3://{storage_options['aws_bucket']}/datalog/{distrik}/{deviceid}/{hivehour}/{hivehour}"
bucketpath


In [None]:
s3paths = s3.glob(filepath)
s3paths = [i.replace("smartdbucket/datalog/BRCG/SLS30I009/", "") for i in s3paths]

In [None]:
# duckdb s3 connection config
conn = duckdb.connect()

conn.execute("INSTALL httpfs;")
conn.execute("LOAD httpfs;")

conn.execute(f"SET s3_region = '{storage_options['aws_region']}';")
conn.execute(f"SET s3_access_key_id = '{storage_options['aws_access_key_id']}';")
conn.execute(
    f"SET s3_secret_access_key = '{storage_options['aws_secret_access_key']}';"
)


In [None]:
list_unit = [
    ("SLS30I009", "LD780"),
    ("SLS30I015", "LD781"),
    ("SLS30I016", "LD782"),
    ("SLS30I039", "LD776"),
    ("SLS30I146", "LD772"),
    ("SLS30I477", "LD778"),
    ("SDLIR038", "LD921"),
    ("SLS30I623", "LD783"),
]

In [None]:
df = conn.sql(f"""
SELECT CAST(to_timestamp(heartbeat) as DATE)as tgl_utc,COUNT(1) as jml
FROM read_json_auto('{bucketpath}')
GROUP BY CAST(to_timestamp(heartbeat) as DATE)
ORDER BY CAST(to_timestamp(heartbeat) as DATE)
           """).pl()

df

In [None]:
list_unit

In [None]:
distrik = "BRCG"
deviceid = "SLS30I009"


def count_canbus_rows(conn, devid, hivehour):
    bucketpath = f"s3://{storage_options['aws_bucket']}/datalog/{distrik}/{devid}/{hivehour}/{hivehour}.txt.gz"
    df = conn.sql(f"""
SELECT CAST(to_timestamp(heartbeat) as DATE)+ INTERVAL 8 HOUR as datetime_wita,heartbeat,unitno,deviceid,gpsspeed,gpsnumsat,vehiclespeed,speedsource,
FROM read_json_auto('{bucketpath}')
WHERE gpsspeed > 0 AND VehicleSpeed > 0
           """).pl()
    print(devid, "unit has :", len(df), "rows")
    return df

In [None]:
list_unit[1][1]

In [None]:
del main_df

In [None]:
hivehour = "2025121*"
with duckdb.connect() as conn:
    conn = duckdb.connect()

    conn.execute("INSTALL httpfs;")
    conn.execute("LOAD httpfs;")

    conn.execute(f"SET s3_region = '{storage_options['aws_region']}';")
    conn.execute(f"SET s3_access_key_id = '{storage_options['aws_access_key_id']}';")
    conn.execute(
        f"SET s3_secret_access_key = '{storage_options['aws_secret_access_key']}';"
    )

    for sls, unitno in list_unit:
        try:
            if main_df is not None:
                df = count_canbus_rows(conn, sls, hivehour)
                main_df = pl.concat([main_df, df])
        except NameError:
            # Code to execute if 'df' variable has not been assigned
            main_df = count_canbus_rows(conn, sls, hivehour)
            print("DataFrame variable is not defined.")

In [None]:
df

In [49]:
test_str = [
    "datalog/BRCG/SLS30I614/2025121212/2025121212.txt.gz",
    "datalog/BRCG/SLS30I614/2025121212/2025121212.txt.gz",
    "datalog/BRCG/SLS30I614/2025121212/2025121212.txt.gz",
    "datalog/BRCG/SLS30I614/2025121212/2025121212.txt.gz",
    "datalog/BRCG/SLS30I614/2025121212/2025121212.txt.gz",
]

In [51]:
bucket_name = "smartdbucket"
s3key_list_string = (
    f"['s3://{bucket_name}/" + f"', 's3://{bucket_name}/".join(test_str) + "']"
)
s3key_list_string

In [None]:
r = list(range(0, 24))
r = [f"{i:02d}" for i in r]
r

In [None]:
main_df.select(pl.col("speedsource")).unique()

's3://smartdbucket/datalog/BRCB/SLS30I172/2025120705/2025120705.txt.gz'

In [None]:
S3_OBJECT_KEY = "s3://smartdbucket/datalog/BRCB/SLS30I172/2025120705/2025120705.txt.gz"

In [None]:
s3 = boto3.client(
    "s3",
    aws_access_key_id=storage_options["aws_access_key_id"],
    aws_secret_access_key=storage_options["aws_secret_access_key"],
    region_name=storage_options["aws_region"],
)
response = s3.list_objects_v2(
    Bucket="smartdbucket", Prefix="datalogparquet/datalog/", MaxKeys=10
)

print(f"KeyCount: {response.get('KeyCount', 0)}")
print(f"Contents: {response.get('Contents', [])}")

In [None]:
s3 = boto3.client(
    "s3",
    aws_access_key_id=storage_options["aws_access_key_id"],
    aws_secret_access_key=storage_options["aws_secret_access_key"],
    region_name=storage_options["aws_region"],
)
try:
    # Download the file
    s3.download_file("smartdbucket", S3_OBJECT_KEY, "data")
    print(f"Successfully downloaded {S3_OBJECT_KEY} to data")

except Exception as e:
    print(f"An error occurred: {e}")


In [None]:
main_df.group_by(pl.col(pl.col('speedsource'))

In [None]:
# pull gzip data from s3
s3 = s3fs.S3FileSystem(
    key=storage_options["aws_access_key_id"],
    secret=storage_options["aws_secret_access_key"],
)


filepath = f"s3://{storage_options['aws_bucket']}/datalog/{distrik}/{deviceid}/{hivehour}/{hivehour}"
bucketpath


In [None]:
bucketpath = f"s3://{storage_options['aws_bucket']}/datalog/{distrik}/{deviceid}/{hivehour}/{hivehour}.txt.gz"
df = conn.sql(f"""
SELECT CAST(to_timestamp(heartbeat) as DATE)+ INTERVAL 8 HOUR as datetime_wita,gpsspeed,gpsnumsat,vehiclespeed,speedsource,
FROM read_json_auto('{bucketpath}')
WHERE gpsspeed > 0 AND VehicleSpeed > 0 AND speedsource = 'CANBus

           """).pl()

In [None]:
df.filter(pl.col("speedsource") == "CANBus")

In [None]:
px.