# Preprocessing Movielens Data for Embeddings Learning

The following are the steps of this tutorial:


1. Download Movielens data.
2. Preprocess the data and store it as TFRecord files.
3. Read the prepared data in the TFRecords using tf.data APIs

<a href="https://colab.research.google.com/github/ksalama/data2cooc2emb2ann/blob/master/movie2ann/01-Preparing_Movielens_Data_for_embeddings_learning.ipynb" target="_parent">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/> </a>

### Setup

In [1]:
!pip3 install apache_beam[gcp]==2.14.0

Collecting pyarrow<0.15.0,>=0.11.1
  Using cached pyarrow-0.14.1-cp37-cp37m-manylinux2010_x86_64.whl (58.1 MB)
Collecting pyyaml<4.0.0,>=3.12
  Using cached PyYAML-3.13-cp37-cp37m-linux_x86_64.whl
Collecting google-cloud-bigtable<0.33.0,>=0.31.1
  Using cached google_cloud_bigtable-0.32.2-py2.py3-none-any.whl (156 kB)
Collecting google-cloud-datastore<1.8.0,>=1.7.1
  Using cached google_cloud_datastore-1.7.4-py2.py3-none-any.whl (82 kB)
Installing collected packages: pyyaml, pyarrow, google-cloud-datastore, google-cloud-bigtable
  Attempting uninstall: pyyaml
    Found existing installation: PyYAML 5.4.1
    Uninstalling PyYAML-5.4.1:
      Successfully uninstalled PyYAML-5.4.1
  Attempting uninstall: pyarrow
    Found existing installation: pyarrow 2.0.0
    Uninstalling pyarrow-2.0.0:
      Successfully uninstalled pyarrow-2.0.0
  Attempting uninstall: google-cloud-datastore
    Found existing installation: google-cloud-datastore 1.15.3
    Uninstalling google-cloud-datastore-1.15.3:

In [2]:
import os
import math
import apache_beam as beam
import tensorflow as tf
from datetime import datetime
import pandas as pd

  'Some syntactic constructs of Python 3 are not yet fully supported by '


In [3]:
WORKSPACE = './workspace'
DATA_DIR = '{}/data'.format(WORKSPACE)
COOC_DIR = '{}/cooc'.format(WORKSPACE)

## 1. Download Dataset

In [10]:
DATASET = 'ml-1m'
! wget http://files.grouplens.org/datasets/movielens/{DATASET}.zip -P {DATA_DIR}/
! unzip {DATA_DIR}/{DATASET}.zip -d {DATA_DIR}/
data_file = os.path.join(DATA_DIR, '{}/ratings.dat'.format(DATASET))

--2021-09-23 15:24:07--  http://files.grouplens.org/datasets/movielens/ml-1m.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5917549 (5.6M) [application/zip]
Saving to: ‘./workspace/data/ml-1m.zip’


2021-09-23 15:24:08 (32.3 MB/s) - ‘./workspace/data/ml-1m.zip’ saved [5917549/5917549]

Archive:  ./workspace/data/ml-1m.zip
   creating: ./workspace/data/ml-1m/
  inflating: ./workspace/data/ml-1m/movies.dat  
  inflating: ./workspace/data/ml-1m/ratings.dat  
  inflating: ./workspace/data/ml-1m/README  
  inflating: ./workspace/data/ml-1m/users.dat  


In [11]:
header = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings_data = pd.read_csv(data_file, sep="::", names=header)
print("Size: {}".format(len(ratings_data)))
ratings_data.head()

  return func(*args, **kwargs)


Size: 1000209


Unnamed: 0,user_id,movie_id,rating,timestamp
0,1,1193,5,978300760
1,1,661,3,978302109
2,1,914,3,978301968
3,1,3408,4,978300275
4,1,2355,5,978824291


## 2. Preprocess the data

### Preprocessing steps

In [12]:
def read_data(pipeline, source_data_location):
    raw_data = ( 
        pipeline
        | 'Read from files'>> beam.io.ReadFromText(
            file_pattern=source_data_location)
    )
    return raw_data
    

def parse_data(raw_data, delimiter):
    
    def _parse_csv(line, delimiter):
        try:
            item1, item2, score = line.split(delimiter)[:3]
            return (item1, item2, score)
        except:
            raise ValueError("Invalid file format. A delimited data with three values is expected.")
            
    parsed_data = (
        raw_data
        | 'Parse to tuple' >> beam.Map(_parse_csv, delimiter)
    
    )
    return parsed_data

