# Data Preprocessing
1. Topic Extraction
2. User Buckets
3. Data Split

## Make user buckets

In [10]:
import os
import boto3
import gc
import sys
import numpy as np
import pandas as pd
import pickle
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (FloatType, DateType, StructType, StructField, StringType, LongType,
                               IntegerType, ArrayType, BooleanType, DoubleType)
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler, QuantileDiscretizer
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline, PipelineModel
gc.enable()

spark = SparkSession.builder.config("spark.sql.shuffle.partitions", 500).appName("twitter").getOrCreate()
print(spark.sparkContext.getConf().get('spark.driver.memory'))
print(spark.sparkContext.getConf().get("spark.sql.shuffle.partitions"))


None
500


In [3]:
def validator(df):
    columns_w_nan = {}
    for col in df.schema:
        null_count = df.filter(F.col(col.name).isNull()).count()
        if null_count>0:
            columns_w_nan[col.name]=null_count
    return columns_w_nan

In [5]:
dictionary_size={"final-complete": {"val_size": 500000, 
                                    "train_size": "all"}}

training = True
submission = False
test = False

bucket='bucket-name'
s3_resource = boto3.resource('s3')
top_k_languages = 30
top_k_domains = 3000
top_k_hashtags = 13000

# Embeddings
num_partitions=1000

# Buckets
partition_per_cluster = 100

suffix_sample = "final-complete" #"full", "small", "medium", "sub_medium"
data_path = "final-data"
object_paths = "final-artifacts"

val_size = dictionary_size[suffix_sample]["val_size"]
train_size = dictionary_size[suffix_sample]["train_size"]

bucket_s3 = s3_resource.Bucket(bucket)

In [18]:
#S3
twitter_bucket_s3 = "s3a://bucket-name"
twitter_bucket_s3 = "/Volumes/Seagate\ Backup\ Plus\ Drive/ACM_RecSys/"

# trainining_path = os.path.join(twitter_bucket_s3, "data", "raw", "final", "training.tsv")
# submission_path = os.path.join(twitter_bucket_s3, "data", "raw", "final", "submissivalon.tsv")
# test_path = os.path.join(twitter_bucket_s3, "data", "raw", "final", "test.tsv")
trainining_path = "/Volumes/Seagate\ Backup\ Plus\ Drive/ACM_RecSys/val.tsv"
submission_path = "/Volumes/Seagate\ Backup\ Plus\ Drive/ACM_RecSys/val.tsv"
test_path = "/Volumes/Seagate\ Backup\ Plus\ Drive/ACM_RecSys/test.tsv"

# Splitted paths
train_path = os.path.join(twitter_bucket_s3, data_path, "train-"+suffix_sample)
val_path = os.path.join(twitter_bucket_s3, data_path, "val-"+suffix_sample)

# Processed
processed_train_path = os.path.join(twitter_bucket_s3, data_path, "processed", "train-"+suffix_sample)
processed_val_path = os.path.join(twitter_bucket_s3, data_path, "processed", "val-"+suffix_sample)
processed_submission_path = os.path.join(twitter_bucket_s3, data_path, "processed", "submission-"+suffix_sample)
processed_test_path = os.path.join(twitter_bucket_s3, data_path, "processed", "test-"+suffix_sample)
processed_emb_train_path = os.path.join(twitter_bucket_s3, data_path, "processed-embeddings-final", 
                                        "train-"+suffix_sample)
processed_emb_val_path = os.path.join(twitter_bucket_s3, data_path, "processed-embeddings-final", 
                                      "val-"+suffix_sample)
processed_emb_submission_path = os.path.join(twitter_bucket_s3, data_path, "processed-embeddings-final", 
                                         "submission-"+suffix_sample)
processed_emb_test_path = os.path.join(twitter_bucket_s3, data_path, "processed-embeddings-final", 
                                         "test-"+suffix_sample)
processed_top_train_path = os.path.join(twitter_bucket_s3, data_path, "processed-topics", 
                                        "train-"+suffix_sample)
processed_top_val_path = os.path.join(twitter_bucket_s3, data_path, "processed-topics", 
                                      "val-"+suffix_sample)
processed_top_submission_path = os.path.join(twitter_bucket_s3, data_path, "processed-topics", 
                                             "submission-"+suffix_sample)
processed_top_test_path = os.path.join(twitter_bucket_s3, data_path, "processed-topics", 
                                             "test-"+suffix_sample)
