# 1. **Infrastructure Monitoring (internal): In the Russia project, we run processes on a daily basis and need to know whether they have run successfully or not. Please suggest one or several configurations that would allow us (internally) to check the success of each process. We are looking for setups that are both easy to maintain and generate as little friction / efforts as possible for us to check.**

I suggest the use of ELK Stack for log/ process monitoring. The ELK Stack comprises three tools - Elasticsearch, Logstash, and Kibana which when combined provide an end-to-end centralized log management system that allows us to monitor the success of each process in the project’s pipeline. Elasticsearch is the core component of the stack, it indexes and ensures quick access to log data for analysis and monitoring. The Logstash is the log collection and processing component of the stack. It collects logs from various sources, parses them, enriches them, and then sends them to Elasticsearch for indexing. The final component, Kibana allows visual exploration of indexed log data through the creation of dashboards. 


Configuring and setting up the ELK Stack

Install Elasticsearch: Download and install Elasticsearch on the local machine following the official Elasticsearch documentation.

Configure Elasticsearch: Modify the Elasticsearch configuration file to specify settings such as cluster name, node roles, network settings, and storage paths. Customize these settings based on the project requirements and available resources.

Install Logstash: Download and install Logstash on the same server or machine where Elasticsearch is installed.

Configure Logstash: Create a Logstash configuration file where you define input sources, filters for log processing, and output settings to direct the processed logs to Elasticsearch. Customize the configuration based on your log sources, formats, and desired transformations.

Install Kibana: Download and install Kibana on the same server or machine where Elasticsearch is installed.

Configure Kibana: Modify the Kibana configuration file to specify Elasticsearch cluster details, such as the host and port. Optionally, configure authentication, SSL/TLS settings, and other advanced configurations based on the security requirements.

Start Services: Start Elasticsearch, Logstash, and Kibana services using the appropriate commands or service managers specific to the operating system.

Identify Key Metrics: Determine the important metrics and indicators that define the success of each process in your pipeline. These could include metrics such as data ingestion rates, data quality checks, analysis completion times, error rates, or any other relevant performance indicators.

Instrument Logging: Instrument your pipeline processes with appropriate logging statements to capture relevant information and events at different stages. This could involve logging the start and end times of each process, capturing error messages, logging data quality checks, or any other relevant information that helps assess the success of a particular process.

Monitor and Analyze: Start sending log data to Logstash, which will process and send it to Elasticsearch for indexing. Use Kibana to monitor the indexed logs, analyze metrics, search for specific events or patterns, and track the success of each process in your project.


Justification for choice of ELK Stack for monitoring process 


1. Scalability and Performance: The ELK stack efficiently handles the large volumes of trade data generated by your pipeline, ensuring smooth processing, analysis, and tracking of fossil fuel trades.

2. Flexibility and Extensibility: With the ELK stack, you can easily adapt to different trade data sources and formats, allowing seamless integration of various trading platforms, data feeds, and internal systems.

3. Powerful Search and Analysis: The ELK stack empowers you to perform in-depth analysis on pricing, volumes, market trends, and other trade-related metrics specific to fossil fuel trades, enabling comprehensive tracking and analysis of the collected data.

4. Real-time Monitoring and Alerting: By leveraging the ELK stack's real-time monitoring and alerting capabilities, you can promptly detect anomalies, monitor the success of each trade process, and receive alerts for critical events or potential errors, ensuring the integrity and reliability of the trade data pipeline.

5. Ecosystem and Community Support: The ELK stack's vibrant open-source community provides extensive resources, plugins, and documentation tailored to various industries, including energy and finance, making it well-suited for the specific requirements of tracking fossil fuel trades.

6. Integration with Other Tools: The ELK stack seamlessly integrates with other data engineering and data science tools, allowing you to combine trade data with additional datasets, leverage advanced analytics, and integrate with existing workflows specific to fossil fuel trade analysis.

Considering the project's objective and requirement of maintaining databases and pipelines for tracking and analysing trades of fossil fuels, the ELK stack offers the necessary capabilities to efficiently manage logs, monitor processes, and detect errors quickly to optimize pipeline performance. 

#2. **Alerts (external): External users of our platform should be able to set alerts for when potential fraudulent shipments are detected. Assuming the shipments are created in bulk and stored in a Postgres database query, design an infrastructure that would allow user to create / edit alert criteria and receive them when our system detects them. Please describe the setup in detail.**
Step 1 - Create a Database Schema: Design a schema for your Postgres database that matches the structure of the shipment data. Define tables and columns that correspond to the relevant attributes extracted from the CSV file such as shipment details, timestamps, and indicators of potential fraud.


Step 2 - Import Shipment Data into the Database: Use a database interaction library (such as psycopg2 for Python) or a database management tool (like pgAdmin) to connect to your Postgres database and import the cleaned and transformed shipment data from the CSV file into the corresponding tables. 

Step 3 - Create API Layer: We can now create an API layer that interacts with the database and provides endpoints for managing alerts using Flask. This layer will handle requests from external users to create, edit, and delete alert criteria. 

