In [1]:
import pandas as pd
log_path = "../data/raw/HDFS_2k.log"

with open(log_path, "r") as f:
    lines = f.readlines()

len(lines)

2000

In [2]:
lines[10]

'081109 204722 567 INFO dfs.DataNode$PacketResponder: Received block blk_5402003568334525940 of size 67108864 from /10.251.214.112\n'

### Regex for HDFS Logs

In [3]:
import re

log_pattern = re.compile(
    r'(?P<date>\d{6})\s+'
    r'(?P<time>\d{6})\s+'
    r'(?P<ms>\d+)\s+'
    r'(?P<level>\w+)\s+'
    r'(?P<component>[^:]+):\s+'
    r'(?P<message>.*)'
)

### Parse Again (Slowly)

In [4]:
parsed_logs = []

for line in lines:
    match = log_pattern.match(line)
    if match:
        parsed_logs.append(match.groupdict())

len(parsed_logs)

2000

In [5]:
import pandas as pd

df = pd.DataFrame(parsed_logs)
df.head()

Unnamed: 0,date,time,ms,level,component,message
0,81109,203615,148,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_38865049064139...
1,81109,203807,222,INFO,dfs.DataNode$PacketResponder,PacketResponder 0 for block blk_-6952295868487...
2,81109,204005,35,INFO,dfs.FSNamesystem,BLOCK* NameSystem.addStoredBlock: blockMap upd...
3,81109,204015,308,INFO,dfs.DataNode$PacketResponder,PacketResponder 2 for block blk_82291938032499...
4,81109,204106,329,INFO,dfs.DataNode$PacketResponder,PacketResponder 2 for block blk_-6670958622368...


### Build a Proper Timestamp (One Last Fix)

In [6]:
df["timestamp"] = pd.to_datetime(
    df["date"] + df["time"],
    format="%y%m%d%H%M%S"
)

df.dtypes

date                 object
time                 object
ms                   object
level                object
component            object
message              object
timestamp    datetime64[ns]
dtype: object

### Observations

Real-world event data rarely matches initial assumptions.
Inspecting raw data before parsing is critical.
Event schemas must be derived, not assumed.
Once parsed, system logs behave like event-driven analytics data.
Foundations from earlier days made this dataset manageable.

### Sort Events Properly

In [7]:
df = df.sort_values(by=["component", "timestamp"])
df.head()

Unnamed: 0,date,time,ms,level,component,message,timestamp
28,81109,205931,13,INFO,dfs.DataBlockScanner,Verification succeeded for blk_-49809165198942...,2008-11-09 20:59:31
69,81109,213436,13,INFO,dfs.DataBlockScanner,Verification succeeded for blk_-28277162389727...,2008-11-09 21:34:36
175,81110,2337,13,INFO,dfs.DataBlockScanner,Verification succeeded for blk_-15479543530655...,2008-11-10 00:23:37
196,81110,11237,13,INFO,dfs.DataBlockScanner,Verification succeeded for blk_699619438987858...,2008-11-10 01:12:37
345,81110,83453,13,INFO,dfs.DataBlockScanner,Verification succeeded for blk_314136351752080...,2008-11-10 08:34:53


### Calculate Time Gaps Between Events

In [8]:
df["prev_timestamp"] = df.groupby("component")["timestamp"].shift(1)
df["gap_seconds"] = (df["timestamp"] - df["prev_timestamp"]).dt.total_seconds()

In [9]:
df[["component", "timestamp", "prev_timestamp", "gap_seconds"]].head(10)

Unnamed: 0,component,timestamp,prev_timestamp,gap_seconds
28,dfs.DataBlockScanner,2008-11-09 20:59:31,NaT,
69,dfs.DataBlockScanner,2008-11-09 21:34:36,2008-11-09 20:59:31,2105.0
175,dfs.DataBlockScanner,2008-11-10 00:23:37,2008-11-09 21:34:36,10141.0
196,dfs.DataBlockScanner,2008-11-10 01:12:37,2008-11-10 00:23:37,2940.0
345,dfs.DataBlockScanner,2008-11-10 08:34:53,2008-11-10 01:12:37,26536.0
346,dfs.DataBlockScanner,2008-11-10 08:50:42,2008-11-10 08:34:53,949.0
347,dfs.DataBlockScanner,2008-11-10 08:59:33,2008-11-10 08:50:42,531.0
357,dfs.DataBlockScanner,2008-11-10 09:36:43,2008-11-10 08:59:33,2230.0
568,dfs.DataBlockScanner,2008-11-10 11:21:55,2008-11-10 09:36:43,6312.0
645,dfs.DataBlockScanner,2008-11-10 12:17:58,2008-11-10 11:21:55,3363.0


