In [0]:
%run "./Environment Setup"

In [0]:

import pyspark.sql.functions as f
from pyspark.sql.types import *
 
from delta.tables import *
 
import dlt
import boto3
import socket
from botocore.exceptions import NoCredentialsError
import time

In [0]:
# Define AWS configuration details in the aws_config dictionary
# DBTITLE 1,Initialize Config Settings
if 'config' not in locals() or not isinstance(config, dict):
    config = {}

config['aws'] = {
    'access_key_id': 'AKIA3BYT2DKQQQ7QDXVW',
    'secret_access_key': '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',
    'region_name': 'us-west-2',
    'subnets': [
        'subnet-0682160c6cf0f7d33',  # SubnetID-1 
        'subnet-0ef26ca8237f37fae'   # SubnetID-2
    ],
    'security_group': 'sg-098d6eca4e4744f3f',  # Security group ID
    'cluster_name': 'real-time-pos-msk',  # Unique cluster name
    'kafka_version': '2.8.1',
    'number_of_broker_nodes': 4,
    'instance_type': 'kafka.m5.large',
    'cluster_arn': 'arn:aws:kafka:us-west-2:759713897121:cluster/real-time-pos-msk/d494f0b6-6c35-4d6e-ab46-60da511fcafd-11'
}

## Config Settings for DBFS Mount Point
config['dbfs_mount_name'] = f'/mnt/real-time-pos/' 

# Store the filenames for the data files into Config
config['inventory_change_store001_filename'] = config['dbfs_mount_name'] + '/data-generator/inventory_change_store001.txt'
config['inventory_change_online_filename'] = config['dbfs_mount_name'] + '/data-generator/inventory_change_online.txt'
 
# snapshot data files
config['inventory_snapshot_store001_filename'] = config['dbfs_mount_name'] + '/data-generator/inventory_snapshot_store001.txt'
config['inventory_snapshot_online_filename'] = config['dbfs_mount_name'] + '/data-generator/inventory_snapshot_online.txt'
 
# static data files
config['stores_filename'] = config['dbfs_mount_name'] + '/data-generator/store.txt'
config['items_filename'] = config['dbfs_mount_name'] + '/data-generator/item.txt'
config['change_types_filename'] = config['dbfs_mount_name'] + '/data-generator/inventory_change_type.txt'

# Config Settings for Checkpoint Files
config['inventory_snapshot_path'] = config['dbfs_mount_name'] + '/inventory_snapshots/'
# Config Settings for DLT Data
config['dlt_pipeline'] = config['dbfs_mount_name'] + '/dlt_pipeline_pos'

# Identify Database for Data Objects and initialize it
database_name = f'pos_dlt'
config['database'] = database_name




In [0]:
%sql
CREATE DATABASE IF NOT EXISTS pos_dlt

In [0]:
# DLT tables wont be run unless the DLT pipeline is initialized. It cant be run in interactive mode .
## to test the dataframe use store definition 
store_schema = StructType([ StructField("store_id", StringType(), True),
                            StructField("name", StringType(), True)
                            ])
df=(
    spark.read.csv(config['stores_filename'],header=True, schema=store_schema)
  )
display(df)

In [0]:
## Store Static Data 
store_schema = StructType([ StructField("store_id", StringType(), True),
                            StructField("name", StringType(), True)
                            ])

## Define the delta live table for the store data
@dlt.table(
  name='Store',
  comment='This table store Static data with individual store id & Name',
  table_properties={'quality':'Silver'},
  spark_conf={'pipelines.trigger.interval':'10 minutes'}
)
def store():
  df=(
    spark.read.csv(config['stores_filename'],header=True, schema=store_schema)
  )
  return df


In [0]:
item_schema=StructType([
    StructField("item_id", StringType()),
    StructField("name",StringType(),True),
    StructField("supplier_id",IntegerType(),True),
    StructField("saftey_stock_quantity",IntegerType(),True)    
])

## Defining the Item Table
@dlt.table(
    name='Item',
    comment='This table Include all the items with their details . Triggered Every 10 Minutes',
    table_properties={'quality':'Silver'},
    spark_conf={'pipelines.trigger.interval':'10 minutes'}
)
def Item():
    df=spark.read.csv(
        config['items_filename'],header=True,schema=item_schema
    )
    return df




In [0]:
change_type_schema = StructType([
  StructField('change_type_id', IntegerType()),
  StructField('change_type', StringType())
  ])
 
