In [18]:
import pandas as pd
import os
import numpy as np
import logging
import sys
from datetime import datetime, timedelta

### Load environment vars and directories

In [11]:
KEY_DIR = os.path.join("/Users/felisialoukou/Documents/","govuk-network-data", "key")
KEY_PATH = os.path.join(KEY_DIR, os.listdir(KEY_DIR)[0])
PROJECT_ID = "govuk-bigquery-analytics"

#### Logging for `pandas_gbq`

In [12]:
logger = logging.getLogger('pandas_gbq')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

# Extract page-hit only user journeys for 2-3 weeks back
8.8 GB
### Define date range
    > 3 wks before, up to current date

In [37]:
N_weeks = 3
yesterday = 1
N_days = (N_weeks*7) + yesterday

start = (datetime.now() - timedelta(days=N_days)).date().strftime("%Y%m%d")
end = (datetime.now() - timedelta(days=yesterday)).date().strftime("%Y%m%d")

print(datetime.now())
print(start, end)

2019-04-18 12:07:19.967568
20190327 20190417


In [58]:
daterange = "_{}_{}_".format(start, end)
daterange

'_20190327_20190327_'

In [None]:
## Query

In [59]:
query = """SELECT
  COUNT(*) AS Occurrences,
  PageSeq_Length,
  PageSequence,
  CIDSequence
FROM (
  SELECT
    *
  FROM (
    SELECT
      CONCAT(fullVisitorId,"-",CAST(visitId AS STRING),"-",CAST(visitNumber AS STRING)) AS sessionId,
      STRING_AGG(IF(htype = 'PAGE',
          pagePath,
          NULL),">>") OVER (PARTITION BY fullVisitorId, visitId, visitStartTime ORDER BY hitNumber ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) AS PageSequence,
       STRING_AGG(IF(htype = 'PAGE',
          content_id,
          NULL),">>") OVER (PARTITION BY fullVisitorId, visitId, visitStartTime ORDER BY hitNumber ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) AS CIDSequence,
      SUM(IF(htype='PAGE',
          1,
          0)) OVER (PARTITION BY fullVisitorId, visitId, visitStartTime ORDER BY hitNumber ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) AS PageSeq_Length
    FROM (
      SELECT
        fullVisitorId,
        visitId,
        visitNumber,
        visitStartTime,
        hits.page.pagePath AS pagePath,
        hits.hitNumber AS hitNumber,
        hits.type AS htype,
        (
        SELECT
          value
        FROM
          hits.customDimensions
        WHERE
          index=4) AS content_id,
        (
        SELECT
          value
        FROM
          hits.customDimensions
        WHERE
          index=2) AS document_type
      FROM
        `govuk-bigquery-analytics.87773428.ga_sessions_*` AS sessions
      CROSS JOIN
        UNNEST(sessions.hits) AS hits
      WHERE
        _TABLE_SUFFIX BETWEEN '_start_'
        AND '_end_' )
    WHERE
      pagePath != '/'
      AND document_type NOT IN (
        'document_collection',
        'finder',
        'homepage',
        'license_finder',
        'mainstream_browse_page',
        'organisation',
        'search',
        'service_manual_homepage',
        'service_manual_topic',
        'services_and_information',
        'taxon',
        'topic',
        'topical_event',
        'fatality_notice',
        'contact',
        'service_sign_in',
        'html_publication',
        'calculator',
        'completed_transaction' )
      AND content_id NOT IN ( '00000000-0000-0000-0000-000000000000',
        '[object Object]'))
  WHERE
    PageSeq_Length >1
  GROUP BY
    sessionId,
    PageSequence,
    CIDSequence,
    PageSeq_Length)
GROUP BY
  PageSequence,
  CIDSequence,
  PageSeq_Length"""

In [60]:
query = query.replace("_start_", start).replace("_end_", end)
query

