# S3 Log File Processing and Elasticsearch Ingestion

This notebook demonstrates how to:
1. Read an incremental log file from S3
2. Process and transform the data
3. Push the processed data to Elasticsearch

In [None]:
import os
import boto3
import json
from elasticsearch import Elasticsearch
from datetime import datetime
from urllib3.exceptions import InsecureRequestWarning
import urllib3

# Suppress InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)

## Configuration

Set up your configuration parameters here. Make sure to set your environment variables for Elasticsearch credentials.

In [None]:
# S3 Configuration
bucket_name = 'your-bucket-name'
file_key = 'path/to/your/logfile.log'

# Elasticsearch Configuration
es_host = os.environ.get('ES_HOST', 'https://localhost:9200')
es_user = os.environ.get('ES_USER')
es_password = os.environ.get('ES_PASSWORD')
index_name = 'your-log-index'

# File to store the last processed position
state_file = 'last_processed_position.txt'

# Verify Elasticsearch credentials
if not all([es_host, es_user, es_password]):
    raise ValueError("Elasticsearch credentials not properly set in environment variables.")

# Initialize S3 and Elasticsearch clients
s3 = boto3.client('s3')
es = Elasticsearch(
    [es_host],
    http_auth=(es_user, es_password),
    scheme="https",
    port=9200,
    verify_certs=False  # Set to True in production and provide proper certificates
)

## Helper Functions

In [None]:
def get_last_position():
    try:
        with open(state_file, 'r') as f:
            return int(f.read().strip())
    except FileNotFoundError:
        return 0

def save_last_position(position):
    with open(state_file, 'w') as f:
        f.write(str(position))

## Process Logs

In [None]:
def process_logs():
    last_position = get_last_position()
    
    # Get the file metadata
    response = s3.head_object(Bucket=bucket_name, Key=file_key)
    file_size = response['ContentLength']
    
    if last_position >= file_size:
        print("No new data to process")
        return
    
    # Read only the new part of the file
    response = s3.get_object(Bucket=bucket_name, Key=file_key, Range=f'bytes={last_position}-{file_size}')
    new_content = response['Body'].read().decode('utf-8')
    
    # Process new lines
    for line in new_content.splitlines():
        try:
            log_entry = json.loads(line)
            # Add a timestamp if not present
            if 'timestamp' not in log_entry:
                log_entry['timestamp'] = datetime.now().isoformat()
            # Index the log entry
            es.index(index=index_name, body=log_entry)
        except json.JSONDecodeError:
            print(f"Skipping invalid JSON: {line}")
    
    # Update the last processed position
    save_last_position(file_size)
    
    print(f"Processed {file_size - last_position} bytes of new log data")

# Run the log processing
process_logs()

## Verify Data in Elasticsearch

You can use this cell to verify that data has been successfully pushed to Elasticsearch.

In [None]:
# Search for documents in the index
search_result = es.search(index=index_name, body={"query": {"match_all": {}}})

print(f"Total documents in index: {search_result['hits']['total']['value']}")
print("Sample document:")
print(json.dumps(search_result['hits']['hits'][0]['_source'], indent=2) if search_result['hits']['hits'] else "No documents found")