@dlt.table(
  name = 'inventory_change_type',
  comment = 'data mapping change type id values to descriptive strings',
  table_properties={'quality':'silver'},
  spark_conf={'pipelines.trigger.interval':'10 minutes'}
)
def inventory_change_type():
  return (
    spark
      .read
      .csv(
        config['change_types_filename'],
        header=True,
        schema=change_type_schema
        )
  )

In [0]:
## Initialize the boto3 session
session = boto3.Session(
    aws_access_key_id=config['aws']['access_key_id'],
    aws_secret_access_key=config['aws']['secret_access_key'],
    region_name=config['aws']['region_name']
)

# Create an MSK client
msk_client = session.client('kafka')

try:
    # Get bootstrap brokers
    response = msk_client.get_bootstrap_brokers(
        ClusterArn=config['aws']['cluster_arn'],
    )
    bootstrap_servers = response['BootstrapBrokerString']
except NoCredentialsError:
    print("No credentials available. Please check your AWS credentials.")
    exit(1)  # Exit the script if credentials are not available
except Exception as e:
    print(f"Error getting bootstrap brokers: {e}")
    exit(1)


topics = ['InventorySnapshot', 'ChangeInventoryData']

In [0]:
@dlt.table(
  name = 'raw_inventory_change',
  comment= 'data representing raw (untransformed) inventory-relevant events originating from the POS',
  table_properties={'quality':'bronze'}
  )
def raw_inventory_change():
  return (
    spark
      .readStream
      .format('kafka')
      .option('subscribe', 'ChangeInventoryData')
      .option('kafka.bootstrap.servers',bootstrap_servers )
      .option('kafka.sasl.mechanism', 'PLAIN')
    #   .option('kafka.security.protocol', 'SASL_SSL')
    #   .option('kafka.sasl.jaas.config', config['eh_sasl'])
      .option('kafka.request.timeout.ms', '120000')
      .option('kafka.session.timeout.ms', '120000')
      .option('failOnDataLoss', 'false')
      .option('startingOffsets', 'earliest')
      .option('maxOffsetsPerTrigger', '100') # read 100 messages at a time
      .load()
  )

In [0]:
# display(spark
#       .readStream
#       .format('kafka')
#       .option('subscribe', 'ChangeInventoryData')
#       .option('kafka.bootstrap.servers',bootstrap_servers )
#       .option('kafka.sasl.mechanism', 'PLAIN')
#     #   .option('kafka.security.protocol', 'SASL_SSL')
#     #   .option('kafka.sasl.jaas.config', config['eh_sasl'])
#       .option('kafka.request.timeout.ms', '120000')
#       .option('kafka.session.timeout.ms', '120000')
#      .option('failOnDataLoss', 'false')
#       .option('startingOffsets', 'earliest')
#       .option('maxOffsetsPerTrigger', '100') # read 100 messages at a time
#      .load())

In [0]:
## Use the raw_inventory_change DLT to create another DLT for conversion 
# schema of value field
value_schema = StructType([
  StructField('trans_id', StringType()),
  StructField('store_id', IntegerType()),
  StructField('date_time', TimestampType()),
  StructField('change_type_id', IntegerType()),
  StructField('items', ArrayType(
    StructType([
      StructField('item_id', IntegerType()), 
      StructField('quantity', IntegerType())
      ])
    ))
  ])
 
# define inventory change data
@dlt.table(
  name = 'inventory_change',
  comment = 'data representing item-level inventory changes originating from the POS',
  table_properties = {'quality':'silver'}
)
def inventory_change():
  df = (
    dlt
      .read_stream('raw_inventory_change')
      .withColumn('body', f.expr('cast(value as string)')) # convert payload to string
      .withColumn('event', f.from_json('body', value_schema)) # parse json string in payload
      .select( # extract data from payload json
        f.col('event').alias('event'),
        f.col('event.trans_id').alias('trans_id'),
        f.col('event.store_id').alias('store_id'), 
        f.col('event.date_time').alias('date_time'), 
        f.col('event.change_type_id').alias('change_type_id'), 
        f.explode_outer('event.items').alias('item')     # explode items so that there is now one item per record
        )
      .withColumn('item_id', f.col('item.item_id'))
      .withColumn('quantity', f.col('item.quantity'))
      .drop('item')
      .withWatermark('date_time', '1 hour') # ignore any data more than 1 hour old flowing into deduplication
      .dropDuplicates(['trans_id','item_id'])  # drop duplicates 
    )
  return df

# Inventory Management Process

### 1. Periodically Receiving Files of Inventory Count

### 2. Receiving the File from S3 Bucket

