In [30]:
from pprint import pprint
import re
import requests
import json
import hashlib

In [None]:
# STEP 1
# Fetch logs from Apache API with retry logic
api_url = "https://apache-api.onrender.com/logs"
logs = []

for attempt in range(3):
    response = requests.get(api_url)
    
    if response.status_code == 200:
        print("Successfully fetched logs from API")
        json_data = json.loads(response.text)
        logs = json_data["raw_logs"]
        break
    else:
        print(f"Failed to fetch logs from API. Status code: {response.status_code}")
        if attempt < 2:
            print("Retrying...")
        else:
            print("All attempts to fetch logs from API failed")
            
print(f"Step 1 complete: {len(logs)} logs fetched")


Successfully fetched logs from API
Step 1 complete: 100 logs fetched


##### Transformation
- Parse fields using a regex-based parser (parser.py).
- Deduplicate records based on a hash of IP + timestamp + request

In [32]:
# Step 2
# Parse Apache log lines into structured data(converts raw text logs into organized data that is easier to work with)
log_pattern = re.compile(
    r'(?P<ip>\S+) - - \[(?P<timestamp>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d{3}) (?P<size>\d+|-)'
)
parsed_logs = []
seen_hashes = set()  
duplicates_count = 0

for line in logs:
    match = log_pattern.match(line)
    if match:
        log_entry = match.groupdict()

       # Checks for size field in '-', before converting to integer
        if log_entry['size'] == '-':
            log_entry['size'] = 0
        else:
            log_entry['size'] = int(log_entry['size']) 
            
        # Convert status to integers as regex captures everything as strings
        log_entry['status'] = int(log_entry['status']) 

        # Deduplication: Create hash based on IP + timestamp + request
        hash_input = f"{log_entry['ip']}{log_entry['timestamp']}{log_entry['method']}{log_entry['url']}"
        log_hash = hashlib.md5(hash_input.encode()).hexdigest()
            
        # Check for duplicates
        if log_hash in seen_hashes:
            duplicates_count += 1
            continue  

        # Add hash to seen set and store in log entry
        seen_hashes.add(log_hash)
        log_entry['log_hash'] = log_hash
        
        parsed_logs.append(log_entry)

print(f"Parsed {len(parsed_logs)} logs")
print(f"Duplicates removed: {duplicates_count}")

Parsed 100 logs
Duplicates removed: 0


##### Classify
- Success vs. error (status codes)
- Classify logs with 200 status codes as Success 400, 500, etc as error.


In [34]:
 # Classifying the parsed logs as either sucess or error
classified_logs = []

for log in parsed_logs:
    status_code = log['status']

    if 200 <= status_code < 400:
        log['classification'] = 'Success'
    elif 400 <= status_code < 600:
        log['classification'] = 'Error'
    else:
        log['classification'] = 'unknown'
    
    classified_logs.append(log)


# Count success vs errors
success_count = sum(1 for log in classified_logs if log['classification'] == 'Success')
error_count = sum(1 for log in classified_logs if log['classification'] == 'Error')
unknown_count = sum(1 for log in classified_logs if log['classification'] == 'unknown')
print(f"Success: {success_count}, Error: {error_count}, Unknown: {unknown_count}")


Success: 99, Error: 1, Unknown: 0


##### step 4 - Loading

Insert clean records into a local Postgres database (logs.db) using
safe transactions (database.py).

In [37]:
import sqlite3

# creating a database to store logs
def create_database():

    # connecting to SQLite database 
    conn = sqlite3.connect('apache_logs.db')
    cursor = conn.cursor()
    
    # Creating logs table to store logs
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS logs (
            id INTEGER PRIMARY KEY,
            ip TEXT,
            timestamp TEXT,
            method TEXT,
            url TEXT,
            status INTEGER,
            size INTEGER,
            classification TEXT,
            log_hash TEXT UNIQUE
        )
    ''')

    conn.commit()
    conn.close()
    print("Database created")

    
    # Create index for faster queries
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_status ON logs(status)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_classification ON logs(classification)')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_ip ON logs(ip)')
    
    conn.commit()
    conn.close()
    print("Database and table created successfully")

def save_classified_logs_to_database(classified_logs):
    
    if not classified_logs:
        print("No logs to save")
        return 0
    
    conn = sqlite3.connect('apache_logs.db')
    cursor = conn.cursor()
    
    saved_count = 0
    duplicate_count = 0
    error_count = 0
    
    for log in classified_logs:
        try:
            cursor.execute('''
                INSERT INTO logs (ip, timestamp, method, url, protocol, 
                                status, size, classification, log_hash)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                log['ip'],
                log['timestamp'], 
                log['method'],
                log['url'],
                log['protocol'],
                log['status'],
                log['size'],
                log['classification'],
                log['log_hash']
            ))
            saved_count += 1
            
        except sqlite3.IntegrityError:

            pass
            

    
    conn.commit()
    conn.close()
    
    print(f"Database save complete:")
    print(f"  Saved: {saved_count} logs")
    print(f"  Duplicates skipped: {duplicate_count}")
    print(f"  Errors: {error_count}")
    
    return saved_count