# Resources
engaging_users_training_path = os.path.join(twitter_bucket_s3, data_path, "engaging-users-training")
engaging_users_submission_path = os.path.join(twitter_bucket_s3, data_path, "engaging-users-submission")
engaging_users_test_path = os.path.join(twitter_bucket_s3, data_path, "engaging-users-test")
intentions_path = os.path.join(twitter_bucket_s3, data_path, "intentions-"+suffix_sample)
map_user_bucket_path = os.path.join(twitter_bucket_s3, data_path, "map_user_bucket")

topic_encodings_path = os.path.join(twitter_bucket_s3, "data", "textEncodings", "user_topics")
users_intime_path = os.path.join(twitter_bucket_s3, data_path, "users_intime-"+suffix_sample)

# keys objects
key_hashtag_mapping = os.path.join(object_paths, f'hashtag_mapping_{suffix_sample}.pkl')
key_domain_mapping = os.path.join(object_paths, f'domain_mapping_{suffix_sample}.pkl')
key_language_mapping = os.path.join(object_paths, f'language_mapping_{suffix_sample}.pkl')
key_hashtag_count = os.path.join(object_paths, f'hashtag_count_{suffix_sample}.pkl')
key_domain_count = os.path.join(object_paths, f'domain_count_{suffix_sample}.pkl')
key_scaling_features = os.path.join(object_paths, f'scaling_dictionary_{suffix_sample}.pkl')
key_diff_min = os.path.join(object_paths, f'diff_min_{suffix_sample}.pkl')
key_impute_perc = os.path.join(object_paths, f'dict_mean_perc_{suffix_sample}.pkl')

# s3+keys
columns = ["engaged_with_user_follower_count", "engaged_with_user_following_count",
           "engaged_with_user_account_creation", "engaging_user_follower_count",
           "engaging_user_following_count", "engaging_user_account_creation"]
qds_paths = {}
for col in columns:
    qds_paths[col] = os.path.join(twitter_bucket_s3, object_paths, f"qs_{suffix_sample}_" + col)
    
# Bucket pipeline
users_buckets = os.path.join(twitter_bucket_s3, data_path, "users_buckets") #
users_buckets_part_2 = os.path.join(twitter_bucket_s3, data_path, "users_buckets_part_2") #

pipeline_kmeans_path = os.path.join(twitter_bucket_s3, object_paths, "pipeline_id_encoding")
cluster_map_path = os.path.join(twitter_bucket_s3, data_path, "cluster_map")

# Embeddings
bert_embeddings_train = os.path.join(twitter_bucket_s3, "data", "textEncodings", "tweets_extended")
submission_rawTweetEncodings_path = os.path.join(twitter_bucket_s3, "data", "textEncodings", "submissionEmbs.txt")
test_rawTweetEncodings_path = None

# Topics pipeline
reduced_topics_path = os.path.join(twitter_bucket_s3, "data", "textEncodings", "reducedTopics")

In [13]:
trainining_path

'/Volumes/Seagate\\ Backup\\ Plus\\ Drive/ACM_RecSys/training.tsv'

In [14]:
def parse_data(path='training.tsv', has_labels=True, schema='auto'):
    """
    Parses the training data for the Twitter RecSys Challenge.
    """
    spark = SparkSession.builder.appName("twitter").getOrCreate()
    if schema == 'auto':
        schema = build_schema(has_labels)
    df = spark.read.csv(path, schema=schema, sep='\x01', encoding='utf-8',
                        ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)
    df = df.withColumn('text_tokens', F.split('text_tokens', '\t'))
    df = df.withColumn('hashtags', F.split('hashtags', '\t'))
    df = df.withColumn('present_media', F.split('present_media', '\t'))
    df = df.withColumn('present_links', F.split('present_links', '\t'))
    df = df.withColumn('present_domains', F.split('present_domains', '\t'))
    return df

