In [4]:
pip install s3fs

Note: you may need to restart the kernel to use updated packages.


In [3]:
import json
import boto3
import logging
from datetime import datetime, timedelta
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import os
import gzip
from io import BytesIO
import time

In [1]:


# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class EarthquakeConsumer:
    def _init_(self, bootstrap_servers=['13.204.84.130:9092'], topic_name='earthquake-events', 
                 s3_bucket='your-earthquake-bucket', consumer_group='earthquake-processors'):
        
        self.topic_name = topic_name
        self.s3_bucket = s3_bucket
        self.consumer_group = consumer_group
        
        # Initialize Kafka Consumer
        self.consumer = KafkaConsumer(
            self.topic_name,
            bootstrap_servers=bootstrap_servers,
            group_id=self.consumer_group,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m else None,
            auto_offset_reset='latest',
            enable_auto_commit=True,
            auto_commit_interval_ms=5000,
            max_poll_records=100,
            consumer_timeout_ms=10000
        )
        
        # Initialize S3 client (uses AWS CLI credentials)
        self.s3_client = boto3.client('s3')
        
        # Buffer for batch uploads
        self.buffer = []
        self.buffer_size = 50  # Upload every 50 messages
        self.last_upload_time = time.time()
        self.upload_interval = 300  # Upload every 5 minutes regardless
        
    def validate_earthquake_data(self, data):
        """Validate earthquake data structure"""
        required_fields = ['event_id', 'event_time', 'magnitude', 'coordinates']
        
        for field in required_fields:
            if field not in data:
                logger.warning(f"Missing required field: {field}")
                return False
                
        # Validate coordinate structure
        if not isinstance(data['coordinates'], dict):
            logger.warning("Invalid coordinates structure")
            return False
            
        required_coords = ['latitude', 'longitude', 'depth_km']
        for coord in required_coords:
            if coord not in data['coordinates']:
                logger.warning(f"Missing coordinate field: {coord}")
                return False
                
        return True
    
    def enrich_for_storage(self, earthquake_data):
        """Add storage-specific metadata"""
        enriched = earthquake_data.copy()
        
        # Add partition information for S3 organization
        event_time = datetime.fromisoformat(earthquake_data['event_time'].replace('Z', '+00:00'))
        
        enriched.update({
            'storage_metadata': {
                'ingestion_time': datetime.utcnow().isoformat(),
                'partition_year': event_time.year,
                'partition_month': event_time.month,
                'partition_day': event_time.day,
                'partition_hour': event_time.hour
            },
            'data_quality': {
                'has_magnitude': earthquake_data.get('magnitude') is not None,
                'has_location': all(earthquake_data['coordinates'].get(k) is not None 
                                 for k in ['latitude', 'longitude']),
                'has_depth': earthquake_data['coordinates'].get('depth_km') is not None,
                'significance_score': earthquake_data.get('significance', 0)
            }
        })
        
        return enriched
    
    def generate_s3_key(self, earthquake_data):
        """Generate S3 key with partitioning structure"""
        metadata = earthquake_data['storage_metadata']
        
        # Partitioned by date for efficient querying
        s3_key = (f"earthquake-data/"
                 f"year={metadata['partition_year']}/"
                 f"month={metadata['partition_month']:02d}/"
                 f"day={metadata['partition_day']:02d}/"
                 f"hour={metadata['partition_hour']:02d}/"
                 f"{earthquake_data['event_id']}.json")
        
        return s3_key
    
    def upload_to_s3(self, earthquake_data):
        """Upload single earthquake record to S3"""
        try:
            s3_key = self.generate_s3_key(earthquake_data)
            
            # Convert to JSON and compress
            json_data = json.dumps(earthquake_data, indent=2)
            
            # Upload to S3
            self.s3_client.put_object(
                Bucket=self.s3_bucket,
                Key=s3_key,
                Body=json_data,
                ContentType='application/json',
                Metadata={
                    'event-id': earthquake_data['event_id'],
                    'magnitude': str(earthquake_data.get('magnitude', 'unknown')),
                    'source': 'kafka-consumer',
                    'ingestion-time': earthquake_data['storage_metadata']['ingestion_time']
                }
            )
            
            logger.info(f"Uploaded earthquake {earthquake_data['event_id']} to S3: {s3_key}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to upload to S3: {e}")
            return False
    
    def batch_upload_to_s3(self, earthquake_batch):
        """Upload batch of earthquakes as compressed JSONL file"""
        try:
            if not earthquake_batch:
                return True
                
            # Create batch metadata
            batch_time = datetime.utcnow()
            first_event_time = earthquake_batch[0]['event_time']
            
            # Generate batch S3 key
            s3_key = (f"earthquake-data/batches/"
                     f"year={batch_time.year}/"
                     f"month={batch_time.month:02d}/"
                     f"day={batch_time.day:02d}/"
                     f"batch_{batch_time.strftime('%Y%m%d_%H%M%S')}_{len(earthquake_batch)}_records.jsonl.gz")
            
            # Create compressed JSONL
            buffer = BytesIO()
            with gzip.GzipFile(fileobj=buffer, mode='wb') as gz_file:
                for earthquake in earthquake_batch:
                    line = json.dumps(earthquake) + '\n'
                    gz_file.write(line.encode('utf-8'))
            
            # Upload to S3
            self.s3_client.put_object(
                Bucket=self.s3_bucket,
                Key=s3_key,
                Body=buffer.getvalue(),
                ContentType='application/gzip',
                Metadata={
                    'batch-size': str(len(earthquake_batch)),
                    'first-event-time': first_event_time,
                    'batch-upload-time': batch_time.isoformat(),
                    'compression': 'gzip'
                }
            )
            
            logger.info(f"Uploaded batch of {len(earthquake_batch)} earthquakes to S3: {s3_key}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to upload batch to S3: {e}")
            return False
    
    def should_upload_buffer(self):
        """Check if buffer should be uploaded"""
        buffer_full = len(self.buffer) >= self.buffer_size
        time_elapsed = time.time() - self.last_upload_time > self.upload_interval
        return buffer_full or time_elapsed
    
    def flush_buffer(self):
        """Upload buffered records to S3"""
        if not self.buffer:
            return
            
        logger.info(f"Flushing buffer with {len(self.buffer)} records")
        
        if self.batch_upload_to_s3(self.buffer):
            self.buffer.clear()
            self.last_upload_time = time.time()
        else:
            # Fallback: try individual uploads
            logger.info("Batch upload failed, trying individual uploads...")
            successful_uploads = 0
            failed_records = []
            
            for earthquake in self.buffer:
                if self.upload_to_s3(earthquake):
                    successful_uploads += 1
                else:
                    failed_records.append(earthquake)
            
            logger.info(f"Individual uploads: {successful_uploads} successful, {len(failed_records)} failed")
            
            # Keep failed records for retry
            self.buffer = failed_records
            if successful_uploads > 0:
                self.last_upload_time = time.time()
    
    def process_message(self, message):
        """Process individual Kafka message"""
        try:
            earthquake_data = message.value
            
            # Validate data
            if not self.validate_earthquake_data(earthquake_data):
                logger.warning(f"Invalid earthquake data received: {message.key}")
                return False
            
            # Enrich data for storage
            enriched_data = self.enrich_for_storage(earthquake_data)
            
            # Add to buffer
            self.buffer.append(enriched_data)
            
            # Log high-significance earthquakes immediately
            if enriched_data.get('magnitude', 0) >= 5.0:
                logger.warning(f"SIGNIFICANT EARTHQUAKE: Magnitude {enriched_data['magnitude']} "
                             f"at {enriched_data['place']} - {enriched_data['event_id']}")
            
            logger.info(f"Processed earthquake: {enriched_data['event_id']} "
                       f"(Mag: {enriched_data.get('magnitude', 'N/A')}) - Buffer: {len(self.buffer)}")
            
            return True
            
        except Exception as e:
            logger.error(f"Error processing message: {e}")
            return False
    
    def run_consumer(self):
        """Main consumer loop"""
        logger.info(f"Starting earthquake consumer for topic: {self.topic_name}")
        logger.info(f"Storing data to S3 bucket: {self.s3_bucket}")
        
        try:
            while True:
                # Poll for messages
                message_batch = self.consumer.poll(timeout_ms=5000)
                
                if message_batch:
                    for topic_partition, messages in message_batch.items():
                        for message in messages:
                            self.process_message(message)
                
                # Check if buffer should be uploaded
                if self.should_upload_buffer():
                    self.flush_buffer()
                
                # Periodic status log
                if len(self.buffer) > 0:
                    logger.debug(f"Buffer status: {len(self.buffer)} records pending upload")
        
        except KeyboardInterrupt:
            logger.info("Received shutdown signal...")
        except Exception as e:
            logger.error(f"Unexpected error in consumer loop: {e}")
        finally:
            # Flush any remaining buffer
            if self.buffer:
                logger.info("Flushing final buffer before shutdown...")
                self.flush_buffer()
            
            self.consumer.close()
            logger.info("Consumer closed")

