# Experiment 8: GridWorld RL for Oil & Gas Facility Navigation

**Course:** Introduction to Deep Learning | **Module:** Reinforcement Learning

---

## Objective

Design and implement reinforcement learning algorithms for optimal navigation in oil & gas facilities, focusing on safety-aware path planning, equipment inspection routes, and emergency evacuation procedures.

## Learning Outcomes

By the end of this experiment, you will:

1. Understand reinforcement learning fundamentals and the GridWorld environment
2. Implement Q-Learning and Deep Q-Network (DQN) algorithms
3. Apply RL to real-world facility navigation and safety scenarios
4. Compare tabular and deep learning approaches to RL
5. Evaluate RL agents using safety and efficiency metrics

## Background & Theory

**Reinforcement Learning (RL)** is a machine learning paradigm where agents learn optimal behavior through interaction with an environment, receiving rewards and penalties for their actions.

**Key Components:**

- **Agent:** The decision-making entity (facility operator, robot, autonomous system)
- **Environment:** The facility layout with obstacles, equipment, and hazards
- **State (s):** Current position and situation in the facility
- **Action (a):** Available moves (up, down, left, right, stay)
- **Reward (r):** Feedback for actions (positive for goals, negative for hazards)
- **Policy (π):** Strategy for selecting actions in each state

**Mathematical Foundation:**

