# Cloud project: Data pipeline

# Azure Data Pipeline Assignment

## Objective

Design and implement a data pipeline using Azure services, demonstrating skills in data production, ingestion, storage, processing, and visualization.

## Requirements

### 1. Data Producer

- Develop a Python script on your laptop to generate sample data in JSON format.
- Use libraries: `json`, `random`, `datetime`

### 2. Message Queue

- Use Azure Storage account → queue service (or Event Hubs) for message queuing.
    - https://portal.azure.com/?azure-portal=true#browse/Microsoft.Storage%2FStorageAccounts
- Configure your Python producer to send data to the queue (e.g. Event Hubs).
- Library: `azure-eventhub` (for event hubs)

### 3. Serverless SQL Solution

- Utilize Azure SQL Database (serverless tier) for data storage.
- Set up auto-pause to minimize costs when not in use.

### 4. Data Consumer

- Create an Azure VM with Ubuntu Server 22.04 (B1s size - 1 vCPU, 1 GB RAM).
- Develop a Python script to consume data from the queue and insert it into Azure SQL Database.
- Libraries: `pyodbc`

### 5. Grafana Dashboard (optional, see below)

- Install Grafana on the Ubuntu VM.
- Configure Grafana to connect to Azure SQL Database.
- Create a simple dashboard to visualize the data.

### 6. Power BI Dashboard (optional, see below)

- Use Azure Fabric (which includes Power BI) to create a dashboard.
- Connect the dashboard to your Azure SQL Database.

### 7. Cost Analysis

- Compare the costs between: a) Azure SQL Database (serverless) b) Azure VM with PostgreSQL installed
- Assume a 10TB database for this comparison. (Do NOT create a 10TB DB!)
- Use the Azure Pricing Calculator for estimates.

## Recommended Azure Services and Tools

1. Azure Queuing Service, or Event Hubs (Basic tier)
2. Azure SQL Database (Serverless tier - select free tier and monitor costs)
3. Azure Virtual Machines (B1s size)
4. Data consumer, ping one or more:
    1. Variant 1: Azure Fabric (includes Power BI)
    2. Variant 2: Run MS PowerBI locally and connect to the cloud SQL DB
    3. Variant 3: Run a Grafana instance and connect to the cloud SQL DB
    4. Variant 4: Create your own web server hosting plots for the data

## Cost Considerations

- Utilize Azure's free tier and student offers where possible.
- Configure auto-shutdown for the VM when not in use.
- Use the serverless tier for Azure SQL Database to minimize costs during idle periods.
- Monitor your Azure student account usage regularly.

## Deliverables

1. GitHub repository (or similar) with all code (data producer, consumer, configurations, etc).
    1. ***IMPORTANT: Never put your credentials, access tokens, keys, and other secrets in a GitHub repo! Instead, use environment variables!***
2. Screenshots of Grafana and Power BI dashboards.
3. A report detailing the implementation process, challenges faced, and cost analysis.

## Evaluation Criteria

- Functionality of the entire pipeline
- Code quality and documentation
- Dashboard design and usefulness
- Accuracy and depth of cost analysis

# Started from creating Virtual Machine

### Code for data Producer

In [None]:
import json
import random
from datetime import datetime, timedelta
from azure.eventhub import EventHubProducerClient, EventData




def generate_sample_data(num_records):
    data = []
    for _ in range(num_records):
        record = {
            "timestamp": str(datetime.now()),  # Current timestamp
            "temperature": round(random.uniform(20.0, 30.0), 2),  # Random temperature (between 20.0 and 30.0)
            "humidity": round(random.uniform(40.0, 70.0), 2),  # Random humidity (between 40.0 and 70.0)
        }
        data.append(record)
    return data

if __name__ == "__main__":
    num_records_to_generate = 10  # You can adjust this as needed
    sample_data = generate_sample_data(num_records_to_generate)

    # Save the data to a JSON file
    with open("sample_data.json", "w") as json_file:
        json.dump(sample_data, json_file, indent=2)

    print(f"{num_records_to_generate} records generated and saved to sample_data.json.")

connection_str = "Endpoint=sb://mynewvm1.servicebus.windows.net/;SharedAccessKeyName=New_key;SharedAccessKey=rTmzn7wW3PHRmhMNeZaNDFnZh6ZtmS9v6+AEhNf0BDk="
eventhub_name = "eventhub_m"
producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)

def send_data_to_eventhub(data):
    event_data_batch = producer.create_batch()
    for record in data:
        event_data_batch.add(EventData(json.dumps(record)))
    producer.send_batch(event_data_batch)

# Example usage:
sample_data = generate_sample_data(num_records_to_generate)  # Assuming you already have sample data
send_data_to_eventhub(sample_data)

### Ubuntu login -
 ssh azureuser@20.240.202.86

### Code for sending data to Event Hub

In [None]:
import json
import random
from datetime import datetime
from azure.eventhub import EventHubProducerClient, EventData

def generate_sample_data(num_records):
    data = []
    for _ in range(num_records):
        record = {
            "timestamp": str(datetime.now()),
            "temperature": round(random.uniform(20.0, 30.0), 2),
            "humidity": round(random.uniform(40.0, 70.0), 2),
        }
        data.append(record)
    return data