def main():
    # Configuration - Update these values
    KAFKA_BOOTSTRAP_SERVERS = ['13.204.84.130:9092']
    TOPIC_NAME = 'earthquake-events'
    S3_BUCKET = 'your-earthquake-data-bucket'  # Update this!
    CONSUMER_GROUP = 'earthquake-s3-processors'
    
    # Verify S3 bucket exists
    s3_client = boto3.client('s3')
    try:
        s3_client.head_bucket(Bucket=S3_BUCKET)
        logger.info(f"S3 bucket '{S3_BUCKET}' is accessible")
    except Exception as e:
        logger.error(f"Cannot access S3 bucket '{S3_BUCKET}': {e}")
        logger.error("Please create the bucket or update the bucket name in the script")
        return
    
    # Create and run consumer
    consumer = EarthquakeConsumer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        topic_name=TOPIC_NAME,
        s3_bucket=S3_BUCKET,
        consumer_group=CONSUMER_GROUP
    )
    
    consumer.run_consumer()

if _name_ == "_main_":
    main()

NameError: name 'logging' is not defined

In [1]:
import json
import boto3
from kafka import KafkaConsumer
from datetime import datetime

# ======= CONFIGURATION - UPDATE THESE =======
KAFKA_SERVERS = ['YOUR_EC2_IP:9092']  # Change this!
TOPIC_NAME = 'earthquake-events'      # Change this!  
S3_BUCKET = 'your-earthquake-bucket'  # Change this!