### Define Session Breaks

In [10]:
SESSION_GAP = 300

df["new_session"] = (df["gap_seconds"].isna()) | (df["gap_seconds"] > SESSION_GAP)

### Assign Session IDs

In [11]:
df["session_id"] = df.groupby("component")["new_session"].cumsum()

In [12]:
df[["component", "timestamp", "gap_seconds", "session_id"]].head(15)

Unnamed: 0,component,timestamp,gap_seconds,session_id
28,dfs.DataBlockScanner,2008-11-09 20:59:31,,1
69,dfs.DataBlockScanner,2008-11-09 21:34:36,2105.0,2
175,dfs.DataBlockScanner,2008-11-10 00:23:37,10141.0,3
196,dfs.DataBlockScanner,2008-11-10 01:12:37,2940.0,4
345,dfs.DataBlockScanner,2008-11-10 08:34:53,26536.0,5
346,dfs.DataBlockScanner,2008-11-10 08:50:42,949.0,6
347,dfs.DataBlockScanner,2008-11-10 08:59:33,531.0,7
357,dfs.DataBlockScanner,2008-11-10 09:36:43,2230.0,8
568,dfs.DataBlockScanner,2008-11-10 11:21:55,6312.0,9
645,dfs.DataBlockScanner,2008-11-10 12:17:58,3363.0,10


### Session-Level Metrics

In [13]:
session_summary = (
    df.groupby(["component", "session_id"])
      .agg(
          session_start=("timestamp", "min"),
          session_end=("timestamp", "max"),
          event_count=("message", "count")
      )
      .reset_index()
)

session_summary["session_duration_sec"] = (
    session_summary["session_end"] - session_summary["session_start"]
).dt.total_seconds()

session_summary.head()

Unnamed: 0,component,session_id,session_start,session_end,event_count,session_duration_sec
0,dfs.DataBlockScanner,1,2008-11-09 20:59:31,2008-11-09 20:59:31,1,0.0
1,dfs.DataBlockScanner,2,2008-11-09 21:34:36,2008-11-09 21:34:36,1,0.0
2,dfs.DataBlockScanner,3,2008-11-10 00:23:37,2008-11-10 00:23:37,1,0.0
3,dfs.DataBlockScanner,4,2008-11-10 01:12:37,2008-11-10 01:12:37,1,0.0
4,dfs.DataBlockScanner,5,2008-11-10 08:34:53,2008-11-10 08:34:53,1,0.0


### Event Sequences (Mini Funnel)

In [14]:
df["event_order"] = df.groupby(["component", "session_id"]).cumcount() + 1

In [15]:
df[
    (df["component"] == df["component"].iloc[0]) &
    (df["session_id"] == 1)
][["event_order", "level", "message"]]

Unnamed: 0,event_order,level,message
28,1,INFO,Verification succeeded for blk_-49809165198942...


## Day 7 – Key Takeaways

Sessions convert raw events into meaningful behavior

Time gaps define behavioral boundaries

Components behave like users in product analytics

Sequence analysis reveals system workflows

Interpretation matters more than code

In SaaS:
Funnel drop-off = user abandonment

In logs:
Failure = ERROR level events

Let’s inspect log levels first:

In [16]:
df["level"].value_counts()

INFO    1920
WARN      80
Name: level, dtype: int64

### Error Rate per Component

In [17]:
error_summary = (
    df.groupby("component")
      .agg(
          total_events=("message", "count"),
          error_events=("level", lambda x: (x == "ERROR").sum())
      )
      .reset_index()
)

error_summary["error_rate"] = (
    error_summary["error_events"] / error_summary["total_events"]
)

error_summary.sort_values("error_rate", ascending=False)

