In [52]:
import re
# !pip install mrjob
from collections import defaultdict
from tqdm import tqdm

## Data Cleaning - Parse the logs

#### Provide the log pattern as per the input data (logs)


In [49]:
log_pattern = r'(?P<ip>\d+\.\d+\.\d+\.\d+) - - \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<url>\S+) (?P<http_version>.*?)" (?P<status_code>\d+) (?P<response_size>\d+) "(?P<referrer>.*?)" "(?P<user_agent>.*?)"'

In [62]:
# Function to parse log entries
def parse_log_entry(log_entry):
    match = re.match(log_pattern, log_entry)
    if match:
        return match.groupdict()
    else:
        return None

# Function to read log file and parse log entries
def parse_log_file(file_path):
    parsed_logs = []
    with open(file_path, 'r') as file:
        for line in file:
            log_entry = line.strip()
            parsed_log_entry = parse_log_entry(log_entry)
            if parsed_log_entry:
                parsed_logs.append(parsed_log_entry)
    return parsed_logs

In [63]:
file_path = '../Downloads/log_file.log'
parsed_logs = parse_log_file(file_path)
#     for log in parsed_logs:
#         print(log)

if __name__ == "__main__":
    main()

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [64]:
parsed_logs[0]

{'ip': '87.116.74.253',
 'timestamp': '28/Feb/2022:14:04:44 +0200',
 'method': 'POST',
 'url': '/wp-admin/admin-ajax.php',
 'http_version': 'HTTP/2.0',
 'status_code': '200',
 'response_size': '47',
 'referrer': 'https://nargile.bg/wp-admin/admin.php?page=wc-settings',
 'user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.109 Safari/537.36'}

In [36]:
type(parsed_logs)

list

## MapReduce Algorithm

### Hadoop MapReduce logic  identify patterns and trends. MapReduce jobs  count the occurrence of specific log messages, identify common error patterns, or detect unusual access patterns.

In [50]:
def map_function(log):
    # Extract relevant information from the log
    url = log['url']
    status_code = log['status_code']
    
    # Emit key-value pairs (pattern, 1)
    if status_code != '200':
        yield ('error', 1)
    
    if '/wp-content/' in url:
        yield ('wp_content', 1)

In [51]:
def reduce_function(pairs):
    # Aggregate counts for each pattern
    pattern_counts = defaultdict(int)
    for pattern, count in pairs:
        pattern_counts[pattern] += count
    return pattern_counts.items()

In [60]:
# parsed logs
logs = parsed_logs

# Map phase
mapped_pairs = [result for log in logs for result in map_function(log)]

# Reduce phase
reduced_output = reduce_function(mapped_pairs)

## Output Response from MapReduce

In [61]:
# Output the result
for pattern, count in tqdm(reduced_output):
    print(f'{pattern}: {count}')

100%|████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<?, ?it/s]

wp_content: 1767145
error: 63234





In [57]:
mapped_pairs[0:10]

[('wp_content', 1),
 ('wp_content', 1),
 ('wp_content', 1),
 ('wp_content', 1),
 ('wp_content', 1),
 ('wp_content', 1),
 ('error', 1),
 ('wp_content', 1),
 ('wp_content', 1),
 ('wp_content', 1)]

In [59]:
reduced_output

dict_items([('wp_content', 1767145), ('error', 63234)])

In [65]:
def map_function_anomaly(log):
    # Extract relevant information from the log
    timestamp = log['timestamp']
    status_code = log['status_code']
    response_size = int(log['response_size'])
    
    # Extract hour from timestamp
    hour = timestamp.split(':')[1]
    
    # Emit key-value pairs (hour, (1, response_size, status_code))
    yield (hour, (1, response_size, status_code))

In [66]:
def reduce_function_anomaly(pairs):
    # Aggregate counts, total response size, and error counts for each hour
    hour_metrics = defaultdict(lambda: (0, 0, 0))  # (count, total_response_size, error_count)
    for hour, metrics in pairs:
        count, response_size, status_code = metrics
        hour_metrics[hour] = (hour_metrics[hour][0] + count,
                              hour_metrics[hour][1] + response_size,
                              hour_metrics[hour][2] + (1 if status_code != '200' else 0))
    return hour_metrics.items()