def build_schema(has_labels=True):
    if has_labels:
        schema = StructType([StructField('text_tokens', StringType()),
                             StructField('hashtags', StringType()),
                             StructField('tweet_id', StringType()),
                             StructField('present_media', StringType()),
                             StructField('present_links', StringType()),
                             StructField('present_domains', StringType()),
                             StructField('tweet_type', StringType()),
                             StructField('language', StringType()),
                             StructField('tweet_timestamp', LongType()),
                             StructField('engaged_with_user_id', StringType()),
                             StructField('engaged_with_user_follower_count', IntegerType()),
                             StructField('engaged_with_user_following_count', IntegerType()),
                             StructField('engaged_with_user_is_verified', BooleanType()),
                             StructField('engaged_with_user_account_creation', LongType()),
                             StructField('engaging_user_id', StringType()),
                             StructField('engaging_user_follower_count', IntegerType()),
                             StructField('engaging_user_following_count', IntegerType()),
                             StructField('engaging_user_is_verified', BooleanType()),
                             StructField('engaging_user_account_creation', LongType()),
                             StructField('engagee_follows_engager', BooleanType()),
                             StructField('reply_timestamp', LongType()),
                             StructField('retweet_timestamp', LongType()),
                             StructField('retweet_with_comment_timestamp', LongType()),
                             StructField('like_timestamp', LongType())
                            ])
    else:
         schema = StructType([StructField('text_tokens', StringType()),
                             StructField('hashtags', StringType()),
                             StructField('tweet_id', StringType()),
                             StructField('present_media', StringType()),
                             StructField('present_links', StringType()),
                             StructField('present_domains', StringType()),
                             StructField('tweet_type', StringType()),
                             StructField('language', StringType()),
                             StructField('tweet_timestamp', LongType()),
                             StructField('engaged_with_user_id', StringType()),
                             StructField('engaged_with_user_follower_count', IntegerType()),
                             StructField('engaged_with_user_following_count', IntegerType()),
                             StructField('engaged_with_user_is_verified', BooleanType()),
                             StructField('engaged_with_user_account_creation', LongType()),
                             StructField('engaging_user_id', StringType()),
                             StructField('engaging_user_follower_count', IntegerType()),
                             StructField('engaging_user_following_count', IntegerType()),
                             StructField('engaging_user_is_verified', BooleanType()),
                             StructField('engaging_user_account_creation', LongType()),
                             StructField('engagee_follows_engager', BooleanType())
                            ])
    return schema

In [15]:
training_df = parse_data(trainining_path, has_labels=True).repartition(600)


In [16]:
training_df

DataFrame[text_tokens: array<string>, hashtags: array<string>, tweet_id: string, present_media: array<string>, present_links: array<string>, present_domains: array<string>, tweet_type: string, language: string, tweet_timestamp: bigint, engaged_with_user_id: string, engaged_with_user_follower_count: int, engaged_with_user_following_count: int, engaged_with_user_is_verified: boolean, engaged_with_user_account_creation: bigint, engaging_user_id: string, engaging_user_follower_count: int, engaging_user_following_count: int, engaging_user_is_verified: boolean, engaging_user_account_creation: bigint, engagee_follows_engager: boolean, reply_timestamp: bigint, retweet_timestamp: bigint, retweet_with_comment_timestamp: bigint, like_timestamp: bigint]

In [17]:
training_df.count()


KeyboardInterrupt: 

In [None]:
if len(list(bucket_s3.objects.filter(Prefix=f"{data_path}/users_buckets", Delimiter='./')))==0:
    engaging_user_id_id = training_df.select(F.col("engaging_user_id").alias("user_id"))
    engaged_with_user_id_id = training_df.select(F.col("engaged_with_user_id").alias("user_id"))
    total_users = engaging_user_id_id.union(engaged_with_user_id_id)
    col="user_id"
    col_dist = total_users.select(col)\
                          .groupBy(col).count()
    
    bucketizer_users = col_dist.filter(F.col("count")>=71)
    w = Window.orderBy(F.col("count").desc())
    bucketizer_users = bucketizer_users.withColumn("bucket", F.row_number().over(w))
    bucketizer_users = bucketizer_users.drop("count", "in_submission")
    
    col_dist = col_dist.join(bucketizer_users, on="user_id", how="left")
    col_dist.write.csv(users_buckets)
else:
    print("Already exists")

col_dist = spark.read.csv(users_buckets, 
                          schema=StructType([StructField('user_id', StringType()),
                                             StructField('count', IntegerType()),
                                             StructField('bucket', IntegerType())]))
col_dist = col_dist.orderBy(F.col("count").desc())

In [None]:
main_buckets = col_dist.select(F.max("bucket")).first()[0]