### 3. Maintaining Two Tables:
- **Full History of Inventory Snapshot**
- **Latest Inventory Snapshot**

### 4. Full History Inventory Snapshot:
- **Mode:** Append

### 5. Latest Inventory Snapshot:
- **Mode:** Merge

## Inventory Snapshot Files Details:
- **Frequency:** Irregular
- **Action:** Process it as soon as it lands
- **Solution:** Use Autoloader
  - **Function:** Listens for incoming files from the storage path
  - **Process:** Handles data for any new arriving files as a stream

In [0]:
# inventory snapshot data files (one from each store) -> SCHEMA_IS
Schema_IS = StructType([
  StructField('item_id', IntegerType()),
  StructField('employee_id', IntegerType()),
  StructField('store_id', IntegerType()),
  StructField('date_time', TimestampType()),
  StructField('quantity', IntegerType())
  ])

@dlt.table(
      name='InventorySnapshot',
      comment='data representing inventory snapshots originating from the POS in S3',
      table_properties={'quality':'silver'}
  )
def InventorySnapshot():
    return(
         spark.readStream
            .format('cloudFiles')
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.includeExistingFiles",'true')
            .option("header","true")
            .schema(Schema_IS)
            .load(config['inventory_snapshot_path'])
            .drop('id')
      )

In [0]:
# display(
#          spark.readStream
#             .format('cloudFiles')
#             .option("cloudFiles.format", "csv")
#             .option("cloudFiles.includeExistingFiles",'true')
#             .option("header","true")
#             .schema(Schema_IS)
#             .load(config['inventory_snapshot_path'])
#             .drop('id')
#       )

In [0]:
## Inventory Snapshot data - All historical Load 
## Calculate a table for latest snapshot data
## Use apply changes method
# create dlt table to hold latest inventory snapshot (if it doesn't exist)
dlt.create_streaming_table('latestinventorysnapshot')
 
# merge incoming snapshot data with latest
dlt.apply_changes( # merge
  target = 'latestinventorysnapshot',
  source = 'InventorySnapshot',
  keys = ['store_id','item_id'], # match source to target records on these keys
  sequence_by = 'date_time' # determine latest value by comparing date_time field
  )

In [0]:
# display(
#         spark
#       .readStream
#       .format('kafka')
#       .option('subscribe', 'ChangeInventoryData')
#       .option('kafka.bootstrap.servers', bootstrap_servers)
#       .option('kafka.sasl.mechanism', 'PLAIN')
#     #   .option('kafka.security.protocol', 'SASL_SSL')
#     #   .option('kafka.sasl.jaas.config', config['eh_sasl'])
#       .option('kafka.request.timeout.ms', '120000')
#       .option('kafka.session.timeout.ms', '120000')
#       .option('failOnDataLoss', 'false')
#       .option('startingOffsets', 'latest')
#       .option('maxOffsetsPerTrigger', '1000') # read 1000 messages at a time
#       .load()
# )

In [0]:
from pyspark.sql.functions import *

@dlt.table(
  name='current_inventory',
  comment='current inventory count for a product in a store location',
  table_properties={'quality':'gold'},
  spark_conf={'pipelines.trigger.interval': '5 minutes'}
)
def current_inventory():

    # calculate inventory change with bopis corrections
    inventory_change_df = (
        dlt
        .readStream('inventory_change').alias('x')
        .join(
            dlt.readStream('Store').alias('y'), 
            on='store_id'
        )
        .join(
            dlt.readStream('inventory_change_type').alias('z'), 
            on='change_type_id'
        )
        .filter(expr("NOT(y.name='online' AND z.change_type='bopis')"))
        .select('store_id','item_id','date_time','quantity')
    )

    # calculate current inventory
    inventory_current_df = (
        dlt
            .readStream('latestinventorysnapshot').alias('a')
            .join(
            inventory_change_df.alias('b'), 
            on=expr('''
                a.store_id=b.store_id AND 
                a.item_id=b.item_id AND 
                a.date_time<=b.date_time
                '''), 
            how='leftouter'
            )
            .groupBy('a.store_id','a.item_id')
            .agg(
                first('a.quantity').alias('snapshot_quantity'),
                sum('b.quantity').alias('change_quantity'),
                first('a.date_time').alias('snapshot_datetime'),
                max('b.date_time').alias('change_datetime')
                )
            .withColumn('change_quantity', coalesce('change_quantity', lit(0)))
            .withColumn('current_quantity', expr('snapshot_quantity + change_quantity'))
            .withColumn('date_time',expr('GREATEST(snapshot_datetime, change_datetime)'))
            .drop('snapshot_datetime','change_datetime')
            .orderBy('current_quantity')
    )

    return inventory_current_df

