In [8]:
from tqdm import tqdm
import sys
import csv
from dateutil.parser import parse
import time
import psycopg2
from psycopg2.extensions import AsIs

maxInt = sys.maxsize
decrement = True

while decrement:
    # decrease the maxInt value by factor 10 
    # as long as the OverflowError occurs.

    decrement = False
    try:
        csv.field_size_limit(maxInt)
    except OverflowError:
        maxInt = int(maxInt/10)
        decrement = True

In [9]:
def convert_key(key):
    converter = {
        'Address': 'From',
        'Date': 'Date',
        'AddressTo': 'To',
        'Department': 'From'
    }
    return converter[key]

In [10]:
def get_info(cur_string, key):
    
    if (key == 'Message'):
        cur_string_copy = (cur_string+'.')[:-1]
        cur_field = cur_string[:cur_string.find('\n')]
        while (cur_field != ''):
            index = cur_string_copy.find('\n')
            cur_field = cur_string_copy[:index]
            cur_string_copy = cur_string_copy[index+1:]
        return cur_string_copy.strip()
    
    index = cur_string.find('\n'+convert_key(key)+': ')
    if (index == -1):
        return ''
    
    value_with_trash = cur_string[index+1+len(convert_key(key)+': '):]
    cur_piece = value_with_trash[:value_with_trash.find('\n')+1]
    value_with_trash = value_with_trash[value_with_trash.find('\n')+1:]
    while(value_with_trash[0] == ' ' or value_with_trash[0] == '\t' or value_with_trash[0] == '\n'):
        cur_piece += value_with_trash[:value_with_trash.find('\n')+1]
        value_with_trash = value_with_trash[value_with_trash.find('\n')+1:]
            
    if (key == 'Department'):
        #первая буква почты - номер отдела
        cur_piece = cur_piece.strip()[0]
        
    return cur_piece.strip()

In [11]:
def get_string(line):
    #line is an ordered dictionary
    norm_dict = dict(line)
    cur_string = norm_dict['message']
    if (cur_string.find('-----Original Message-----') > -1):
        cur_string = cur_string[:cur_string.find('-----Original Message-----')]
    if (cur_string.find('---------------------- Forwarded') > -1):
        cur_string = cur_string[:cur_string.find('---------------------- Forwarded')]
    return cur_string

In [12]:
def first_stage(file_name, users):
    print('First stage started!')
    file_obj = open(file_name, 'r')
    reader = csv.DictReader(file_obj, delimiter=',')
    with tqdm(total=517401) as pbar:
        users_keys = ['Address', 'Department', 'Date']
        
        for line in reader:
            flag_name = False
            flag_department = False
            cur_string = get_string(line)

            if (get_info(cur_string, 'AddressTo') == '' or get_info(cur_string, 'Address') == '' 
                or get_info(cur_string, 'Message') == ''):
                pbar.update(1)
                continue
            
            for key in users_keys:
                
                cur_field = get_info(cur_string, key)

                if (key == 'Address'):
                    address = cur_field
                    if (address in users.keys()):
                        flag_name = True
                    else:
                        users.setdefault(address)

                elif (key == 'Department'):
                    department = cur_field
                    if (flag_name):
                        if (department in users[address].keys()):
                            flag_department = True
                        else:
                            users[address].setdefault(department)
                    else:
                        users[address] = dict.fromkeys([department])

                elif (key == 'Date'):
                    date_for_table = cur_field[:cur_field.find('(')]
                    date = int(time.mktime(parse(cur_field).timetuple()))
                    if (not flag_department):
                        users[address][department] = dict.fromkeys(['Id', 'DateBeginForTable', 'DateEndForTable', 'DateBegin', 'DateEnd'])
                        users[address][department]['Id'] = len(users)-1
                        users[address][department]['DateBeginForTable'] = date_for_table
                        users[address][department]['DateEndForTable'] = date_for_table
                        users[address][department]['DateBegin'] = date
                        users[address][department]['DateEnd'] = date
                    else:
                        if (date < users[address][department]['DateBegin']):
                            users[address][department]['DateBegin'] = date
                            users[address][department]['DateBeginForTable'] = date_for_table
                        elif (date > users[address][department]['DateEnd']):
                            users[address][department]['DateEnd'] = date
                            users[address][department]['DateEndForTable'] = date_for_table
                            
            pbar.update(1)
    file_obj.close()
    print('First stage finished!')

