setup

In [1]:
import json, os, re
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, when, array, concat, size
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.functions import vector_to_array

In [None]:
ROOT_DIR = os.path.dirname(os.path.dirname(os.getcwd()))

In [2]:
def natural_sort(l): 
    convert = lambda text: int(text) if text.isdigit() else text.lower()
    alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
    return sorted(l, key=alphanum_key)

In [3]:

DATA_DIR = ROOT_DIR + "/data/criteo"
ECOSYSTEM_DIR = ROOT_DIR + "/resources/ecosystem"
CONNECTOR_DIR = ECOSYSTEM_DIR + "/spark/spark-tensorflow-connector/target"
# TRAIN_PATH = DATA_DIR + "/full/train.txt"
TRAIN_PATH = DATA_DIR + "/part/train/sample.txt"
SCHEMA_PATH = DATA_DIR + "/full/schema.json"
CACHE_DIR = DATA_DIR + "/cache/xdeepfm"

In [4]:
spark = SparkSession.builder.appName("pCTR").\
    config('spark.jars', CONNECTOR_DIR + "/spark-tensorflow-connector_2.12-1.11.0.jar").\
    getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

21/10/01 18:03:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
schema = StructType.fromJson(json.load(open(SCHEMA_PATH)))
df = spark.read.option("header", "false").option("delimiter", "\t").schema(schema).csv(TRAIN_PATH)

In [6]:
total = df.count()
nulls = {}
for i in df.columns[1:]:
    nulls[i] = int(df.filter(col(i).isNull()).count() / total * 10000) / 100
to_drop, to_transform, to_fill = [], [], []
for feature in nulls:
    if nulls[feature] > 70:
        to_drop.append(feature)
    elif nulls[feature] > 40:
        to_transform.append(feature)
    else:
        to_fill.append(feature)
to_fill_int = list(filter(lambda x : 'i' in x, to_fill))
to_fill_cat = list(filter(lambda x : 'c' in x, to_fill))

In [7]:
stats = {}
described = df.select(to_fill_int).describe().collect()
for feature in to_fill_int:
    stats[feature] = {}
    stats[feature]["count"] = int(described[0][feature])
    stats[feature]["mean"] = round(float(described[1][feature]), 2)
    stats[feature]["stddev"] = round(float(described[2][feature]), 2)
    stats[feature]["min"] = int(described[3][feature])
    stats[feature]["q1"], stats[feature]["q2"], stats[feature]["q3"] \
        = df.approxQuantile(feature, [0.25, 0.50, 0.75], 0)
    stats[feature]["max"] = int(described[4][feature])
    iqr = stats[feature]['q3'] - stats[feature]['q1']
    stats[feature]['lower'] = stats[feature]['q1'] - (iqr * 1.5)
    stats[feature]['upper'] = stats[feature]['q3'] + (iqr * 1.5)


In [8]:
print("balancing...")
# balance
label_count = df.groupBy("label").count().withColumn("ratio", \
    (col("count") / total)).orderBy('label')
ratio = label_count.collect()[1]['ratio'] / label_count.collect()[0]['ratio']
clicked = df.filter(col("label") == 1)
unclicked = df.filter(col("label") == 0).sample(False, ratio)
df = clicked.union(unclicked)

balancing...


In [9]:
print("dropping...")
# drop
df = df.drop(*to_drop) # * : for each

dropping...


In [10]:
print("transforming...")
# transform to boolean
for feature in to_transform:
    df = df.withColumn(feature, when(df[feature].isNull(), 0).otherwise(1).cast('string'))

transforming...


In [11]:
print("filling numerical features...")
for feature in to_fill_int:
    st = stats[feature]
    # fill nulls with median and ceil negative values
    df = df.withColumn(feature, when(col(feature).isNull(), st['q2']).\
        when(col(feature) < 0, 0).otherwise(col(feature)))
    # replace outliers with median (again, mean is skewed)
    df = df.withColumn(feature, when((col(feature) > st['upper']) | \
        (col(feature) < st['lower']), st['q2']).otherwise(col(feature)))
    # standardize (z-scoring)
    df = df.withColumn(feature, (col(feature) - st['mean']) / st['stddev'])