Step 4 - Design UI for user alert creation: There are various ways via which users can create alerts for potential fraudulent shipments. For this project, a simple user interface where they can interact with the system and provide inputs is ideal for non-tech savvy users. This can be a web-based UI built with HTML, CSS, and JavaScript or a desktop application. The UI would include input fields or forms where users can enter the alert criteria. Upon submitting the form, the UI would send a POST request to the alerts endpoint with the input values.

 
Step 5 - Define Endpoints For Each Alert: On receiving the POST request, an endpoint will be created for each alert criterion by defining the logic. For example, if a user creates an alert for when a shipment contains more than 10000 tonnes, an endpoint to trigger an alert whenever this condition is true is created. The code below shows how this criteria is used to create an alert which notifies the user by SMS when more than 10000 tonnes is shipped.

In [32]:
pip install flask

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [33]:
pip install flask_sqlalchemy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [34]:
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# Set the database connection URI
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://username:password@localhost/database'

db = SQLAlchemy(app)

# Define the Shipment model
class Shipment(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    tonnes = db.Column(db.Float)
    # Other columns and relationships

@app.route('/shipments', methods=['POST'])
def create_shipment():
    # Get the shipment data from the request
    data = request.get_json()
    tonnes = data['tonnes']

    # Create a new shipment
    shipment = Shipment(tonnes=tonnes)
    db.session.add(shipment)
    db.session.commit()

    # Check if the number of tonnes is above 10000
    if tonnes > 10000:
        # Alert the user
        alert_message = f"Shipment with {tonnes} tonnes is above 10000 tonnes!"
        send_alert(alert_message)

    # Return a success response
    return jsonify({'message': 'Shipment created successfully'})

def send_alert(message):
    # Dummy implementation for sending an SMS notification
    # Replace with your actual SMS notification logic or service integration
    dummy_phone_number = "+1234567890"
    sms_message = f"ALERT: {message}"
    send_sms(dummy_phone_number, sms_message)

def send_sms(phone_number, message):
    # Logic to send an SMS to the specified phone number with the given message
    # Implement your actual SMS notification logic or service integration here
    # Example:
    print(f"Sending SMS to {phone_number}: {message}")




Deleting Alerts: Users can similarly delete alerts on the UI. In the code below, the endpoint /alerts/<alert_id> is used to handle the DELETE request for deleting an alert. The <alert_id> is a route parameter that represents the unique identifier of the alert to be deleted. Upon receiving the DELETE request, the endpoint retrieves the corresponding alert from the database based on the provided alert_id. If the alert exists, it is deleted from the database. If the alert is not found, a not found response with a 404 status code is returned.





In [35]:
#syntax for deleting alert 
@app.route('/alerts/<alert_id>', methods=['DELETE'])
def delete_alert(alert_id):
    # Retrieve the alert from the database based on the provided alert_id
    alert = Alert.query.get(alert_id)

    if alert:
        # Delete the alert from the database
        db.session.delete(alert)
        db.session.commit()

        # Return a success response
        return jsonify({'message': 'Alert deleted successfully'})
    else:
        # Return a not found response if the alert doesn't exist
        return jsonify({'message': 'Alert not found'}), 404


Updating alert criteria: Users can also update an existing alert via the UI. In the code below, the endpoint /alerts/<alert_id> is used to handle the PUT or PATCH request for updating an alert. The <alert_id> is a route parameter that represents the unique identifier of the alert to be updated.Upon receiving the request, the endpoint retrieves the corresponding alert from the database based on the provided alert_id. If the alert exists, it checks for updated criteria in the request data. If new criteria are provided, it updates the alert's criteria and saves the changes to the database.If the alert is not found, a not found response with a 404 status code is returned.





In [36]:
#syntax for updating/editing alert 
@app.route('/alerts/<alert_id>', methods=['PUT', 'PATCH'])
def update_alert(alert_id):
    # Retrieve the alert from the database based on the provided alert_id
    alert = Alert.query.get(alert_id)

    if alert:
        # Get the updated criteria from the request data
        updated_criteria = request.get_json().get('criteria')

        # Update the alert criteria if provided
        if updated_criteria:
            alert.criteria = updated_criteria
            db.session.commit()

        # Return a success response
        return jsonify({'message': 'Alert updated successfully'})
    else:
        # Return a not found response if the alert doesn't exist
        return jsonify({'message': 'Alert not found'}), 404





Step 6 - Implement Endpoint Logic: Implement the logic for each endpoint. This includes retrieving data from the database, performing necessary operations, and returning appropriate responses. 

Step 7 - Establish database Interaction: Integrate the database interaction within the endpoint logic. Use an ORM library like SQLAlchemy to query and manipulate the database.

In [37]:
# Example model for Alert
from flask_sqlalchemy import SQLAlchemy


class Alert(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    criteria = db.Column(db.String(255))
    # Other columns and relationships

# Use the Alert model in endpoint logic


Step 8 - Run the Application: Run the Flask application by adding the following lines at the end of the Python file.

In [31]:
if __name__ == '__main__':
    app.run()

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


Step 9 - Test the Endpoints: Start the Flask development server by running python api.py in your command-line interface. Test the defined endpoints using API testing tools like cURL, Postman, or by making HTTP requests from other applications.