# Preprocess the for the news_recommend_dataset

In this notebook, we are going to use standard SQL to extract features from BigQuey dataset, and build our own preprocess dataset . The "[cloud-training-demos.GA360_test.ga_sessions_sample](https://console.cloud.google.com/bigquery?p=cloud-training-demos&d=GA360_test&t=ga_sessions_sample&page=table)" is a public BigQuey dataset containing the Google Analytics data from Austrian news website [Kurier.at](https://kurier.at/). We query the "cloud-training-demos.GA360_test.ga_sessions_sample" in Bigquery, and take out customDimensions such as item_id, user_id, news title, news author, news date, mobile device brand, or other content-based features. The features are then written into preprocess dataset named "preprocess_train.csv" and "preprocess_text.csv" which will be stored in GCS bucket.

The preprocess dataset will later be used for training neural collaborative filtering model to extract user and item embeddings as latent factors. After that, combine preprocess dataset and laten factors into news_recommend_dataset. The news_recommend_dataset will later be used for training hybrid recommendation model.

## 1. import libraries

In [1]:
# import libraries
import os
import pandas as pd
from google.cloud import bigquery

In [2]:
# set constants
PROJECT = "hybrid-recsys-gcp"
BUCKET = "hybrid-recsys-gcp-bucket"
DATASET = "news_recommend_dataset"
TABLE = "preprocess"

## 2. create BigQuery dataset and table

Select customDimension such as user_id, item_id, title, author, and category as features. Use the time spent on each page as the implicit rating for the news. Apply farm_figerprint to hash user_id and visit_time into hash_id, and use the modulo of hash_id for splitting train and test sets. Finally, save the extracted train and test tables in Bigquery, and save the csv files in GCS bucket.

The extracted features with discriptions:

| Feature | Discription |
| :- | :- |
| user_id | ID of the visitor. |
| item_id | ID of the news article. |
| title | Title of the news article. |
| author | Author of the news article. |
| device_brand | Brand of the device used by visitor to view the news. |
| article_year | Year of the news article. |
| article_month | Month of the news article. |
| rating | Implicit rating for the news. Calculated using time spent on each news article. |
| next_item_id | Next news article viewed by the visitor. |
| fold | Modulo of farm_fingerprint hash of user_id and visit_time . Used for splitting train and test sets |

In [3]:
# create dataset in bigquery
def create_bigquery_dataset(project_id, dataset_id):
    """ Create dataset in bigquery under the  project.
    
    Args:
        project_id (str): The ID of your project.
        dataset_id (str): The name for the dataset.

    Returns:
        None
    """
    client = bigquery.Client(project=project_id)
    dataset = bigquery.Dataset("{}.{}".format(project_id, dataset_id))
    dataset = client.create_dataset(dataset)

In [4]:
create_bigquery_dataset(PROJECT, DATASET)

In [5]:
# specify query for preprocessing train dataset
preprocess_train_query = """
WITH user_count_table AS (
    SELECT 
        fullVisitorId as user_id,
        Count(*) - 1 AS intaractions

    FROM cloud-training-demos.GA360_test.ga_sessions_sample,
        UNNEST(hits) AS hits

    WHERE
        fullVisitorId IS NOT NULL
        AND
        (SELECT value FROM  hits.customDimensions WHERE index = 10) IS NOT NULL
        AND
        visitStartTime IS NOT NULL
        AND
        hits.time IS NOT NULL
        AND
        hits.type = "PAGE"
        
    GROUP BY fullVisitorId
),

raw_data_table AS(
    SELECT
        fullVisitorId as user_id,
        (SELECT value FROM  hits.customDimensions WHERE index = 10) AS item_id,
        (SELECT REPLACE(value, ",", " ") FROM  hits.customDimensions WHERE index = 6) AS title,
        (SELECT REGEXP_EXTRACT(value, r"^[^,]+") FROM  hits.customDimensions WHERE index = 2) AS author,
        (SELECT value FROM  hits.customDimensions WHERE index = 7) AS category,
        (SELECT value FROM  hits.customDimensions WHERE index = 4) AS article_date,
        device.mobileDeviceBranding AS device_brand,
        visitStartTime * 1000 + hits.time AS hits_timestamp,
        (LEAD(visitStartTime * 1000 + hits.time, 1) OVER (PARTITION BY fullVisitorId ORDER BY (visitStartTime * 1000 + hits.time) ASC) - (visitStartTime * 1000 + hits.time)) AS time_span,
        LEAD((SELECT value FROM  hits.customDimensions WHERE index = 10), 1) OVER (PARTITION BY fullVisitorId ORDER BY (visitStartTime * 1000 + hits.time) ASC) AS next_item_id,
        intaractions,
        FARM_FINGERPRINT(CONCAT(fullVisitorId, visitStartTime * 1000 + hits.time)) AS hash_id,
        CONCAT(fullVisitorId, (SELECT value FROM  hits.customDimensions WHERE index = 10)) AS user_item_id
        
    FROM cloud-training-demos.GA360_test.ga_sessions_sample,   
        UNNEST(hits) AS hits
        
    INNER JOIN user_count_table
    ON fullVisitorId = user_count_table.user_id
        
    WHERE
        fullVisitorId IS NOT NULL
        AND
        (SELECT value FROM  hits.customDimensions WHERE index = 10) IS NOT NULL
        AND
        visitStartTime IS NOT NULL
        AND
        hits.time IS NOT NULL
        AND
        hits.type = "PAGE"
        AND
        intaractions > 5
),

max_time_table AS (
    SELECT
        user_item_id,
        max(time_span) AS max_time,
        PERCENTILE_CONT(max(time_span), 0.5) OVER() AS median
        
    FROM raw_data_table
    
    GROUP BY user_item_id
)

SELECT
    user_id,
    item_id,
    IFNULL(title, 'unknown') AS title,
    IFNULL(author, 'unknown') AS author,
    IFNULL(category, 'unknown') AS category,
    IFNULL(device_brand, 'unknown') AS device_brand,
    SUBSTR(article_date, 1, 4) AS article_year,
    SUBSTR(article_date, 6, 2) AS article_month,
    IF (0.5 * max_time / median > 1.0, 1.0, 0.5 * max_time / median) AS rating,
    next_item_id,
    ABS(MOD(hash_id, 10)) AS fold
    
FROM 
    raw_data_table

INNER JOIN max_time_table
ON raw_data_table.user_item_id= max_time_table.user_item_id

WHERE
    next_item_id IS NOT NULL
    AND
    time_span IS NOT NULL
    AND
    ABS(MOD(hash_id, 10)) > 0

ORDER BY user_id ASC, hits_timestamp ASC
"""

In [6]:
# specify query for preprocessing test dataset
preprocess_test_query = """
WITH user_count_table AS (
    SELECT 
        fullVisitorId as user_id,
        Count(*) - 1 AS intaractions

    FROM cloud-training-demos.GA360_test.ga_sessions_sample,
        UNNEST(hits) AS hits

    WHERE
        fullVisitorId IS NOT NULL
        AND
        (SELECT value FROM  hits.customDimensions WHERE index = 10) IS NOT NULL
        AND
        visitStartTime IS NOT NULL
        AND
        hits.time IS NOT NULL
        AND
        hits.type = "PAGE"
        
    GROUP BY fullVisitorId
),

raw_data_table AS(
    SELECT
        fullVisitorId as user_id,
        (SELECT value FROM  hits.customDimensions WHERE index = 10) AS item_id,
        (SELECT REPLACE(value, ",", " ") FROM  hits.customDimensions WHERE index = 6) AS title,
        (SELECT REGEXP_EXTRACT(value, r"^[^,]+") FROM  hits.customDimensions WHERE index = 2) AS author,
        (SELECT value FROM  hits.customDimensions WHERE index = 7) AS category,
        (SELECT value FROM  hits.customDimensions WHERE index = 4) AS article_date,
        device.mobileDeviceBranding AS device_brand,
        visitStartTime * 1000 + hits.time AS hits_timestamp,
        (LEAD(visitStartTime * 1000 + hits.time, 1) OVER (PARTITION BY fullVisitorId ORDER BY (visitStartTime * 1000 + hits.time) ASC) - (visitStartTime * 1000 + hits.time)) AS time_span,
        LEAD((SELECT value FROM  hits.customDimensions WHERE index = 10), 1) OVER (PARTITION BY fullVisitorId ORDER BY (visitStartTime * 1000 + hits.time) ASC) AS next_item_id,
        intaractions,
        FARM_FINGERPRINT(CONCAT(fullVisitorId, visitStartTime * 1000 + hits.time)) AS hash_id,
        CONCAT(fullVisitorId, (SELECT value FROM  hits.customDimensions WHERE index = 10)) AS user_item_id
        
    FROM cloud-training-demos.GA360_test.ga_sessions_sample,   
        UNNEST(hits) AS hits
        
    INNER JOIN user_count_table
    ON fullVisitorId = user_count_table.user_id
        
    WHERE
        fullVisitorId IS NOT NULL
        AND
        (SELECT value FROM  hits.customDimensions WHERE index = 10) IS NOT NULL
        AND
        visitStartTime IS NOT NULL
        AND
        hits.time IS NOT NULL
        AND
        hits.type = "PAGE"
        AND
        intaractions > 5
),

max_time_table AS (
    SELECT
        user_item_id,
        max(time_span) AS max_time,
        PERCENTILE_CONT(max(time_span), 0.5) OVER() AS median
        
    FROM raw_data_table
    
    GROUP BY user_item_id
)

SELECT
    user_id,
    item_id,
    IFNULL(title, 'unknown') AS title,
    IFNULL(author, 'unknown') AS author,
    IFNULL(category, 'unknown') AS category,
    IFNULL(device_brand, 'unknown') AS device_brand,
    SUBSTR(article_date, 1, 4) AS article_year,
    SUBSTR(article_date, 6, 2) AS article_month,
    IF (0.5 * max_time / median > 1.0, 1.0, 0.5 * max_time / median) AS rating,
    next_item_id,
    ABS(MOD(hash_id, 10)) AS fold
    
FROM 
    raw_data_table

INNER JOIN max_time_table
ON raw_data_table.user_item_id= max_time_table.user_item_id

WHERE
    next_item_id IS NOT NULL
    AND
    time_span IS NOT NULL
    AND
    ABS(MOD(hash_id, 10)) = 0

ORDER BY user_id ASC, hits_timestamp ASC
"""

In [7]:
# specify schema
schema = [
    bigquery.SchemaField("user_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("item_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("author", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("category", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("device_brand", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("article_year", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("article_month", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("rating", "FLOAT", mode="REQUIRED"),
    bigquery.SchemaField("next_item_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("fold", "INTEGER", mode="REQUIRED")
]

In [8]:
def store_query_result_to_table(query, project_id, dataset_id, table_id, bucket_id, schema, mode):
    """ Execute query, store result in bigquery table, and save result to bucket in csv file.
    
    Args:
        query (str): The query to be executed in bigquery.
        project_id (str): The ID of your project.
        dataset_id (str): The name of the dataset.
        table_id (str): The name for the table.
        bucket_id (str): Bucket to store the csv file of query result.
        schema (list:bigquery.SchemaField): Schema of the query result.
        mode (str): Train or test mode.

    Returns:
        None
    """
    client = bigquery.Client(project=project_id)
    
    table = bigquery.Table("{}.{}.{}".format(project_id, dataset_id, table_id + "_" + mode), schema=schema)
    table = client.create_table(table)
    table_ref = client.dataset(dataset_id).table(table_id + "_" + mode)
    
    job_config = bigquery.QueryJobConfig()
    job_config.destination = table_ref
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
        
    query_job = client.query(query, job_config=job_config)
    result = query_job.result()
    
    destination_uri = "gs://{}/{}/{}_{}.csv".format(bucket_id, dataset_id, table_id, mode)
    extract_job = client.extract_table(table_ref, destination_uri)
    extract_job.result()

In [9]:
# create preprocess train and test tables in bigquery
store_query_result_to_table(preprocess_train_query, PROJECT, DATASET, TABLE, BUCKET, schema, "train")
store_query_result_to_table(preprocess_test_query, PROJECT, DATASET, TABLE, BUCKET, schema, "test")

## 3. view news_recommend_dataset

In [10]:
!gsutil cp gs://{BUCKET}/{DATASET}/{TABLE}_train.csv ./{DATASET}/{TABLE}_train.csv
!gsutil cp gs://{BUCKET}/{DATASET}/{TABLE}_test.csv ./{DATASET}/{TABLE}_test.csv

Copying gs://hybrid-recsys-gcp-bucket/news_recommend_dataset/preprocess_train.csv...
/ [1 files][ 22.2 MiB/ 22.2 MiB]                                                
Operation completed over 1 objects/22.2 MiB.                                     
Copying gs://hybrid-recsys-gcp-bucket/news_recommend_dataset/preprocess_test.csv...
/ [1 files][  2.5 MiB/  2.5 MiB]                                                
Operation completed over 1 objects/2.5 MiB.                                      


In [11]:
train_df = pd.read_csv("./{}/{}_train.csv".format(DATASET, TABLE))
test_df = pd.read_csv("./{}/{}_test.csv".format(DATASET, TABLE))

In [12]:
train_df.head()

Unnamed: 0,user_id,item_id,title,author,category,device_brand,article_year,article_month,rating,next_item_id,fold
0,1.000164e+18,299853016,Schröcksnadel gegen Werdenigg: Keine Aussprache,unknown,News,Apple,2017,11,0.147207,298888038,6
1,1.000164e+18,298888038,Investment kann jetzt jeder!,unknown,News,Apple,2017,11,0.484673,299814775,5
2,1.000164e+18,299814775,Meghan Markle: Verlobungsring aus Dianas Brosche,Maria Zelenko,Lifestyle,Apple,2017,11,0.500774,299772450,6
3,1.000164e+18,299772450,"Kritik an Prinz Harry: ""Ein verzogener Rotzlöf...",Elisabeth Spitzer,Stars & Kultur,Apple,2017,11,1.0,299824032,5
4,1.000164e+18,299824032,YouTube: Schwere Probleme mit verstörenden Kin...,Georg Leyrer,Stars & Kultur,Apple,2017,11,0.857168,299809748,3


In [13]:
test_df.head()

Unnamed: 0,user_id,item_id,title,author,category,device_brand,article_year,article_month,rating,next_item_id,fold
0,1000196974485173657,299910994,Direktorensprecherin Isabella Zins: So könnte ...,Ute Brühl,News,unknown,2017,11,1.0,299899819,0
1,1000196974485173657,299930679,Wintereinbruch naht: Erster Schnee im Osten mö...,Daniela Wahl,News,unknown,2017,11,1.0,299972194,0
2,1004209053768679755,18976804,Heimskandal - Brigitte Wanker: Die Landesverrä...,Georg Hönigsberger,News,Huawei,2013,7,1.0,299695400,0
3,1004555043399129313,299837992,Das erste TV-Interview von Prinz Harry & Megha...,Christina Michlits,Stars & Kultur,unknown,2017,11,0.979912,299824032,0
4,1004555043399129313,299836841,ÖVP will Studiengebühren FPÖ in Verhandlungen...,Raffaela Lindorfer,News,unknown,2017,11,1.0,299899819,0


## 4. save unique column values to text

Save unique values of each colum in text files in GCS bucket. They will later be used for creating feature columns in hybrid recommendaiton system model.

In [14]:
def get_unique_col_values(dataset_id, table_id, col_name, path):
    """ Store unique values of the column of bigquery table into txt file. 
    
    Args:
        dataset_id (str): The name of the dataset.
        table_id (str): The name of the table.
        col_name (str): Name of the column.
        path (str): Path to store the txt result.

    Returns:
        None
    """
    query = '''
        SELECT DISTINCT 
            {} 
        FROM 
            {}.{}_train

        UNION DISTINCT

        SELECT 
            {}
        FROM 
            {}.{}_test

        ORDER BY {} ASC
        '''.format(col_name, dataset_id, table_id, col_name, dataset_id, table_id, col_name)
    
    script = '''
        bq query --format=csv --use_legacy_sql=false -n 100000 "{}" > query_result.txt &&
        tail -n +2 query_result.txt > truncate_result.txt &&
        truncate -s -1 truncate_result.txt
        gsutil mv truncate_result.txt {}
        rm query_result.txt
        '''.format(query, path)
    
    os.system(script)

In [15]:
column_names = ['user_id', 'item_id', 'author', 'category', 'device_brand', 'article_year', 'article_month']
file_paths = ["gs://{}/{}/{}_list.txt".format(BUCKET, DATASET, x) for x in column_names]

# for each column, save unique values to text
for col, path in zip(column_names, file_paths):
    get_unique_col_values(DATASET, TABLE, col, path)