<h2 style='text-align: center;'>AI Lab 2 -  Capstone Project</h2>

<h3 style='text-align: center;'>Author - Aloy Banerjee</h3>
<h3 style='text-align: center;'>Roll No. CH22M503</h3>

**Part 1: Kafka**
> For this part, you need to complete the following tasks:

    1. Install and Set-up Kafka on the Google Colab Notebook
    2. Stream data from the given dataset and break the data into batches of 10 records that are written to Kafka, separated by a sleep time of 10 seconds until 100 records are written.
    3. Use the Kafka consumer to read every 5 seconds from the producer. (Note: The dataset you will use for this part is YELP-review.json.)

In [None]:
import os
from google.colab import drive

# Check if Google Drive is already mounted
if not os.path.exists('/content/drive'):
    # Mount Google Drive if it's not already mounted
    drive.mount('/content/drive')
else:
    print("Google Drive is already mounted.")



Google Drive is already mounted.


**Install Kafka and Zookeeper**

In [None]:
!curl -sSOL https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
!tar -xzf kafka_2.12-3.5.0.tgz

In [None]:
!echo "Starting ZooKeeper service..."
!./kafka_2.12-3.5.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.5.0/config/zookeeper.properties

!echo "Starting Kafka service..."
!./kafka_2.12-3.5.0/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.5.0/config/server.properties

!echo "Waiting for 10 secs until Kafka and ZooKeeper services are up and running..."

!sleep 10

!ps -ef | grep kafka

Starting ZooKeeper service...
Starting Kafka service...
Waiting for 10 secs until Kafka and ZooKeeper services are up and running...
root        5476       1  0 18:38 ?        00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/content/kafka_2.12-3.5.0/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/content/kafka_2.12-3.5.0/bin/../logs -Dlog4j.configuration=file:./kafka_2.12-3.5.0/bin/../config/log4j.properties -cp /content/kafka_2.12-3.5.0/bin/../libs/activation-1.1.1.jar:/content/kafka_2.12-3.5.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/content/kafka_2.12-3.5.0/bin/../libs/argparse4j-0.7.0.jar:/content/kafka_2.12-3.5.0/bin/../libs/audience-annotations-0.13.0.jar:/cont

**Run Kafka and Zookeeper in daemon mode on port 9092**

In [None]:
!./kafka_2.12-3.5.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic yelp_reviews

Error while executing topic command : Topic 'yelp_reviews' already exists.
[2023-08-08 19:00:28,791] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'yelp_reviews' already exists.
 (kafka.admin.TopicCommand$)


**Create new topics in kafka**

In [None]:
!./kafka_2.12-3.5.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic yelp_reviews

Topic: yelp_reviews	TopicId: GdXwg4MzSnm0gamhgwGisA	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: yelp_reviews	Partition: 0	Leader: 0	Replicas: 0	Isr: 0


**Install OpenJDK**

In [None]:
!echo "Installing OpenJDK 8 JDK..."
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Installing OpenJDK 8 JDK...


**Install Kafka's client**

In [None]:
!pip install kafka-python



**Importing Library**

In [None]:
import sys
import json
import time
import numpy as np
import pandas as pd
import random
from multiprocessing import Process
from pandas import Timestamp
from datetime import datetime
import threading
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError

**Common Variables**

In [None]:
# Define the Kafka topic
topic = "yelp_reviews"
# Define the google drive path
drive_link = '/content/drive/MyDrive/CapstoneProjectData/yelp_academic_dataset_review.json'


**Common Method**

In [None]:
def read_json_data(file_path, num_lines=None):
    """
    Reads data from a specified JSON file and converts it to a list of dictionaries.
    Each dictionary corresponds to a line in the JSON file. If num_lines is not specified,
    the function will attempt to read all lines in the file.

    Args:
        file_path (str): The path of the JSON file to read.
        num_lines (int, optional): The maximum number of lines to read from the file.
            Defaults to None, which means all lines will be read.

    Returns:
        list: A list of dictionaries representing the JSON data read from the file.

    Raises:
        IOError: If the file cannot be read.
        json.JSONDecodeError: If there is an error decoding the JSON data.
    """
    json_data = []
    try:
        with open(file_path, 'r') as f:
            for i, line in enumerate(f):
                if num_lines is not None and i >= num_lines:
                    break
                record = json.loads(line)
                json_data.append(record)
    except IOError:
        raise IOError(f"Could not read file at path: {file_path}")
    except json.JSONDecodeError:
        raise json.JSONDecodeError(f"Error decoding JSON data in file: {file_path}")

    return json_data

