In [154]:
import re

from collections import deque, Counter
from itertools import islice

from pathlib import Path

In [155]:
regex = re.compile(r'openat\(AT_FDCWD, "(/[^"]+)",')

# stream read file
def read_path_from_file(path):
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            match = regex.search(line)
            if match:
                yield Path(match.group(1)).as_posix()


def read_path_from_stdin():
    for line in sys.stdin:
        match = regex.search(line)
        if match:
            yield Path(match.group(1)).as_posix()

In [156]:
class MarkovChain:
    def __init__(self, order=1):
        self.order = order
        self.transitions = {}
        self.context = deque(maxlen=order+1)

    def update(self, token):        
        self.context.append(token)
        if len(self.context) > self.order:
            _context = tuple(islice(self.context, 0, self.order))

            if _context not in self.transitions:
                self.transitions[_context] = Counter()

            self.transitions[_context][token] += 1
            self.context.popleft()

    def predict_all(self, context):
        if context in self.transitions:
            tokens, counts = zip(*self.transitions[context].items())
            total_count = sum(counts)
            probabilities = [count / total_count for count in counts]
            return tokens, probabilities

        return [], []
    
    def get_next(self, context):
        try:
            total_count = sum(self.transitions[context].values())
            most_common = self.transitions[context].most_common(1)
            
            if context in self.transitions:
                path = most_common[0][0]
                prob = most_common[0][1] / total_count
                return path, prob
        except KeyError:    # context가 없을 경우
            return None, None

    def predict(self, data, n=1, update=False, verbose=False, full_context=False):
        assert(n > 0)

        predictions = []

        if full_context:
            assert(isinstance(data, tuple))
            assert(len(data) == self.order)
            copy_context = deque(data)
            if update:
                # ignore warning
                print('update is ignored when full_context is True')
        else:
            copy_context = deque(self.context)

            if update:
                self.update(data)
            else:
                copy_context.popleft()
                copy_context.append(data)

            if len(self.context) != self.order:
                return None

        if verbose:
            print(f"Predict: {tuple(copy_context)}")
            print(f"Predicting next {n} tokens...")
            print()

        for i in range(n):
            next_token, prob = self.get_next(tuple(copy_context))
            
            if verbose:
                print(f"  Context: {tuple(copy_context)}")
                print(f"  Next token: {next_token}")
                print(f"  Probability: {prob}")
                print()

            if next_token is not None:
                predictions.append(next_token)
                copy_context.popleft()
                copy_context.append(next_token)
        
        return predictions

In [157]:
mc = MarkovChain(order=1)

# 문자열
for token in "aababcabcdabcde":
    mc.update(token)

# 모델 출력
from pprint import pprint
print("MarkovChain Model")
pprint(mc.transitions)
print()

# Order=1 모델 예측 확률 확인
print(f"1. predict ('a',) -> {mc.predict_all(('a',))}") # (('a', 'b', 'c', 'd'), [0.25, 0.25, 0.25, 0.25])
print(f"2. predict ('b',) -> {mc.predict_all(('b',))}") # (('a', 'c'), [0.5, 0.5])
print(f"3. predict ('c',) -> {mc.predict_all(('c',))}") # (('a', 'b', 'd'), [0.3333333333333333, 0.3333333333333333, 0.3333333333333333])

# Order=1 모델 N회 연속 예측
print('4. ', end='')
print(f"predict ('a',) -> {mc.predict(tuple('a'), n=3, verbose=True, full_context=True)}") # ['b', 'c', 'd']


MarkovChain Model
{('a',): Counter({'b': 4, 'a': 1}),
 ('b',): Counter({'c': 3, 'a': 1}),
 ('c',): Counter({'d': 2, 'a': 1}),
 ('d',): Counter({'a': 1, 'e': 1})}

