# **Cab Service Recommendation System Design with Azure - A Data Engineering Perspective**

### **Step 1: Data Ingestion (Producers)**
1.	A user books a cab through the App/UI.
2.	The UI captures details such as ride ID, user ID, pickup and drop locations, timestamp, ride type, fare, and distance traveled.
3.	These details are sent to Azure Event Hubs via an API.
### **Step 2: Event Handling in Azure Event Hubs**
1.	Event Hubs receives the booking data and temporarily stores it.
2.	It acts as a message broker, capturing the events in partitions, ensuring that event messages are available for downstream processing.
### **Step 3: Stream Processing & Storage**
1.	Azure Stream Analytics reads the data from Event Hubs in real-time.
2.	It inserts the structured data into Azure SQL Database with additional transformation like formatting the timestamp column for partitioning.
3.	Each day’s data will be stored as a separate partition in SQL Table.
4.	This partitioning enables efficient querying for recommendation process.
5.	The processed data is stored and made available for further analysis.
### **Step 4: Configure Azure Event Hubs**
1.	Create an Event Hubs namespace and an Event Hub instance.
2.	Set up authentication for secure API access.
3.	Develop an API (Flask, Node.js, etc.) to send booking data to Event Hubs.
### **Step 5: Set Up Azure Stream Analytics**
1.	Create a Stream Analytics job.
2.	Configure Event Hubs as input.
3.	Configure Azure SQL Database as output.
4.	Use the following query to insert data into SQL.
5.	Ensure Stream Analytics has permissions to write the input to SQL Database.	
### **Step 6: Verify Data Storage**
1.	Query the CabBookings table to check if data is being stored.
2.	SELECT * FROM CabBookings;
3.	Monitor Event Hubs and Stream Analytics logs for troubleshooting.


## **Schema of the Source Table (in Azure SQL Database)**
Since the source data is partitioned by date, we will process a single day’s data at a time. This makes it easier to scale and manage large volumes of data, especially with 5 million rides per day.

In [None]:
# Source Table Schema SQL
CREATE TABLE source_rides (
    rideId STRING,
    userId STRING,
    pickupLocation STRING,
    dropLocation STRING,
    timestamp TIMESTAMP,
    rideType STRING,
    fareAmount FLOAT,
    distanceKm FLOAT
)
PARTITION BY RANGE (CAST(timestamp AS DATE));

## **Processing One Date at a Time:**
We will read the ride data for a specific date using partitioning. PySpark will handle this efficiently on Azure Databricks.

## **PySpark Script on Azure Databricks**
We will create a PySpark script on Azure Databricks to process one date’s data from the source, generate the recommendations, and store the results in the Azure SQL Database.

PySpark Script for Azure Databricks:
Read the source data for a specific date (partitioned by date).
Add necessary columns (ride_day, ride_hour, ride_period).
Calculate the ride_count for each unique pickup-location → drop-location combination.
Rank the recommendations for each user based on the ride count.
Store the results in Azure SQL Database.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, dayofweek, hour, count, coalesce, lit, desc
from pyspark.sql.window import Window, rank
from datetime import datetime, timedelta

# Create Spark session
spark = SparkSession.builder.appName("RideRecommendationsProcessing").getOrCreate()