def send_to_kafka(producer, topic, data, iteration):
    """
    Sends data to Kafka topic using the specified producer.

    Parameters:
        producer (KafkaProducer): The Kafka producer instance.
        topic (str): The name of the Kafka topic to send data to.
        data (list): A list of dictionaries representing the data to send.
        iteration(int) : Iteration Number

    Returns:
        None
    """
    try:
        print(f'yelp data length : {len(data)}')
        for record in data:
            if 'timestamp' in record and isinstance(record['timestamp'], str):
                record['timestamp'] = datetime.strptime(record['timestamp'], "%Y-%m-%d %H:%M:%S")
            message = json.dumps(record).encode("utf-8")
            print(message)
            producer.send(topic, value=message)
        print(f"Iteration:{iteration} data sent successfully to Kafka.")
    except KafkaError as e:
        print(f"Error sending data to Kafka: {e}")

def consume_messages(consumer):
    """
    Consumes messages from Kafka in a separate thread.

    Parameters:
        consumer (KafkaConsumer): The Kafka consumer instance.

    Returns:
        None
    """
    try:
        while True:  # Run indefinitely to keep consuming messages
            for message in consumer:
                record = message.value
                print(record)
                # Perform further processing of the record here if needed
            time.sleep(5)  # Sleep for 5 seconds before reading more messages
    except Exception as e:
        print(f"Error occurred while consuming messages: {e}")
    finally:
        consumer.close()

