In [26]:
import pyspark
import petastorm
from os.path import expanduser, join, abspath
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
from pyspark.sql.functions import udf, struct
from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec
from pyspark.sql.types import StructType, ArrayType, IntegerType, LongType, StringType, DoubleType, MapType
from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField
import numpy as np 
from petastorm.pytorch import DataLoader

# PARAMS
num_epochs = 1
batch_size = 64
row_group_size_mb = 64
num_partitions = 10
ONE_HOT_TYPE = np.int64

# where to put the petastorm parquet files
output_url = "file:///Users/kaiwenw/Desktop/ReAgent/my_petastorm_output"

# TODO: The below two should be stored in a config.
# list of all the actions (for one-hot encoding)
actions = ['0', '1']

# list of all the features and their possible values
feature_map = {
    'state_features': [0,1,2,3],
    'next_state_features': [0,1,2,3],
    'metrics': ['reward'],
}

warehouse_location = abspath('spark-warehouse')
spark = pyspark.sql.SparkSession \
    .builder \
    .master('local[1]') \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()



In [27]:
def pyspark_type_to_petastorm_type(t):
    """ scalar type conversions """
    if isinstance(t, DoubleType):
        return np.float64
    elif isinstance(t, IntegerType): 
        return np.int32
    elif isinstance(t, LongType):
        return np.int64
    else:
        raise NotImplementedError()

def get_petastorm_schema(actions, feature_map, sparse_pyspark_schema):
    """
    Does two functions:
    1) handles action/action_mask 
    2) performs sparse2dense
    Every scalar field should stay as is. 
    Every Map field should become dense (array). We assume that they are in features.
    They have a corresponding presence array.
    """
    unischema_fields = []
    def add_field(name, petastorm_type, shape, codec):
        # nothing can be null
        unischema_fields.append(
            UnischemaField(name, petastorm_type, shape, codec, False)
        )
    for struct_field in sparse_pyspark_schema:
        # first handle actions and action masks
        if struct_field.name in ["action", "next_action"]:
            add_field(
                name=struct_field.name,
                petastorm_type=ONE_HOT_TYPE,
                shape=(),
                codec=ScalarCodec(LongType()),
            )
        elif struct_field.name in ["possible_actions", "possible_next_actions"]:
            add_field(
                name=struct_field.name,
                petastorm_type=ONE_HOT_TYPE,
                shape=(len(actions), ),
                codec=NdarrayCodec(),
            )
        # now perform sparse2dense
        elif isinstance(struct_field.dataType, MapType):
            val_type = struct_field.dataType.valueType
            assert not isinstance(val_type, MapType), f"{struct_field.name} has Map type with value type of Map"
            # add presence array
            add_field(
                name=f"{struct_field.name}_presence",
                petastorm_type=np.int64,
                shape=(len(feature_map[struct_field.name]), ),
                codec=NdarrayCodec(),
            )
            # add dense array
            # also assume that mapped values are scalars
            add_field(
                name=struct_field.name,
                petastorm_type=pyspark_type_to_petastorm_type(val_type),
                shape=(len(feature_map[struct_field.name]), ),
                codec=NdarrayCodec(),
            )
        else:
            assert not isinstance(struct_field.dataType, ArrayType), f"{struct_field.name} has array type"
            # simply add scalar field
            add_field(
                name=struct_field.name,
                petastorm_type=pyspark_type_to_petastorm_type(struct_field.dataType),
                shape=(),
                codec=ScalarCodec(struct_field.dataType),
            )
            
    return Unischema("TimelineSchema", unischema_fields)

def preprocessing(actions, feature_map, schema):
    """ 
    Does two functions:
        1) handles action/action_mask 
        2) performs sparse2dense
    """
    def get_schema_type(name):
        return getattr(schema, name).numpy_dtype
    
    def find_action(desired_action):
        for i, a in enumerate(actions):
            if a == desired_action:
                return i
        return len(actions)

    def row_map(row):
        row_dict = row.asDict()
        # first handle the action/masks
        action_keys = ["action", "next_action"]
        for k in action_keys:
            row_dict[k] = find_action(row_dict[k])
            
        possible_action_keys = ["possible_actions", "possible_next_actions"]
        for k in possible_action_keys:
            mask = np.zeros(len(actions), dtype=ONE_HOT_TYPE)
            for a in row_dict[k]:
                i = find_action(a)
                assert i < len(actions)
                mask[i] = 1
            row_dict[k] = mask
        
        # now handle rest of the keys (including sparse2dense)
        rest_keys = row_dict.keys() - set(action_keys + possible_action_keys)
        for k in row_dict.keys() - set(action_keys + possible_action_keys):
            val_type = get_schema_type(k)
            val = row_dict[k]
            # convert sparse to dense
            if isinstance(val, dict):
                presence_arr = []
                dense_arr = []
                for feature in feature_map[k]:
                    # absent
                    if feature not in val:
                        presence_arr.append(0)
                        dense_arr.append(0.0) # TODO: assuming value type is a number
                    # present
                    else:
                        presence_arr.append(1)
                        dense_arr.append(val[feature])
                presence_key = f"{k}_presence"
                row_dict[presence_key] = np.array(presence_arr, dtype=get_schema_type(presence_key))
                row_dict[k] = np.array(dense_arr, dtype=val_type)
            # scalar
            else:
                assert not isinstance(val, list)
                row_dict[k] = val
        return dict_to_spark_row(schema, row_dict)
    
    return row_map


In [28]:
df = spark.sql("SELECT * FROM cartpole_discrete_training").drop("ds", "mdp_id")
schema = get_petastorm_schema(actions, feature_map, df.schema)

with materialize_dataset(spark, output_url, schema, row_group_size_mb):
    rdd = df.rdd.map(preprocessing(actions, feature_map, schema))
    out_df = spark.createDataFrame(rdd, schema.as_spark_schema()).coalesce(num_partitions)
    out_df.write.mode('overwrite').parquet(output_url)


In [29]:
from petastorm import make_reader, make_batch_reader

reader = make_reader(output_url, num_epochs=num_epochs)
with DataLoader(reader, batch_size=batch_size) as train_loader:
    for idx, row in enumerate(train_loader):
        print(idx)


0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66


In [8]:
# read as rdd
from petastorm.spark_utils import dataset_as_rdd
rdd = dataset_as_rdd(output_url, spark, [field for name, field in schema.fields.items()])
rdd.collect()[0]

TimelineSchema_view_view(action=0, action_probability=0.975, metrics=array([1.]), metrics_presence=array([1]), next_action=0, next_state_features=array([ 0.04426776,  0.04691078, -0.02820231,  0.0002014 ]), next_state_features_presence=array([1, 1, 1, 1]), possible_actions=array([1, 1]), possible_next_actions=array([1, 1]), reward=1.0, sequence_number=5, sequence_number_ordinal=6, state_features=array([ 0.03943367,  0.24170479, -0.02249626, -0.28530234]), state_features_presence=array([1, 1, 1, 1]), time_diff=1, time_since_first=5)