# Sample dataset generator

Sample a big dataset to create small(er) datasets. Run this notebook on the cluster.

Produces a folder with parts of a parquet file, located on HDFS at `OUTPUT_PATH/OUTPUT_DIR`.

To merge the parts into a single file and bring it to the local filesystem, use `hdfs dfs -getmerge OUTPUT_PATH/OUTPUT_DIR <filename>.parquet`.

To load the file locally, use pandas:
```python
df = pd.read_parquet(path)
```

In [1]:
import os
from pyspark.sql.functions import lit
from pyspark.sql.functions import rand

G1 GC useful links:
- https://www.oracle.com/technical-resources/articles/java/g1gc.html for overview
- https://spark.apache.org/docs/latest/tuning.html#memory-management-overview for description of tuning options
- https://spark.apache.org/docs/latest/configuration.html#runtime-environment for JavaOptions

In [2]:
# Set G1  GC as garbage collector
spark.conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
spark.conf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC")

# Set heap of a G1 region to maximum size
spark.conf.set("spark.driver.extraJavaOptions", "XX:G1HeapRegionSize=32m")

# Increase executor memory
spark.conf.set("spark.executor.memory", "64g")

# Logging of GC usage: debug to find the bottleneck
spark.conf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCTimeStamps")
spark.conf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails")

# Set logging directory
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "hdfs://BigDataHA/user/s277596/spark/spark2ApplicationHistory")

In [3]:
INPUT_PATH = "hdfs://BigDataHA/user/s277309/recsys_data/"
OUTPUT_PATH = "recsys_data_sample_generated"

# If not empty, drop the specified columns from the sample.
# Note: this does not actually drop columns. Rather, it fills
# the column with `0` values. This is done to keep compatibility
# with our importer
FILL_NULL_COLUMNS = [
    
]

NO_TIMESTAMP = False  # If True, transform timestamps into "1" if present, "0" if not present (saves disk space)

TOTAL_ROWS = 747694282
SAMPLE_ROWS = 2000000  # Number of rows to sample

# If True, get a samples from a time window. This is necessary for correct training and testing conditions
TIME_WINDOW = True

In [4]:
features = [
    # Tweet features
    "text_tokens",      # List[long]    Ordered list of Bert ids corresponding to Bert tokenization of Tweet text
    "hashtags",         # List[string]  Tab separated list of hastags (identifiers) present in the tweet
    "tweet_id",         # String        Tweet identifier (unique)
    "present_media",    # List[String]  Tab separated list of media types. Media type can be in (Photo, Video, Gif)
    "present_links",    # List[string]  Tab separated list of links (identifiers) included in the Tweet
    "present_domains",  # List[string]  Tab separated list of domains included in the Tweet (twitter.com, dogs.com)
    "tweet_type",       # String        Tweet type, can be either Retweet, Quote, Reply, or Toplevel
    "language",         # String        Identifier corresponding to the inferred language of the Tweet
    "tweet_timestamp",  # Long          Unix timestamp, in sec of the creation time of the Tweet
    
    # Engaged-with User (i.e., Engagee) Features
    "engaged_with_user_id",                 # String    User identifier
    "engaged_with_user_follower_count",     # Long      Number of followers of the user
    "engaged_with_user_following_count",    # Long      Number of accounts the user is following
    "engaged_with_user_is_verified",        # Bool      Is the account verified?
    "engaged_with_user_account_creation",   # Long      Unix timestamp, in seconds, of the creation time of the account
    
    # Engaging User (i.e., Engager) Features
    "engaging_user_id",                     # String    User identifier   
    "engaging_user_follower_count",         # Long      Number of followers of the user
    "engaging_user_following_count",        # Long      Number of accounts the user is following
    "engaging_user_is_verified",            # Bool      Is the account verified?
    "engaging_user_account_creation",       # Long      Unix timestamp, in seconds, of the creation time of the account
    
    # Engagement features
    "engagee_follows_engager"   # Bool  Engagee follows engager?
]