In [0]:
class InventoryTestSuite:
    def setup(self):
        # Create the necessary tables and insert test data
        spark.sql("DROP TABLE IF EXISTS inventory_change")
        spark.sql("DROP TABLE IF EXISTS Store")
        spark.sql("DROP TABLE IF EXISTS inventory_change_type")
        spark.sql("DROP TABLE IF EXISTS latestinventorysnapshot")

        spark.sql("""
        CREATE TABLE inventory_change (
            store_id INT,
            item_id INT,
            date_time TIMESTAMP,
            quantity INT
        ) USING DELTA
        """)

        spark.sql("""
        INSERT INTO inventory_change VALUES
        (1, 101, '2023-10-01 10:00:00', 10),
        (1, 102, '2023-10-01 11:00:00', 5),
        (2, 101, '2023-10-01 12:00:00', -3)
        """)

        spark.sql("""
        CREATE TABLE Store (
            store_id INT,
            name STRING
        ) USING DELTA
        """)

        spark.sql("""
        INSERT INTO Store VALUES
        (1, 'physical'),
        (2, 'online')
        """)

        spark.sql("""
        CREATE TABLE inventory_change_type (
            change_type_id INT,
            change_type STRING
        ) USING DELTA
        """)

        spark.sql("""
        INSERT INTO inventory_change_type VALUES
        (1, 'sale'),
        (2, 'bopis')
        """)

        spark.sql("""
        CREATE TABLE latestinventorysnapshot (
            store_id INT,
            item_id INT,
            date_time TIMESTAMP,
            quantity INT
        ) USING DELTA
        """)

        spark.sql("""
        INSERT INTO latestinventorysnapshot VALUES
        (1, 101, '2023-09-30 10:00:00', 100),
        (1, 102, '2023-09-30 11:00:00', 50),
        (2, 101, '2023-09-30 12:00:00', 30)
        """)

    def test_inventory_logic(self):
        from pyspark.sql.functions import expr, col, first, sum, max, coalesce, lit

        # Read the tables
        inventory_change_df = spark.read.table('inventory_change')
        store_df = spark.read.table('Store')
        inventory_change_type_df = spark.read.table('inventory_change_type')
        latest_inventory_snapshot_df = spark.read.table('latestinventorysnapshot')

        # Perform the joins and transformations as defined in the DLT pipeline
        inventory_change_df = (
            inventory_change_df.alias('x')
            .join(store_df.alias('y'), on='store_id')
            .join(inventory_change_type_df.alias('z'), on=col('change_type_id') == col('z.change_type_id'))
            .filter(expr("NOT(y.name='online' AND z.change_type='bopis')"))
            .select('store_id', 'item_id', 'date_time', 'quantity', 'change_type_id')
        )

        inventory_current_df = (
            latest_inventory_snapshot_df.alias('a')
            .join(
                inventory_change_df.alias('b'),
                on=expr('''
                    a.store_id=b.store_id AND 
                    a.item_id=b.item_id AND 
                    a.date_time<=b.date_time
                '''),
                how='leftouter'
            )
            .groupBy('a.store_id', 'a.item_id')
            .agg(
                first('a.quantity').alias('snapshot_quantity'),
                sum('b.quantity').alias('change_quantity'),
                first('a.date_time').alias('snapshot_datetime'),
                max('b.date_time').alias('change_datetime')
            )
            .withColumn('change_quantity', coalesce('change_quantity', lit(0)))
            .withColumn('current_quantity', expr('snapshot_quantity + change_quantity'))
            .withColumn('date_time', expr('GREATEST(snapshot_datetime, change_datetime)'))
            .drop('snapshot_datetime', 'change_datetime')
            .orderBy('current_quantity')
        )

        # Display the final DataFrame
        display(inventory_current_df)
        print("Test inventory logic successfully done")

    def teardown(self):
        # Drop the tables
        spark.sql("DROP TABLE IF EXISTS inventory_change")
        spark.sql("DROP TABLE IF EXISTS Store")
        spark.sql("DROP TABLE IF EXISTS inventory_change_type")
        spark.sql("DROP TABLE IF EXISTS latestinventorysnapshot")
        print("Teardown successfully done")



In [0]:
# Run the test suite
test_suite = InventoryTestSuite()
test_suite.setup()
test_suite.test_inventory_logic()
test_suite.teardown()