def vocabulary(parsed_data, item_index):
    
    def _extract_item(record, item_index):
        return record[item_index]
    
    vocab = (
        parsed_data
        | 'Extract item {}'.format(item_index) >> beam.Map(_extract_item, item_index)
        | 'Extract vocabulary of item {}'.format(item_index) >> beam.Distinct()
    
    )
    return vocab 


def process_data(parsed_data):
    
    def _extend_record(record):
        item1, item2, score = record
        return (item1, item2, score, 1, 'P')
       
    processed_data = (
        parsed_data
        | 'Extend record' >> beam.Map(_extend_record)
    
    )
    return processed_data

def get_info(stats):
    
    def _make_type_as_key(record):
        _, _, _, _, record_type = record
        return (record_type, 1)
    
    def _get_scores(record):
        _, _, score, _, _ = record
        return score
    
    counts = (
        stats
        | "Group by record type" >> beam.Map(_make_type_as_key)
        | "Count records" >> beam.CombinePerKey(sum)
        | "Fromat counts" >> beam.Map(lambda entry: '{}: {}'.format(entry[0], entry[1]))
    )
    
    scores = (
        stats
        | "Get scores" >> beam.Map(_get_scores)
    )
    
    mins = (
        scores
        | "Get min score" >> beam.CombineGlobally(min).without_defaults()
        | "Format min score" >> beam.Map(lambda value: 'min: {}'.format(value))
    )
    
    maxs = (
        scores
        | "Get max score" >> beam.CombineGlobally(max).without_defaults()
        | "Format max score" >> beam.Map(lambda value: 'max: {}'.format(value))
    )
    
    info = (
        (counts, mins, maxs)
        | "Combine info" >> beam.Flatten()
    )
    
    return info
    

def write_debug(data, sink_data_location):
    
    (
        data
        | 'Write debug' >> beam.io.WriteToText(
            file_path_prefix = sink_data_location+"/debug")
    )
    

def write_log(info, sink_data_location):
    
    (
        info
        | 'Write logs' >> beam.io.WriteToText(
            file_path_prefix = sink_data_location+"/info",
            file_name_suffix = ".log",
            shard_name_template ='',
            num_shards = 1)
    )

def write_vocab(vocab, sink_data_location, item_index):
    
    (
        vocab
        | 'Write vocabulary file {}'.format(item_index) >> beam.io.WriteToText(
            file_path_prefix = sink_data_location+"/vocab", 
            file_name_suffix = "-{}.txt".format(item_index),
            shard_name_template ='',
            num_shards = 1)
    )
    

def write_to_tfrecords(stats, sink_data_location):
    
    def _to_tf_example(record):
        item1, item2, score, weight, record_type = record
        feature = {
            'item1': tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(item1)])),
            'item2': tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(item2)])),
            'score': tf.train.Feature(
                float_list=tf.train.FloatList(value=[float(score)])),
            'weight': tf.train.Feature(
                float_list=tf.train.FloatList(value=[float(weight)])),
            'type': tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(record_type)])),
        }
        return tf.train.Example(features=tf.train.Features(feature=feature))
        
    (
        stats
        | 'Encode to tf.example' >> beam.Map(_to_tf_example)
        | 'Serialize to string' >> beam.Map(lambda example: example.SerializeToString(deterministic=True))
        | 'Write to TFRecords files' >> beam.io.WriteToTFRecord(
                file_path_prefix = sink_data_location+"/cooc",
                file_name_suffix = '.tfrecords')
    ) 

### Preprocessing pipeline