Unnamed: 0,component,total_events,error_events,error_rate
0,dfs.DataBlockScanner,20,0,0.0
1,dfs.DataNode,1,0,0.0
2,dfs.DataNode$DataXceiver,454,0,0.0
3,dfs.DataNode$PacketResponder,603,0,0.0
4,dfs.FSDataset,263,0,0.0
5,dfs.FSNamesystem,659,0,0.0


### Error Funnel (Session-Level)

In [18]:
session_errors = (
    df.groupby(["component", "session_id"])
      .agg(
          total_events=("message", "count"),
          has_error=("level", lambda x: (x == "ERROR").any())
      )
      .reset_index()
)

session_errors["has_error"].value_counts()

False    245
Name: has_error, dtype: int64

In [19]:
error_session_rate = session_errors["has_error"].mean()
error_session_rate

0.0

### Do Errors End Sessions?

In [20]:
session_summary = session_summary.merge(
    session_errors[["component", "session_id", "has_error"]],
    on=["component", "session_id"],
    how="left"
)

session_summary.groupby("has_error")["session_duration_sec"].mean()

has_error
False    515.261224
Name: session_duration_sec, dtype: float64

### Sequence Before Error

In [21]:
error_events = df[df["level"] == "ERROR"]

error_events[["component", "timestamp", "message"]].head()

Unnamed: 0,component,timestamp,message


#### Now look 3 events before each error (example approach):

In [22]:
df["is_error"] = df["level"] == "ERROR"

df["prev_event"] = df.groupby(["component", "session_id"])["message"].shift(1)
df["prev2_event"] = df.groupby(["component", "session_id"])["message"].shift(2)

df[df["is_error"]][
    ["component", "prev2_event", "prev_event", "message"]
].head()

Unnamed: 0,component,prev2_event,prev_event,message


### Which Components Produce WARN?

In [23]:
warn_summary = (
    df.groupby("component")
      .agg(
          total_events=("message", "count"),
          warn_events=("level", lambda x: (x == "WARN").sum())
      )
      .reset_index()
)

warn_summary["warn_rate"] = (
    warn_summary["warn_events"] / warn_summary["total_events"]
)

warn_summary.sort_values("warn_rate", ascending=False)

Unnamed: 0,component,total_events,warn_events,warn_rate
2,dfs.DataNode$DataXceiver,454,80,0.176211
0,dfs.DataBlockScanner,20,0,0.0
1,dfs.DataNode,1,0,0.0
3,dfs.DataNode$PacketResponder,603,0,0.0
4,dfs.FSDataset,263,0,0.0
5,dfs.FSNamesystem,659,0,0.0


### Which Components Produce WARN?

In [24]:
session_warns = (
    df.groupby(["component", "session_id"])
      .agg(has_warn=("level", lambda x: (x == "WARN").any()))
      .reset_index()
)

session_warn_rate = session_warns["has_warn"].mean()
session_warn_rate

0.1346938775510204

### Do WARN Sessions Last Longer?

In [25]:
session_summary = session_summary.merge(
    session_warns,
    on=["component", "session_id"],
    how="left"
)

session_summary.groupby("has_warn")["session_duration_sec"].mean()

has_warn
False    548.962264
True     298.757576
Name: session_duration_sec, dtype: float64

### Plot WARN Events Over Time

In [26]:
warn_df = df[df["level"] == "WARN"].copy()
warn_df = warn_df.sort_values("timestamp")

In [27]:
warn_df["timestamp"].min(), warn_df["timestamp"].max()

(Timestamp('2008-11-09 21:40:43'), Timestamp('2008-11-11 01:44:31'))

### Are WARNs Clustered?

#### Calculate time gaps between WARNs:

In [28]:
warn_df["time_diff_sec"] = warn_df["timestamp"].diff().dt.total_seconds()
warn_df["time_diff_sec"].describe()

count       79.000000
mean      1278.835443
std       2914.977923
min          1.000000
25%        109.000000
50%        264.000000
75%        722.000000
max      13320.000000
Name: time_diff_sec, dtype: float64

In [29]:
(warn_df["time_diff_sec"] < 10).sum()

4

### Bucket Events by Minute

In [30]:
df["minute_bucket"] = df["timestamp"].dt.floor("T")

