In [7]:
from datetime import datetime
import sys
sys.path.append('..')

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import floc
from joblib import Parallel, delayed

sessions_fpath = '../data/comscore/2017/sessions.csv'


Find all bad domains, save to `output/bad_domains.txt`

In [None]:
%%timeit
import floc
df = pd.read_csv(sessions_fpath, usecols=['domain_name'], encoding="ISO-8859-1")
domains = df.domain_name.unique()

def _is_bad_domain(domain): 
    try: 
        floc.hashes.sim_hash_string([domain])
        return None
    except Exception as e:
        return domain
    
def get_bad_domains(unique_domains, n_cores=2):
    bad_domains = Parallel(n_jobs=n_cores)(delayed(_is_bad_domain)(d) for d in domains)
    bad_domains = [d for d in bad_domains if d is not None]
    return bad_domains

bad_domains = get_bad_domains(domains, n_cores=24)

with open('../output/bad_domains.txt', 'w') as f:
    for domain in bad_domains:
        f.write(str(domain) + "\n")
        


In [3]:
bad_domains = []
with open('../output/bad_domains.txt', 'r') as f:
    for l in f.readlines():
        bad_domains.append(l.strip())

---

Read in actual data, strip bad domains, format date correctly, and save to file

In [4]:
# make a map: {week: {machine_id: {set of domains from the week}}}
#weeks_machines_
machine_ids = pd.read_csv(sessions_fpath, usecols=['machine_id'], encoding="ISO-8859-1").machine_id.unique()
weeks_machines_domains = {w: {machine_id: set() for machine_id in machine_ids} for w in range(1, 52+1)}

In [5]:
jan1 = datetime.strptime(str('20170101'), '%Y%m%d')

In [None]:
def process_chunk(sessions_chunk):
    # drop rows with nan or malformed domains
    sessions_chunk.dropna(inplace=True)
    # drop malformed domains
    sessions_chunk = sessions_chunk[sessions_chunk.domain_name.map(lambda d: d not in bad_domains)]
    # drop rows where event date is Jan 1 2017.
    sessions_chunk = sessions_chunk[sessions_chunk.event_date > jan1]
    # assign weeks
    sessions_chunk['week'] = sessions_chunk.event_date.apply(lambda d: d.isocalendar()[1])
    # group by week, machine_id so that each row corresponds to list of unique
    # domains visited for given week for given machine
    week_machine_domains = sessions_chunk.groupby(
        ['week', 'machine_id']
    )['domain_name'].unique()
    return week_machine_domains

n_cores = 10
CHUNKSIZE=1000000
chunks = pd.read_csv(sessions_fpath, 
                     encoding="ISO-8859-1", 
                     chunksize=CHUNKSIZE, 
                     usecols=['machine_id', 'event_date', 'domain_name'])
wmds = Parallel(n_jobs=n_cores)(delayed(process_chunk)(chunk) for chunk in chunks)

Need to reduce to set of week, machine_id -> [ unique domains ] after 

In [None]:
for w in range(1, 52+1):
    # maps for week w, {machine_id -> [array of unique domains]}
    w_machine_domains_dict = week_machine_domains.xs(w).to_dict()
    for m, d_array in w_machine_domains_dict.items():
        weeks_machines_domains[w][m].update(set(d_array))  

In [None]:
weeks_machines_domains_df = pd.DataFrame({})
for w in range(1, 52+1):
    if w % 10 == 0:
        print('week', w, datetime.now().time())
    weeks_machines_domains_df = weeks_machines_domains_df.append(pd.DataFrame.from_dict(
        # 12/3 note: we added the if len(d) > 0 to try and eliminate '' domains in the next step.
        # check this once the chunk processing is working.
        {m: [m, w, d, len(d)] for m,d in weeks_machines_domains[w].items() if len(d) > 0}, 
        orient='index', 
        columns=['machine_id', 'week','domains', 'n_domains']
    ))
weeks_machines_domains_df.drop('machine_id', axis=1).head()

old code below

In [29]:
jan1 = datetime.strptime(str('20170101'), '%Y%m%d')

sessions_chunk = None
total_rows = 0
i = 0
for sessions_chunk in pd.read_csv(sessions_fpath, 
                                  chunksize=CHUNKSIZE,
                                  usecols=['machine_id', 'event_date', 'domain_name'],
                                  parse_dates=['event_date'],
                                  encoding="ISO-8859-1"
                                 ):
    start_d = datetime.now()
    print('%s : chunk %s' % (start_d.time(), i))
    # drop rows with nan or malformed domains
    sessions_chunk.dropna(inplace=True)
    # drop malformed domains
    sessions_chunk = sessions_chunk[sessions_chunk.domain_name.map(lambda d: d not in bad_domains)]
    # drop rows where event date is Jan 1 2017.
    sessions_chunk = sessions_chunk[sessions_chunk.event_date > jan1]
    rows = len(sessions_chunk)
    total_rows += rows
    print('%s rows --> %s total rows' % (rows, total_rows))
    #print('2 %s : chunk %s' % (datetime.now().time(), i))
    # assign weeks
    sessions_chunk['week'] = sessions_chunk.event_date.apply(lambda d: d.isocalendar()[1])
    #print('3 %s : chunk %s' % (datetime.now().time(), i))
    # group by week, machine_id so that each row corresponds to list of unique
    # domains visited for given week for given machine
    week_machine_domains = sessions_chunk.groupby(
        ['week', 'machine_id']
    )['domain_name'].unique()
    #print('4 %s : chunk %s' % (datetime.now().time(), i))
    # iterate over weeks to update the week_machines_domains dict for each week
    for w in range(1, 52+1):
        # maps for week w, {machine_id -> [array of unique domains]}
        w_machine_domains_dict = week_machine_domains.xs(w).to_dict()
        for m, d_array in w_machine_domains_dict.items():
            weeks_machines_domains[w][m].update(set(d_array))    
    i += 1
#     if i > 1:
#         break 
    print('time to handle chunk: %s' % (datetime.now() - start_d)) 

14:00:36.097913 : chunk 0
0 rows --> 0 total rows


KeyError: 1

Write df to disk

In [None]:
!ls ../data/comscore/2017/

In [None]:
write_weeks_machines_domains(df)

In [None]:
!ls ../data/comscore/2017

---

## Resample

---

In [None]:
weeks_machines_domains_df = pd.DataFrame({})
for w in range(1, 52+1):
    if w % 10 == 0:
        print('week', w, datetime.now().time())
    weeks_machines_domains_df = weeks_machines_domains_df.append(pd.DataFrame.from_dict(
        {m: [m, w, d, len(d)] for m,d in weeks_machines_domains[w].items()}, 
        orient='index', 
        columns=['machine_id', 'week','domains', 'n_domains']
    ))
weeks_machines_domains_df.drop('machine_id', axis=1).head()