# Simulating Live Data Streaming for Hotel Bookings

*Note: This notebook can be run either in Databricks or with a spark environment running locally*

### Process:

- In this notebook, we are simulating the live data streaming for hotel bookings using spark and confluent cloud (kafka). The streaming data is setup to generate data for a new hotel booking every 15 mins and writes the data to a topic in confluent cloud (kafka)

### Use Case:

- The stream will be read using a databricks notebook every 7 days and using the endpoint created for predictions, we will calculate the probability of cancellations for every active booking.

- Email campaigns will be triggered using a pipeline for bookings which have a probability of cancellation higher than 70% (configurable).

## 1. Import Required Libraries

### 1.1 Verify Installation of Confluent's Python client for Apache Kafka

In [None]:
! pip show confluent-kafka

### 1.2 Import Other Required Libraries

In [None]:
# 1.2.1 Import pyspark modules for stream processing
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.streaming import Trigger

# 1.2.2 Import os for masking the environment variables (api key and secret for confluent cloud)
import os

# 1.2.3 Import time to control the stream data processing
import time

## 2. Stream Processing

This section contains the step-by-step process of simulating, processing, and publishing streaming data to a Kafka topic. This workflow is designed to represent live hotel bookings made by customers.

### 2.1 Define a Schema for Stream Processing

Initially, we create a schema corresponding to the Avro schema specified for the Kafka topic. This schema organizes the streaming data and includes fields such as:

- `hotel`
- `lead_time`
- `arrival_date_month`
- `arrival_date_week_number`
- `arrival_date_day_of_month`
- `stays_in_weekend_nights`
- `stays_in_week_nights`
- `adults`
- `children`
- `babies`
- `meal`
- `country`
- `market_segment`
- `distribution_channel`
- `is_repeated_guest`
- `previous_cancellations`
- `previous_bookings_not_canceled`
- `reserved_room_type`
- `booking_changes`
- `deposit_type`
- `days_in_waiting_list`
- `customer_type`
- `adr`
- `required_car_parking_spaces`
- `total_of_special_requests`

These fields are tailored to capture the dynamics of hotel bookings, from guest details to reservation specifics.


### 2.2 Simulate Streaming Data

To mimic the flow of real-time data, a streaming DataFrame is created that generates one row every 15 minutes. This is made with the assumption that the hotel receives a new booking every 15 minutes.

### 2.3 Transform Streaming Data

The generated streaming data is then transformed to align with the previously defined schema. This step involves casting and structuring data fields to ensure they match the expected format for subsequent processing and analysis.

### 2.4 Publish to Kafka

With the streaming data properly formatted, the next step is to publish this data to a specific Kafka topic. This involves configuring Kafka connection parameters, and sending the transformed data to the designated Kafka topic, making it available for real-time consumption.

### 2.5 Stream Execution and Termination

The streaming process is executed for a predetermined duration (e.g., 30 minutes) to simulate a live data feed. After this period, the streaming query is programmatically terminated, stopping the data flow and concluding the simulation.


In [None]:
# 2.1 Schema columns based on the provided hotel booking schema
schema_columns = ["hotel", "lead_time", "arrival_date_month", "arrival_date_week_number",
                  "arrival_date_day_of_month", "stays_in_weekend_nights", "stays_in_week_nights",
                  "adults", "children", "babies", "meal", "country", "market_segment",
                  "distribution_channel", "is_repeated_guest", "previous_cancellations",
                  "previous_bookings_not_canceled", "reserved_room_type", "booking_changes",
                  "deposit_type", "days_in_waiting_list", "customer_type", "adr",
                  "required_car_parking_spaces", "total_of_special_requests"]

# 2.2 Create a streaming DataFrame that generates one row per second
df_stream = spark.readStream.format("rate").option("rowsPerSecond", 1).load().trigger(Trigger.ProcessingTime("15 minutes"))

# 2.3 Transform this DataFrame to match our hotel booking schema
df_stream_transformed = df_stream.selectExpr("CAST(value AS STRING) AS key",
    """to_json(struct(
        'City Hotel' as hotel,
        cast(value as int) % 300 as lead_time,
        'January' as arrival_date_month,
        cast((value % 53) + 1 as int) as arrival_date_week_number,
        cast((value % 31) + 1 as int) as arrival_date_day_of_month,
        cast((value % 5) as int) as stays_in_weekend_nights,
        cast((value % 10) as int) as stays_in_week_nights,
        cast((value % 4) + 1 as int) as adults,
        cast((value % 3) as int) as children,
        cast((value % 2) as int) as babies,
        'BB' as meal,
        'USA' as country,
        'Online TA' as market_segment,
        'TA/TO' as distribution_channel,
        cast(value % 2 as int) as is_repeated_guest,
        0 as previous_cancellations,
        0 as previous_bookings_not_canceled,
        'A' as reserved_room_type,
        cast((value % 5) as int) as booking_changes,
        'No Deposit' as deposit_type,
        0 as days_in_waiting_list,
        'Transient' as customer_type,
        cast(100 + (rand() * 120) as float) as adr,
        0 as required_car_parking_spaces,
        cast((value % 3) as int) as total_of_special_requests
    )) AS value""")

# 2.4 Write the stream data to the kafka topic
kafka_api_key = os.getenv('KAFKA_API_KEY', 'default_api_key')
kafka_api_secret = os.getenv('KAFKA_API_SECRET', 'default_api_secret')

topic_name = "hotel_booking_live"
bootstrap_server = "pkc-56d1g.eastus.azure.confluent.cloud:9092"

ds = df_stream_transformed \
  .writeStream \
  .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_server)\
    .option("subscribe", topic_name)\
    .option("kafka.security.protocol","SASL_SSL")\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.sasl.jaas.config", f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_api_key}" password="{kafka_api_secret}";')\
  .option("topic", topic_name) \
  .option("checkpointLocation", "/dbfs/dir") \
  .trigger(processingTime='10 seconds') \
  .start()


# 2.5 Let the stream run for 30 minutes and terminate
time.sleep(1800)
ds.stop()