In [2]:
pip install fastavro faker

Defaulting to user installation because normal site-packages is not writeable
Collecting fastavro
  Downloading fastavro-1.10.0-cp39-cp39-macosx_10_9_universal2.whl (1.0 MB)
[K     |████████████████████████████████| 1.0 MB 5.4 MB/s eta 0:00:01
[?25hCollecting faker
  Downloading Faker-36.1.1-py3-none-any.whl (1.9 MB)
[K     |████████████████████████████████| 1.9 MB 11.4 MB/s eta 0:00:01
Installing collected packages: fastavro, faker
Successfully installed faker-36.1.1 fastavro-1.10.0
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [1]:
import json
import random
import fastavro
from fastavro.schema import load_schema
from faker import Faker
from datetime import datetime, timedelta
from decimal import Decimal

In [14]:
fake = Faker()

# Define a unified schema with "Location" only once
full_schema = {
    "type": "record",
    "name": "RideHailingEvent",
    "namespace": "com.ridehailing",
    "fields": [
        {
            "name": "event_type",
            "type": {"type": "enum", "name": "EventType", "symbols": ["RideRequest", "RideStatus"]}
        },
        {
            "name": "ride_request",
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "RideRequest",
                    "fields": [
                        {"name": "passenger_id", "type": "string"},
                        {"name": "pickup_location", "type": {
                            "type": "record",
                            "name": "Location",
                            "fields": [
                                {"name": "latitude", "type": "float"},
                                {"name": "longitude", "type": "float"}
                            ]
                        }},
                        {"name": "dropoff_location", "type": "Location"},
                        {"name": "timestamp", "type": "string"},
                        {"name": "status", "type": {"type": "enum", "name": "Status", "symbols": ["Requested", "Canceled"]}},
                        {"name": "estimated_duration", "type": "int"},
                        {"name": "estimated_price", "type": "float"}
                    ]
                }
            ]
        },
        {
            "name": "ride_status",
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "RideStatus",
                    "fields": [
                        {"name": "ride_id", "type": "string"},
                        {"name": "driver_id", "type": "string"},
                        {"name": "passenger_id", "type": "string"},
                        {"name": "pickup_location", "type": "Location"},
                        {"name": "dropoff_location", "type": "Location"},
                        {"name": "timestamp", "type": "string"},
                        {"name": "status", "type": {"type": "enum", "name": "RideStatusEnum", "symbols": ["Accepted", "Ongoing", "Completed"]}},
                        {"name": "actual_duration", "type": "int"},
                        {"name": "final_price", "type": "float"}
                    ]
                }
            ]
        }
    ]
}

# Custom JSON encoder for Decimal
class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Decimal):
            return float(obj)
        return super(DecimalEncoder, self).default(obj)

# Function to generate a random ride request
def generate_ride_request():
    return {
        "event_type": "RideRequest",
        "ride_request": {
            "passenger_id": fake.uuid4(),
            "pickup_location": {"latitude": float(fake.latitude()), "longitude": float(fake.longitude())},
            "dropoff_location": {"latitude": float(fake.latitude()), "longitude": float(fake.longitude())},
            "timestamp": datetime.now().isoformat(),
            "status": random.choice(["Requested", "Canceled"]),
            "estimated_duration": random.randint(5, 60),
            "estimated_price": float(round(random.uniform(5.0, 50.0), 2))
        },
        "ride_status": None
    }

# Function to generate a random ride status
def generate_ride_status():
    return {
        "event_type": "RideStatus",
        "ride_request": None,
        "ride_status": {
            "ride_id": fake.uuid4(),
            "driver_id": fake.uuid4(),
            "passenger_id": fake.uuid4(),
            "pickup_location": {"latitude": float(fake.latitude()), "longitude": float(fake.longitude())},
            "dropoff_location": {"latitude": float(fake.latitude()), "longitude": float(fake.longitude())},
            "timestamp": datetime.now().isoformat(),
            "status": random.choice(["Accepted", "Ongoing", "Completed"]),
            "actual_duration": random.randint(5, 60),
            "final_price": float(round(random.uniform(5.0, 50.0), 2))
        }
    }

# Generate sample data
ride_requests = [generate_ride_request() for _ in range(5)]
ride_statuses = [generate_ride_status() for _ in range(5)]
all_events = ride_requests + ride_statuses

# Save JSON output
with open("ride_events.json", "w") as f:
    json.dump(all_events, f, indent=4, cls=DecimalEncoder)

# Save AVRO output
def save_avro(data, schema, filename):
    with open(filename, "wb") as out:
        fastavro.writer(out, schema, data)

save_avro(all_events, full_schema, "ride_events.avro")

print("✅ Successfully generated ride request and ride status data in JSON and AVRO formats.")

✅ Successfully generated ride request and ride status data in JSON and AVRO formats.


In [11]:
!git --version

256.91s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


git version 2.39.3 (Apple Git-146)


In [12]:
!git config --global user.name "VCAM101"
!git config --global user.email "varino.ieu2021@student.ie.edu"

295.91s - pydevd: Sending message related to process being replaced timed-out after 5 seconds
301.21s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


In [13]:
!pwd  # Shows your current directory
!ls   # Lists all files in the folder

328.02s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


/Users/valeriaarinomontero/Desktop/2nd Semester (4th Year)/STREAM ANALYTICS/GROUP PROJECTS/MILESTONE 1


333.31s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


MILESTONE1.ipynb
Stream Analytics Group Presentation Milestone 1.pdf
ride_events.avro
ride_events.json