if __name__ == "__main__":
    num_records_to_generate = 10  # You can adjust this as needed
    sample_data = generate_sample_data(num_records_to_generate)

    # Replace with your actual values
    connection_str = "Endpoint=sb://mynewvm1.servicebus.windows.net/;SharedAccessKeyName=New_key;SharedAccessKey=rTmzn7wW3PHRmhMNeZaNDFnZh6ZtmS9v6+AEhNf0BDk="
    eventhub_name = "eventhub_m"

    producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)

    try:
        with producer:
            event_data_batch = producer.create_batch()
            for record in sample_data:
                event_data_batch.add(EventData(json.dumps(record)))
            producer.send_batch(event_data_batch)
            print(f"{num_records_to_generate} records sent to Event Hub.")
    except Exception as e:
        print(f"Error sending data: {str(e)}")


### Code for SQL data base to Consume the data

In [None]:
import pyodbc
import json
from azure.eventhub import EventHubConsumerClient
from datetime import datetime

# SQL Server connection parameters
sql_server = 'misbahserver.database.windows.net'
sql_database = 'My_database'
sql_username = 'misbahbin.hossain@yh.nackademin.se'  # Your Microsoft Entra admin email
sql_password = 'w3dgiEf6'  # Replace with your actual password
sql_driver = '{ODBC Driver 18 for SQL Server}'

# Create a connection to the SQL Server using Microsoft Entra password authentication
def create_sql_connection():
    try:
        # Create the connection string using Microsoft Entra Password Authentication
        connection_string = (
            f"DRIVER={sql_driver};"
            f"SERVER=tcp:{sql_server},1433;"
            f"DATABASE={sql_database};"
            f"UID={sql_username};"
            f"PWD={sql_password};"
            f"Encrypt=yes;"
            f"TrustServerCertificate=no;"
            f"Connection Timeout=30;"
            f"Authentication=ActiveDirectoryPassword;"
        )
        
        print("Connecting to SQL Server...")
        conn = pyodbc.connect(connection_string)
        return conn
    except Exception as e:
        print(f"Error creating SQL connection: {str(e)}")
        raise

# Function to insert data into SQL Server
def insert_data_to_sql(timestamp, temperature, humidity):
    try:
        print(f"Inserting data to SQL Server: {timestamp}, {temperature}, {humidity}")
        conn = create_sql_connection()
        cursor = conn.cursor()

        # Define the insert query for your SensorData table
        insert_query = """
            INSERT INTO SensorData (timestamp, temperature, humidity)
            VALUES (?, ?, ?)
        """
        # Execute the query with the data
        cursor.execute(insert_query, (timestamp, temperature, humidity))
        conn.commit()
        cursor.close()
        conn.close()
        print(f"Successfully inserted data: {timestamp}, {temperature}, {humidity}")
    except Exception as e:
        print(f"Error inserting data into SQL: {str(e)}")

# Process the received event from Event Hub
def process_event(partition_context, event):
    try:
        print("Processing event...")
        event_data = event.body_as_str()
        json_data = json.loads(event_data)
        
        timestamp = datetime.strptime(json_data['timestamp'], '%Y-%m-%d %H:%M:%S.%f')
        temperature = json_data['temperature']
        humidity = json_data['humidity']

        insert_data_to_sql(timestamp, temperature, humidity)
        print(f"Processed event data: {json_data}")
    except Exception as e:
        print(f"Error processing event: {str(e)}")

# Event Hub connection parameters
eventhub_connection_str = "Endpoint=sb://mynewvm1.servicebus.windows.net/;SharedAccessKeyName=New_key;SharedAccessKey=rTmzn7wW3PHRmhMNeZaNDFnZh6ZtmS9v6+AEhNf0BDk="
eventhub_name = "eventhub_m"
consumer_group = "$Default"

# Create a consumer client with the required consumer group
consumer_client = EventHubConsumerClient.from_connection_string(
    eventhub_connection_str,
    eventhub_name=eventhub_name,
    consumer_group=consumer_group
)

# Start receiving events
try:
    print("Starting Event Hub listener...")
    with consumer_client:
        consumer_client.receive(process_event, starting_position="-1")  # Receive from the beginning of the stream
except Exception as e:
    print(f"Error receiving data: {str(e)}")
finally:
    print("Closing Event Hub consumer...")
    consumer_client.close()


### Query used inside Database to create S

In [None]:
CREATE TABLE dbo.SensorData (
    id INT PRIMARY KEY NOT NULL,
    timestamp DATETIME NOT NULL,
    temperature FLOAT NOT NULL,
    humidity FLOAT NOT NULL
);


In [None]:
 sudo systemctl status grafana-server
 sudo systemctl enable grafana-server
 sudo systemctl start grafana-serve




In [2]:
    CREATE USER ApplicationUser 
	WITH PASSWORD = 'Kabir.asma20';

    ALTER ROLE db_datareader ADD MEMBER ApplicationUser;
    ALTER ROLE db_datawriter ADD MEMBER ApplicationUser;
    
    

SyntaxError: invalid syntax (1962829240.py, line 1)