- Bellman Equation: V(s) = max_a Σ P(s'|s,a)[R(s,a,s') + γV(s')]
- Q-Learning Update: Q(s,a) ← Q(s,a) + α[r + γ max_a' Q(s',a') - Q(s,a)]
- Policy: π(s) = argmax_a Q(s,a)
- Expected Return: G*t = Σ γ^k R*{t+k+1}

**Algorithms:**

- **Q-Learning:** Model-free, off-policy algorithm using Q-tables
- **Deep Q-Network (DQN):** Neural network approximation of Q-function
- **Policy Gradient:** Direct optimization of policy parameters

**Applications in Oil & Gas:**

- Autonomous inspection robots navigating facilities
- Emergency evacuation route optimization
- Maintenance scheduling and resource allocation
- Safety protocol development and training
- Risk-aware decision making in hazardous environments


## Setup & Dependencies

**What to Expect:** This section establishes the Python environment for reinforcement learning in GridWorld environments. We'll install PyTorch for deep Q-networks, visualization libraries for environment rendering, and data structures for Q-learning algorithms.

**Process Overview:**

1. **Package Installation:** Install PyTorch for neural networks, matplotlib/seaborn for environment visualization, and scientific computing libraries
2. **RL Environment Setup:** Configure GridWorld environment with customizable layouts, obstacles, and reward structures
3. **Algorithm Framework:** Set up Q-learning tables, experience replay buffers, and neural network architectures
4. **Visualization Tools:** Configure environment rendering, learning curves, and policy visualization
5. **Evaluation Metrics:** Set up performance tracking for episode rewards, convergence analysis, and safety metrics

**Expected Outcome:** A fully configured environment ready for reinforcement learning experiments with both tabular Q-learning and deep Q-networks, including comprehensive visualization and evaluation tools.


In [2]:
# Install required packages
import subprocess, sys
packages = ['torch', 'numpy', 'matplotlib', 'pandas', 'scikit-learn', 'seaborn']
for pkg in packages:
    try: __import__(pkg.replace('-', '_').lower())
    except ImportError: subprocess.check_call([sys.executable, '-m', 'pip', 'install', pkg])

import torch, torch.nn as nn, torch.optim as optim
import torch.nn.functional as F
import numpy as np, pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict, deque
import json, random, time
from pathlib import Path

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data directory setup
DATA_DIR = Path('data')
if not DATA_DIR.exists():
    DATA_DIR = Path('Expirements/data')
if not DATA_DIR.exists():
    DATA_DIR = Path('.')
    print('Warning: Using current directory for data')

# ArivuAI styling
plt.style.use('default')
colors = {'primary': '#004E89', 'secondary': '#3DA5D9', 'accent': '#F1A208', 'dark': '#4F4F4F'}
sns.set_palette([colors['primary'], colors['secondary'], colors['accent'], colors['dark']])

print(f'✓ PyTorch version: {torch.__version__}')
print(f'✓ Device: {device}')
print(f'✓ Data directory: {DATA_DIR.absolute()}')
print('✓ All packages installed and configured')
print('✓ Random seeds set for reproducible results')
print('✓ ArivuAI styling applied')

✓ PyTorch version: 2.8.0+cpu
✓ Device: cpu
✓ Data directory: d:\Suni Files\AI Code Base\Oil and Gas\Oil and Gas Pruthvi College Course Material\Updated\Expirements\Experiment_8_GridWorld_RL\data
✓ All packages installed and configured
✓ Random seeds set for reproducible results
✓ ArivuAI styling applied


## GridWorld Environment Implementation

Create the oil & gas facility GridWorld environment with safety constraints and objectives.


In [3]:
class OilGasGridWorld:
    def __init__(self, config_path, scenario='facility_inspection'):
        """Initialize GridWorld environment for oil & gas facility navigation"""
        try:
            with open(config_path, 'r') as f:
                self.config = json.load(f)
            print('✓ Configuration loaded from JSON')
        except FileNotFoundError:
            print('Creating default configuration...')
            self.config = self._create_default_config()
        
        self.scenario = scenario
        self.cell_types = self.config['environment_config']['cell_types']
        self.actions = self.config['agent_config']['actions']
        self.max_steps = self.config['agent_config']['max_steps']
        
        # Load scenario-specific grid
        self.grid_layout = self.config['oil_gas_scenarios'][scenario]['grid_layout']
        self.grid = np.array(self.grid_layout)
        self.rows, self.cols = self.grid.shape
        
        # Find special positions
        self.start_pos = self._find_position('S')
        self.goal_pos = self._find_position('G')
        self.equipment_pos = self._find_positions('E')
        self.hazard_pos = self._find_positions('H')
        self.checkpoint_pos = self._find_positions('C')
        
        # Initialize state
        self.reset()
    
    def _create_default_config(self):
        """Create default configuration if JSON file not found"""
        return {
            'environment_config': {
                'cell_types': {
                    'empty': {'reward': -0.1, 'passable': True},
                    'wall': {'reward': 0, 'passable': False},
                    'goal': {'reward': 10, 'passable': True},
                    'hazard': {'reward': -5, 'passable': True}
                }
            },
            'agent_config': {
                'actions': {
                    '0': {'name': 'UP', 'delta': [-1, 0]},
                    '1': {'name': 'DOWN', 'delta': [1, 0]},
                    '2': {'name': 'LEFT', 'delta': [0, -1]},
                    '3': {'name': 'RIGHT', 'delta': [0, 1]}
                },
                'max_steps': 100
            },
            'oil_gas_scenarios': {
                'facility_inspection': {
                    'grid_layout': [
                        ['#', '#', '#', '#'],
                        ['#', 'S', '.', '#'],
                        ['#', '.', 'G', '#'],
                        ['#', '#', '#', '#']
                    ]
                }
            }
        }
    
    def _find_position(self, symbol):
        """Find position of a specific symbol in the grid"""
        positions = np.where(self.grid == symbol)
        if len(positions[0]) > 0:
            return (positions[0][0], positions[1][0])
        return None
    
    def _find_positions(self, symbol):
        """Find all positions of a specific symbol in the grid"""
        positions = np.where(self.grid == symbol)
        return list(zip(positions[0], positions[1]))
    
    def reset(self):
        """Reset environment to initial state"""
        self.agent_pos = self.start_pos
        self.steps = 0
        self.visited_equipment = set()
        self.visited_checkpoints = set()
        self.hazard_encounters = 0
        self.done = False
        return self._get_state()
    
    def _get_state(self):
        """Get current state representation"""
        # Simple state: agent position as tuple
        return self.agent_pos
    
    def _is_valid_position(self, pos):
        """Check if position is valid and passable"""
        row, col = pos
        if row < 0 or row >= self.rows or col < 0 or col >= self.cols:
            return False
        
        cell_type = self.grid[row, col]
        if cell_type == '#':  # Wall
            return False
        
        return True
    
    def step(self, action):
        """Execute action and return next state, reward, done, info"""
        if self.done:
            return self._get_state(), 0, True, {}
        
        # Get action delta
        action_info = self.actions[str(action)]
        delta = action_info['delta']
        
        # Calculate new position
        new_row = self.agent_pos[0] + delta[0]
        new_col = self.agent_pos[1] + delta[1]
        new_pos = (new_row, new_col)
        
        # Check if move is valid
        if self._is_valid_position(new_pos):
            self.agent_pos = new_pos
        
        # Calculate reward
        reward = self._calculate_reward()
        
        # Update step counter
        self.steps += 1
        
        # Check termination conditions
        self.done = self._check_done()
        
        # Prepare info
        info = {
            'steps': self.steps,
            'visited_equipment': len(self.visited_equipment),
            'visited_checkpoints': len(self.visited_checkpoints),
            'hazard_encounters': self.hazard_encounters
        }
        
        return self._get_state(), reward, self.done, info
    
    def _calculate_reward(self):
        """Calculate reward for current position"""
        cell_type = self.grid[self.agent_pos[0], self.agent_pos[1]]
        
        # Base reward for cell type
        if cell_type == 'G':  # Goal
            return 10
        elif cell_type == 'H':  # Hazard
            self.hazard_encounters += 1
            return -5
        elif cell_type == 'E':  # Equipment
            if self.agent_pos not in self.visited_equipment:
                self.visited_equipment.add(self.agent_pos)
                return 1
            return -0.1  # Already visited
        elif cell_type == 'C':  # Checkpoint
            if self.agent_pos not in self.visited_checkpoints:
                self.visited_checkpoints.add(self.agent_pos)
                return 2
            return -0.1  # Already visited
        else:  # Empty space
            return -0.1  # Small penalty for each step
    
    def _check_done(self):
        """Check if episode is complete"""
        # Goal reached
        if self.agent_pos == self.goal_pos:
            return True
        
        # Max steps reached
        if self.steps >= self.max_steps:
            return True
        
        return False
    
    def render(self, show_path=None):
        """Visualize the current state of the environment"""
        # Create visualization grid
        vis_grid = self.grid.copy()
        
        # Mark agent position
        vis_grid[self.agent_pos[0], self.agent_pos[1]] = 'A'
        
        # Mark path if provided
        if show_path:
            for pos in show_path:
                if pos != self.agent_pos and pos != self.start_pos and pos != self.goal_pos:
                    vis_grid[pos[0], pos[1]] = '*'
        
        # Print grid
        print(f'\nGridWorld - {self.scenario.replace("_", " ").title()}')
        print(f'Steps: {self.steps}/{self.max_steps}')
        print(f'Equipment visited: {len(self.visited_equipment)}/{len(self.equipment_pos)}')
        print(f'Checkpoints visited: {len(self.visited_checkpoints)}/{len(self.checkpoint_pos)}')
        print(f'Hazard encounters: {self.hazard_encounters}')
        print()
        
        for row in vis_grid:
            print(' '.join(row))
        
        print('\nLegend: S=Start, G=Goal, A=Agent, E=Equipment, C=Checkpoint, H=Hazard, #=Wall, .=Empty, *=Path')

# Initialize GridWorld environment
env = OilGasGridWorld(DATA_DIR / 'gridworld_config.json', scenario='facility_inspection')

print(f'✓ GridWorld environment initialized')
print(f'✓ Scenario: {env.scenario.replace("_", " ").title()}')
print(f'✓ Grid size: {env.rows}x{env.cols}')
print(f'✓ Actions: {len(env.actions)} ({[info["name"] for info in env.actions.values()]})')
print(f'✓ Equipment positions: {len(env.equipment_pos)}')
print(f'✓ Hazard positions: {len(env.hazard_pos)}')
print(f'✓ Checkpoint positions: {len(env.checkpoint_pos)}')

# Display initial environment
env.render()

✓ Configuration loaded from JSON
✓ GridWorld environment initialized
✓ Scenario: Facility Inspection
✓ Grid size: 8x8
✓ Actions: 5 (['UP', 'DOWN', 'LEFT', 'RIGHT', 'STAY'])
✓ Equipment positions: 4
✓ Hazard positions: 4
✓ Checkpoint positions: 2

GridWorld - Facility Inspection
Steps: 0/100
Equipment visited: 0/4
Checkpoints visited: 0/2
Hazard encounters: 0

# # # # # # # #
# A . E . H . #
# . # . # . . #
# E . C . . H #
# . # . # E . #
# H . . . . C #
# . E . H . G #
# # # # # # # #

Legend: S=Start, G=Goal, A=Agent, E=Equipment, C=Checkpoint, H=Hazard, #=Wall, .=Empty, *=Path


## Summary & Validation

This is a simplified version of Experiment 8 for testing. The complete implementation would include Q-Learning and DQN algorithms, training loops, and comprehensive evaluation.

**Key Components Demonstrated:**

- Reinforcement learning theory and GridWorld fundamentals
- Oil & gas facility navigation with safety constraints
- Multi-objective environment (equipment inspection, hazard avoidance)
- Realistic facility scenarios (inspection, evacuation, maintenance)

**Next Steps:**

- Implement Q-Learning algorithm with Q-table
- Create Deep Q-Network (DQN) with experience replay
- Add training loops and convergence monitoring
- Include policy visualization and performance analysis
- Implement safety-aware reward shaping and evaluation metrics
