In [None]:
import os
import gzip
import csv
import arrow
import pandas as pd
from datetime import datetime
import json
import random
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk as bulk_index

In [None]:
DEBUG = False
MAX_LINES = None

In [None]:
log_input_file = input("csv log input file: ")

In [None]:
operation_types = set()

def clean(tables):
    tables = tables.strip('`').strip('\'').strip()
    list_tables = tables.split(',')
    for i in range(len(list_tables)):
        list_tables[i] = remove_suffixes(list_tables[i], [' t0', ' t1', ' t2'])
        list_tables[i] = remove_prefixes(list_tables[i], ['mysql.'])
    list_tables = list(set(list_tables))
    return list_tables


def remove_suffixes(text, suffixes):
    for suffix in suffixes:
        if text.endswith(suffix):
            text = text[:-len(suffix)].strip()
    return text


def remove_prefixes(text, prefixes):
    for prefix in prefixes:
        if text.startswith(prefix):
            text = text[len(prefix):].strip()
    return text


def extract_tables_from_select(argument):
    argument = argument.lower()
    if 'from' in argument.lower():
        return argument.split(' from ')[1].split(' group by ')[0].split(' order by ')[0].split(' where ')[0]
    elif 'table_name = ' in argument:
        return argument.split('table_name = ')[1]
    elif 'from' in argument:
        return argument.split('from')[1].split('where')[0]
    else:
        if argument not in ['select 1', 'select @@session.tx_isolation',
                                     'select database()', 'select user()',
                                     'select @@global.sync_binlog']:
            if DEBUG:
                print("No tables found:", argument)
        return None

    
def load_from(file, max_lines):
    reader = csv.DictReader(file)
    
    for i, entry in enumerate(reader, 1):
        if max_lines is not None and i == max_lines:
            break
        else:
            entry['operation_type'] = entry['argument'].split(' ')[0].upper()
            operation_types.add(entry['operation_type'])
            entry['tables'] = None

            if entry['operation_type'] == 'SELECT':
                entry['tables'] = extract_tables_from_select(entry['argument'])
                entry['select'] = entry['argument'].split('WHERE')[0]
            elif entry['operation_type'] == 'UPDATE':
                if 'SET' in entry['argument']:
                    entry['tables'] = entry['argument'].split('SET')[0].split('UPDATE')[1]
                else:
                    print(entry['argument'])
                    entry['tables'] = None
            elif entry['operation_type'] == 'INSERT':
                entry['tables'] = entry['argument'].split('INSERT INTO')[1].split('(')[0]
            elif entry['operation_type'] == 'SHOW' and 'TABLES FROM ' in entry['argument']:
                entry['tables'] = entry['argument'].split('TABLES FROM ')[1].split(' ')[0]
            else:
                if entry['argument'] != 'commit':
                    continue

            if entry['tables'] is not None:
                entry['tables'] = clean(entry['tables'])
                
            entry['event_time'] = arrow.get(entry['event_time']).datetime

            action = { '_id': i, '_index': 'general_log', '_type': 'event' }
            action.update(entry)
            yield action
                

with gzip.open(log_input_file, "rt") as f:
    es = Elasticsearch()
    actions = load_from(f, max_lines = MAX_LINES)
    bulk_index(es, actions, chunk_size=1000)

In [None]:
df = pd.DataFrame(data)
df