# Data Pipeline

# Goal
Create a data pipeline for twitter challenge.

# Methodology
Create different features from the original data

## Sections
1. [**Requirements**](#Requirements)
2. [**Functions**](#Functions)
3. [**Inputs**](#Inputs)
4. [**Pipeline**](#Pipeline)
    - [**Indicators**](#Indicators)
    - [**Intention_features**](#Intention_features)
    - [**TopicEncodings**](#TopicEncodings)
    - [**EngagingFollowsEngaged**](#EngagingFollowsEngaged)
    - [**Hashtags**](#Hashtags)
    - [**Domain**](#Domain)
    - [**Language**](#Language)
    - [**Media**](#Media)
    - [**Links**](#Links)
    - [**Tweet_type**](#Tweet_type)
    - [**Timestamp_features**](#Timestamp_features)
    - [**Followers_and_Followings_features**](#Followers_and_Followings_features)
    - [**Quantile_Discretizer**](#Quantile_Discretizer)
    - [**Intentions_join**](#Intentions_join)
5. [**FeatureSelection**](#FeatureSelection)
6. [**Imputation**](#Imputation)
7. [**Validation**](#Validation)
8. [**Saving_df**](#Saving_df)

# Requirements

In [None]:
#installing packages
sc.install_pypi_package("pandas")
sc.install_pypi_package("boto3")

In [2]:
#reconfiguring SparkContext
sc.setCheckpointDir('hdfs:///twitter/checkpoints')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
import time
import os
import boto3
import gc
import sys
import numpy as np
import pandas as pd
import pickle
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (FloatType, DateType, StructType, StructField, StringType, LongType, 
    IntegerType, ArrayType, BooleanType, DoubleType)
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler, QuantileDiscretizer
gc.enable()

spark = SparkSession.builder.config("spark.sql.shuffle.partitions", 1000).appName("twitter").getOrCreate()
print(spark.sparkContext.getConf().get('spark.driver.memory'))
print(spark.sparkContext.getConf().get("spark.sql.shuffle.partitions"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2048M
1000

# Functions

## Loading data

In [4]:
def parse_data(path='training.tsv', has_labels=True, schema='auto'):
    """
    Parses the training data for the Twitter RecSys Challenge.
    """
    spark = SparkSession.builder.appName("twitter").getOrCreate()
    if schema == 'auto':
        schema = build_schema(has_labels)
    df = spark.read.csv(path, schema=schema, sep='\x01', encoding='utf-8',
                        ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)
    df = df.withColumn('text_tokens', F.split('text_tokens', '\t'))
    df = df.withColumn('hashtags', F.split('hashtags', '\t'))
    df = df.withColumn('present_media', F.split('present_media', '\t'))
    df = df.withColumn('present_links', F.split('present_links', '\t'))
    df = df.withColumn('present_domains', F.split('present_domains', '\t'))
    return df

def build_schema(has_labels=True):
    if has_labels:
        schema = StructType([StructField('text_tokens', StringType()),
                             StructField('hashtags', StringType()),
                             StructField('tweet_id', StringType()),
                             StructField('present_media', StringType()),
                             StructField('present_links', StringType()),
                             StructField('present_domains', StringType()),
                             StructField('tweet_type', StringType()),
                             StructField('language', StringType()),
                             StructField('tweet_timestamp', LongType()),
                             StructField('engaged_with_user_id', StringType()),
                             StructField('engaged_with_user_follower_count', IntegerType()),
                             StructField('engaged_with_user_following_count', IntegerType()),
                             StructField('engaged_with_user_is_verified', BooleanType()),
                             StructField('engaged_with_user_account_creation', LongType()),
                             StructField('engaging_user_id', StringType()),
                             StructField('engaging_user_follower_count', IntegerType()),
                             StructField('engaging_user_following_count', IntegerType()),
                             StructField('engaging_user_is_verified', BooleanType()),
                             StructField('engaging_user_account_creation', LongType()),
                             StructField('engagee_follows_engager', BooleanType()),
                             StructField('reply_timestamp', LongType()),
                             StructField('retweet_timestamp', LongType()),
                             StructField('retweet_with_comment_timestamp', LongType()),
                             StructField('like_timestamp', LongType())
                            ])
    else:
         schema = StructType([StructField('text_tokens', StringType()),
                             StructField('hashtags', StringType()),
                             StructField('tweet_id', StringType()),
                             StructField('present_media', StringType()),
                             StructField('present_links', StringType()),
                             StructField('present_domains', StringType()),
                             StructField('tweet_type', StringType()),
                             StructField('language', StringType()),
                             StructField('tweet_timestamp', LongType()),
                             StructField('engaged_with_user_id', StringType()),
                             StructField('engaged_with_user_follower_count', IntegerType()),
                             StructField('engaged_with_user_following_count', IntegerType()),
                             StructField('engaged_with_user_is_verified', BooleanType()),
                             StructField('engaged_with_user_account_creation', LongType()),
                             StructField('engaging_user_id', StringType()),
                             StructField('engaging_user_follower_count', IntegerType()),
                             StructField('engaging_user_following_count', IntegerType()),
                             StructField('engaging_user_is_verified', BooleanType()),
                             StructField('engaging_user_account_creation', LongType()),
                             StructField('engagee_follows_engager', BooleanType())
                            ])
    return schema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Preprocessing

In [5]:
def mediaCounter(row, media='Photo'):
    counter=0
    if type(row)==list:
        for elem in row:
            if elem==media:
                counter+=1
    else:
        pass
    return counter

def listCounter(row):
    counter=0
    if type(row)==list:
        counter+=len(row)
    else:
        pass
    return counter

def labelEncoder(row, mapping_encode):
    """
    Label Encoding or Array<String> types
    
    
    Parameters:
    -----------
    row : list(string)
        List of string or labels
    mapping_encode : dict(label, integer)
        Encoding of some top K labels
    Return:
    -------
    out : list(integers)
        List of Label Encoders.
        if not in mapping Encoded to len(map)
        if not a list Encoded to len(map)+1
    """
    out=[]
    if type(row)==list:
        for elem in row:
            if elem in mapping_encode:
                out.append(mapping_encode.get(elem))
            else:
                out.append(len(mapping_encode))
    else:
        out.append(len(mapping_encode)+1)
    return out

def labelEncoderSingle(row, mapping_encode):
    out=[]
    if row:
        if row in mapping_encode:
            out.append(mapping_encode.get(row))
        else:
            out.append(len(mapping_encode))
    else:
        out.append(len(mapping_encode)+1)
    return out

def hashtagSumCounter(row, mapping_hashtag_count):
    counter=0
    if type(row)==list:
        for elem in row:
            if elem in mapping_hashtag_count:
                counter+=mapping_hashtag_count.get(elem, 0)
    else:
        pass
    return counter

def get_distribution_array_col(df, col):
    distribution_df = df.select(col).filter(F.col(col).isNotNull())\
                              .withColumn(col, 
                                          F.explode(F.col(col)))\
                              .groupBy(col).count()\
                              .orderBy(F.col("count").desc())
    return distribution_df

def save_pkl_to_s3(obj, key_filename, bucket_name):
    serialized_obj = pickle.dumps(obj)
    s3 = boto3.client('s3')
    s3.put_object(Bucket=bucket_name, Key=key_filename, 
                  Body=serialized_obj)
    
def columns2cast(df):
    columns = []
    for col in df.schema:
        if col.dataType.typeName()=="array":
            columns.append(col)
    return columns
    
def cast_array2string(df, columns):
    for col in columns:
        df = df.withColumn(col.name, F.col(col.name).cast(StringType()))
    return df

def cast_string2array(df, columns):
    for col in columns:
        df= df.withColumn(col, 
                          F.split(F.regexp_replace(F.col(col), r"(^\[)|(\]$)|(')", ""),
                                  ", "))
    return df
    
def mappings(df, col, top_k):
    col_dist = get_distribution_array_col(df, col)
    df_col_dist = col_dist.limit(top_k)
    df_col = df_col_dist.toPandas().rename(columns={'_1': col, 
                                                    '_2': 'count'})\
                                    .reset_index().set_index(col)
    mapping_encode = df_col['index'].to_dict()
    mapping_count = df_col['count'].to_dict()
    return mapping_encode, mapping_count

def mapping_label_encoder(df, col, top_k):
    col_dist = df.select(col).filter(F.col(col).isNotNull())\
                      .groupBy(col).count()\
                      .orderBy(F.col("count").desc())
    df_col_dist = col_dist.limit(top_k)
    df_col = df_col_dist.toPandas().rename(columns={'_1': col, 
                                                    '_2': 'count'})\
                                    .reset_index().set_index(col)
    mapping_encoder = df_col['index'].to_dict()
    return mapping_encoder

def validator(df):
    columns_w_nan = {}
    for col in df.schema:
        null_count = df.filter(F.col(col.name).isNull()).count()
        if null_count>0:
            columns_w_nan[col.name]=null_count
    return columns_w_nan

# Mappings
tweet_type_mapping = {'TopLevel':0, 'Quote':1, 'Retweet':2, 'Reply':3}

# UDF SQL
PhotoCounter_udf = F.udf(lambda row: mediaCounter(row, 'Photo'), 
                         IntegerType())
VideoCounter_udf = F.udf(lambda row: mediaCounter(row, 'Video'), 
                         IntegerType())
GifCounter_udf = F.udf(lambda row: mediaCounter(row, 'GIF'), 
                         IntegerType())
listCounter_udf = F.udf(listCounter, 
                         IntegerType())
tweet_encoded_udf = F.udf(lambda x: tweet_type_mapping[x], 
                             IntegerType())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Inputs

In [223]:
dictionary_size={"final-complete": {"val_size": 500000, 
                                    "train_size": "all"}}

training = False
submission = False
test = True

bucket='bucket-name'
s3_resource = boto3.resource('s3')
top_k_languages = 30
top_k_domains = 3000
top_k_hashtags = 13000

# Embeddings
num_partitions=1000

# Buckets
partition_per_cluster = 100

suffix_sample = "final-complete" #"full", "small", "medium", "sub_medium"
data_path = "final-data"
object_paths = "final-artifacts"

val_size = dictionary_size[suffix_sample]["val_size"]
train_size = dictionary_size[suffix_sample]["train_size"]

bucket_s3 = s3_resource.Bucket(bucket)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Paths**

In [224]:
#S3
twitter_bucket_s3 = "s3a://bucket-name"
trainining_path = os.path.join(twitter_bucket_s3, "data", "raw", "final", "training.tsv")
submission_path = os.path.join(twitter_bucket_s3, "data", "raw", "final", "submission.tsv")
test_path = os.path.join(twitter_bucket_s3, "data", "raw", "final", "test.tsv")

# Splitted paths
train_path = os.path.join(twitter_bucket_s3, data_path, "train-"+suffix_sample)
val_path = os.path.join(twitter_bucket_s3, data_path, "val-"+suffix_sample)

# Processed
processed_train_path = os.path.join(twitter_bucket_s3, data_path, "processed", "train-"+suffix_sample)
processed_val_path = os.path.join(twitter_bucket_s3, data_path, "processed", "val-"+suffix_sample)
processed_submission_path = os.path.join(twitter_bucket_s3, data_path, "processed", "submission-"+suffix_sample)
processed_test_path = os.path.join(twitter_bucket_s3, data_path, "processed", "test-"+suffix_sample)
processed_emb_train_path = os.path.join(twitter_bucket_s3, data_path, "processed-embeddings-final", 
                                        "train-"+suffix_sample)
processed_emb_val_path = os.path.join(twitter_bucket_s3, data_path, "processed-embeddings-final", 
                                      "val-"+suffix_sample)
processed_emb_submission_path = os.path.join(twitter_bucket_s3, data_path, "processed-embeddings-final", 
                                         "submission-"+suffix_sample)
processed_emb_test_path = os.path.join(twitter_bucket_s3, data_path, "processed-embeddings-final", 
                                         "test-"+suffix_sample)
processed_top_train_path = os.path.join(twitter_bucket_s3, data_path, "processed-topics", 
                                        "train-"+suffix_sample)
processed_top_val_path = os.path.join(twitter_bucket_s3, data_path, "processed-topics", 
                                      "val-"+suffix_sample)
processed_top_submission_path = os.path.join(twitter_bucket_s3, data_path, "processed-topics", 
                                             "submission-"+suffix_sample)
processed_top_test_path = os.path.join(twitter_bucket_s3, data_path, "processed-topics", 
                                             "test-"+suffix_sample)
# Resources
engaging_users_training_path = os.path.join(twitter_bucket_s3, data_path, "engaging-users-training")
engaging_users_submission_path = os.path.join(twitter_bucket_s3, data_path, "engaging-users-submission")
engaging_users_test_path = os.path.join(twitter_bucket_s3, data_path, "engaging-users-test")
intentions_path = os.path.join(twitter_bucket_s3, data_path, "intentions-"+suffix_sample)
map_user_bucket_path = os.path.join(twitter_bucket_s3, data_path, "map_user_bucket")

topic_encodings_path = os.path.join(twitter_bucket_s3, "data", "textEncodings", "user_topics")
users_intime_path = os.path.join(twitter_bucket_s3, data_path, "users_intime-"+suffix_sample)

# keys objects
key_hashtag_mapping = os.path.join(object_paths, f'hashtag_mapping_{suffix_sample}.pkl')
key_domain_mapping = os.path.join(object_paths, f'domain_mapping_{suffix_sample}.pkl')
key_language_mapping = os.path.join(object_paths, f'language_mapping_{suffix_sample}.pkl')
key_hashtag_count = os.path.join(object_paths, f'hashtag_count_{suffix_sample}.pkl')
key_domain_count = os.path.join(object_paths, f'domain_count_{suffix_sample}.pkl')
key_scaling_features = os.path.join(object_paths, f'scaling_dictionary_{suffix_sample}.pkl')
key_diff_min = os.path.join(object_paths, f'diff_min_{suffix_sample}.pkl')
key_impute_perc = os.path.join(object_paths, f'dict_mean_perc_{suffix_sample}.pkl')
key_topiccount = os.path.join(object_paths, f'topiccount_{suffix_sample}.pkl')

# s3+keys
columns = ["engaged_with_user_follower_count", "engaged_with_user_following_count",
           "engaged_with_user_account_creation", "engaging_user_follower_count",
           "engaging_user_following_count", "engaging_user_account_creation"]
qds_paths = {}
for col in columns:
    qds_paths[col] = os.path.join(twitter_bucket_s3, object_paths, f"qs_{suffix_sample}_" + col)
    
# Bucket pipeline
users_buckets = os.path.join(twitter_bucket_s3, data_path, "users_buckets") #
users_buckets_part_2 = os.path.join(twitter_bucket_s3, data_path, "users_buckets_part_2") #

pipeline_kmeans_path = os.path.join(twitter_bucket_s3, object_paths, "pipeline_id_encoding")
cluster_map_path = os.path.join(twitter_bucket_s3, data_path, "cluster_map")

# Embeddings
bert_embeddings_train = os.path.join(twitter_bucket_s3, "data", "textEncodings", "tweets_extended")
submission_rawTweetEncodings_path = os.path.join(twitter_bucket_s3, "data", "textEncodings", "submission-tweets-extended")
test_rawTweetEncodings_path = os.path.join(twitter_bucket_s3, "data", "textEncodings", "test-tweets-extended")

# Topics pipeline
reduced_topics_path = os.path.join(twitter_bucket_s3, "data", "textEncodings", "reducedTopics")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Engaging-users-id

In [225]:
# Training
if len(list(bucket_s3.objects.filter(Prefix=f"{data_path}/engaging-users-training", Delimiter='./')))==0:
    df = parse_data(trainining_path, has_labels=True).repartition(500)
    engaging_user_id_train = df.select("engaging_user_id").distinct()
    engaging_user_id_train.write.csv(engaging_users_training_path)

engaging_users_train = spark.read.csv(engaging_users_training_path, 
                                      schema=StructType([StructField('engaging_user_id', StringType())]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [226]:
# Submission
if len(list(bucket_s3.objects.filter(Prefix=f"{data_path}/engaging-users-submission", Delimiter='./')))==0:
    df = parse_data(submission_path, has_labels=False).repartition(200)
    engaging_users_submission = df.select("engaging_user_id").distinct()
    engaging_users_submission.write.csv(engaging_users_submission_path)

engaging_users_submission = spark.read.csv(engaging_users_submission_path, 
                                    schema=StructType([StructField('engaging_user_id', StringType())]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [227]:
# Test
if len(list(bucket_s3.objects.filter(Prefix=f"{data_path}/engaging-users-test", Delimiter='./')))==0:
    df = parse_data(test_path, has_labels=False).repartition(200)
    engaging_users_test = df.select("engaging_user_id").distinct()
    engaging_users_test.write.csv(engaging_users_test_path)

engaging_users_test = spark.read.csv(engaging_users_test_path, 
                                    schema=StructType([StructField('engaging_user_id', StringType())]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [228]:
if submission:
    print("Submission")
    df = parse_data(submission_path, has_labels=False).repartition(300)
elif test:
    print("Test")
    df = parse_data(test_path, has_labels=False).repartition(300)
else:
    print("Train")
    df = parse_data(trainining_path, has_labels=True).repartition(1000)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test

In [229]:
def split_final_df(df, time_holdout_fraction=0.1, space_column='engaging_user_id', 
                   time_column='tweet_timestamp', avg_rows_per_user=5.4, 
                   val_size = 500000, perc_val_not_shared = 0.27, seed=0, 
                   engaging_users_submission=None, engaging_users_test=None):
    """
    Split using all the train data.
    Exclude all engaging_users from submission and test to create the validation
    Select the validatioon to be up to 500k samples, where 23% of enagging users are not known 
    (out of space completely). The validation is going to be picked out of time, but the rest 
    of out of time is gooing ot be used in train
    """
    info_dict = {}
    min_date, max_date = df.select(F.min(time_column), F.max(time_column)).first()
    time_range = max_date - min_date
    time_holdout_timestamp = min_date + int(time_range*(1-time_holdout_fraction))

    #Get engaging_users on submission and test
    engaging_users_test = engaging_users_submission.union(engaging_users_test)
    engaging_users_test = engaging_users_test.select(F.col("engaging_user_id").alias("user_id_1")).distinct()

    join_users_df = engaging_users_train.join(engaging_users_test, 
                                              engaging_users_test.user_id_1==engaging_users_train.engaging_user_id, 
                                              how="left")
    join_users_df = join_users_df.withColumn("indicator_test", 
                                             F.when(F.col("user_id_1").isNotNull(), 1).otherwise(0))
    join_users_df = join_users_df.drop("user_id_1") # Training user w indicator also in test
    join_users_not_test = join_users_df.filter(F.col("indicator_test")==0)
    
    # Sample that are not in test
    df_not_test = df.join(join_users_not_test,
                          on=space_column, 
                          how="inner")
    df_not_test = df_not_test.drop("indicator_test")
    info_dict["df_not_test_count"] = df_not_test.count()

    df_not_test_intime = df_not_test.filter(F.col(time_column) <= time_holdout_timestamp)
    df_not_test_outtime = df_not_test.filter(F.col(time_column) > time_holdout_timestamp) # Set for validation
    info_dict["df_not_test_intime_count"] = df_not_test_intime.count() #
    info_dict["df_not_test_outtime_count"] = df_not_test_outtime.count() #
    engaging_user_id_intime_not_test = df_not_test_intime.select(F.col(space_column)).distinct()
    engaging_user_id_outtime_not_test = df_not_test_outtime.select(F.col(space_column)).distinct()
    info_dict["engaging_user_id_intime_not_test_count"] = engaging_user_id_intime_not_test.count() #
    info_dict["engaging_user_id_outtime_not_test_count"] = engaging_user_id_outtime_not_test.count() #
    inner_engaging_not_test = engaging_user_id_intime_not_test.join(engaging_user_id_outtime_not_test, 
                                                                    on=space_column, 
                                                                    how="inner")
    info_dict["inner_engaging_not_test_count"] = inner_engaging_not_test.count() #
    rows_per_user_outtime_not_test = info_dict["df_not_test_outtime_count"]/\
                                      info_dict["engaging_user_id_outtime_not_test_count"]
    
    engaging_user_id_only_outtime_not_test =engaging_user_id_outtime_not_test.join(inner_engaging_not_test,
                                                                                   on=space_column,
                                                                                   how="left_anti")
    info_dict["engaging_user_id_only_outtime_not_test_count"] = engaging_user_id_only_outtime_not_test.count()#
    frac_user_only_outtime = val_size/rows_per_user_outtime_not_test*(perc_val_not_shared)\
                        /info_dict["engaging_user_id_only_outtime_not_test_count"] # Validation shared 
    valid_users_not_test_outtime = engaging_user_id_only_outtime_not_test.sample(withReplacement=False,
                                                                           fraction=frac_user_only_outtime,
                                                                           seed=seed)
    valid_users_not_test_outtime = valid_users_not_test_outtime.select(F.col("engaging_user_id").alias("user_id"))

    # Select the sample from that correspnd to 27% of rows
    df_valid_unknown = df_not_test_outtime.join(valid_users_not_test_outtime, 
                                    df_not_test_outtime.engaging_user_id==valid_users_not_test_outtime.user_id,
                                    how="inner").drop("user_id")
    
    #Select the rest
    frac_user_inner = val_size/rows_per_user_outtime_not_test*(1-perc_val_not_shared)\
                        /info_dict["inner_engaging_not_test_count"] # Validation shared 
    inner_engaging_not_test = inner_engaging_not_test.select(F.col("engaging_user_id").alias("user_id"))
    valid_users_not_test_shared = inner_engaging_not_test.sample(withReplacement=False,
                                                                 fraction=frac_user_inner,
                                                                 seed=seed)
    df_valid_known = df_not_test_outtime.join(valid_users_not_test_shared,
                                    df_not_test_outtime.engaging_user_id==inner_engaging_not_test.user_id,
                                    how="inner").drop("user_id")
    df_valid = df_valid_known.union(df_valid_unknown)

    valid_samples = df_valid.select(F.col("tweet_id").alias("tweet_id_1"), 
                                    F.col("engaging_user_id").alias("engaging_user_id_1"))
    df_train = df.join(valid_samples, 
                       (df.tweet_id==valid_samples.tweet_id_1)&\
                       (df.engaging_user_id == valid_samples.engaging_user_id_1), 
                       how="left_anti")
    return df_train, df_valid, join_users_not_test, info_dict

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [230]:
if not(submission):
    if len(list(bucket_s3.objects.filter(Prefix=f"{data_path}/train-"+suffix_sample, Delimiter='./')))==0:
        df_train, df_valid, join_users_not_test, info_dict = split_final_df(df, time_holdout_fraction=0.1, 
                                                               space_column='engaging_user_id', 
                                                               time_column='tweet_timestamp',
                                                               avg_rows_per_user=5.4, val_size=val_size, 
                                                               perc_val_not_shared=0.27, seed=0, 
                                                               engaging_users_submission=engaging_users_submission, 
                                                               engaging_users_test=engaging_users_test)
        print("info_dict: ", info_dict)
        train = df_train
        val = df_valid
        columns = columns2cast(train)
        train = cast_array2string(train, columns)
        val = cast_array2string(val, columns)
        join_users_not_test.write.csv(users_intime_path)
        train.write.csv(train_path)
        val.repartition(1000).write.csv(val_path)
        print("Casted columns: ",columns)
    else:
        print("Already exists")
schema = StructType([StructField('text_tokens', StringType()),
                     StructField('hashtags', StringType()),
                     StructField('tweet_id', StringType()),
                     StructField('present_media', StringType()),
                     StructField('present_links', StringType()),
                     StructField('present_domains', StringType()),
                     StructField('tweet_type', StringType()),
                     StructField('language', StringType()),
                     StructField('tweet_timestamp', LongType()),
                     StructField('engaged_with_user_id', StringType()),
                     StructField('engaged_with_user_follower_count', IntegerType()),
                     StructField('engaged_with_user_following_count', IntegerType()),
                     StructField('engaged_with_user_is_verified', BooleanType()),
                     StructField('engaged_with_user_account_creation', LongType()),
                     StructField('engaging_user_id', StringType()),
                     StructField('engaging_user_follower_count', IntegerType()),
                     StructField('engaging_user_following_count', IntegerType()),
                     StructField('engaging_user_is_verified', BooleanType()),
                     StructField('engaging_user_account_creation', LongType()),
                     StructField('engagee_follows_engager', BooleanType()),
                     StructField('reply_timestamp', LongType()),
                     StructField('retweet_timestamp', LongType()),
                     StructField('retweet_with_comment_timestamp', LongType()),
                     StructField('like_timestamp', LongType())])
train = spark.read.csv(train_path, schema=schema)
schema = StructType([StructField('engaging_user_id', StringType()),
                     StructField('text_tokens', StringType()),
                     StructField('hashtags', StringType()),
                     StructField('tweet_id', StringType()),
                     StructField('present_media', StringType()),
                     StructField('present_links', StringType()),
                     StructField('present_domains', StringType()),
                     StructField('tweet_type', StringType()),
                     StructField('language', StringType()),
                     StructField('tweet_timestamp', LongType()),
                     StructField('engaged_with_user_id', StringType()),
                     StructField('engaged_with_user_follower_count', IntegerType()),
                     StructField('engaged_with_user_following_count', IntegerType()),
                     StructField('engaged_with_user_is_verified', BooleanType()),
                     StructField('engaged_with_user_account_creation', LongType()),
                     StructField('engaging_user_follower_count', IntegerType()),
                     StructField('engaging_user_following_count', IntegerType()),
                     StructField('engaging_user_is_verified', BooleanType()),
                     StructField('engaging_user_account_creation', LongType()),
                     StructField('engagee_follows_engager', BooleanType()),
                     StructField('reply_timestamp', LongType()),
                     StructField('retweet_timestamp', LongType()),
                     StructField('retweet_with_comment_timestamp', LongType()),
                     StructField('like_timestamp', LongType())])
val = spark.read.csv(val_path, schema=schema).repartition(1000)
join_users_not_test = spark.read.csv(users_intime_path, 
                              schema=StructType([StructField('engaging_user_id', StringType())]))
columns = ["text_tokens", "hashtags", "present_media", "present_links", "present_domains"]
train = cast_string2array(train, columns)
val = cast_string2array(val, columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Already exists

# Pipeline

In [231]:
if submission:
    print("Submission")
elif test:
    print("Test")
else:
    if training:
        print("Train")
        df = train
    else:
        print("Validation")
        df = val    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test

In [232]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12434838

In [233]:
columns_w_nan = validator(df)
print(columns_w_nan)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'hashtags': 9802258, 'present_media': 7855555, 'present_links': 10560235, 'present_domains': 10560235}

### Indicators

In [234]:
if not(submission) and not(test):
    df = df.withColumn('indicator_reply',F.when(F.col('reply_timestamp').isNotNull(), 1).otherwise(0))
    df = df.withColumn('indicator_retweet',F.when(F.col('retweet_timestamp').isNotNull(), 1).otherwise(0))
    df = df.withColumn('indicator_retweet_with_comment',
                       F.when(F.col('retweet_with_comment_timestamp').isNotNull(),1).otherwise(0))
    df = df.withColumn('indicator_like', F.when(F.col('like_timestamp').isNotNull(),1).otherwise(0))
    df = df.withColumn('indicator_interaction', 
                       F.when(F.col('indicator_reply')+\
                              F.col('indicator_retweet')+\
                              F.col('indicator_retweet_with_comment')+\
                              F.col('indicator_like')>0, 1)\
                       .otherwise(0))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Intention_features

In [235]:
if training:
    if len(list(bucket_s3.objects.filter(Prefix=f"{data_path}/intentions-"+suffix_sample, Delimiter='./')))==0:
        print("Creating Intention")
        intention_df = df.select("engaging_user_id", "indicator_reply", "indicator_retweet", 
                                    "indicator_retweet_with_comment", "indicator_like", "indicator_interaction")\
                        .groupBy("engaging_user_id").agg(F.sum(F.col("indicator_interaction")).alias("n_interactions"), 
                                                         F.sum(F.col("indicator_retweet_with_comment"))\
                                                         .alias("n_commented"),
                                                         F.sum(F.col("indicator_like")).alias("n_liked"),
                                                         F.sum(F.col("indicator_reply")).alias("n_replied"),
                                                         F.sum(F.col("indicator_retweet")).alias("n_retweeted"),
                                                         F.count(F.col("indicator_interaction"))\
                                                         .alias("total_appearance"))
        columns = ['n_interactions', 'n_commented', 'n_liked', 'n_replied', 'n_retweeted']
        for col_i in columns:
            intention_df = intention_df.withColumn("perc_" + col_i, F.col(col_i)/(F.col("total_appearance")))
        intention_df = intention_df.drop(*columns)
        join_users_not_test = join_users_not_test.select(F.col("engaging_user_id").alias("drop_users"))
        join_users_not_test = join_users_not_test.sample(withReplacement=False,
                                                         fraction=0.15,
                                                         seed=42)
        intention_df = intention_df.join(join_users_not_test, 
                                         intention_df.engaging_user_id==join_users_not_test.drop_users, 
                                         how="left_anti").drop("drop_users")
        intention_df.repartition(1000).write.csv(intentions_path)
    else:
        print("Intention already created")
schema = StructType([StructField('engaging_user_id', StringType()),
             StructField('total_appearance', LongType()),
             StructField('perc_n_interactions', DoubleType()),
             StructField('perc_n_commented', DoubleType()),
             StructField('perc_n_liked', DoubleType()),
             StructField('perc_n_replied', DoubleType()),
             StructField('perc_n_retweeted', DoubleType())])
intention_df = spark.read.csv(intentions_path, schema=schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Hashtags

In [236]:
if training:
    print("Creating hashtags features")
    mapping_hashtag_encode, mapping_hashtag_count = mappings(df, "hashtags", top_k_hashtags)
    # Saving pkl
    save_pkl_to_s3(mapping_hashtag_encode, key_hashtag_mapping, bucket)
    save_pkl_to_s3(mapping_hashtag_count, key_hashtag_count, bucket)

# Load dict mapping language
mapping_hashtag_encode = pickle.loads(s3_resource.Bucket(bucket).Object(key_hashtag_mapping).get()['Body'].read())
mapping_hashtag_count = pickle.loads(s3_resource.Bucket(bucket).Object(key_hashtag_count).get()['Body'].read())
        
hashtagsEncoder_udf = F.udf(lambda x: labelEncoder(x, mapping_hashtag_encode), 
                         StringType())
df = df.withColumn('hashtagEncoded', hashtagsEncoder_udf(df.hashtags))
hashtagSumCounter_udf = F.udf(lambda x: hashtagSumCounter(x, mapping_hashtag_count), 
                             IntegerType())
df = df.withColumn('hashtagSumCount', hashtagSumCounter_udf(df['hashtags']))
df = df.withColumn('hashtagCount', listCounter_udf(df.hashtags))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Domain

In [237]:
if training:
    print("Creating domains features")
    mapping_domain_encode, mapping_domain_count = mappings(df, "present_domains", top_k_domains)
    # Saving pkl
    save_pkl_to_s3(mapping_domain_encode, key_domain_mapping, bucket)
    save_pkl_to_s3(mapping_domain_count, key_domain_count, bucket)

# Load dict mapping language
mapping_domain_encode = pickle.loads(s3_resource.Bucket(bucket).Object(key_domain_mapping).get()['Body'].read())

domainEncoder_udf = F.udf(lambda x: labelEncoder(x, mapping_domain_encode), 
                         StringType())
df = df.withColumn('domainEncoded', domainEncoder_udf(df.present_domains))
df = df.withColumn('domainCount', listCounter_udf(df.present_domains))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Language

In [238]:
if training:
    print("Creating language features")
    mapping_encoder = mapping_label_encoder(df, "language", top_k_languages)
    save_pkl_to_s3(mapping_encoder, key_language_mapping, bucket)

# Load dict mapping language
mapping_language_encode = pickle.loads(s3_resource.Bucket(bucket).Object(key_language_mapping).get()['Body'].read())
        
languageEncoder_udf = F.udf(lambda x: labelEncoderSingle(x, mapping_language_encode)[0], 
                         StringType())
df = df.withColumn('languageEncoded', languageEncoder_udf(df.language))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Media

In [239]:
dict_media_counter={'PhotoCount': PhotoCounter_udf,
                    'VideoCount': VideoCounter_udf, 
                    'GIFCount': GifCounter_udf}
for media, media_fun in dict_media_counter.items():
    df = df.withColumn(media, media_fun(df.present_media))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Links

In [240]:
df = df.withColumn('linkCount', listCounter_udf(df.present_links))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Tweet_type

In [241]:
df = df.withColumn('tweetEncoded', tweet_encoded_udf(df.tweet_type))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Timestamp_features

#### Date features

In [242]:
timestamp_feats = [i for i in df.columns if (('timestamp' in i) or ('creation' in i))]

# Timestamp to dates
for col_ts in timestamp_feats:
    # Taking only the preffix of each column
    preffix = col_ts.split("_timestamp")[0]
    df = df.withColumn(preffix + "_date", F.from_unixtime(col_ts, 'yyyy-MM-dd HH:mm:ss'))

# From tweet_date extracting day of week, week of month and hour of tweet
df = df.withColumn( 'tweet_timestamp_day_of_week', F.date_format(F.col('tweet_date'), 'u') )
df = df.withColumn( 'tweet_timestamp_week_of_month',  F.date_format(F.col('tweet_date'), "W" ) )
df = df.withColumn( 'tweet_timestamp_hour',  F.date_format(F.col('tweet_date'), "H" ) )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Scaling_features
As standard scaling will be done manually, we have to create some dictionaries to save the information of the scaling.

In [243]:
if training: 
    scaling_dict = dict()
else:
    scaling_dict = pickle.loads(s3_resource.Bucket(bucket).Object(key_scaling_features).get()['Body'].read())
    assert type(scaling_dict) == dict

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [244]:
#Seconds from engagee account creation to tweet posting
# Differences of timestamps return a result inv Seconds
df = df.withColumn( 'tweet_timestamp_to_engagee_account_creation',  
                   F.col('tweet_timestamp') - F.col('engaged_with_user_account_creation'))
if training: 
    mean_col, sttdev_col = df.select(F.mean('tweet_timestamp_to_engagee_account_creation'),
                                     F.stddev('tweet_timestamp_to_engagee_account_creation')).first()
    # Saving scaling features
    scaling_dict['tweet_timestamp_to_engagee_account_creation'] = { 'mean': mean_col, 'std': sttdev_col}    
mean_col = scaling_dict['tweet_timestamp_to_engagee_account_creation']['mean']    
sttdev_col = scaling_dict['tweet_timestamp_to_engagee_account_creation']['std']
    
df = df.withColumn('tweet_timestamp_to_engagee_account_creation'+'_ss', 
                   (F.col('tweet_timestamp_to_engagee_account_creation') - mean_col) / (2*sttdev_col))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [245]:
#Seconds from engaging account creation to tweet posting
# Differences of timestamps return a result inv Seconds
df = df.withColumn( 'tweet_timestamp_to_engaging_account_creation',  
                   F.col('tweet_timestamp') - F.col('engaging_user_account_creation'))
#Scaling
if training: 
    mean_col, sttdev_col = df.select(F.mean('tweet_timestamp_to_engaging_account_creation'), 
                                     F.stddev('tweet_timestamp_to_engaging_account_creation')).first()
    # Saving scaling features
    scaling_dict['tweet_timestamp_to_engaging_account_creation'] = { 'mean': mean_col, 'std': sttdev_col}    
mean_col = scaling_dict['tweet_timestamp_to_engaging_account_creation']['mean']    
sttdev_col = scaling_dict['tweet_timestamp_to_engaging_account_creation']['std']

df = df.withColumn('tweet_timestamp_to_engaging_account_creation'+'_ss', 
                   (F.col('tweet_timestamp_to_engaging_account_creation') - mean_col) / (2*sttdev_col))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Followers_and_Followings_features

In [246]:
df = df.withColumn('engaged_with_vs_engaging_follower_diff',
                   F.col('engaged_with_user_follower_count') - F.col('engaging_user_follower_count'))
df = df.withColumn('engaged_with_vs_engaging_following_diff', 
                   F.col('engaged_with_user_following_count') - F.col('engaging_user_following_count'))
df = df.withColumn('engaged_follow_diff',
                   F.col('engaged_with_user_follower_count') - F.col('engaged_with_user_following_count'))
df = df.withColumn('engaging_follow_diff', 
                   F.col('engaging_user_follower_count') - F.col('engaging_user_following_count'))
df = df.withColumn('engaged_follower_diff_engaging_following', 
                   F.col('engaged_with_user_follower_count') - F.col('engaging_user_following_count'))
df = df.withColumn('engaged_following_diff_engaging_follower', 
                   F.col('engaged_with_user_following_count') - F.col('engaging_user_follower_count'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [247]:
columns = ["engaged_with_vs_engaging_follower_diff", "engaged_with_vs_engaging_following_diff", 
           "engaged_follow_diff", "engaging_follow_diff", "engaged_follower_diff_engaging_following", 
           "engaged_following_diff_engaging_follower"]
if training:
    diff_min_dict = {}
    for col in columns:
        diff_min_dict[col] = df.select(F.min(col)).first()[0]
    save_pkl_to_s3(diff_min_dict, key_diff_min, bucket)
        
diff_min_dict = pickle.loads(s3_resource.Bucket(bucket).Object(key_diff_min).get()['Body'].read())
for col in columns:
    df = df.withColumn(col, F.col(col)-diff_min_dict[col])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Log and Standard Scaling of followers and difference features

In [248]:
# Followers/following counts and differences will transformed to logarithm. We are choosing the column names.
columns_to_log = ['engaged_with_vs_engaging_follower_diff', 'engaged_with_vs_engaging_following_diff', 
                    'engaged_follow_diff', 'engaging_follow_diff', 
                    'engaged_follower_diff_engaging_following', 'engaged_following_diff_engaging_follower', 
                    'engaged_with_user_follower_count', 'engaging_user_follower_count', 
                    'engaged_with_user_following_count', 'engaging_user_following_count']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [249]:
for column in columns_to_log:
    df = df.withColumn(column + '_log', F.log(F.col(column)+1))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [250]:
cols_to_scale = [column for column in df.columns if ('_log' in column)]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [251]:
if training:
    for col in cols_to_scale:
        mean_col, sttdev_col = df.select(F.mean(col), F.stddev(col)).first()
        scaling_dict[col] = { 'mean': mean_col, 'std': sttdev_col}

for col in cols_to_scale:
    mean_col = scaling_dict[col]['mean']
    sttdev_col = scaling_dict[col]['std']
    df = df.withColumn(col+'_ss', (F.col(col) - mean_col) / (2*sttdev_col))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Quantile_Discretizer

In [252]:
if training:
    qds_obj = {}
    qds_paths
    for col in qds_paths.keys():
        qds = QuantileDiscretizer(numBuckets=50, inputCol=col, outputCol=col+"_q")
        qds = qds.fit(df)
        qds.save(qds_paths[col])
        
for col in qds_paths.keys():
    qds = QuantileDiscretizer.load(qds_paths[col])
    df = qds.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Intentions_join

In [253]:
df = df.join(intention_df, 
             on="engaging_user_id", 
             how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [254]:
columns = ["perc_n_interactions", 'perc_n_commented', 'perc_n_liked', 'perc_n_replied', 'perc_n_retweeted']
if training:
    dict_mean_perc = {}
    for col in columns:
        dict_mean_perc[col] = intention_df.select(F.mean(col)).first()[0]
    save_pkl_to_s3(dict_mean_perc, key_impute_perc, bucket)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### NumScale

In [255]:
num_cols_to_scale = ["hashtagSumCount", "hashtagCount",
                     "domainCount", "PhotoCount",
                     "VideoCount", "GIFCount",
                     "linkCount", "total_appearance",
                     "perc_n_interactions",
                     "perc_n_commented",
                     "perc_n_liked",
                     "perc_n_replied",
                     "perc_n_retweeted"]
#Scaling
if training:
    for col in num_cols_to_scale:
        mean_col, sttdev_col = df.select(F.mean(col), F.stddev(col)).first()
        scaling_dict[col] = {'mean': mean_col, 'std': sttdev_col}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Imputation

#### DiffsImputer

In [256]:
columns = ['engaged_with_vs_engaging_follower_diff_log_ss', 
           'engaged_with_vs_engaging_following_diff_log_ss', 
           'engaged_follow_diff_log_ss', 
           'engaging_follow_diff_log_ss', 
           'engaged_follower_diff_engaging_following_log_ss', 
           'engaged_following_diff_engaging_follower_log_ss']
for col in columns:
    df = df.withColumn(col, F.when(F.col(col).isNotNull(), F.col(col)).otherwise(0))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### IntentionsImputer

In [257]:
df = df.withColumn('total_appearance', F.when(F.col("total_appearance").isNotNull(), 
                                                   F.col("total_appearance"))\
                          .otherwise(0))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [258]:
columns = ["perc_n_interactions", 'perc_n_commented', 'perc_n_liked', 'perc_n_replied', 'perc_n_retweeted']
        
dict_mean_perc = pickle.loads(s3_resource.Bucket(bucket).Object(key_impute_perc).get()['Body'].read())
for col in columns:
    df = df.withColumn(col, F.when(F.col(col).isNotNull(), F.col(col))\
                       .otherwise(dict_mean_perc[col]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Scaling after imputed

In [259]:
for col in num_cols_to_scale:
    mean_col = scaling_dict[col]['mean']
    sttdev_col = scaling_dict[col]['std']
    df = df.withColumn(col+'_ss', (F.col(col) - mean_col) / (2*sttdev_col))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Clipping

In [260]:
deviations_to_clip = 5.0
for col in num_cols_to_scale:
    df = df.withColumn(col+'_ss', F.when(F.abs(F.col(col+'_ss'))<5, 
                                         F.col(col+'_ss'))\
                       .otherwise(deviations_to_clip))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Saving scaling_dict

In [261]:
if training: 
    # Saving scaling dictionary to pickle
    save_pkl_to_s3(scaling_dict, key_scaling_features, bucket)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# FeatureSelection

In [262]:
original = ['text_tokens', 'tweet_id', 'engaged_with_user_id', 'engaged_with_user_is_verified',
            'engaging_user_id', 'engaging_user_is_verified', 'engagee_follows_engager']

processed = ['hashtagEncoded', 'hashtagSumCount_ss', 'hashtagCount_ss', #hashtags
             'domainEncoded', 'domainCount_ss', #domains
             'tweetEncoded', # tweet type encoded
             'languageEncoded',# language encoded
             'tweet_timestamp_day_of_week', 'tweet_timestamp_week_of_month', 'tweet_timestamp_hour', #tweet timestamp
             'tweet_timestamp_to_engagee_account_creation_ss', #engagee time in twitter proxy
             'tweet_timestamp_to_engaging_account_creation_ss', #engaging time in twitter proxy
             'engaged_with_vs_engaging_follower_diff_log_ss', #followers engaged
             'engaged_with_vs_engaging_following_diff_log_ss', #followings engaged
             'engaged_follow_diff_log_ss', #diff follows engaged
             'engaging_follow_diff_log_ss', #diff follows engaging
             'engaged_follower_diff_engaging_following_log_ss',  #followings engaging
             'engaged_following_diff_engaging_follower_log_ss', #followers engaging
             'engaged_with_user_follower_count_log_ss', #engaged follower count
             'engaging_user_follower_count_log_ss', #engaging follower count
             'engaged_with_user_following_count_log_ss',  #engaged following count
             'engaging_user_following_count_log_ss', #engaging following count
             'PhotoCount_ss', 'VideoCount_ss', 'GIFCount_ss', 'linkCount_ss', #media
             'engaged_with_user_follower_count_q', 'engaged_with_user_following_count_q', #Quantile
             'engaged_with_user_account_creation_q', 'engaging_user_follower_count_q', #Quantile
             'engaging_user_following_count_q', 'engaging_user_account_creation_q', #Quantile
             'total_appearance_ss', 'perc_n_interactions_ss', 'perc_n_commented_ss', 
             'perc_n_liked_ss', 'perc_n_replied_ss', 'perc_n_retweeted_ss' #intentions
             ]

labels = ['indicator_reply', 'indicator_retweet', 'indicator_retweet_with_comment',
          'indicator_like', 'indicator_interaction']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [263]:
if submission:
    cols_to_select = original + processed
elif test:
    cols_to_select = original + processed
else:
    cols_to_select = original + processed + labels

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [264]:
new_df = df.select(cols_to_select)
columns = columns2cast(new_df)
new_df = cast_array2string(new_df, columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [265]:
dict_types = {'bool': ['engaged_with_user_is_verified', 'engaging_user_is_verified', 'engagee_follows_engager'],
              'id': ['tweet_id', 'engaged_with_user_id', 'engaging_user_id'], 
              'num': ['hashtagSumCount_ss', 'hashtagCount_ss', 
                      'domainCount_ss', 
                      'tweet_timestamp_to_engagee_account_creation_ss', #engagee time in twitter proxy
                      'tweet_timestamp_to_engaging_account_creation_ss', #engaging time in twitter proxy
                      'engaged_with_vs_engaging_follower_diff_log_ss', #followers engaged
                      'engaged_with_vs_engaging_following_diff_log_ss', #followings engaged
                      'engaged_follow_diff_log_ss', #diff follows engaged
                      'engaging_follow_diff_log_ss', #diff follows engaging
                      'engaged_follower_diff_engaging_following_log_ss',  #followings engaging
                      'engaged_following_diff_engaging_follower_log_ss', #followers engaging
                      'engaged_with_user_follower_count_log_ss', #engaged follower count
                      'engaging_user_follower_count_log_ss', #engaging follower count
                      'engaged_with_user_following_count_log_ss',  #engaged following count
                      'engaging_user_following_count_log_ss', #engaging following count
                      'PhotoCount_ss', 'VideoCount_ss', 'GIFCount_ss', 'linkCount_ss', #media
                      'total_appearance_ss', 'perc_n_interactions_ss', 'perc_n_commented_ss', 
                      'perc_n_liked_ss', 'perc_n_replied_ss', 'perc_n_retweeted_ss' #intentions
                     ], 
              'cat': ['tweetEncoded', 'languageEncoded',
                      'tweet_timestamp_day_of_week', 'tweet_timestamp_week_of_month', 'tweet_timestamp_hour',
                      'engaged_with_user_follower_count_q', 'engaged_with_user_following_count_q', #Quantile
                      'engaged_with_user_account_creation_q', 'engaging_user_follower_count_q', #Quantile
                      'engaging_user_following_count_q', 'engaging_user_account_creation_q', #Quantile
                     ], 
              'ors': ['text_tokens'], 
              'unors':['hashtagEncoded', 'domainEncoded']}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [266]:
for key, val in dict_types.items():
    for column in val:
        new_df = new_df.withColumnRenamed(column, column + '_' + key)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [267]:
new_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12434838

# MergeUserBucket

In [268]:
map_user_bucket = spark.read.csv(map_user_bucket_path, schema= StructType([StructField('user_id', StringType()),
                                                                           StructField('final_bucket', IntegerType())]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [269]:
new_df = new_df.join(map_user_bucket, new_df.engaged_with_user_id_id==map_user_bucket.user_id, how="left")
new_df = new_df.drop("user_id")
new_df = new_df.withColumnRenamed("final_bucket", "engaged_with_user_id_bucket")

new_df = new_df.join(map_user_bucket, new_df.engaging_user_id_id==map_user_bucket.user_id, how="left")
new_df = new_df.drop("user_id")
new_df = new_df.withColumnRenamed("final_bucket", "engaging_user_id_bucket")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Saving_df

In [None]:
print(processed_submission_path)
print(processed_test_path)
print(processed_train_path)
print(processed_val_path)

In [273]:
processed_submission_path = f"hdfs:///submission-{suffix_sample}"
processed_test_path = f"hdfs:///test-{suffix_sample}"
processed_train_path = f"hdfs:///train-{suffix_sample}"
processed_val_path = f"hdfs:///val-{suffix_sample}"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [274]:
if submission:
    new_df.write.option("header","true").csv(processed_submission_path)
    print("Submission saved")
elif test:
    new_df.write.option("header","true").csv(processed_test_path)
    print("Test saved")
elif training:
    new_df.write.option("header","true").csv(processed_train_path)
    print("Train saved")
else:
    new_df.write.option("header","true").csv(processed_val_path)
    print("Valid saved")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test saved

# REPLACE FOR S3-DIST-CP

In [None]:
StructType([StructField("text_tokens_ors",StringType),
            StructField("tweet_id_id",StringType),
            StructField("engaged_with_user_id_id",StringType),
            StructField("engaged_with_user_is_verified_bool",BooleanType),
            StructField("engaging_user_id_id",StringType),
            StructField("engaging_user_is_verified_bool",BooleanType),
            StructField("engagee_follows_engager_bool",BooleanType),
            StructField("hashtagEncoded_unors",StringType),
            StructField("hashtagSumCount_num",IntegerType),
            StructField("hashtagCount_num",IntegerType),
            StructField("domainEncoded_unors",StringType),
            StructField("domainCount_num",IntegerType),
            StructField("tweetEncoded_cat",IntegerType),
            StructField("languageEncoded_cat",StringType),
            StructField("tweet_timestamp_day_of_week_cat",StringType),
            StructField("tweet_timestamp_week_of_month_cat",StringType),
            StructField("tweet_timestamp_hour_cat",StringType),
            StructField("tweet_timestamp_to_engagee_account_creation_ss_num",DoubleType),
            StructField("tweet_timestamp_to_engaging_account_creation_ss_num",DoubleType),
            StructField("engaged_with_vs_engaging_follower_diff_log_ss_num",DoubleType),
            StructField("engaged_with_vs_engaging_following_diff_log_ss_num",DoubleType),
            StructField("engaged_follow_diff_log_ss_num",DoubleType),
            StructField("engaging_follow_diff_log_ss_num",DoubleType),
            StructField("engaged_follower_diff_engaging_following_log_ss_num",DoubleType),
            StructField("engaged_following_diff_engaging_follower_log_ss_num",DoubleType),
            StructField("engaged_with_user_follower_count_log_ss_num",DoubleType),
            StructField("engaging_user_follower_count_log_ss_num",DoubleType),
            StructField("engaged_with_user_following_count_log_ss_num",DoubleType),
            StructField("engaging_user_following_count_log_ss_num",DoubleType),
            StructField("PhotoCount_num",IntegerType),
            StructField("VideoCount_num",IntegerType),
            StructField("GIFCount_num",IntegerType),
            StructField("linkCount_num",IntegerType),
            StructField("engaged_with_user_follower_count_q_cat",DoubleType),
            StructField("engaged_with_user_following_count_q_cat",DoubleType),
            StructField("engaged_with_user_account_creation_q_cat",DoubleType),
            StructField("engaging_user_follower_count_q_cat",DoubleType),
            StructField("engaging_user_following_count_q_cat",DoubleType),
            StructField("engaging_user_account_creation_q_cat",DoubleType),
            StructField("total_appearance_num",LongType),
            StructField("perc_n_interactions_num",DoubleType),
            StructField("perc_n_commented_num",DoubleType),
            StructField("perc_n_liked_num",DoubleType),
            StructField("perc_n_replied_num",DoubleType),
            StructField("perc_n_retweeted_num",DoubleType),
            StructField("indicator_reply",IntegerType),
            StructField("indicator_retweet",IntegerType),
            StructField("indicator_retweet_with_comment",IntegerType),
            StructField("indicator_like",IntegerType),
            StructField("indicator_interaction",IntegerType)])