# Azure SQL Database Connection details
jdbcUrl = "jdbc:sqlserver://<server_name>.database.windows.net:1433;database=<database_name>"
connectionProperties = {
    "user": "<userName>",
    "password": "<password>",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Read new rides from source table
processingDate = datetime.now().date() - timedelta(days=1)
newRides = f"""select * from sourceTable where CAST(timestamp AS DATE) = '{processingDate}"""

# Read the source data partitioned by date (for a specific date)
sourceRidesData = spark.read.jdbc(url = jdbcUrl, table = f"({newRides}) as sourceData", properties = connectionProperties)

# Read the existing recommendations data
existingRecommendations = spark.read.jdbc(url = jdbcUrl, table = "recommendationTable", properties = connectionProperties)


# Create rideDay, rideHour, and ride_period columns
processedRidesData = sourceRidesData.withColumn("rideDay", dayofweek(col("timestamp")))\
                    .withColumn("rideHour", hour(col("timestamp")))\
                    .withColumn("ride_period", 
                                        when((col("rideDay") <= 5) & (col("rideHour").between(6, 10)), "Weekday Commute")
                                        .when((col("rideDay") <= 5) & (col("rideHour").between(17, 21)), "Weekday Commute")
                                        .when((col("rideDay") <= 5), "Weekday Routine")
                                        .when((col("rideHour").between(21, 3)), "Nightlife")
                                        .when((col("rideDay") >= 6) & (col("rideHour").between(10, 20)), "Weekend Leisure")
                                        .when((col("rideDay") >= 6) & (col("rideHour").between(16, 23)), "Weekend Social")
                                        .otherwise("Other"))

# Group by user_id, pickup_location, drop_location, ride_period and calculate ride_count
newRidesCount = processedRidesData.groupBy("userId", "pickupLocation", "dropLocation", "ridePeriod").agg(count("*").alias("newRideCount"))

# Combine new rides count with existing recommendations
combinedDf = existingRecommendations.join(newRidesCount, ["userId", "pickUpLocation", "dropLocation", "ridePeriod"], "full_outer").na.fill(0)

# Update ride counts
updatedDf = combinedDf.withColumn("ridesCount", coalesce(col("rideCount"), lit(0)) + col("newRideCount")).drop("newRideCount", "rank")

# Rank the recommendations based on rideCount
window_spec = Window.partitionBy("userId").orderBy(desc("rideCount"))
finalRecommendations = updatedDf.withColumn("rank", rank().over(window_spec)).filter(col("rank") <= 10)  # Get top 10 recommendations

# Save the result into the `recommendationTable` table in Azure SQL Database
finalRecommendations.write.jdbc(url=jdbcUrl, table="recommendationTable", mode="overwrite", properties=connectionProperties)


## **Key Elements in the Script:**
1. Reading Data for a Single Date:
> The data for a specific date (e.g., 2025-02-01) is read from Azure Data Lake Storage (ADLS) using the load method with the partition path for the specific date.
2. Creating the rideDay, rideHour, ridePeriod Columns:
> We use dayofweek and hour functions to extract the day and hour from the timestamp.
> The ridePeriod is created using the when clause to classify the ride into commute, leisure, nightlife, etc.
3. Calculating ride_count:
> We group the data by userId, pickupLocation, dropLocation, and ridePeriod, and then count the number of rides in each group.
4. Ranking Recommendations:
> rank() is used to rank the recommendations for each user, based on the rideCount (frequent rides get a higher rank).
> Only the top 10 recommendations are kept per user.
5. Saving to Azure SQL Database:
The processed and ranked recommendations are saved back to Azure SQL Database in the recommendationTable table using JDBC.


In [None]:
# Target Table Schema SQL
CREATE TABLE recommendationTable (
    userId STRING,                   -- Unique user identifier
    rank INT,                        -- Rank of recommendation (1 to 10)
    pickupLocation STRING,           -- derived pickup location
    dropLocation STRING,             -- Suggested drop location
    ridePeriod STRING,               -- Weekday Commute, Weekend Leisure, etc.
    ridesCount INT,                  -- Frequency of this particular ride
    PRIMARY KEY (userId, rank)
);


## **Fetching Relevant Recommendations for UI**
Once the recommendations table is populated, the UI will fetch recommendations based on the current location of the user and the current day/time.
1. UI captures the current location and time.
2. The backend fetches the top 10 recommendations based on the current location and filters out irrelevant recommendations (e.g., no commute rides on weekends).

In [None]:
# API Query for Fetching Recommendations:
SELECT pickupLocation, dropLocation 
FROM recommendationTable 
WHERE userId = 'U1' 
AND pickupLocation = 'Home'  # User's current location
AND ridePeriod != 'Weekday Commute'  # Exclude commute rides on weekends if it is a weekend currently( caculated through current day)
ORDER BY ridesCount DESC
LIMIT 10;


## **Final Workflow Recap:**
### **Data Ingestion:**
Data for a single date is read from Azure Data Lake into Azure Databricks.

### **Data Processing:**
The PySpark script processes the data to generate ride day, ride hour, ride period and calculates the top 10 recommendations for each user.

### **Storing Data:**
The processed data is saved into the user_recommendations table in Azure SQL Database.

### **Fetching Recommendations for UI:**
The UI captures the current location and time, and fetches relevant recommendations from the user_recommendations table.

This setup ensures that Azure Databricks is efficiently processing partitioned data, generating relevant recommendations for users, and storing them back into Azure SQL Database for fast retrieval.