# creating a summary report of the database
def get_database_summary():
    
    conn = sqlite3.connect('apache_logs.db')
    cursor = conn.cursor()
    
    # Count total logs
    cursor.execute("SELECT COUNT(*) FROM logs")
    total = cursor.fetchone()[0]
    
    # Count success logs
    cursor.execute("SELECT COUNT(*) FROM logs WHERE classification = 'Success'")
    success = cursor.fetchone()[0]
    
    # Count error logs
    cursor.execute("SELECT COUNT(*) FROM logs WHERE classification = 'Error'")
    error = cursor.fetchone()[0]
    
    conn.close()


    print(f"Database Summary:")
    print(f"  Total logs: {total}")
    print(f"  Success: {success}")
    print(f"  Error: {error}")
    
    return {'total': total, 'success': success, 'error': error}
        
    

# For testing this file directly
if __name__ == "__main__":
    print("Testing database operations...")
    
    

Testing database operations...


##### Step 5 - Summarization
- Top IP addresses by request count
- Error code summaries (4xx, 5xx)
- Hourly traffic pattern

In [41]:

def generate_daily_summary():
    
    # Connect to database
    conn = sqlite3.connect('apache_logs.db')
    cursor = conn.cursor()
    
    # Get overall statistics
    cursor.execute("SELECT COUNT(*) FROM logs")
    total_logs = cursor.fetchone()[0]
    
    cursor.execute("SELECT COUNT(*) FROM logs WHERE classification = 'Success'")
    success_count = cursor.fetchone()[0]
    
    cursor.execute("SELECT COUNT(*) FROM logs WHERE classification = 'Error'")
    error_count = cursor.fetchone()[0]
    
    # Get top IP addresses by request count
    cursor.execute("""
        SELECT ip, COUNT(*) as count 
        FROM logs 
        GROUP BY ip 
        ORDER BY count DESC 
        LIMIT 10
    """)
    top_ips = cursor.fetchall()

In [43]:
# Get error code summaries (4xx, 5xx)
def generate_daily_summary():
    
    # Connect to database
    conn = sqlite3.connect('apache_logs.db')
    cursor = conn.cursor()
    
    cursor.execute("""
        SELECT status, COUNT(*) as count 
        FROM logs 
        WHERE classification = 'Error'
        GROUP BY status 
        ORDER BY count DESC
    """)
    error_codes = cursor.fetchall()

In [44]:
# Get most requested URLs

def generate_daily_summary():
    
    # Connect to database
    conn = sqlite3.connect('apache_logs.db')
    cursor = conn.cursor()

    cursor.execute("""
        SELECT url, COUNT(*) as count 
        FROM logs 
        GROUP BY url 
        ORDER BY count DESC 
        LIMIT 10
    """)
    top_urls = cursor.fetchall()

In [45]:
# Get traffic by hour 
def generate_daily_summary():
    
    # Connect to database
    conn = sqlite3.connect('apache_logs.db')
    cursor = conn.cursor()
    
    cursor.execute("""
        SELECT substr(timestamp, 13, 2) as hour, COUNT(*) as count
        FROM logs 
        GROUP BY hour 
        ORDER BY hour
    """)
    hourly_traffic = cursor.fetchall()
    
    conn.close()

In [50]:
 
    # Create summary report
def generate_daily_summary():

    summary = {
        'report_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'total_logs': total_logs,
        'success_count': success_count,
        'error_count': error_count,
        'success_percentage': round((success_count / total_logs * 100), 2) if total_logs > 0 else 0,
        'top_ips': top_ips,
        'error_codes': error_codes,
        'top_urls': top_urls,
        'hourly_traffic': hourly_traffic
    }
    
    return summary

In [51]:
def save_json_report(summary):
    """
    Save summary as JSON report
    """
    filename = f"daily_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    
    with open(filename, 'w') as f:
        json.dump(summary, f, indent=2)
    
    print(f"JSON report saved: {filename}")
    return filename


In [52]:
def save_csv_report(summary):
    """
    Save summary as CSV report
    """
    filename = f"daily_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    
    with open(filename, 'w', newline='') as f:
        writer = csv.writer(f)