INSTALLATION AND IMPORTING LIBRARIES NEEDED

In [1]:
pip install fastavro faker

Collecting fastavro
  Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting faker
  Downloading faker-36.2.2-py3-none-any.whl.metadata (15 kB)
Downloading fastavro-1.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m21.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading faker-36.2.2-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m49.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fastavro, faker
Successfully installed faker-36.2.2 fastavro-1.10.0


In [2]:
import json
import random
import fastavro
from fastavro.schema import load_schema
from faker import Faker
from datetime import datetime, timedelta
from decimal import Decimal

CODE FOR DATA GENERATION

In [4]:
fake = Faker() # we initialize faker for data generation

full_schema = { #definition of a unified schema (Location only once)
    "type": "record",
    "name": "RideHailingEvent",
    "namespace": "com.ridehailing",
    "fields": [
        {
            "name": "event_type",
            "type": {"type": "enum", "name": "EventType", "symbols": ["RideRequest", "RideStatus"]}
        },
        {
            "name": "ride_request", # Defining the Passenger Ride Request Schema
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "RideRequest",
                    "fields": [
                        {"name": "passenger_id", "type": "string"}, #passengerID: Unique ID for each passenger
                        {"name": "pickup_location", "type": { #pick up location using Location schema
                            "type": "record",
                            "name": "Location",
                            "fields": [
                                {"name": "latitude", "type": "float"}, #latitude is part of location schema
                                {"name": "longitude", "type": "float"} #longitude is part of location shema
                            ]
                        }},
                        {"name": "dropoff_location", "type": "Location"}, #dropoff location using Location shema
                        {"name": "timestamp", "type": "string"}, #timestamp is when the request was made
                        {"name": "status", "type": {"type": "enum", "name": "Status", "symbols": ["Requested", "Canceled"]}}, #status can be either requested or canceled
                        {"name": "estimated_duration", "type": "int"}, #predicted ride time in minutes
                        {"name": "estimated_price", "type": "float"} #estimated price of service in dollars
                    ]
                }
            ]
        },
        {
            "name": "ride_status", #Defining the Ride Status Schema
            "type": [
                "null",
                {
                    "type": "record",
                    "name": "RideStatus",
                    "fields": [
                        {"name": "ride_id", "type": "string"}, #unique ride identifier
                        {"name": "driver_id", "type": "string"}, #unique driver identifier
                        {"name": "passenger_id", "type": "string"}, #unique passenger identifier
                        {"name": "pickup_location", "type": "Location"}, # pick up location using previously defined Location schema
                        {"name": "dropoff_location", "type": "Location"}, # drop off location using previously defined Location schema
                        {"name": "timestamp", "type": "string"}, # when status updates occur
                        {"name": "status", "type": {"type": "enum", "name": "RideStatusEnum", "symbols": ["Accepted", "Ongoing", "Completed"]}}, # status which can be either accepted, ongoing or completed
                        {"name": "actual_duration", "type": "int"}, #actual time taken in ,minutes
                        {"name": "final_price", "type": "float"} #final price for service amount
                    ]
                }
            ]
        }
    ]
}

# Custom JSON encoder that ensures decimal values are properly converted into float values before saving them in JSON format
class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Decimal):
            return float(obj)
        return super(DecimalEncoder, self).default(obj)

def generateRideRequest():
    base_time = fake.date_time_this_month()
    return {
        "event_type": "RideRequest",
        "ride_request": {
            "passenger_id": fake.uuid4(),
            "pickup_location": {"latitude": float(fake.latitude()), "longitude": float(fake.longitude())},
            "dropoff_location": {"latitude": float(fake.latitude()), "longitude": float(fake.longitude())},
            "timestamp": base_time.isoformat(),
            "status": random.choice(["Requested", "Canceled"]),
            "estimated_duration": random.randint(5, 60),
            "estimated_price": float(round(random.uniform(5.0, 50.0), 2))
        },
        "ride_status": None
    }

def generateRideStatus(related_request=None):
    base_time = related_request["ride_request"]["timestamp"] if related_request else fake.date_time_this_month().isoformat()
    delay = timedelta(minutes=random.randint(5, 30))
    return {
        "event_type": "RideStatus",
        "ride_request": None,
        "ride_status": {
            "ride_id": fake.uuid4(),
            "driver_id": fake.uuid4(),
            "passenger_id": related_request["ride_request"]["passenger_id"] if related_request else fake.uuid4(),
            "pickup_location": related_request["ride_request"]["pickup_location"] if related_request else {"latitude": float(fake.latitude()), "longitude": float(fake.longitude())},
            "dropoff_location": related_request["ride_request"]["dropoff_location"] if related_request else {"latitude": float(fake.latitude()), "longitude": float(fake.longitude())},
            "timestamp": (datetime.fromisoformat(base_time) + delay).isoformat(),
            "status": random.choice(["Accepted", "Ongoing", "Completed"]),
            "actual_duration": random.randint(5, 60),
            "final_price": float(round(random.uniform(5.0, 50.0), 2))
        }
    }