1. predict ('a',) -> (('a', 'b'), [0.2, 0.8])
2. predict ('b',) -> (('a', 'c'), [0.25, 0.75])
3. predict ('c',) -> (('a', 'd'), [0.3333333333333333, 0.6666666666666666])
4. Predict: ('a',)
Predicting next 3 tokens...

  Context: ('a',)
  Next token: b
  Probability: 0.8

  Context: ('b',)
  Next token: c
  Probability: 0.75

  Context: ('c',)
  Next token: d
  Probability: 0.6666666666666666

predict ('a',) -> ['b', 'c', 'd']


In [158]:
mc = MarkovChain(order=2)

# 문자열
for token in "aababcabcdabcde":
    mc.update(token)

# 모델 출력
from pprint import pprint
print("MarkovChain Model")
pprint(mc.transitions)
print()

# Order=2 모델 예측
print(f"1. predict ('a', 'b') -> {mc.predict_all(('a', 'b'))}")  # predict ('a', 'b') -> (('a', 'c'), [0.25, 0.75])
print(f"2. predict ('b', 'a') -> {mc.predict_all(('b', 'a'))}")  # predict ('b', 'a') -> (('b',), [1.0])
print(f"3. predict ('a', 'c') -> {mc.predict_all(('a', 'c'))}")  # predict ('a', 'c') -> ([], [])

# Order=2 모델 N회 연속 예측
print('4. ', end='')
print(f"predict ('a', 'b') -> {mc.predict(('a', 'b'), n=3, verbose=True, full_context=True)}")  # ['c', 'd', 'e']

MarkovChain Model
{('a', 'a'): Counter({'b': 1}),
 ('a', 'b'): Counter({'c': 3, 'a': 1}),
 ('b', 'a'): Counter({'b': 1}),
 ('b', 'c'): Counter({'d': 2, 'a': 1}),
 ('c', 'a'): Counter({'b': 1}),
 ('c', 'd'): Counter({'a': 1, 'e': 1}),
 ('d', 'a'): Counter({'b': 1})}

1. predict ('a', 'b') -> (('a', 'c'), [0.25, 0.75])
2. predict ('b', 'a') -> (('b',), [1.0])
3. predict ('a', 'c') -> ([], [])
4. Predict: ('a', 'b')
Predicting next 3 tokens...

  Context: ('a', 'b')
  Next token: c
  Probability: 0.75

  Context: ('b', 'c')
  Next token: d
  Probability: 0.6666666666666666

  Context: ('c', 'd')
  Next token: a
  Probability: 0.5

predict ('a', 'b') -> ['c', 'd', 'a']


In [159]:
mc = MarkovChain(order=1)

# 로그 읽기
for path in read_path_from_file(path=Path("gimp-filtered.log")):
    mc.update(path)

# 모델 출력
# from pprint import pprint
# pprint(mc.transitions)

# Order=1 모델 N회 연속 예측
path = Path("/usr/share/fontconfig/conf.avail/45-generic.conf").as_posix()
print(f"predict ('{path}',)")
pprint(mc.predict(path, n=3, verbose=True))

predict ('/usr/share/fontconfig/conf.avail/45-generic.conf',)
Predict: ('/usr/share/fontconfig/conf.avail/45-generic.conf',)
Predicting next 3 tokens...

  Context: ('/usr/share/fontconfig/conf.avail/45-generic.conf',)
  Next token: /usr/share/fontconfig/conf.avail/45-latin.conf
  Probability: 1.0

  Context: ('/usr/share/fontconfig/conf.avail/45-latin.conf',)
  Next token: /usr/share/fontconfig/conf.avail/48-spacing.conf
  Probability: 1.0

  Context: ('/usr/share/fontconfig/conf.avail/48-spacing.conf',)
  Next token: /usr/share/fontconfig/conf.avail/49-sansserif.conf
  Probability: 1.0

['/usr/share/fontconfig/conf.avail/45-latin.conf',
 '/usr/share/fontconfig/conf.avail/48-spacing.conf',
 '/usr/share/fontconfig/conf.avail/49-sansserif.conf']


In [160]:
# mc = MarkovChain(order=1)

# # 표준 입력 읽기
# for path in read_stdin():
#     mc.update(path)