'SELECT\n  COUNT(*) AS Occurrences,\n  PageSeq_Length,\n  PageSequence,\n  CIDSequence\nFROM (\n  SELECT\n    *\n  FROM (\n    SELECT\n      CONCAT(fullVisitorId,"-",CAST(visitId AS STRING),"-",CAST(visitNumber AS STRING)) AS sessionId,\n      STRING_AGG(IF(htype = \'PAGE\',\n          pagePath,\n          NULL),">>") OVER (PARTITION BY fullVisitorId, visitId, visitStartTime ORDER BY hitNumber ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) AS PageSequence,\n       STRING_AGG(IF(htype = \'PAGE\',\n          content_id,\n          NULL),">>") OVER (PARTITION BY fullVisitorId, visitId, visitStartTime ORDER BY hitNumber ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) AS CIDSequence,\n      SUM(IF(htype=\'PAGE\',\n          1,\n          0)) OVER (PARTITION BY fullVisitorId, visitId, visitStartTime ORDER BY hitNumber ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) AS PageSeq_Length\n    FROM (\n      SELECT\n        fullVisitorId,\n        visitId,\n       

# Extract data from BigQuery

In [56]:
df_in = pd.read_gbq(query,
                       project_id=PROJECT_ID,
                       reauth=False,
                       private_key=KEY_PATH,
                       dialect="standard")

df_in.shape

Requesting query... 
Query running...
Job ID: f875a606-d968-4667-99a2-8115efae7808
  Elapsed 6.44 s. Waiting...
  Elapsed 7.87 s. Waiting...
  Elapsed 9.31 s. Waiting...
  Elapsed 10.74 s. Waiting...
  Elapsed 12.01 s. Waiting...
  Elapsed 13.29 s. Waiting...
  Elapsed 14.73 s. Waiting...
  Elapsed 16.17 s. Waiting...
  Elapsed 17.6 s. Waiting...
  Elapsed 19.03 s. Waiting...
  Elapsed 20.47 s. Waiting...
  Elapsed 21.81 s. Waiting...
  Elapsed 23.23 s. Waiting...
  Elapsed 24.67 s. Waiting...
Query done.
Processed: 1.5 GB Billed: 1.5 GB
Standard price: $0.01 USD

Got 1069433 rows.

Total time taken 138.99 s.
Finished at 2019-04-18 12:25:30.


(1069433, 3)

### Add `Page_List` and `Cid_List` columns

In [None]:
pagelist = [pageseq.split(">>") for pageseq in df_in['PageSequence'].values]
df_in['Page_List'] = pagelist

cidlist = [cidseq.split(">>") for cidseq in df_in['CIDSequence'].values]
df_in['Cid_List'] = cidlist

### Save out 

In [None]:
bq_dir = os.path.join(os.getenv("DATA_DIR"),"raw", "bq_journey_extract")

In [None]:
### Drop infrequent journeys
bq_file_doo = os.path.join(bq_dir, "page_cid_seq_user_journey"+date_range+"doo.csv.gz")
df_in[df_in.Occurrences>1].to_csv(bq_file_doo, compression="gzip", sep='\t', index=False)

# Make network data

In [None]:
def read_file(filename, columns_to_read):
    """
    Read a dataframe compressed csv file, init as dataframe, drop unnecessary columns, prepare target columns
    to be evaluated as lists with literal_eval.
    :return: processed for list-eval dataframe
    """
    logger.debug("Reading file {}...".format(filename))
    df = pd.read_csv(filename, sep='\t', compression="gzip", skipinitialspace=True, usecols=columns_to_read)
    logger.debug("Read in {} columns...".format(df.columns))

#     print(df.shape)
#     print(df[df.Occurrences == 1].shape)
#     # Sample 30% of one-off journeys and then use these indices to drop them
#     indices = df[df.Occurrences == 1].sample(frac=0.3, random_state=1234).index
#     print(len(indices))
#     df.drop(indices, inplace=True)
#     print(df.shape)

#     logger.debug("Number of rows post one-off occurrence drop: {}".format(df.shape))

    column_to_eval = 'Cid_List'


    if isinstance(df[column_to_eval].iloc[0], str) and any(["," in val for val in df[column_to_eval].values]):
        logger.debug("Working on literal_eval for \"{}\"".format(column_to_eval))
        df[column_to_eval] = df[column_to_eval].map(literal_eval)

    return df

In [None]:
def compute_occurrences(user_journey_df, page_sequence, occurrences):
    logging.debug("Computing specialized occurrences \"{}\" based on  \"{}\"...".format(occurrences, page_sequence))
    user_journey_df[occurrences] = user_journey_df.groupby(page_sequence)['Occurrences'].transform(
        'sum')