In [None]:
if training:
    if len(list(bucket_s3.objects.filter(Prefix=object_paths+"/pipeline_id_encoding", Delimiter='./')))==0:
        print("Creating Kmeans pipeline")
        engaged_with_df = training_df.select(F.col("engaged_with_user_id").alias("user_id"), 
                                             F.col("engaged_with_user_follower_count").alias("follower_count"), 
                                             F.col("engaged_with_user_following_count").alias("following_count"),
                                             F.col("engaged_with_user_is_verified").alias("is_verified"), 
                                             F.col("engaged_with_user_account_creation").alias("account_creation"))
        engaging_df = training_df.select(F.col("engaging_user_id").alias("user_id"), 
                                         F.col("engaging_user_follower_count").alias("follower_count"), 
                                         F.col("engaging_user_following_count").alias("following_count"),
                                         F.col("engaging_user_is_verified").alias("is_verified"), 
                                         F.col("engaging_user_account_creation").alias("account_creation"))
        user_df = engaging_df.union(engaged_with_df)

        user_ids_missing = col_dist.select("user_id", "bucket").filter(F.col("bucket").isNull()).drop("bucket")
        user_kmeans = user_ids_missing.join(user_df, on="user_id", how="left")
        grouped = user_kmeans.groupBy("user_id").agg(F.max(F.col("follower_count")).alias("follower_count"), 
                                                     F.max(F.col("following_count")).alias("following_count"),
                                                     F.max(F.col("is_verified")).alias("is_verified"),
                                                     F.max(F.col("account_creation")).alias("account_creation"))
        cols_for_training = ["follower_count", "following_count", "is_verified", "account_creation"]
        vectorAssembler = VectorAssembler(inputCols=cols_for_training,
                                          outputCol="features")
        ss = StandardScaler(inputCol="features", 
                            outputCol="features_ss", 
                            withStd=True, 
                            withMean=True)
        clustering = KMeans(k=60, 
                            featuresCol="features_ss", 
                            predictionCol="cluster")
        pipeline_like = Pipeline(stages=[vectorAssembler, ss, clustering])
        model_like = pipeline_like.fit(grouped)
        model_like.save(pipeline_kmeans_path)
    else:
        model_like = PipelineModel.load(pipeline_kmeans_path)
        print("Kmeans already create")
else:
    model_like = PipelineModel.load(pipeline_kmeans_path)

In [None]:
if training:
    if len(list(bucket_s3.objects.filter(Prefix=f"{data_path}/cluster_map", Delimiter='./')))==0:
        preds_cluster = model_like.transform(grouped)
        cluster_map = preds_cluster.groupBy("cluster").count()
        output = cluster_map.filter(F.col("count")<200).select(F.collect_set("cluster")).collect()
        good_clusters = cluster_map.filter(~F.col("cluster").isin(output[0][0]))
        good_clusters = good_clusters.withColumn("id_cluster", F.row_number().over(Window.orderBy(F.col("count").desc())))
        good_clusters = good_clusters.drop("count")
        cluster_map = cluster_map.join(good_clusters, on="cluster", how="left")
        cluster_map = cluster_map.withColumn("id_cluster", F.when(F.col("id_cluster").isNotNull(), F.col("id_cluster")).otherwise(0))
        cluster_map.write.csv(cluster_map_path)
    else:
        print("Cluster map  already created")

cluster_map = spark.read.csv(cluster_map_path, 
                             schema=StructType([StructField('cluster', IntegerType()),
                                                StructField('count', LongType()),
                                                StructField('id_cluster', IntegerType())]))
cluster_map = cluster_map.drop("count")

In [None]:
training_df = parse_data(trainining_path, has_labels=True).repartition(600)
submission_df = parse_data(submission_path, has_labels=False).repartition(300)
test_df = parse_data(test_path, has_labels=False).repartition(300)