def upload_to_s3(s3_client, earthquake_data):
    """Upload earthquake to S3"""
    try:
        # Create S3 key (file path)
        event_time = datetime.fromtimestamp(earthquake_data['time'] / 1000)
        s3_key = f"earthquakes/{event_time.year}/{event_time.month:02d}/{event_time.day:02d}/{earthquake_data['id']}.json"
        
        # Upload to S3
        s3_client.put_object(
            Bucket=S3_BUCKET,
            Key=s3_key,
            Body=json.dumps(earthquake_data, indent=2),
            ContentType='application/json'
        )
        
        print(f"Uploaded: {earthquake_data['id']} to S3")
        return True
        
    except Exception as e:
        print(f"S3 upload failed: {e}")
        return False

def main():
    # Create Kafka consumer
    consumer = KafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=KAFKA_SERVERS,
        group_id='earthquake-consumer',
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='latest'
    )
    
    # Create S3 client
    s3_client = boto3.client('s3')
    
    print(f"Starting earthquake consumer...")
    print(f"Kafka: {KAFKA_SERVERS}")
    print(f"Topic: {TOPIC_NAME}")
    print(f"S3 Bucket: {S3_BUCKET}")
    
    try:
        for message in consumer:
            earthquake_data = message.value
            
            print(f"Received: {earthquake_data['id']} - Mag {earthquake_data['magnitude']}")
            
            # Upload to S3
            upload_to_s3(s3_client, earthquake_data)
            
    except KeyboardInterrupt:
        print("Stopping consumer...")
    finally:
        consumer.close()