In [13]:
def second_stage(file_name, users):
    print('Second stage started!')
    file_obj = open(file_name, 'r')
    with tqdm(total=517401) as pbar:
        reader = csv.DictReader(file_obj, delimiter=',')
        for line in reader:
            cur_string = get_string(line)
            
            if (get_info(cur_string, 'AddressTo') == '' or get_info(cur_string, 'Address') == '' 
                or get_info(cur_string, 'Message') == ''):
                pbar.update(1)
                continue
                
            addresses = get_info(cur_string, 'AddressTo')
            addresses = addresses.replace('\n',' ').replace('\t', ' ').split(',')
            
            for address in addresses:
                address = address.strip()
                if (address not in users.keys()):
                    users.setdefault(address)
                    users[address] = dict.fromkeys(['NULL'])
                    date = int(time.mktime(parse('Thu, 1 Jan 1970 00:00:00 +0000 (UTC)').timetuple()))
                    users[address]['NULL'] = dict.fromkeys(['Id', 'DateBeginForTable', 'DateEndForTable', 'DateBegin', 'DateEnd'])
                    users[address]['NULL']['Id'] = len(users)-1
                    users[address]['NULL']['DateBeginForTable'] = '1970-01-01 00:00:00'
                    users[address]['NULL']['DateEndForTable'] = '1970-01-01 00:00:00'
                    users[address]['NULL']['DateBegin'] = date
                    users[address]['NULL']['DateEnd'] = date
                
            pbar.update(1)
    file_obj.close()
    print('Second stage finished!')

In [14]:
def create_users_table(file_name):

    conn = psycopg2.connect("dbname='maildata' user='maildiscovery' host='localhost' port='5432' password='mailpass'")
    print('Connected!')
    print('Preprocessing started!')
    cur = conn.cursor()
    drop_statement = 'drop table if exists users'
    create_statement = 'create table users (Id integer, Address text, Department text, DateBegin timestamp, DateEnd timestamp)'
    cur.execute(drop_statement)
    print('Table dropped!')
    cur.execute(create_statement)
    print('Table created!')

    users = dict()
    first_stage(file_name, users)
    second_stage(file_name, users)
    
    print('Filling started!')
    with tqdm(total=len(users)) as pbar:
        columns = ['Id', 'Address', 'Department', 'DateBegin', 'DateEnd']
        for address in users.keys():
            for department in users[address].keys():
                values = [users[address][department]['Id'], address, department,
                          users[address][department]['DateBeginForTable'], users[address][department]['DateEndForTable']]
                insert_statement = 'insert into users (%s) values %s'
                cur.execute(insert_statement, (AsIs(','.join(columns)), tuple(values)))
            pbar.update(1)
    print('Filling finished!')
    
    conn.commit()
    cur.close()

In [15]:
create_users_table('emails.csv')

Connected!
Preprocessing started!
Table dropped!


  0%|          | 288/517401 [00:00<02:59, 2878.16it/s]

Table created!
First stage started!


100%|██████████| 517401/517401 [02:44<00:00, 3150.68it/s]
  0%|          | 1168/517401 [00:00<00:44, 11676.95it/s]

First stage finished!
Second stage started!


100%|██████████| 517401/517401 [01:03<00:00, 8137.31it/s] 
  0%|          | 192/79346 [00:00<00:41, 1919.29it/s]

Second stage finished!
Filling started!


100%|██████████| 79346/79346 [00:20<00:00, 3914.45it/s]


Filling finished!
