<a href="https://colab.research.google.com/github/Aadarsh-1210/Aadarsh-1210/blob/main/Untitled6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# First cell - Install the necessary dependencies
!pip install numpy==1.23.0
!pip install --upgrade google-cloud-pubsub==2.13.0
!pip install --upgrade google-cloud-firestore==2.7.2
!pip install --upgrade google-cloud-storage==2.7.0
!pip install --upgrade google-cloud-bigquery==3.4.1
!pip install --upgrade pandas==1.5.3
!pip install --upgrade google-auth==2.22.0  # Make sure Google Auth is up to date
!pip install google-cloud-storage google-cloud-firestore google-cloud-bigquery google-cloud-pubsub

# Second cell - Restart runtime to ensure all dependencies are loaded properly
import IPython
IPython.get_ipython().kernel.do_shutdown(True)

# Third cell - Handle authentication and imports
from google.colab import auth
auth.authenticate_user()

import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/adarshshukla/Downloads/kinetic-dogfish-439013-k4-08e855629f84.json"
import json
import datetime
import numpy as np
import pandas as pd
from google.cloud import pubsub_v1
from google.cloud import firestore
from google.cloud import storage
from google.cloud import bigquery
from google.auth import exceptions

# Set your Google Cloud Project ID
os.environ["GOOGLE_CLOUD_PROJECT"] ="kinetic-dogfish-439013-k4"  # Replace with your project ID
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT", "kinetic-dogfish-439013-k4")

if PROJECT_ID == "kinetic-dogfish-439013-k4":
    raise EnvironmentError("❗️The Google Cloud Project ID is not set. Please set it in the environment variable or replace it in the code.")

# TrafficSystem class definition
class TrafficSystem:
    def __init__(self):
        """Initialize the traffic system with GCP clients and exception handling"""
        try:
            # Initialize Google Cloud clients
            self.storage_client = storage.Client(project=PROJECT_ID)
            self.firestore_client = firestore.Client(project=PROJECT_ID)
            self.bigquery_client = bigquery.Client(project=PROJECT_ID)
            self.publisher = pubsub_v1.PublisherClient()
            print("✓ Successfully initialized all GCP clients")
        except exceptions.GoogleAuthError as auth_error:
            print(f"❗️Authentication failed: {str(auth_error)}. Make sure you're authenticated and have the correct permissions.")
            raise
        except Exception as e:
            print(f"✗ Error initializing clients: {str(e)}")
            raise

    def validate_data(self, data):
        """Validate input data"""
        required_fields = ['location', 'density', 'speed']
        if not all(field in data for field in required_fields):
            raise ValueError(f"❗️Data validation failed. Missing one or more required fields: {required_fields}")
        return True

    def ingest_traffic_data(self, data):
        """Ingest traffic data with validation and improved exception handling"""
        try:
            if not self.validate_data(data):
                return False
            print("✓ Data is valid and ready for ingestion.")

            # Cloud Storage - Store raw traffic data
            bucket_name = 'traffic-raw-data'
            try:
                bucket = self.storage_client.get_bucket(bucket_name)
                print(f"✓ Found bucket: {bucket_name}")
            except Exception as storage_error:
                print(f"❗️Bucket '{bucket_name}' not found. Attempting to create it.")
                bucket = self.storage_client.create_bucket(bucket_name)
                print(f"✓ Bucket '{bucket_name}' created successfully.")

            # Create a timestamped blob path and upload raw data as a JSON file
            timestamp = datetime.datetime.now().strftime('%Y/%m/%d/%H/%M')
            blob = bucket.blob(f'{timestamp}/data.json')
            blob.upload_from_string(
                json.dumps(data),
                content_type='application/json'
            )
            print(f"✓ Successfully stored raw traffic data in bucket: {bucket_name}")

            # Process the data after ingestion
            return self.process_traffic_data(data)

        except Exception as e:
            print(f"❗️Error during data ingestion: {str(e)}")
            return False

    def process_traffic_data(self, data):
        """Process and store traffic data with enhanced error handling"""
        try:
            processed_data = {
                'timestamp': datetime.datetime.now(),
                'location': data.get('location'),
                'density': data.get('density'),
                'speed': data.get('speed'),
            }

            # Store processed data in Firestore
            collection_name = 'processed_traffic_data'
            try:
                self.firestore_client.collection(collection_name).add(processed_data)
                print(f"✓ Successfully stored processed traffic data in Firestore for location: {processed_data['location']}")
            except Exception as firestore_error:
                print(f"❗️Error storing data in Firestore: {str(firestore_error)}")
                raise

            # Send processed data to BigQuery for analysis
            table_id = f"{PROJECT_ID}.traffic_dataset.processed_data"
            try:
                rows_to_insert = [processed_data]
                errors = self.bigquery_client.insert_rows_json(table_id, rows_to_insert)
                if errors:
                    print(f"❗️Failed to insert rows into BigQuery: {errors}")
                else:
                    print("✓ Successfully inserted rows into BigQuery")
            except Exception as bigquery_error:
                print(f"❗️Error inserting data into BigQuery: {str(bigquery_error)}")
                raise

            return True

        except Exception as e:
            print(f"❗️Error in processing traffic data: {str(e)}")
            return False

# Example usage
traffic_system = TrafficSystem()

# Sample data to ingest and process
sample_data = {
    "location": "Main Street",
    "density": 25,
    "speed": 35
}

traffic_system.ingest_traffic_data(sample_data)




OSError: ❗️The Google Cloud Project ID is not set. Please set it in the environment variable or replace it in the code.