In [0]:
%fs ls dbfs:/user/hive/warehouse/

In [0]:
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import random
import pytz
import time



# Start Spark session
spark = SparkSession.builder.appName("IoT_Data_Generator").getOrCreate()

# Delta path (or use table name)
delta_path = "dbfs:/user/hive/warehouse/iot_events"

# India timezone
india_tz = pytz.timezone('Asia/Kolkata')

# Number of days to generate (last 5 days including today)
num_days = 5

# Loop over days (oldest to newest)
for day_offset in range(num_days-1, -1, -1):  # 4,3,2,1,0
    # Calculate the date for this batch
    current_day = datetime.now(pytz.utc).astimezone(india_tz) - timedelta(days=day_offset)
    
    # Generate multiple batches per day if needed
    for i in range(10):  # 10 batches per day
        # Generate a timestamp within the day
        event_time = current_day.replace(
            hour=random.randint(0, 23),
            minute=random.randint(0, 59),
            second=random.randint(0, 59)
        )
        
        # Generate a single-row batch
        data = [
            (
                i + day_offset*10,  # unique event_id
                f"device_{random.randint(1,5)}",
                round(random.uniform(20.0, 35.0), 2),
                round(random.uniform(30.0, 70.0), 2),
                event_time
            )
        ]
        
        # Create DataFrame
        df = spark.createDataFrame(
            data,
            ["event_id", "device_id", "temperature", "humidity", "event_time"]
        )
        
        # inside the inner loop
        df.write.format("delta").mode("append").save(delta_path)

        print(f"Inserted batch {i+1} for {event_time.strftime('%Y-%m-%d')} at {event_time.strftime('%H:%M:%S')}")
        time.sleep(0)  # half a second delay between batches



In [0]:
%fs ls dbfs:/user/hive/warehouse/

In [0]:
%sql
SELECT * FROM delta.`dbfs:/user/hive/warehouse/iot_events` ORDER BY event_time DESC;


In [0]:
START FROM HERE

### creating main table which have all data

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS hive_metastore.iot;

CREATE TABLE IF NOT EXISTS hive_metastore.iot.live_data
USING DELTA
LOCATION 'dbfs:/user/hive/warehouse/MY_iot_events/'
AS SELECT * FROM delta.`dbfs:/user/hive/warehouse/iot_events/`

In [0]:
# %sql
# INSERT INTO hive_metastore.iot.live_data VALUES
# (31, 'device_4', 33.87, 33.87, '2025-08-18T06:49:20.415+00:00'),
# (32, 'device_4', 33.87, 33.87, '2025-08-18T06:50:20.415+00:00'),
# (33, 'device_4', 33.87, 33.87, '2025-08-18T06:51:20.415+00:00'),
# (00, 'device_4', 33.87, 33.87, '2025-08-18T06:42:20.415+00:00');


In [0]:
%sql
SELECT * FROM hive_metastore.iot.live_data ORDER BY event_time DESC ;

## advoc date

In [0]:
from datetime import datetime, timedelta

# Default date: yesterday
default_date = (datetime.now() - timedelta(days=1)).date()

# Create widget
dbutils.widgets.text("input_date", str(default_date), "Enter date")

# Get widget value
selected_date = dbutils.widgets.get("input_date")

# If widget is empty, use default
if selected_date == "":
    selected_date = str(default_date)

print("Using date:", selected_date)

### creating table which have only new data (by default yesterday data)

In [0]:

spark.sql(f"""
CREATE OR REPLACE TABLE hive_metastore.iot.incremtal_d AS
SELECT *
FROM hive_metastore.iot.live_data
WHERE to_date(event_time) = DATE('{selected_date}')
""")

In [0]:
%sql
SELECT * FROM hive_metastore.iot.incremtal_d ;

CREATING VIEW FOR ONLY INCREMENTAL DATA WHICH TRNSFORMATION OR QUERY

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS silver_layer;

CREATE OR REPLACE VIEW hive_metastore.silver_layer.incremental_view
AS SELECT * FROM  hive_metastore.iot.incremtal_d 
WHERE event_time IS NOT NULL;

In [0]:
%sql
SELECT * FROM hive_metastore.silver_layer.incremental_view;

### CREATING FINAL INCREMENTAL LOAD HANDLING TABLE 

In [0]:
%sql
CREATE TABLE IF NOT EXISTS hive_metastore.silver_layer.my_data
USING DELTA 
LOCATION 'dbfs:/user/hive/warehouse/final_iot_events1/'
AS SELECT * FROM hive_metastore.silver_layer.incremental_view
WHERE 1=0;

In [0]:
%sql
SELECT * FROM hive_metastore.silver_layer.my_data;

HANDLING INCREMENTAL LOAD

In [0]:
%sql
INSERT INTO hive_metastore.silver_layer.my_data
SELECT *
FROM hive_metastore.silver_layer.incremental_view src
WHERE src.event_id NOT IN (
  SELECT event_id FROM hive_metastore.silver_layer.my_data
);


In [0]:
# %sql
# MERGE INTO hive_metastore.silver_layer.my_data AS trg
# USING hive_metastore.silver_layer.incremental_view AS src
# ON trg.event_id = src.event_id
# WHEN MATCHED THEN UPDATE SET*
# WHEN NOT MATCHED THEN INSERT *;


In [0]:
%sql
SELECT * FROM hive_metastore.silver_layer.my_data order by event_time DESC;