In [13]:
def run_preproc_pipeline(args):

    source_data_location = args['source_data_location']
    sink_data_location = args['sink_data_location']
    delimiter = args['delimiter']
    
    pipeline_options = beam.options.pipeline_options.GoogleCloudOptions(**args)
    
    with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        
        # Read data from source files
        raw_data = read_data(pipeline, source_data_location)
        
        # Parse data to (item_1, item_2, score)
        parsed_data = parse_data(raw_data, delimiter)
        
        # Process data to (item_1, item_2, score, weight, type)
        processed_data = process_data(parsed_data)
        #write_debug(processed_data, sink_data_location)
        
        # Extract distinct list of items 1 (vocabulary)
        vocab1 = vocabulary(parsed_data, 0)
        write_vocab(vocab1, sink_data_location, 0)

        # Extract distinct list of items 2 (vocabulary)
        vocab2 = vocabulary(parsed_data, 1)
        write_vocab(vocab2, sink_data_location, 1)
        
        # Write processed data to tfrecords
        write_to_tfrecords(processed_data, sink_data_location)
        
        # Log information about the created dataset
        info = get_info(processed_data)
        write_log(info, sink_data_location)


###  Run pipeline

In [14]:
runner = 'DirectRunner'
job_name = 'test-cooc-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))

args = {
    'job_name': job_name,
    'runner': runner,
    'source_data_location': data_file,
    'sink_data_location': COOC_DIR,
    'delimiter': '::',
    'num_shards': 100,
}
print("Pipeline args are set.")

Pipeline args are set.


In [15]:
# if tf.io.gfile.exists(WORKSPACE):
#     print("Removing {} contents...".format(WORKSPACE))
#     tf.io.gfile.rmtree(WORKSPACE)

print("Creating workspace: {}".format(WORKSPACE))
tf.io.gfile.makedirs(WORKSPACE)

time_start = datetime.utcnow() 
print("Running preproc pipeline...")
run_preproc_pipeline(args)
print("Pipeline is done.")
time_end = datetime.utcnow() 
time_elapsed = time_end - time_start
print("Execution elapsed time: {} seconds".format(time_elapsed.total_seconds()))

Creating workspace: ./workspace
Running preproc pipeline...




Pipeline is done.
Execution elapsed time: 99.042967 seconds


In [16]:
!ls {COOC_DIR}

cooc-00000-of-00001.tfrecords  info.log  vocab-0.txt  vocab-1.txt


In [17]:
!head {COOC_DIR}/info.log

min: 1
P: 1000209
max: 5


## 3. Read TFRecords using tf.data APIs

In [21]:
def make_input_fn(file_pattern, batch_size):
    
    features = {
        'item1': tf.io.FixedLenFeature(dtype=tf.string, shape=()),
        'item2': tf.io.FixedLenFeature(dtype=tf.string, shape=()),
        'score': tf.io.FixedLenFeature(dtype=tf.float32, shape=()),
        'weight': tf.io.FixedLenFeature(dtype=tf.float32, shape=()),
        'type': tf.io.FixedLenFeature(dtype=tf.string, shape=())
    }

    def _input_fn():
        dataset = tf.data.experimental.make_batched_features_dataset(
            file_pattern,
            batch_size,
            features,
            reader=tf.data.TFRecordDataset,
            label_key=None,
            num_epochs=1,
            shuffle=True
        )
        return dataset
    
    return _input_fn

In [22]:
# tf.enable_eager_execution()

DATA_FILES = "{}/cooc-*".format(COOC_DIR)

dataset = make_input_fn(DATA_FILES, batch_size=5)()
for i, features in enumerate(dataset.take(5)):
    print()
    print("Record {}:".format(i+1))
    for key in features:
        print("-{}:{}".format(key, features[key]))

2021-09-23 15:26:33.097090: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
2021-09-23 15:26:33.250269: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)



Record 1:
-item1:[b'53' b'45' b'26' b'53' b'65']
-item2:[b'2581' b'532' b'1101' b'2956' b'3409']
-score:[5. 2. 4. 4. 3.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]

Record 2:
-item1:[b'65' b'48' b'35' b'53' b'10']
-item2:[b'2431' b'923' b'2100' b'326' b'248']
-score:[5. 4. 4. 5. 5.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]

Record 3:
-item1:[b'6' b'62' b'17' b'48' b'5']
-item2:[b'34' b'1883' b'235' b'1408' b'515']
-score:[4. 2. 5. 4. 4.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]

Record 4:
-item1:[b'11' b'53' b'19' b'25' b'23']
-item2:[b'1732' b'2186' b'3596' b'3114' b'2021']
-score:[4. 5. 5. 4. 2.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]

Record 5:
-item1:[b'24' b'10' b'19' b'69' b'48']
-item2:[b'3000' b'1513' b'3623' b'1721' b'1777']
-score:[4. 3. 2. 4. 3.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]