In [None]:
if len(list(bucket_s3.objects.filter(Prefix=f"{data_path}/users_buckets_part_2", Delimiter='./')))==0:
    engaged_with_df = training_df.select(F.col("engaged_with_user_id").alias("user_id"), 
                                         F.col("engaged_with_user_follower_count").alias("follower_count"), 
                                         F.col("engaged_with_user_following_count").alias("following_count"),
                                         F.col("engaged_with_user_is_verified").alias("is_verified"), 
                                         F.col("engaged_with_user_account_creation").alias("account_creation"))
    engaging_df = training_df.select(F.col("engaging_user_id").alias("user_id"), 
                                     F.col("engaging_user_follower_count").alias("follower_count"), 
                                     F.col("engaging_user_following_count").alias("following_count"),
                                     F.col("engaging_user_is_verified").alias("is_verified"), 
                                     F.col("engaging_user_account_creation").alias("account_creation"))
    user_df = engaging_df.union(engaged_with_df)

    engaged_with_df_sub = submission_df.select(F.col("engaged_with_user_id").alias("user_id"), 
                                         F.col("engaged_with_user_follower_count").alias("follower_count"), 
                                         F.col("engaged_with_user_following_count").alias("following_count"),
                                         F.col("engaged_with_user_is_verified").alias("is_verified"), 
                                         F.col("engaged_with_user_account_creation").alias("account_creation"))
    engaging_df_sub = submission_df.select(F.col("engaging_user_id").alias("user_id"), 
                                     F.col("engaging_user_follower_count").alias("follower_count"), 
                                     F.col("engaging_user_following_count").alias("following_count"),
                                     F.col("engaging_user_is_verified").alias("is_verified"), 
                                     F.col("engaging_user_account_creation").alias("account_creation"))
    user_df_sub = engaging_df_sub.union(engaged_with_df_sub)
    user_df = user_df.union(user_df_sub) # Union
    
    engaged_with_df_test = test_df.select(F.col("engaged_with_user_id").alias("user_id"), 
                                         F.col("engaged_with_user_follower_count").alias("follower_count"), 
                                         F.col("engaged_with_user_following_count").alias("following_count"),
                                         F.col("engaged_with_user_is_verified").alias("is_verified"), 
                                         F.col("engaged_with_user_account_creation").alias("account_creation"))
    engaging_df_test = test_df.select(F.col("engaging_user_id").alias("user_id"), 
                                     F.col("engaging_user_follower_count").alias("follower_count"), 
                                     F.col("engaging_user_following_count").alias("following_count"),
                                     F.col("engaging_user_is_verified").alias("is_verified"), 
                                     F.col("engaging_user_account_creation").alias("account_creation"))
    user_df_test = engaging_df_test.union(engaged_with_df_test)
    user_df = user_df.union(user_df_test) # Union

    user_ids_already_bucketized = col_dist.select(F.col("user_id").alias("user_bucketized"), 
                                                  F.col("bucket")).filter(F.col("bucket").isNotNull()).drop("bucket")
    user_kmeans = user_df.join(user_ids_already_bucketized, 
                               (user_df.user_id==user_ids_already_bucketized.user_bucketized), 
                               how="left")
    user_kmeans = user_kmeans.filter(F.col("user_bucketized").isNull())
    grouped = user_kmeans.groupBy("user_id").agg(F.max(F.col("follower_count")).alias("follower_count"),
                                                 F.max(F.col("following_count")).alias("following_count"),
                                                 F.max(F.col("is_verified")).alias("is_verified"),
                                                 F.max(F.col("account_creation")).alias("account_creation"))

    preds_cluster= model_like.transform(grouped)
    preds_cluster = preds_cluster.join(cluster_map, on="cluster", how="left")
    preds_cluster = preds_cluster.withColumn("bucket_new", 
                                             main_buckets+1+(F.col("id_cluster")*partition_per_cluster)+F.abs(F.hash(F.col("user_id"))%partition_per_cluster))
    bucket_new_ids = preds_cluster.select("user_id", "id_cluster", "bucket_new")
    bucket_new_ids = bucket_new_ids.withColumn("bucket_new", F.col("bucket_new").cast(IntegerType()))
    bucket_new_ids.write.csv(users_buckets_part_2)
# Reading
bucket_new_ids = spark.read.csv(users_buckets_part_2, 
                                schema= StructType([StructField('user_id', StringType()),
                                                    StructField('id_cluster', IntegerType()),
                                                    StructField('bucket_new', IntegerType())]))
bucket_new_ids = bucket_new_ids.drop("id_cluster")

In [None]:

if len(list(bucket_s3.objects.filter(Prefix=f"{data_path}/map_user_bucket", Delimiter='./')))==0:
    map_user_bucket = col_dist.join(bucket_new_ids, on="user_id", how="outer")
    map_user_bucket = map_user_bucket.withColumn("final_bucket", F.when(F.col("bucket").isNotNull(), 
                                                                        F.col("bucket")).otherwise(F.col("bucket_new")))
    map_user_bucket = map_user_bucket.select("user_id", "final_bucket")
    map_user_bucket.write.csv(map_user_bucket_path)
    print("Map User Bucket created")
    
map_user_bucket = spark.read.csv(map_user_bucket_path, 
                                 schema= StructType([StructField('user_id', StringType()),
                                                StructField('final_bucket', IntegerType())]))