In [2]:
import os
PROJECT = "wals-vi" # REPLACE WITH YOUR PROJECT ID
BUCKET = "wals-vi-ml" # REPLACE WITH YOUR BUCKET NAME
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1

# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "2.1.0"

In [4]:
import tensorflow as tf
import apache_beam as beam
import tensorflow_transform as tft
from tensorflow_transform.beam import impl as beam_impl

def preprocess_tft(rowdict):
    median = 57937 #tft.quantiles(rowdict["session_duration"], 11, epsilon=0.001)[5]
    result = {
      "userId" : tft.string_to_int(rowdict["visitorId"], vocab_filename="vocab_users"),
      "itemId" : tft.string_to_int(rowdict["contentId"], vocab_filename="vocab_items"),
      "rating" : 0.3 * rowdict["session_duration"] / median
    }
    # cap the rating at 1.0
    result["rating"] = tf.where(condition = tf.less(x = result["rating"], y = tf.ones(shape = tf.shape(input = result["rating"]))),
                                x = result["rating"], 
                                y = tf.ones(shape = tf.shape(input = result["rating"])))
    return result
  
def preprocess(query, in_test_mode):
    import os
    import os.path
    import tempfile
    import tensorflow as tf
    from apache_beam.io import tfrecordio
    from tensorflow_transform.coders import example_proto_coder
    from tensorflow_transform.tf_metadata import dataset_metadata
    from tensorflow_transform.tf_metadata import dataset_schema
    from tensorflow_transform.beam.tft_beam_io import transform_fn_io

    def write_count(a, outdir, basename):
        filename = os.path.join(outdir, basename)
        (a 
         | "{}_1".format(basename) >> beam.Map(lambda x: (1, 1)) 
         | "{}_2".format(basename) >> beam.combiners.Count.PerKey()
         | "{}_3".format(basename) >> beam.Map(lambda k, v: v)
         | "{}_write".format(basename) >> beam.io.WriteToText(file_path_prefix=filename, num_shards=1))

    def to_tfrecord(key_vlist, indexCol):
        (key, vlist) = key_vlist
        return {
            "key": [key],
            "indices": [value[indexCol] for value in vlist],
            "values":  [value["rating"] for value in vlist]
        }
  
    job_name = "preprocess-wals-features" + "-" + datetime.datetime.now().strftime("%y%m%d-%H%M%S")    
    if in_test_mode:
        import shutil
        print("Launching local job ... hang on")
        OUTPUT_DIR = "./preproc_tft"
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    else:
        print("Launching Dataflow job {} ... hang on".format(job_name))
        OUTPUT_DIR = "gs://{0}/wals/preproc_tft/".format(BUCKET)
        import subprocess
        subprocess.call("gsutil rm -r {}".format(OUTPUT_DIR).split())

    options = {
    "staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
    "temp_location": os.path.join(OUTPUT_DIR, "tmp"),
    "job_name": job_name,
    "project": PROJECT,
    "max_num_workers": 16,
    "teardown_policy": "TEARDOWN_ALWAYS",
    "save_main_session": False,
    "requirements_file": "requirements.txt"
    }
    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    if in_test_mode:
        RUNNER = "DirectRunner"
    else:
        RUNNER = "DataflowRunner"

  # Set up metadata  
    raw_data_schema = {
        colname : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation()) 
            for colname in "visitorId,contentId".split(",")
    }
    raw_data_schema.update({
        colname : dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())
            for colname in "session_duration".split(",")
    })
    raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))
 
  # Run Beam  
    with beam.Pipeline(RUNNER, options=opts) as p:
        with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, "tmp")):
            # read raw data
            selquery = query
            if in_test_mode:
                 selquery = selquery + " LIMIT 100"
            raw_data = (p 
                        | "read" >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True)))
    
            # analyze and transform
            raw_dataset = (raw_data, raw_data_metadata)
            transformed_dataset, transform_fn = (
                    raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))         
            transformed_data, transformed_metadata = transformed_dataset
            _ = (transform_fn
                 | "WriteTransformFn" >>
                 transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, "transform_fn")))
            
            # do a group-by to create users_for_item and items_for_user
            users_for_item = (transformed_data 
                              | "map_items" >> beam.Map(lambda x : (x["itemId"], x))
                              | "group_items" >> beam.GroupByKey()
                              | "totfr_items" >> beam.Map(lambda item_userlist : to_tfrecord(item_userlist, "userId")))
            items_for_user = (transformed_data
                              | "map_users" >> beam.Map(lambda x : (x["userId"], x))
                              | "group_users" >> beam.GroupByKey()
                              | "totfr_users" >> beam.Map(lambda item_userlist : to_tfrecord(item_userlist, "itemId")))
            
            output_schema = {
                "key" : dataset_schema.ColumnSchema(tf.int64, [1], dataset_schema.FixedColumnRepresentation()),
                "indices": dataset_schema.ColumnSchema(tf.int64, [], dataset_schema.ListColumnRepresentation()),
                "values": dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.ListColumnRepresentation())
            }

            _ = users_for_item | "users_for_item" >> tfrecordio.WriteToTFRecord(
                    os.path.join(OUTPUT_DIR, "users_for_item"),
                    coder = example_proto_coder.ExampleProtoCoder(
                            dataset_schema.Schema(output_schema)))
            _ = items_for_user | "items_for_user" >> tfrecordio.WriteToTFRecord(
                    os.path.join(OUTPUT_DIR, "items_for_user"),
                    coder = example_proto_coder.ExampleProtoCoder(
                            dataset_schema.Schema(output_schema)))
            
            write_count(users_for_item, OUTPUT_DIR, "nitems")
            write_count(items_for_user, OUTPUT_DIR, "nusers") 
     
preprocess(query, in_test_mode=False)

NameError: name 'query' is not defined

In [None]:
pip install tensorflow_transform

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://mirrors.aliyun.com/pypi/simple, https://pypi.tuna.tsinghua.edu.cn/simple
Collecting tensorflow_transform
  Downloading https://mirrors.aliyun.com/pypi/packages/5d/9f/acad7dab38ba19f4c574de2b50ec13343fce6ac51c291b5fc81e59bc4466/tensorflow_transform-0.24.1-py3-none-any.whl (373 kB)
[K     |████████████████████████████████| 373 kB 3.2 MB/s eta 0:00:01
Collecting tensorflow!=2.0.*,!=2.1.*,!=2.2.*,<2.4,>=1.15.2
  Downloading https://mirrors.aliyun.com/pypi/packages/ad/ad/769c195c72ac72040635c66cd9ba7b0f4b4fc1ac67e59b99fa6988446c22/tensorflow-2.3.1-cp36-cp36m-manylinux2010_x86_64.whl (320.4 MB)
[K     |██████▋                         | 66.3 MB 34.3 MB/s eta 0:00:08K     |█                               | 9.8 MB 82.6 MB/s eta 0:00:04��                              | 11.3 MB 82.6 MB/s eta 0:00:04040404                       | 22.8 MB 82.6 MB/s eta 0:00:04

In [None]:
pip install 