def kafka_producer_call(drive_link, topic):
    """
    Establishes a Kafka producer, loads Yelp data, and sends it to Kafka in batches.

    Args:
        drive_link (str): Path to the Yelp data JSON file.
        topic (str): Kafka topic to which data will be sent.
    """
    # Create a Kafka producer
    producer = KafkaProducer(bootstrap_servers="localhost:9092")
    print('Producer sending data:')
    try:
        # Load the Yelp data and select randomly 1000 records for sending to Kafka
        yelp_reviews_data = read_json_data(drive_link, num_lines=1000)
        num_lines_to_select = 100
        yelp_reviews_data_ops = random.sample(yelp_reviews_data, num_lines_to_select)
        # Send data to Kafka in batches of 10 records with a 10-second sleep time between batches
        batch_size = 10
        for i in range(0, len(yelp_reviews_data_ops), batch_size):
            batch = yelp_reviews_data_ops[i:i + batch_size]
            iteration = (i // batch_size) + 1
            send_to_kafka(producer, topic, batch, iteration)
            print('Producer sleeps for 10 seconds:')
            time.sleep(10)

    except Exception as e:
        print(f"Error: {e}")
    finally:
        producer.close()

def kafka_consumer_call(topic):
    """
    Establishes a Kafka consumer, starts consuming messages in a separate thread, and handles interruption gracefully.

    Args:
        topic (str): Kafka topic from which messages will be consumed.
    """
    print('Consumer receving data:')

    # Create a Kafka consumer
    consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9092', auto_offset_reset='earliest')

    # Start consuming messages in a separate thread
    consumer_thread = threading.Thread(target=consume_messages, args=(consumer,))
    consumer_thread.start()

    try:
        # Main thread sleeps for 30 seconds to allow message consumption
        time.sleep(30)
    except KeyboardInterrupt:
        print("Received keyboard interrupt. Stopping consumer gracefully.")
    finally:
        # Set a flag to stop the consumer thread gracefully
        consumer.close()
        consumer_thread.join()

    print("Exiting the main thread.")



### **Kafka Producer** & **Kafka Consumer**

**Calling Kafka Producer**

In [None]:
kafka_producer_call(drive_link,topic)

Producer sending data:
yelp data length : 10
b'{"review_id": "27CYrDqJIUjbOLNeKTfjDg", "user_id": "k3Q0ZcVuMdvA_9iDcSDWIA", "business_id": "pym7c6ZFEtmoH16xN2ApBg", "stars": 4.0, "useful": 0, "funny": 0, "cool": 0, "text": "It seems to me that whenever we have out of towner guests, our go to place is consistently Katie\'s. The variety of food wonderfully prepared is 2nd to none. Reasonably priced and delicious, Katie\'s never disappoints. Whether you choose the grilled ersters, the shrimp and grits, the Cuban sandwich, pizza, or soup & salad you will be very pleased and have a great meal. Daily specials hi-lite local seasonal offerings that keep things fresh. The service staff are warm and friendly. On Sunday\'s, you will want to time your arrival before church let\'s out or you will have a bit of a wait. That said, you can always manage while enjoying bottomless Mimosas!", "date": "2016-06-22 20:10:18"}'
b'{"review_id": "8N3IPLBJ5MH74NKfU-yayw", "user_id": "PEAKaHUmII5hoOBNcRgdCw", "b

**Calling Kafka Consumer**

In [None]:
kafka_consumer_call(topic)



Consumer receving data:
Consumer sleeps for 10 seconds:
b'{"review_id": "eBrVkzHhHKZxyVS1SPKbJw", "user_id": "1PJ-RjMqXHeymSkLRzuIdA", "business_id": "nOyphAl0JQ8JrvNi93pclQ", "stars": 2.0, "useful": 3, "funny": 1, "cool": 1, "text": "So I no longer go here which is a shame because The eyebrow waxing here is fantastic. I am giving this 2 stars because when I have contacted for an appointment via the phone, it usually has taken days for me to get a response. The online booking is useless because I rarely have found appointments within 2 weeks of when I need it (and I\'m very flexible with time of day). \\n\\nI give up on this place. Unless you book out weeks in advance, you will not get an appointment. I\'m now paying $10 more elsewhere which sucks but I\'m able to get seen when I need to be seen.", "date": "2014-10-28 13:58:00"}'
b'{"review_id": "aAcQibR3zWOvk4atbCM3SA", "user_id": "7P9w2PrP4ZcJyDFwch51Ig", "business_id": "Zi-F-YvyVOK0k5QD7lrLOg", "stars": 5.0, "useful": 0, "funny": 0,

ERROR:kafka.consumer.fetcher:Fetch to node 0 failed: Cancelled: <BrokerConnection node_id=0 host=82280299e0af:9092 <connected> [IPv4 ('172.28.0.12', 9092)]>


KeyboardInterrupt: ignored



**Part 2: Pyspark Streaming**

> Consider a scenario where you are a data engineer working for a ride-sharing company called "PyRides." The company has a fleet of drivers who continuously send GPS location data and trip information during their shifts. As a data engineer, you process this real-time streaming data and gain insights into driver performance.

> **The data stream contains the following fields:**

        1.   **driver_id:** The unique identifier of the driver.
        2.   **timestamp:** The timestamp of the GPS location or trip event.
        3.   **latitude:** The latitude of the driver's location.
        4.   **longitude:** The longitude of the driver's location.
        5.   **trip_distance:** The distance covered by the driver during the trip (if it's a trip event).

> **Task 1: Count of Unique Drivers**
Your task is to implement a PySpark Streaming code to calculate the count of unique drivers within a sliding window of 10 minutes, updated every 5 minutes. Display the results for each window update.

> **Task 2: Average Trip Duration**
Your task is to implement a PySpark Streaming code to calculate the average trip duration for each driver within a tumbling window of 15 minutes. Display the results for each window update.

> **Task 3: Idle Time Detection**
Your task is to implement a PySpark Streaming code to detect idle time for each driver using session windows. Consider it an idle session if the driver's location remains unchanged for more than 30 minutes. Display the start and end timestamps of each idle session.

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=3fdf10353a7bd3bf02f04bee3dbd8136e076181281709c8e392eb3954c2ef711
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


**Importing Libraries**

In [2]:
import os
import warnings
import threading
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.functions import * # window, col, countDistinct, avg, lag, when, approx_count_distinct, sum, min, max,from_unixtime, unix_timestamp,expr
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, LongType
from pyspark.sql.window import Window
from pyspark.streaming import StreamingContext
from pyspark import SparkConf, SparkContext

**Mounting Google Drive**

In [3]:
# Check if Google Drive is already mounted
if not os.path.exists('/content/drive'):
    # Mount Google Drive if it's not already mounted
    drive.mount('/content/drive')
else:
    print("Google Drive is already mounted.")

Mounted at /content/drive


**Common Variable Declaration**

In [4]:
# Create a SparkSession
spark_session = SparkSession.builder.master("local").appName("PyRidesDriverPerformance").config('spark.ui.port', '4050').getOrCreate()
# File path to the JSON data
file_path = '/content/drive/MyDrive/CapstoneProjectData/'
# Define the sliding window duration and slide duration
window_duration_part1, slide_duration_part1 = "10 minutes", "5 minutes"
# Define the tumbling window of 15 minutes
window_duration_part2 = "15 minutes"
# Define the session window of 30 minutes
session_gap_duration = "30 minutes"
# Define a threshold for idle session detection (30 minutes in seconds)
idle_threshold_seconds = 1800
# Define the schema for the streaming data
schema = StructType([
    StructField("driver_id", StringType(), nullable=False),
    StructField("timestamp", TimestampType(), nullable=False),
    StructField("latitude", DoubleType(), nullable=False),
    StructField("longitude", DoubleType(), nullable=False),
    StructField("trip_distance", DoubleType(), nullable=True),
    StructField("event_type", StringType(), nullable=True)  # Set nullable=True if trip_distance can be missing
])

**Loading the streaming data**

In [5]:
# Read the JSON file into a DataFrame using the specified schema.
ride_information_stream = spark_session.readStream.format("json").option('multiline', True).schema(schema).json(file_path)
# Display the schema of the DataFrame.
ride_information_stream.printSchema()
# Select all columns from the streaming DataFrame.
ride_information_dataframe = ride_information_stream.select("*")
# Start a streaming query to write the DataFrame into a memory sink.The query is named "ride_information_using_stream" and operates in append mode.The data is processed every 5 seconds as specified by the 'trigger' parameter.
ride_information_query = ride_information_dataframe.writeStream.format("memory").outputMode("append").queryName("ride_information_using_stream").trigger(processingTime='5 seconds').start()

root
 |-- driver_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- event_type: string (nullable = true)



In [6]:
# Retrieve data from the "ride_information_using_stream" temporary view using SQL query
ride_information = spark_session.sql("select * from ride_information_using_stream")

In [8]:
# Display the data
ride_information.show()

+---------+-------------------+------------------+-------------------+------------------+----------+
|driver_id|          timestamp|          latitude|          longitude|     trip_distance|event_type|
+---------+-------------------+------------------+-------------------+------------------+----------+
|     null|               null|              null|               null|              null|      null|
|     D001|2023-07-30 05:55:48|37.538849579360246|-121.22748988885533| 3.423076573047031|      Trip|
|     D001|2023-07-30 07:10:08| 37.93476356947273| -121.0215678805398| 5.072880200441588|      Trip|
|     D001|2023-07-30 07:11:01|37.795197133875064|  -121.920222434928|              null|       GPS|
|     D001|2023-07-30 05:20:21| 37.46871545044007|-121.05888928225791|1.5685949947600415|      Trip|
|     D001|2023-07-30 05:27:29| 37.67930257604861| -121.7174686646489|              null|       GPS|
|     D001|2023-07-30 06:20:49| 37.57858135955766|-121.45726820764473|              null|  

**Task 1: Count of Unique Drivers**

In [9]:
# Sorting the ride information DataFrame by 'driver_id' and 'timestamp'
ride_information = ride_information.orderBy('driver_id', 'timestamp')
# Grouping the ride information DataFrame by time windows and calculating the count of unique drivers within each window
unique_drivers_count = ride_information.groupBy(
    window(col("timestamp"), window_duration_part1, slide_duration_part1)
).agg(countDistinct("driver_id").alias("unique_drivers_count"))

In [10]:
# Print a descriptive message indicating the task being performed
print('Task 1: Count of Unique Drivers : ')
# Perform ordering of the DataFrame containing the count of unique drivers using the 'window' column.
ordered_unique_drivers_data = unique_drivers_count.orderBy('window')
# Display the ordered DataFrame containing the count of unique drivers
ordered_unique_drivers_data.show(ordered_unique_drivers_data.count(), False)

Task 1: Count of Unique Drivers : 
+------------------------------------------+--------------------+
|window                                    |unique_drivers_count|
+------------------------------------------+--------------------+
|{2023-07-30 05:05:00, 2023-07-30 05:15:00}|47                  |
|{2023-07-30 05:10:00, 2023-07-30 05:20:00}|50                  |
|{2023-07-30 05:15:00, 2023-07-30 05:25:00}|50                  |
|{2023-07-30 05:20:00, 2023-07-30 05:30:00}|50                  |
|{2023-07-30 05:25:00, 2023-07-30 05:35:00}|50                  |
|{2023-07-30 05:30:00, 2023-07-30 05:40:00}|50                  |
|{2023-07-30 05:35:00, 2023-07-30 05:45:00}|50                  |
|{2023-07-30 05:40:00, 2023-07-30 05:50:00}|50                  |
|{2023-07-30 05:45:00, 2023-07-30 05:55:00}|49                  |
|{2023-07-30 05:50:00, 2023-07-30 06:00:00}|50                  |
|{2023-07-30 05:55:00, 2023-07-30 06:05:00}|50                  |
|{2023-07-30 06:00:00, 2023-07-30 06:10:0

**Task 2: Average Trip Duration**

In [11]:
# Selecting specific columns "driver_id", "timestamp", and "event_type" from the ride_information DataFrame
eventwise_ride = ride_information.select("driver_id", "timestamp", "event_type")
# Dropping rows where all columns have null values
eventwise_ride = eventwise_ride.dropna(how="all")

In [12]:
# Define a Window specification that partitions the data by 'driver_id' and orders it by 'timestamp'
windows_specs = Window.partitionBy('driver_id').orderBy('timestamp')
# Add a new column 'prev_timestamp' that holds the timestamp of the previous event for the same driver
eventwise_data_within_window = eventwise_ride.withColumn("prev_timestamp", lag('timestamp').over(windows_specs))
# Calculate the duration between the current event's timestamp and the previous event's timestamp
eventwise_data_within_window = eventwise_data_within_window.withColumn(
    "event_duration", (col("timestamp").cast('long') - col('prev_timestamp').cast('long'))
)

In [13]:
# Display the DataFrame containing trip data within the specified window
eventwise_data_within_window.show()

+---------+-------------------+----------+-------------------+--------------+
|driver_id|          timestamp|event_type|     prev_timestamp|event_duration|
+---------+-------------------+----------+-------------------+--------------+
|     D001|2023-07-30 05:12:56|      Trip|               null|          null|
|     D001|2023-07-30 05:15:57|       GPS|2023-07-30 05:12:56|           181|
|     D001|2023-07-30 05:16:08|      Trip|2023-07-30 05:15:57|            11|
|     D001|2023-07-30 05:17:46|      Trip|2023-07-30 05:16:08|            98|
|     D001|2023-07-30 05:17:51|      Trip|2023-07-30 05:17:46|             5|
|     D001|2023-07-30 05:20:21|      Trip|2023-07-30 05:17:51|           150|
|     D001|2023-07-30 05:20:57|      Trip|2023-07-30 05:20:21|            36|
|     D001|2023-07-30 05:21:07|       GPS|2023-07-30 05:20:57|            10|
|     D001|2023-07-30 05:23:03|      Trip|2023-07-30 05:21:07|           116|
|     D001|2023-07-30 05:23:27|       GPS|2023-07-30 05:23:03|  

In [16]:
# Define a window specification for ordering by driver_id
window_spec_idle_duration = Window.orderBy(col("driver_id"))
# Calculate the actual event duration by lagging the event_duration column by -1 (previous row)
eventwise_data_within_window = eventwise_data_within_window.withColumn(
    "event_duration_actual", lag(col("event_duration"), -1).over(window_spec_idle_duration)
)
# Drop the unused column
eventwise_data_within_window = eventwise_data_within_window.drop('event_duration')
# Show the DataFrame with the calculated actual event duration
eventwise_data_within_window.show()

+---------+-------------------+----------+-------------------+---------------------+
|driver_id|          timestamp|event_type|     prev_timestamp|event_duration_actual|
+---------+-------------------+----------+-------------------+---------------------+
|     D001|2023-07-30 05:12:56|      Trip|               null|                  181|
|     D001|2023-07-30 05:15:57|       GPS|2023-07-30 05:12:56|                   11|
|     D001|2023-07-30 05:16:08|      Trip|2023-07-30 05:15:57|                   98|
|     D001|2023-07-30 05:17:46|      Trip|2023-07-30 05:16:08|                    5|
|     D001|2023-07-30 05:17:51|      Trip|2023-07-30 05:17:46|                  150|
|     D001|2023-07-30 05:20:21|      Trip|2023-07-30 05:17:51|                   36|
|     D001|2023-07-30 05:20:57|      Trip|2023-07-30 05:20:21|                   10|
|     D001|2023-07-30 05:21:07|       GPS|2023-07-30 05:20:57|                  116|
|     D001|2023-07-30 05:23:03|      Trip|2023-07-30 05:21:07|   

In [17]:
# Filtering the eventwise_ride DataFrame to keep only rows where the "event_type" column is "Trip"
only_trip_data = eventwise_data_within_window.filter(eventwise_ride.event_type == "Trip")
# Displaying the resulting DataFrame with only "Trip" event type data
only_trip_data.show()

+---------+-------------------+----------+-------------------+---------------------+
|driver_id|          timestamp|event_type|     prev_timestamp|event_duration_actual|
+---------+-------------------+----------+-------------------+---------------------+
|     D001|2023-07-30 05:12:56|      Trip|               null|                  181|
|     D001|2023-07-30 05:16:08|      Trip|2023-07-30 05:15:57|                   98|
|     D001|2023-07-30 05:17:46|      Trip|2023-07-30 05:16:08|                    5|
|     D001|2023-07-30 05:17:51|      Trip|2023-07-30 05:17:46|                  150|
|     D001|2023-07-30 05:20:21|      Trip|2023-07-30 05:17:51|                   36|
|     D001|2023-07-30 05:20:57|      Trip|2023-07-30 05:20:21|                   10|
|     D001|2023-07-30 05:23:03|      Trip|2023-07-30 05:21:07|                   24|
|     D001|2023-07-30 05:29:41|      Trip|2023-07-30 05:27:29|                   15|
|     D001|2023-07-30 05:29:56|      Trip|2023-07-30 05:29:41|   

In [18]:
# Calculate the average trip duration within the specified window for each driver
average_trip_duration = only_trip_data.groupBy('driver_id', window('timestamp', window_duration_part2)).agg(round(avg('event_duration_actual')/60,2).alias('average trip duration'))
# Filter out rows where the calculated average trip duration is not null
average_trip_duration = average_trip_duration.filter(col('average trip duration').isNotNull())

In [19]:
# Display the calculated average trip duration along with the count of rows in the DataFrame.
average_trip_duration.show(average_trip_duration.count(), truncate=False)

+---------+------------------------------------------+---------------------+
|driver_id|window                                    |average trip duration|
+---------+------------------------------------------+---------------------+
|D001     |{2023-07-30 05:00:00, 2023-07-30 05:15:00}|3.02                 |
|D001     |{2023-07-30 05:15:00, 2023-07-30 05:30:00}|0.94                 |
|D001     |{2023-07-30 05:30:00, 2023-07-30 05:45:00}|0.9                  |
|D001     |{2023-07-30 05:45:00, 2023-07-30 06:00:00}|0.65                 |
|D001     |{2023-07-30 06:00:00, 2023-07-30 06:15:00}|1.09                 |
|D001     |{2023-07-30 06:15:00, 2023-07-30 06:30:00}|1.53                 |
|D001     |{2023-07-30 06:30:00, 2023-07-30 06:45:00}|1.18                 |
|D001     |{2023-07-30 06:45:00, 2023-07-30 07:00:00}|1.04                 |
|D001     |{2023-07-30 07:00:00, 2023-07-30 07:15:00}|1.24                 |
|D002     |{2023-07-30 05:00:00, 2023-07-30 05:15:00}|0.43                 |

**Task 3: Idle Time Detection**

In [20]:
# Select specific columns "driver_id", "timestamp", and "event_type" from the ride_information DataFrame.
eventwise_ride = ride_information.select("driver_id", "timestamp", "event_type")
# Drop rows where all columns have missing (null) values.
eventwise_ride = eventwise_ride.dropna(how="all")
# Display the resulting DataFrame "eventwise_ride" to inspect the data.
eventwise_ride.show()

+---------+-------------------+----------+
|driver_id|          timestamp|event_type|
+---------+-------------------+----------+
|     D001|2023-07-30 05:12:56|      Trip|
|     D001|2023-07-30 05:15:57|       GPS|
|     D001|2023-07-30 05:16:08|      Trip|
|     D001|2023-07-30 05:17:46|      Trip|
|     D001|2023-07-30 05:17:51|      Trip|
|     D001|2023-07-30 05:20:21|      Trip|
|     D001|2023-07-30 05:20:57|      Trip|
|     D001|2023-07-30 05:21:07|       GPS|
|     D001|2023-07-30 05:23:03|      Trip|
|     D001|2023-07-30 05:23:27|       GPS|
|     D001|2023-07-30 05:27:29|       GPS|
|     D001|2023-07-30 05:29:41|      Trip|
|     D001|2023-07-30 05:29:56|      Trip|
|     D001|2023-07-30 05:31:50|       GPS|
|     D001|2023-07-30 05:32:08|       GPS|
|     D001|2023-07-30 05:32:20|      Trip|
|     D001|2023-07-30 05:32:34|      Trip|
|     D001|2023-07-30 05:34:54|      Trip|
|     D001|2023-07-30 05:36:21|       GPS|
|     D001|2023-07-30 05:38:09|       GPS|
+---------+

In [21]:
# Define a window specification for partitioning the data by driver_id and ordering by timestamp.
windows_specs = Window.partitionBy('driver_id').orderBy('timestamp')
# Create a new DataFrame 'eventwise_ride_within_window' by adding a new column 'prev_timestamp' which contains the timestamp of the previous row within the same partition.
eventwise_ride_within_window = eventwise_ride.withColumn("prev_timestamp", lag('timestamp').over(windows_specs))

In [22]:
# Display the contents of the DataFrame "eventwise_ride_within_window"
eventwise_ride_within_window.show()

+---------+-------------------+----------+-------------------+
|driver_id|          timestamp|event_type|     prev_timestamp|
+---------+-------------------+----------+-------------------+
|     D001|2023-07-30 05:12:56|      Trip|               null|
|     D001|2023-07-30 05:15:57|       GPS|2023-07-30 05:12:56|
|     D001|2023-07-30 05:16:08|      Trip|2023-07-30 05:15:57|
|     D001|2023-07-30 05:17:46|      Trip|2023-07-30 05:16:08|
|     D001|2023-07-30 05:17:51|      Trip|2023-07-30 05:17:46|
|     D001|2023-07-30 05:20:21|      Trip|2023-07-30 05:17:51|
|     D001|2023-07-30 05:20:57|      Trip|2023-07-30 05:20:21|
|     D001|2023-07-30 05:21:07|       GPS|2023-07-30 05:20:57|
|     D001|2023-07-30 05:23:03|      Trip|2023-07-30 05:21:07|
|     D001|2023-07-30 05:23:27|       GPS|2023-07-30 05:23:03|
|     D001|2023-07-30 05:27:29|       GPS|2023-07-30 05:23:27|
|     D001|2023-07-30 05:29:41|      Trip|2023-07-30 05:27:29|
|     D001|2023-07-30 05:29:56|      Trip|2023-07-30 05

In [23]:
# Calculate the duration of each event within the specified window
eventwise_ride_details_with_idle_duration = eventwise_ride_within_window.withColumn(
    "event_duration",
    (col("timestamp").cast('long') - col('prev_timestamp').cast('long'))
)

In [24]:
# Display the DataFrame containing ride details along with calculated idle durations
eventwise_ride_details_with_idle_duration.show()

+---------+-------------------+----------+-------------------+--------------+
|driver_id|          timestamp|event_type|     prev_timestamp|event_duration|
+---------+-------------------+----------+-------------------+--------------+
|     D001|2023-07-30 05:12:56|      Trip|               null|          null|
|     D001|2023-07-30 05:15:57|       GPS|2023-07-30 05:12:56|           181|
|     D001|2023-07-30 05:16:08|      Trip|2023-07-30 05:15:57|            11|
|     D001|2023-07-30 05:17:46|      Trip|2023-07-30 05:16:08|            98|
|     D001|2023-07-30 05:17:51|      Trip|2023-07-30 05:17:46|             5|
|     D001|2023-07-30 05:20:21|      Trip|2023-07-30 05:17:51|           150|
|     D001|2023-07-30 05:20:57|      Trip|2023-07-30 05:20:21|            36|
|     D001|2023-07-30 05:21:07|       GPS|2023-07-30 05:20:57|            10|
|     D001|2023-07-30 05:23:03|      Trip|2023-07-30 05:21:07|           116|
|     D001|2023-07-30 05:23:27|       GPS|2023-07-30 05:23:03|  

In [26]:
# Create a window specification for ordering by the "driver_id" column
window_spec_idle_duration = Window.orderBy(col("driver_id"))
# Add a new column "event_duration_actual" to the DataFrame
# It calculates the lag of the "event_duration" column with an offset of -1 using the window specification defined above
# This effectively moves the "event_duration" value one row up in the DataFrame for each driver
eventwise_ride_details_with_idle_duration_new = eventwise_ride_details_with_idle_duration.withColumn(
    "event_duration_actual", lag(col("event_duration"), -1).over(window_spec_idle_duration)
)
# Drop the unused column
eventwise_ride_details_with_idle_duration_new = eventwise_ride_details_with_idle_duration_new.drop('event_duration')
# Display the resulting DataFrame with the added column
# The "count()" function is used to show all rows in the DataFrame without truncation

eventwise_ride_details_with_idle_duration_new.show(eventwise_ride_details_with_idle_duration_new.count(), truncate=False)

+---------+-------------------+----------+-------------------+---------------------+
|driver_id|timestamp          |event_type|prev_timestamp     |event_duration_actual|
+---------+-------------------+----------+-------------------+---------------------+
|D001     |2023-07-30 05:12:56|Trip      |null               |181                  |
|D001     |2023-07-30 05:15:57|GPS       |2023-07-30 05:12:56|11                   |
|D001     |2023-07-30 05:16:08|Trip      |2023-07-30 05:15:57|98                   |
|D001     |2023-07-30 05:17:46|Trip      |2023-07-30 05:16:08|5                    |
|D001     |2023-07-30 05:17:51|Trip      |2023-07-30 05:17:46|150                  |
|D001     |2023-07-30 05:20:21|Trip      |2023-07-30 05:17:51|36                   |
|D001     |2023-07-30 05:20:57|Trip      |2023-07-30 05:20:21|10                   |
|D001     |2023-07-30 05:21:07|GPS       |2023-07-30 05:20:57|116                  |
|D001     |2023-07-30 05:23:03|Trip      |2023-07-30 05:21:07|24 

In [27]:
# Filter the DataFrame to retain only rows with the "GPS" event type
only_GPS_data = eventwise_ride_details_with_idle_duration_new.filter(eventwise_ride_details_with_idle_duration_new.event_type == "GPS")
# Display the content of the filtered DataFrame along with its total row count
only_GPS_data.show(only_GPS_data.count())

+---------+-------------------+----------+-------------------+---------------------+
|driver_id|          timestamp|event_type|     prev_timestamp|event_duration_actual|
+---------+-------------------+----------+-------------------+---------------------+
|     D001|2023-07-30 05:15:57|       GPS|2023-07-30 05:12:56|                   11|
|     D001|2023-07-30 05:21:07|       GPS|2023-07-30 05:20:57|                  116|
|     D001|2023-07-30 05:23:27|       GPS|2023-07-30 05:23:03|                  242|
|     D001|2023-07-30 05:27:29|       GPS|2023-07-30 05:23:27|                  132|
|     D001|2023-07-30 05:31:50|       GPS|2023-07-30 05:29:56|                   18|
|     D001|2023-07-30 05:32:08|       GPS|2023-07-30 05:31:50|                   12|
|     D001|2023-07-30 05:36:21|       GPS|2023-07-30 05:34:54|                  108|
|     D001|2023-07-30 05:38:09|       GPS|2023-07-30 05:36:21|                   44|
|     D001|2023-07-30 05:38:53|       GPS|2023-07-30 05:38:09|   

In [28]:
# Identify idle sessions
idle_sessions = only_GPS_data.withColumn("Idle Duration Flag", when(col("event_duration_actual") > idle_threshold_seconds, 1).otherwise(0))
idle_sessions.show()

+---------+-------------------+----------+-------------------+---------------------+------------------+
|driver_id|          timestamp|event_type|     prev_timestamp|event_duration_actual|Idle Duration Flag|
+---------+-------------------+----------+-------------------+---------------------+------------------+
|     D001|2023-07-30 05:15:57|       GPS|2023-07-30 05:12:56|                   11|                 0|
|     D001|2023-07-30 05:21:07|       GPS|2023-07-30 05:20:57|                  116|                 0|
|     D001|2023-07-30 05:23:27|       GPS|2023-07-30 05:23:03|                  242|                 0|
|     D001|2023-07-30 05:27:29|       GPS|2023-07-30 05:23:27|                  132|                 0|
|     D001|2023-07-30 05:31:50|       GPS|2023-07-30 05:29:56|                   18|                 0|
|     D001|2023-07-30 05:32:08|       GPS|2023-07-30 05:31:50|                   12|                 0|
|     D001|2023-07-30 05:36:21|       GPS|2023-07-30 05:34:54|  

In [29]:
# Print a descriptive message indicating the purpose of the following code block
print('Driver idle for 30 minutes')
# Filter the DataFrame 'idle_sessions' to retain rows where the "Idle Duration Flag" column is greater than 0,This filters out rows representing periods of driver activity and keeps only rows corresponding to idle sessions
idle_sessions = idle_sessions.filter(col("Idle Duration Flag") > 0)
# Display the filtered DataFrame 'idle_sessions' using the 'show' method
idle_sessions.show(idle_sessions.count(), truncate=False)

Driver idle for 30 minutes
+---------+---------+----------+--------------+---------------------+------------------+
|driver_id|timestamp|event_type|prev_timestamp|event_duration_actual|Idle Duration Flag|
+---------+---------+----------+--------------+---------------------+------------------+
+---------+---------+----------+--------------+---------------------+------------------+



In [30]:
# Group the DataFrame 'only_GPS_data' by the 'driver_id' column, Calculate the maximum value of the 'event_duration_actual' column for each driver group, Convert the maximum duration from seconds to minutes using the round function with 2 decimal places
# Rename the calculated column to 'Max Idle Duration' using the alias function
driver_wise_max_idle_time = only_GPS_data.groupBy(col('driver_id')).agg(round(max(col('event_duration_actual'))/60,2).alias('Max Idle Duration'))
# Display the results of the 'driver_wise_max_idle_time' DataFrame
driver_wise_max_idle_time.show(driver_wise_max_idle_time.count())

+---------+-----------------+
|driver_id|Max Idle Duration|
+---------+-----------------+
|     D001|              6.7|
|     D002|             4.97|
|     D003|             6.37|
|     D004|             6.17|
|     D005|             4.02|
|     D006|             3.17|
|     D007|             4.65|
|     D008|             5.73|
|     D009|             5.88|
|     D010|             7.17|
|     D011|             4.32|
|     D012|             4.13|
|     D013|              5.4|
|     D014|             3.48|
|     D015|             4.82|
|     D016|              6.8|
|     D017|             4.43|
|     D018|             6.57|
|     D019|             5.53|
|     D020|             4.78|
|     D021|              4.8|
|     D022|              5.5|
|     D023|             6.27|
|     D024|             5.27|
|     D025|             4.72|
|     D026|             4.12|
|     D027|              4.3|
|     D028|              4.7|
|     D029|             4.98|
|     D030|             4.58|
|     D031