In [73]:
# parsed logs
logs = parsed_logs

In [74]:
# Map phase for anomaly detection
mapped_pairs_anomaly = [result for log in logs for result in map_function_anomaly(log)]

In [76]:
mapped_pairs_anomaly[0:5]

[('14', (1, 47, '200')),
 ('14', (1, 47, '200')),
 ('14', (1, 262929, '200')),
 ('14', (1, 992, '200')),
 ('14', (1, 308, '200'))]

In [77]:
# Reduce phase for anomaly detection
reduced_output_anomaly = reduce_function_anomaly(mapped_pairs_anomaly)

In [79]:
reduced_output_anomaly

dict_items([('14', (137344, 6213820968, 4090)), ('15', (175472, 8095543416, 5157)), ('16', (171536, 7450838319, 4368)), ('17', (171171, 7501961016, 3972)), ('18', (137404, 5907974861, 3483)), ('19', (143527, 5705280593, 3803)), ('20', (135327, 5372765135, 3842)), ('21', (133572, 5263420549, 3298)), ('22', (130329, 5251491647, 3173)), ('23', (107173, 4299619857, 2485)), ('00', (65452, 2963020097, 2069)), ('01', (52358, 2570766753, 1679)), ('02', (34662, 1844177913, 991)), ('03', (20482, 1401168315, 923)), ('04', (10338, 629913836, 488)), ('05', (14208, 1292916672, 2928)), ('06', (10292, 570377406, 354)), ('07', (18467, 985466662, 728)), ('08', (42711, 1730509776, 1112)), ('09', (62675, 2619602323, 1808)), ('10', (66239, 2780139074, 1957)), ('11', (89203, 4449049527, 3285)), ('12', (129358, 6011936639, 4111)), ('13', (103919, 4933080283, 3130))])

#### The output indicates that for the hour , the average response size and the error rate giving insights into the behavior of your system over time. This information can be valuable for identifying anomalies or trends in the log data.

#### Hour: The hour of the day for which the metrics are calculated.
#### Average Response Size: The average size of responses received during that hour.
#### Error Rate: The percentage of requests that resulted in an error during that hour.

In [80]:
# Output the result
for hour, metrics in reduced_output_anomaly:
    count, total_response_size, error_count = metrics
    average_response_size = total_response_size / count if count > 0 else 0
    error_rate = (error_count / count) * 100 if count > 0 else 0
    print(f'Hour: {hour}, Average Response Size: {average_response_size}, Error Rate: {error_rate}%')

Hour: 14, Average Response Size: 45242.755184063375, Error Rate: 2.9779240447343898%
Hour: 15, Average Response Size: 46135.813212364366, Error Rate: 2.938930427646576%
Hour: 16, Average Response Size: 43436.00363189068, Error Rate: 2.5464042533345768%
Hour: 17, Average Response Size: 43827.28976286871, Error Rate: 2.320486531012847%
Hour: 18, Average Response Size: 42997.109698407614, Error Rate: 2.5348607027451893%
Hour: 19, Average Response Size: 39750.57371086973, Error Rate: 2.649675670779714%
Hour: 20, Average Response Size: 39702.09296740488, Error Rate: 2.8390491180621753%
Hour: 21, Average Response Size: 39405.11895457132, Error Rate: 2.4690803461803372%
Hour: 22, Average Response Size: 40294.114487182436, Error Rate: 2.4346078002593434%
Hour: 23, Average Response Size: 40118.49866104336, Error Rate: 2.318681011075551%
Hour: 00, Average Response Size: 45270.1230978427, Error Rate: 3.1610951537004217%
Hour: 01, Average Response Size: 49099.78901027541, Error Rate: 3.20676878413

### Define thresholds for error rate and response size


In [87]:
# Define thresholds
error_rate_threshold = 5  # 5% error rate threshold
response_size_lower_threshold = 20000  # Lower response size threshold
response_size_upper_threshold = 60000  # Upper response size threshold