filling numerical features...


In [12]:
# fill nulls with mode for categorical features
print("filling categorical features...")
label = df["label"]
clicked = df.filter(label == 1)
unclicked = df.filter(label == 0)
for feature in to_fill_cat:
    clicked_mode = clicked.filter(col(feature).isNotNull()).groupBy(feature).count().\
        orderBy("count", ascending=False).first()[0]
    unclicked_mode = unclicked.filter(col(feature).isNotNull()).groupBy(feature).count().\
        orderBy("count", ascending=False).first()[0]
    df = df.withColumn(feature, when(col(feature).isNull() & (label == 1), clicked_mode).\
        when(col(feature).isNull() & (label == 0), unclicked_mode).otherwise(col(feature)))

filling categorical features...


In [13]:
numeric_columns, categorical_columns = [], []
for field in df.schema.fields:
    if field.name == 'label':
        continue
    if str(field.dataType) in ('IntegerType', 'DoubleType'):
        numeric_columns.append(field.name)
    elif str(field.dataType) in ('StringType'):
        categorical_columns.append(field.name)

In [14]:
numeric_columns = natural_sort(numeric_columns)
categorical_columns = natural_sort(categorical_columns)

In [15]:
print("encoding categorical feature columns...")
original_cols = categorical_columns
indexed_cols = [s + "_indexed" for s in categorical_columns]
stringIndexer = StringIndexer(inputCols = original_cols, outputCols = indexed_cols)
df = stringIndexer.fit(df).transform(df)
# one-hot encoding features
encoded_cols = [s + "_vec" for s in categorical_columns]
ohe = OneHotEncoder(inputCols = indexed_cols, outputCols = encoded_cols, dropLast = False)
df = ohe.fit(df).transform(df)
df = df.drop(*original_cols).drop(*indexed_cols)

encoding categorical feature columns...


In [16]:
cardinalities = dict(zip(numeric_columns, [1] * len(numeric_columns)))
for feature, encoded in zip(original_cols, encoded_cols):
    df = df.withColumnRenamed(encoded, feature).withColumn(feature, vector_to_array(feature))
    cardinalities[feature] = df.select(size(feature).alias("n")).collect()[0]['n']

In [17]:
# concatenating cateogircal columns and creating dense numeric column
df = df.select(['label'] + 
    [array(numeric_columns).alias('numeric'), concat(*categorical_columns).alias('categorical')])
df = df.select(['label'] + 
    [concat(*['numeric', 'categorical']).alias("features")])

In [18]:
df.show(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|[-0.2409524453764...|
+-----+--------------------+
only showing top 1 row



In [19]:
metadata = {}
metadata["numeric_fields"] = numeric_columns
metadata["categorical_fields"] = categorical_columns
metadata["num_fields"] = len(categorical_columns) + len(numeric_columns)
metadata["num_features"] = sum(list(cardinalities.values()))
metadata["cardinalities"] = cardinalities

In [20]:
if not os.path.isdir(CACHE_DIR):
    os.makedirs(CACHE_DIR)
metadata_file = open(CACHE_DIR + "/metadata.json", "w+")
metadata_file.write(json.dumps(metadata)) 
metadata_file.close()

In [21]:
k = 10
weights = [1 / k] * k
print("splitting data to {} folds...".format(k))

splitting data to 10 folds...


In [22]:
for i, fold in enumerate(df.randomSplit(weights)): 
    path = CACHE_DIR + "/fold{}.tfrecord".format(i + 1)
    fold.write.format("tfrecords").option("recordType", "Example").save(path)



In [23]:
test_path = CACHE_DIR + "/fold{}.tfrecord".format(1)
test = spark.read.format("tfrecords").option("recordType", "Example").load(test_path)

In [24]:
test.show(1)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-0.24997182, -0....|    1|
+--------------------+-----+
only showing top 1 row