warn_per_minute = (
    df[df["level"] == "WARN"]
      .groupby("minute_bucket")
      .size()
)

warn_per_minute.sort_values(ascending=False).head(10)

minute_bucket
2008-11-10 08:27:00    3
2008-11-09 22:20:00    2
2008-11-10 23:19:00    2
2008-11-10 12:59:00    2
2008-11-09 22:11:00    2
2008-11-10 08:10:00    2
2008-11-10 13:10:00    1
2008-11-10 13:36:00    1
2008-11-10 13:25:00    1
2008-11-10 13:19:00    1
dtype: int64

### Rolling Instability Rate

In [31]:
df = df.sort_values("timestamp")

In [32]:
df["is_warn"] = df["level"] == "WARN"

df["rolling_warn_rate"] = (
    df["is_warn"]
      .rolling(window=50)
      .mean()
)

In [33]:
df.sort_values("rolling_warn_rate", ascending=False).head(10)

Unnamed: 0,date,time,ms,level,component,message,timestamp,prev_timestamp,gap_seconds,new_session,session_id,event_order,is_error,prev_event,prev2_event,minute_bucket,is_warn,rolling_warn_rate
125,81109,234516,4136,INFO,dfs.DataNode$DataXceiver,Receiving block blk_7654726977769174867 src: /...,2008-11-09 23:45:16,2008-11-09 23:41:12,244.0,False,11,8,False,Receiving block blk_-8562334328670278932 src: ...,Receiving block blk_-1259190292740306590 src: ...,2008-11-09 23:45:00,False,0.42
126,81109,234555,4345,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_83200881097010...,2008-11-09 23:45:55,2008-11-09 23:45:12,43.0,False,3,5,False,PacketResponder 0 for block blk_-7640601843607...,PacketResponder 1 for block blk_-8108913101671...,2008-11-09 23:45:00,False,0.42
124,81109,234512,4223,INFO,dfs.DataNode$PacketResponder,PacketResponder 0 for block blk_-7640601843607...,2008-11-09 23:45:12,2008-11-09 23:43:25,107.0,False,3,4,False,PacketResponder 1 for block blk_-8108913101671...,PacketResponder 1 for block blk_20288038501793...,2008-11-09 23:45:00,False,0.42
123,81109,234325,4246,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-8108913101671...,2008-11-09 23:43:25,2008-11-09 23:41:11,134.0,False,3,3,False,PacketResponder 1 for block blk_20288038501793...,Received block blk_-3455521536483250153 of siz...,2008-11-09 23:43:00,False,0.42
122,81109,234112,4136,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-8562334328670278932 src: ...,2008-11-09 23:41:12,2008-11-09 23:41:07,5.0,False,11,7,False,Receiving block blk_-1259190292740306590 src: ...,Receiving block blk_-8788032532930439012 src: ...,2008-11-09 23:41:00,False,0.42
121,81109,234111,4288,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_20288038501793...,2008-11-09 23:41:11,2008-11-09 23:38:49,142.0,False,3,2,False,Received block blk_-3455521536483250153 of siz...,,2008-11-09 23:41:00,False,0.42
120,81109,234110,26,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /user/root/so...,2008-11-09 23:41:10,2008-11-09 23:40:27,43.0,False,6,2,False,BLOCK* NameSystem.addStoredBlock: blockMap upd...,,2008-11-09 23:41:00,False,0.42
119,81109,234107,4261,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1259190292740306590 src: ...,2008-11-09 23:41:07,2008-11-09 23:38:36,151.0,False,11,6,False,Receiving block blk_-8788032532930439012 src: ...,Receiving block blk_699651392443620263 src: /1...,2008-11-09 23:41:00,False,0.42
118,81109,234027,26,INFO,dfs.FSNamesystem,BLOCK* NameSystem.addStoredBlock: blockMap upd...,2008-11-09 23:40:27,2008-11-09 23:28:33,714.0,True,6,1,False,,,2008-11-09 23:40:00,False,0.42
117,81109,233849,4087,INFO,dfs.DataNode$PacketResponder,Received block blk_-3455521536483250153 of siz...,2008-11-09 23:38:49,2008-11-09 23:31:08,461.0,True,3,1,False,,,2008-11-09 23:38:00,False,0.42