# Output the result with anomaly detection
for hour, metrics in reduced_output_anomaly:
    count, total_response_size, error_count = metrics
    average_response_size = total_response_size / count if count > 0 else 0
    error_rate = (error_count / count) * 100 if count > 0 else 0
    
    # Check for anomalies
    anomaly_detected = False
    if error_rate > error_rate_threshold:
        anomaly_detected = True
        anomaly_type = 'High Error Rate'
    elif average_response_size < response_size_lower_threshold:
        anomaly_detected = True
        anomaly_type = 'Low Average Response Size'
    elif average_response_size > response_size_upper_threshold:
        anomaly_detected = True
        anomaly_type = 'High Average Response Size'
    
    # Print the result
    if anomaly_detected:
        print(f'Hour: {hour}, Average Response Size: {average_response_size}, Error Rate: {error_rate}%, Anomaly Detected: {anomaly_type}')

Hour: 03, Average Response Size: 68409.74099209062, Error Rate: 4.506395859779318%, Anomaly Detected: High Average Response Size
Hour: 04, Average Response Size: 60931.885857999616, Error Rate: 4.720448829560843%, Anomaly Detected: High Average Response Size
Hour: 05, Average Response Size: 90999.2027027027, Error Rate: 20.60810810810811%, Anomaly Detected: High Error Rate


#### Hour 3 and Hour 4 are flagged as anomalies due to their high average response sizes.
Hour 5 is flagged as an anomaly due to its high error rate.
This indicates that during these hours, there were significant deviations from normal behavior, either in terms of response size or error rate, which may require further investigation or action. This approach helps in proactively identifying potential issues or unusual patterns in the log data.

## To implement real-time monitoring in conjunction with MapReduce for near-real-time analysis of log data, you can integrate technologies like Apache Kafka for streaming log data and Apache Storm for real-time processing. Here's how you can extend the existing implementation to include real-time monitoring:


In [None]:
# !pip install kafka-python
from kafka import KafkaConsumer
from kafka import KafkaProducer
import json

In [None]:
# Set up Kafka producer to send parsed log messages
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Produce parsed log messages to the 'log-analytics' Kafka topic
for log_data in parsed_logs:
    # Convert log data to JSON format
    message = json.dumps(log_data).encode('utf-8')
    
    # Send log message to Kafka
    producer.send('log-analytics', value=message)

# Flush the producer to ensure all messages are sent
producer.flush()

In [91]:
from kafka import KafkaConsumer
from collections import defaultdict
from storm import BasicBolt, Storm

# Set up Kafka consumer to ingest streaming log data
consumer = KafkaConsumer('log-analytics', group_id='log-group', bootstrap_servers=['localhost:9092'])

# Define thresholds
error_rate_threshold = 5  # 5% error rate threshold
response_size_lower_threshold = 20000  # Lower response size threshold
response_size_upper_threshold = 60000  # Upper response size threshold

In [92]:
# Define a function to determine if log data represents an anomaly
def is_anomaly(log_data):
    error_rate = (log_data['error_count'] / log_data['request_count']) * 100 if log_data['request_count'] > 0 else 0
    average_response_size = log_data['total_response_size'] / log_data['request_count'] if log_data['request_count'] > 0 else 0
    
    if error_rate > error_rate_threshold or average_response_size < response_size_lower_threshold or average_response_size > response_size_upper_threshold:
        return True
    return False

In [93]:
# Define a function to emit an alert for the detected anomaly
def emit_alert(log_data):
    print("Anomaly detected!")
    print("Log data:", log_data)

In [None]:
# Define a Storm bolt to perform real-time processing
class LogProcessingBolt(BasicBolt):
    def process(self, tup):
        log_data = tup.values[0]
        
        # Perform real-time processing and anomaly detection logic here
        # Update metrics
         hour_metrics[log_data['hour']] = (hour_metrics[log_data['hour']][0] + 1,
                                      hour_metrics[log_data['hour']][1] + int(log_data['response_size']),
                                      hour_metrics[log_data['hour']][2] + (1 if log_data['status_code'] != '200' else 0))
        
        # If an anomaly is detected, emit an alert
        if is_anomaly(log_data):
            emit_alert(log_data)

In [None]:
# Set up Apache Storm topology
storm = Storm()
storm.add_bolt(LogProcessingBolt)

# Continuously consume streaming log data
for message in consumer:
    log_data = message.value  # Assuming log data is in JSON format
    
    # Emit log data to Storm for processing
    storm.emit({'log_data': log_data})

In [None]:
# **************************************************************************************************************************