In [1]:
import thx.hadoop.hdfs_cache as hdfs
from thx.hadoop.spark_config_builder import create_remote_spark_session, SparkSession
import pyspark
from pyspark.sql import functions as F
from thx.datasources.parquet import create_df_from_parquet
from datetime import datetime, timedelta
import os



In [2]:
ss = create_remote_spark_session('Prepare TB Dataset', 100, 4, '4g', '2g', '16g', hadoop_file_systems=['viewfs://root', 'viewfs://prod-pa4', 'viewfs://preprod-pa4', 'viewfs://prod-am6'])
ss

In [3]:
start_date = datetime(2015, 2, 15)
end_date = datetime(2015, 3, 11)

# Load terabyte dataset

In [4]:
tb_df = create_df_from_parquet(
            ss,
            'viewfs://prod-pa4//user/testfwk/datasets/criteo_1tb_parquet/date={year:04}-{month:02}-{day:02}',
            None,
            start_date,
            end_date
        )

In [5]:
tb_df.printSchema()

root
 |-- label: integer (nullable = true)
 |-- integer_feature_1: integer (nullable = true)
 |-- integer_feature_2: integer (nullable = true)
 |-- integer_feature_3: integer (nullable = true)
 |-- integer_feature_4: integer (nullable = true)
 |-- integer_feature_5: integer (nullable = true)
 |-- integer_feature_6: integer (nullable = true)
 |-- integer_feature_7: integer (nullable = true)
 |-- integer_feature_8: integer (nullable = true)
 |-- integer_feature_9: integer (nullable = true)
 |-- integer_feature_10: integer (nullable = true)
 |-- integer_feature_11: integer (nullable = true)
 |-- integer_feature_12: integer (nullable = true)
 |-- integer_feature_13: integer (nullable = true)
 |-- categorical_feature_1: string (nullable = true)
 |-- categorical_feature_2: string (nullable = true)
 |-- categorical_feature_3: string (nullable = true)
 |-- categorical_feature_4: string (nullable = true)
 |-- categorical_feature_5: string (nullable = true)
 |-- categorical_feature_6: string (nu

In [6]:
non_features = [c for c in tb_df.columns if "feature" not in c]

In [7]:
features = [c for c in tb_df.columns if "feature" in c]

In [None]:
features = ['integer_feature_1',
 'integer_feature_2',
 'integer_feature_3',
 'integer_feature_4',
 'integer_feature_5',
 'integer_feature_6',
 'integer_feature_7',
 'integer_feature_8',
 'integer_feature_9',
 'integer_feature_10',
 'integer_feature_11',
 'integer_feature_12',
 'integer_feature_13',
 'categorical_feature_1',
 'categorical_feature_2',
 'categorical_feature_3',
 'categorical_feature_4',
 'categorical_feature_5',
 'categorical_feature_6',
 'categorical_feature_7',
 'categorical_feature_8',
 'categorical_feature_9',
 'categorical_feature_10',
 'categorical_feature_11',
 'categorical_feature_12',
 'categorical_feature_13',
 'categorical_feature_14',
 'categorical_feature_15',
 'categorical_feature_16',
 'categorical_feature_17',
 'categorical_feature_18',
 'categorical_feature_19',
 'categorical_feature_20',
 'categorical_feature_21',
 'categorical_feature_22',
 'categorical_feature_23',
 'categorical_feature_24',
 'categorical_feature_25',
 'categorical_feature_26']

In [6]:
tb_df.count()

4373472329

In [None]:
# tb_df.select(*[F.approx_count_distinct(c) for c in features]).show()

In [None]:
tb_df.show()

## Approx Count Distinct
|integer_feature_1|integer_feature_2|integer_feature_3|integer_feature_4|integer_feature_5|integer_feature_6|integer_feature_7|integer_feature_8|integer_feature_9|integer_feature_10|integer_feature_11|integer_feature_12|integer_feature_13|categorical_feature_1|categorical_feature_2|categorical_feature_3|categorical_feature_4|categorical_feature_5|categorical_feature_6|categorical_feature_7|categorical_feature_8|categorical_feature_9|categorical_feature_10|categorical_feature_11|categorical_feature_12|categorical_feature_13|categorical_feature_14|categorical_feature_15|categorical_feature_16|categorical_feature_17|categorical_feature_18|categorical_feature_19|categorical_feature_20|categorical_feature_21|categorical_feature_22|categorical_feature_23|categorical_feature_24|categorical_feature_25|categorical_feature_26|
|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|
|61121|7872|2849|69956|7248|3114|1456|20152|7807|21|277|1180208|9435|223980303|37291|17174|7214|19241|3|7336|1455|67|139374520|2842739|359635|10|2196|12046|147|3|950|14|270046725|41642537|189800060|597453|12153|101|33|

In [None]:
counts = [61121,7872,2849,69956,7248,3114,1456,20152,7807,21,277,1180208,9435,223980303,37291,17174,7214,19241,3,7336,1455,67,139374520,2842739,359635,10,2196,12046,147,3,950,14,270046725,41642537,189800060,597453,12153,101,33]

In [None]:
max(counts)

In [None]:
4373472329/10000

# Explore what filtering modalities will do

In [None]:
for i,c in enumerate(features):
    if counts[i] > 1000000:
        hash_column = F.hash(c).alias(f"hash_{c}")
    else:
        hash_column = F.col(c)
    hash_df = tb_df.select(hash_column).groupBy(hash_column).agg(F.count("*").alias("count"))
    filter_df = hash_df.filter("count>10000")
    filter_df.show()
    print(filter_df.count())
    break

# Create sampled dataset

In [None]:
tb_sampled_path = "viewfs://prod-am6/user/j.rioufougeras/criteo_tb_sample_1_1000_seed_42"
if not hdfs.exists(tb_sampled_path):
    tb_sample = tb_df.sample(fraction=0.001, seed=42)
    tb_sample.write.parquet(tb_sampled_path)
else:
    tb_sample = create_df_from_parquet(
            ss,
            tb_sampled_path,
            None,
            start_date,
            end_date
        )

In [None]:
# tb_sample.select(*[F.approx_count_distinct(c) for c in features]).show()

### Approx count of modalities in sampled dataset
|integer_feature_1|integer_feature_2|integer_feature_3|integer_feature_4|integer_feature_5|integer_feature_6|integer_feature_7|integer_feature_8|integer_feature_9|integer_feature_10|integer_feature_11|integer_feature_12|integer_feature_13|categorical_feature_1|categorical_feature_2|categorical_feature_3|categorical_feature_4|categorical_feature_5|categorical_feature_6|categorical_feature_7|categorical_feature_8|categorical_feature_9|categorical_feature_10|categorical_feature_11|categorical_feature_12|categorical_feature_13|categorical_feature_14|categorical_feature_15|categorical_feature_16|categorical_feature_17|categorical_feature_18|categorical_feature_19|categorical_feature_20|categorical_feature_21|categorical_feature_22|categorical_feature_23|categorical_feature_24|categorical_feature_25|categorical_feature_26|
|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|
|4428|7782|377|7947|2283|369|205|5372|315|11|164|195909|690|963879|21638|16511|6638|17968|3|7200|1277|52|690406|114607|88721|10|2141|8722|71|3|907|14|1104321|360122|904703|82045|10232|67|31|

In [None]:
tb_sample.count()

In [None]:
sample_counts= [4428,7782,377,7947,2283,369,205,5372,315,11,164,195909,690,963879,21638,16511,6638,17968,3,7200,1277,52,690406,114607,88721,10,2141,8722,71,3,907,14,1104321,360122,904703,82045,10232,67,31]

In [None]:
[(n,a,b,a/b) for n,a,b in zip(features,sample_counts,counts)]

# Filtering: Replace low count modalities by a default value:

In [8]:
base_filter_path = "viewfs://prod-am6/user/j.rioufougeras/criteo_tb_sample_1_1000_seed_42_filtered"

In [None]:
!hdfs dfs -ls criteo_tb_sample_1_1000_seed_42_filtered_by_day

In [None]:
if not hdfs.exists(base_filter_path):
    tb_sample_filtered = tb_sample
    for i,col in enumerate(features):
        filter_df_name = f"{base_filter_path}_{col}"
        if hdfs.exists(filter_df_name):
            tb_sample_filtered = create_df_from_parquet(
                ss,
                filter_df_name,
                None,
                start_date,
                end_date
            )
            continue
        hash_col_name = f"hash_{col}"
        if sample_counts[i] > 1000:        
            hash_column = (F.hash(col)%100000)
        else:
            hash_column = F.col(col)
        hash_df = tb_sample.select(hash_column.alias(hash_col_name)).groupBy(hash_col_name).agg(F.count("*").alias("count"))
        filter_df = hash_df.filter("count>10").orderBy(F.desc("count")).select(hash_col_name)
        decoalesce_col = F.when(F.col(hash_col_name).isNotNull(), F.col(col)).otherwise(None).alias(col)
        tb_sample_filtered = tb_sample_filtered.join(filter_df, how="left_outer", on=hash_column==F.col(hash_col_name)).select(*non_features, *[c if c != col else decoalesce_col for c in features])
        tb_sample_filtered.write.parquet(filter_df_name)
    tb_sample_filtered.write.parquet("viewfs://prod-am6/user/j.rioufougeras/criteo_tb_sample_1_1000_seed_42_filtered")
else:
    tb_sample_filtered = create_df_from_parquet(
                ss,
                base_filter_path,
                None,
                start_date,
                end_date
            )

In [None]:
# tb_sample_filtered.write.parquet("viewfs://prod-am6/user/j.rioufougeras/criteo_tb_sample_1_1000_seed_42_filtered_by_day", partitionBy="day")

In [None]:
#tb_sample_filtered.groupBy("day","date","RequestTimestampUTC").count().select("day","date","RequestTimestampUTC").show()

In [None]:
# tb_sample_filtered.select(*[F.approx_count_distinct(c) for c in features]).show()

|integer_feature_1|integer_feature_2|integer_feature_3|integer_feature_4|integer_feature_5|integer_feature_6|integer_feature_7|integer_feature_8|integer_feature_9|integer_feature_10|integer_feature_11|integer_feature_12|integer_feature_13|categorical_feature_1|categorical_feature_2|categorical_feature_3|categorical_feature_4|categorical_feature_5|categorical_feature_6|categorical_feature_7|categorical_feature_8|categorical_feature_9|categorical_feature_10|categorical_feature_11|categorical_feature_12|categorical_feature_13|categorical_feature_14|categorical_feature_15|categorical_feature_16|categorical_feature_17|categorical_feature_18|categorical_feature_19|categorical_feature_20|categorical_feature_21|categorical_feature_22|categorical_feature_23|categorical_feature_24|categorical_feature_25|categorical_feature_26|
|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|-|
|1019|4684|147|2389|847|175|78|3632|70|8|125|55234|260|202847|11553|14026|5931|16312|3|6569|1152|33|136391|23807|32879|10|1751|5255|54|3|847|14|280993|70976|193007|18806|8356|44|31|

In [None]:
tb_sample_filtered.count()

In [None]:
filtered_counts= [1019,4684,147,2389,847,175,78,3632,70,8,125,55234,260,202847,11553,14026,5931,16312,3,6569,1152,33,136391,23807,32879,10,1751,5255,54,3,847,14,280993,70976,193007,18806,8356,44,31]

In [None]:
[(n,a,b,a/b) for n,a,b in zip(features,filtered_counts,sample_counts)]

In [None]:
tb_sample_filtered.coalesce(1).write.mode("overwrite").parquet("viewfs://prod-am6/user/j.rioufougeras/criteo_tb_sample_1_1000_seed_42_small")

In [None]:
tb_sample_filtered.printSchema()

In [None]:
tb_sample_filtered.count()

### Build filtered dataset

In [8]:
base_filter_path = "viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter"

In [9]:
join_dfs = dict()
for i,col in enumerate(features):
    filter_df_name = f"{base_filter_path}_{col}"
    if hdfs.exists(filter_df_name):
        filter_df = create_df_from_parquet(
            ss,
            filter_df_name,
            None,
            start_date,
            end_date
        )
    else:
        filter_df = tb_df.groupBy(col).agg(F.count("*").alias("count"))
        filter_df = filter_df.filter("count>10000").orderBy(F.desc("count")).select(col)
        filter_df.write.parquet(filter_df_name)
    join_dfs[col] = filter_df

In [10]:
if not hdfs.exists(base_filter_path):
    tb_filtered = tb_df
    s = 0
    old_base_path = None
    for k,col in enumerate(features):
        if hdfs.exists(base_filter_path + f"_joined_{col}"):
            s = k
            old_base_path = base_filter_path + f"_joined_{col}"
            tb_filtered = create_df_from_parquet(
                ss,
                old_base_path,
                None,
                start_date,
                end_date
            )
            print(old_base_path)
    for col in features[s:]:
        new_base_path = base_filter_path + f"_joined_{col}"
        if not hdfs.exists(new_base_path):
            filter_df = join_dfs[col].withColumnRenamed(col,f"join_{col}")
            new_tb_filtered = tb_filtered.join(F.broadcast(filter_df), how="left_outer", on=F.col(col)==F.col(f"join_{col}"))
            new_tb_filtered = new_tb_filtered.select(
                *non_features, *[c if c != col else F.col(f"join_{col}").alias(col) for c in features]
            )
            new_tb_filtered.write.parquet(new_base_path, partitionBy="day")
            tb_filtered = new_tb_filtered
            if old_base_path and old_base_path != new_base_path:
                print(f"Replace {old_base_path} with {new_base_path}")
                # hdfs.rm(old_base_path)
        else:
            old_base_path = base_filter_path + f"_joined_{col}"
            tb_filtered = create_df_from_parquet(
                ss,
                   old_base_path,
                None,
                start_date,
                end_date
            )

viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter_joined_categorical_feature_22
Replace viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter_joined_categorical_feature_22 with viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter_joined_categorical_feature_23
Replace viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter_joined_categorical_feature_22 with viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter_joined_categorical_feature_24
Replace viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter_joined_categorical_feature_22 with viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter_joined_categorical_feature_25
Replace viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter_joined_categorical_feature_22 with viewfs://prod-am6/user/j.rioufougeras/criteo_tb_filter_joined_categorical_feature_26


In [12]:
tb_filtered.write.parquet(base_filter_path, partitionBy="day")