In [None]:
def generate_subpaths(user_journey_df, page_list, subpaths):
    """
    Compute lists of subpaths ie node-pairs/edges (where a node is a page) from both original and de-looped page_lists
    (page-hit only journeys)
    :param subpaths:
    :param page_list:
    :param user_journey_df: user journey dataframe
    :return: inplace assign new columns
    """
    logger.debug("Setting up \"{}\" based on  \"{}\"...".format(subpaths, page_list))
    user_journey_df[subpaths] = user_journey_df[page_list].map(prep.subpaths_from_list)

In [None]:
def edgelist_from_subpaths(user_journey_df, use_delooped_journeys=False):
    """
    Generate a counter that represents the edge list. Keys are edges (node pairs) which represent a user going from
    first element of pair to second one), values are a sum of journey occurrences (de-looped occurrences since current
    computation is based on de-looped subpaths), ie number of times a user/agent went from one page (node) to another.
    :param use_delooped_journeys:
    :param user_journey_df: user journey dataframe
    :return: edgelist counter
    """
    subpath_default = 'Cid_Subpaths'
    occurrences_default = 'CIDSeq_Occurrences'
    page_list_default = 'Cid_List'
    page_sequence_default = 'CIDSequence'

    if occurrences_default not in user_journey_df.columns:
        compute_occurrences(user_journey_df, page_sequence_default, occurrences_default)

    logger.debug("Dropping duplicates {}...".format(page_sequence_default))
    user_journey_df.drop_duplicates(page_sequence_default, keep="first", inplace=True)

    generate_subpaths(user_journey_df, page_list_default, subpath_default)
    edgelist_counter = Counter()

    ind_path = user_journey_df.columns.get_loc(subpath_default)
    ind_occ = user_journey_df.columns.get_loc(occurrences_default)

    for tup in user_journey_df.itertuples(index=False):
        for edge in tup[ind_path]:
            edgelist_counter[tuple(edge)] += tup[ind_occ]

    return edgelist_counter

In [None]:
def nodes_from_edgelist(edgelist):
    """
    Generate a node list (from edges). Internally represented as a set, returned as alphabetically sorted list
    :param edgelist: list of edges (node-pairs)
    :return: sorted list of nodes
    """
    logger.debug("Creating node list...")
    nid = 0
    node_list = {}

    for keys, _ in edgelist.items():
        for key in keys:
            if key not in node_list.keys():
                node_list[key] = nid
                nid += 1
    return node_list

In [None]:
def compute_nodes_edges(source_filename, dest_filename, cols, collapse_search, use_delooped_journeys,
                        drop_incorrect_occ,
                        with_attribute):
    """
    Read processed_journey dataframe file, preprocess, compute node/edge lists, write contents of lists to file.
    :param collapse_search:
    :param with_attribute:
    :param drop_incorrect_occ:
    :param use_delooped_journeys:
    :param source_filename: dataframe to be loaded
    :param dest_filename: filename prefix for node and edge files
    """
    df = read_file(source_filename, cols, collapse_search, use_delooped_journeys, drop_incorrect_occ, with_attribute)
    edges = edgelist_from_subpaths(df, use_delooped_journeys)
    node_list = nodes_from_edgelist(edges)

    print(list(node_list.items())[0:10])

    default_edge_header = "Source_node\tSource_id\tDestination_node\tDestination_id\tWeight\n"
    default_node_header = "Node\tNode_id\n"
    node_attr = None

    logger.info("Number of nodes: {} Number of edges: {}".format(len(node_list), len(edges)))
    logger.info("Writing edge list to file...")

    edge_writer(dest_filename + "_edges.csv.gz", default_edge_header, edges, node_list, node_attr)
    node_writer(dest_filename + "_nodes.csv.gz", default_node_header, node_list, node_attr)

In [None]:
def node_writer(filename, header, node_id, node_attr):
    with gzip.open(filename, "w") as file:
        print(filename)
        file.write(header.encode())
        for node, nid in node_id.items():
            file.write("{}\t{}".format(node, nid).encode())
            if node_attr is not None:
                file.write("\t{}".format(node_attr[node]).encode())
            file.write("\n".encode())

In [None]:
def edge_writer(filename, header, edges, node_id, node_attr):
    with gzip.open(filename, "w") as file:
        print(filename)
        file.write(header.encode())
        for key, value in edges.items():
            file.write("{}\t{}\t{}\t{}\t{}".format(key[0], node_id[key[0]], key[1], node_id[key[1]], value).encode())
            if node_attr is not None:
                file.write("\t{}\t{}".format(node_attr[key[0]], node_attr[key[1]]).encode())
            file.write("\n".encode())