if __name__ == "__main__":
    main()

NameError: name '_name_' is not defined

In [6]:
from kafka import KafkaConsumer
import json
from s3fs import S3FileSystem

# Configuration
KAFKA_SERVERS = ['YOUR_EC2_IP:9092']
TOPIC_NAME = 'earthquake-events'
S3_BUCKET = 'your-earthquake-bucket'

# Create consumer
consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=KAFKA_SERVERS,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Create S3 connection
s3 = S3FileSystem()

print("Starting simple consumer...")

# Simple loop - just like YouTube tutorial
for message in consumer:
    earthquake_data = message.value
    
    # Create filename with earthquake ID
    filename = f"s3://{S3_BUCKET}/earthquake_{earthquake_data['id']}.json"
    
    # Write directly to S3
    with s3.open(filename, 'w') as file:
        json.dump(earthquake_data, file)
    
    print(f"Saved earthquake {earthquake_data['id']} to S3")

NoBrokersAvailable: NoBrokersAvailable

In [5]:
pip install s3fs

Note: you may need to restart the kernel to use updated packages.


In [None]:
import s3fs
fs = s3fs.S3FileSystem(anon=False)
with fs.open('earthquake-bucket-muskan/test.json', 'w') as f:
    f.write('test')

In [None]:
from kafka import KafkaConsumer
import json
from s3fs import S3FileSystem
from json import dumps

# Simple configuration
consumer = KafkaConsumer(
    'demo_test234',
    bootstrap_servers=['13.204.64.146:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

s3 = S3FileSystem(anon=False)

print("Starting earthquake consumer...")

for message in consumer:
    earthquake_data = message.value
    
    # Create S3 filename
    filename = f"s3://earthquake-bucket-muskan/earthquake_{earthquake_data['id']}.json"
    
    # Save to S3
    with s3.open(filename, 'w') as file:
        json.dump(earthquake_data, file)
    
    print(f"Saved earthquake {earthquake_data['id']} to S3")

In [7]:
from kafka import KafkaConsumer
import json
from s3fs import S3FileSystem
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("earthquake-consumer")

def safe_deserializer(x):
    try:
        return json.loads(x.decode('utf-8')) if x else None
    except Exception as e:
        logger.error(f"Deserialization failed: {e} | Data: {x}")
        return None

# Kafka Consumer
consumer = KafkaConsumer(
    'demo_test27',
    bootstrap_servers=['13.126.75.47:9092'],
    value_deserializer=safe_deserializer,
    auto_offset_reset='earliest'
)

# S3 Setup
s3 = S3FileSystem(anon=False)
bucket = 'earthquake-bucket-muskan'

for message in consumer:
    if message.value:
        try:
            filename = f"{bucket}/quake_{message.offset}.json"
            with s3.open(filename, 'w') as f:
                f.write(json.dumps(message.value))
            logger.info(f"Saved to S3: {filename}")
        except Exception as e:
            logger.error(f"S3 upload failed: {e}")

INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.15, node_id=bootstrap-0 host=13.126.75.47:9092 <connecting> [IPv4 ('13.126.75.47', 9092)]>: connecting to 13.126.75.47:9092 [('13.126.75.47', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.15, node_id=bootstrap-0 host=13.126.75.47:9092 <checking_api_versions_recv> [IPv4 ('13.126.75.47', 9092)]>: Broker version identified as 2.6
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.15, node_id=bootstrap-0 host=13.126.75.47:9092 <connected> [IPv4 ('13.126.75.47', 9092)]>: Connection complete.
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('demo_test27',)
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='demo_test27', partition=0)]
INFO:kafka.conn:<BrokerConnection client_id=kafka-python-2.2.15, node_id=0 host=13.126.75.47:9092 <connecting> [IPv4 ('13.126.75.47', 9092)]>: connecting to 13.126.75.47:9092 [('13.126.75.47', 9092) IPv4

KeyboardInterrupt: 