features_idx = dict(zip(features, range(len(features))))

labels_idx = {
    # Engagement features (cont.)
    "reply_timestamp": 20,                  # Long      Unix timestamp (in seconds) of one of the replies, if there is at least one
    "retweet_timestamp": 21,                # Long      Unix timestamp (in seconds) of the retweet by the engaging user, if there is at least one
    "retweet_with_comment_timestamp": 22,   # Long      Unix timestamp (in seconds) of one of the retweet with comment by the engaging user, if there is at least one
    "like_timestamp": 23                    # Long      Unix timestamp (in seconds) of the like by the engaging user, if they liked the tweet
}

In [5]:
dtypes = [
    "string",
    "string",
    "string",
    "string",
    "string",
    "string",
    "string",
    "string",
    "int",
    "string",
    "int",
    "int",
    "boolean",
    "int",
    "string",
    "int",
    "int",
    "boolean",
    "int",
    "boolean",
    "int",
    "int",
    "int",
    "int"
]

In [6]:
# Read data
lines_rdd = sc.textFile(INPUT_PATH)

# Split each line
# Fields in each data entry are separated by the 1 character (0x31 in UTF-8).
# https://recsys-twitter.com/code/snippets
fields_rdd = lines_rdd.map(lambda line: line.strip().split("\x01"))

# Adapted from: https://github.com/MAL-TO/recsys-2021/blob/a047fba6385453b90f6754ac7ebe36eaf622cb2c/andrea/recsys-2021/src/data/make_dataset.py
# Eventually delete timestamps for targets and put 1 if a timestamp is present, 0 otherwise
def timestamp_to_bool(l):
    """Transform targets into either 1 or 0, based on whether a timestamp is present or not in `label_key`"""
    for label_key in labels_idx:
        l[labels_idx[label_key]] = int(len(l[labels_idx[label_key]]) > 0)
    return l

if NO_TIMESTAMP:
    fields_rdd = fields_rdd.map(lambda line: timestamp_to_bool(line))

# Transform to Spark dataframe
schema = features + list(labels_idx.keys())  # Column names
df = spark.createDataFrame(fields_rdd, schema)

# Eventually drop some columns from the dataframe
for col_to_drop in FILL_NULL_COLUMNS:
    df = df.withColumn(col_to_drop, lit(0))
    
for i in range(len(dtypes)):
    dtype = dtypes[i]
    field = schema[i]
    df = df.withColumn(field, df[field].cast(dtype))

In [None]:
# %%time
# # Sample data
# OUTPUT_DIR = f"sample_{SAMPLE_ROWS/TOTAL_ROWS:.4f}"
# if TIME_WINDOW:
#     # Reversing the order since first samples are less meaningful (see stationarity eda)
#     df.createOrReplaceTempView("dataframe")
#     query = f"SELECT * FROM dataframe ORDER BY tweet_timestamp DESC LIMIT {SAMPLE_ROWS}"
#     sample_df = spark.sql(query)
# else:
#     sample_df = df.sample(withReplacement=False, fraction=SAMPLE_ROWS/TOTAL_ROWS)
# sample_df.show()

In [None]:
%%time
# Sample data
OUTPUT_DIR = f"sample_{SAMPLE_ROWS/TOTAL_ROWS:.4f}"

df.createOrReplaceTempView("dataframe")
query = "SELECT * FROM dataframe ORDER BY tweet_timestamp"
sorted_df = spark.sql(query)
sorted_df.show()

In [None]:
%%time
sorted_df.createOrReplaceTempView("sorted")
query = f"SELECT * FROM sorted LIMIT {SAMPLE_ROWS}"
sample_df = spark.sql(query)
sample_df.show()

In [None]:
# Write to parquet file
sample_df.write.parquet(os.path.join(OUTPUT_PATH, OUTPUT_DIR))

In [None]:
print(os.path.join(OUTPUT_PATH, OUTPUT_DIR))