def generate_events(num_requests=5, num_statuses=5, high_demand=False):
    ride_requests = [generateRideRequest() for _ in range(num_requests)]
    ride_statuses = [generateRideStatus(random.choice(ride_requests) if ride_requests else None) for _ in range(num_statuses)]
    pricing_factor = 1.5 if high_demand else 1.0
    for event in ride_requests + ride_statuses:
        if event["ride_request"]:
            event["ride_request"]["estimated_price"] *= pricing_factor
        elif event["ride_status"]:
            event["ride_status"]["final_price"] *= pricing_factor
    return ride_requests + ride_statuses

rideRequests = [generateRideRequest() for _ in range(5)]
rideSatuses = [generateRideStatus() for _ in range(5)]  # This line now works
all_events = generate_events(num_requests=10, num_statuses=10, high_demand=True)

with open("ride_events.json", "w") as f:
    json.dump(all_events, f, indent=4, cls=DecimalEncoder)

def save_avro(data, schema, filename):
    with open(filename, "wb") as out:
        fastavro.writer(out, schema, data)

save_avro(all_events, full_schema, "ride_events.avro")

print("We have successfully generated ride request and ride status data in JSON and AVRO formats.")

We have successfully generated ride request and ride status data in JSON and AVRO formats.


In [5]:
!git config --global user.name "VCAM101"
!git config --global user.email "varino.ieu2021@student.ie.edu"

In [6]:
!pwd  # To show current directory

/content


In [7]:
!rm -rf .git  # Remove all previous Git history
!git init  # Initializes a new Git repository

[33mhint: Using 'master' as the name for the initial branch. This default branch name[m
[33mhint: is subject to change. To configure the initial branch name to use in all[m
[33mhint: [m
[33mhint: 	git config --global init.defaultBranch <name>[m
[33mhint: [m
[33mhint: Names commonly chosen instead of 'master' are 'main', 'trunk' and[m
[33mhint: 'development'. The just-created branch can be renamed via this command:[m
[33mhint: [m
[33mhint: 	git branch -m <name>[m
Initialized empty Git repository in /content/.git/


In [None]:
!git add .  # Stages all new and modified files
!git commit -m "Initial commit - Uploading project to STREAM-ANALYTICS-GROUP-PROJECT"

[main (root-commit) a65d415] Initial commit - Uploading project to STREAM-ANALYTICS-GROUP-PROJECT
 5 files changed, 596 insertions(+)
 create mode 100644 MILESTONE 1 STREAM ANALYTICS.pptx
 create mode 100644 MILESTONE1.ipynb
 create mode 100644 Stream Analytics Group Presentation Milestone 1.pdf
 create mode 100644 ride_events.avro
 create mode 100644 ride_events.json


In [None]:
!git filter-branch --force --index-filter \
'git rm --cached --ignore-unmatch MILESTONE1.ipynb' \
--prune-empty --tag-name-filter cat -- --all

	 rewrites.  Hit Ctrl-C before proceeding to abort, then use an
	 alternative filtering tool such as 'git filter-repo'
	 (https://github.com/newren/git-filter-repo/) instead.  See the
Proceeding with filter-branch...

Rewrite de90fd235c015380482b52f89373da81511a277b (1/1) (0 seconds passed, remaining 0 predicted)    


In [None]:
!git add MILESTONE1.ipynb
!git commit -m "Removed secret from history"

fatal: pathspec 'MILESTONE1.ipynb' did not match any files
On branch main
nothing to commit, working tree clean


In [None]:
!git push origin main --force

fatal: 'origin' does not appear to be a git repository
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.


In [None]:
!git remote add origin https://github.com/VCAM101/STREAM-ANALYTICS-GROUP-PROJECT.git

In [None]:
!git remote -v

origin	https://github.com/VCAM101/STREAM-ANALYTICS-GROUP-PROJECT.git (fetch)
origin	https://github.com/VCAM101/STREAM-ANALYTICS-GROUP-PROJECT.git (push)


In [None]:
!git remote set-url origin https://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@github.com/VCAM101/STREAM-ANALYTICS-GROUP-PROJECT.git

In [None]:
!git push origin main --force

Enumerating objects: 6, done.
Counting objects: 100% (6/6), done.
Delta compression using up to 8 threads
Compressing objects: 100% (6/6), done.
Writing objects: 100% (6/6), 4.34 MiB | 963.00 KiB/s, done.
Total 6 (delta 0), reused 0 (delta 0), pack-reused 0
To https://github.com/VCAM101/STREAM-ANALYTICS-GROUP-PROJECT.git
 * [new branch]      main -> main
