# Feature Engineering

### Approach
<b>Features To Build </b>
- Total Songs Listened To
- App Page Interactions
    - Thumbs Up
    - Thumbs Down
    - Add Friend
    - Add to playlist
- Help Page Interactions/Error Page Interactions
- User Device Brand
- User Browser


<b> Key Points </b>
- The Feature dataframe will be created based on the userId
- Transformations will need to maximize Pyspark capabilities

In [26]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql import functions as F
from pyspark.sql.functions import col, from_unixtime, date_trunc, udf, lit,date_format


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
import plotly.express as px

In [27]:
def create_spark_session(app_name="Sparkify", default_settings = True ,  total_physical_cores=16,driver_memory = 8,executor_memory = 8):
# Calculate available cores for Spark
    try:
        spark.shutdown()
    except  Exception as e:
        print(e)

    if default_settings == False:
        total_physical_cores = input(" Available Cores")
        driver_memory =  input(" Driver Memory Allowance")
        executor_memory = input("Executor Memory Allowance")

    available_cores_for_spark =int( total_physical_cores - 2)
    # Configure Spark session
    spark = (
        SparkSession.builder.appName(app_name)
        .config("spark.driver.memory", str(int(driver_memory)) + "g")
        .config("spark.executor.memory", str(int(executor_memory)) + "g")
        .config("spark.executor.cores", available_cores_for_spark)
        .getOrCreate()
    )

    return  spark

spark = create_spark_session()

local variable 'spark' referenced before assignment


24/01/20 23:37:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [28]:
path = "./data/lg_sparkify_event_data.json"
sdf = spark.read.json(path)

                                                                                

In [29]:

clean_sdf = sdf.dropna(subset="userId")
clean_sdf.createOrReplaceTempView("cleaned_user_log")
unique_users = clean_sdf[["userId"]].distinct()

clean_sdf = clean_sdf.withColumn("ts", (col("ts") / 1000).cast("timestamp"))

# Apply date_format function
clean_sdf = clean_sdf.withColumn("date", date_format(col("ts"), "yyyy-MM-dd"))

# clean_sdf = clean_sdf.withColumn("ts", from_unixtime(col("ts") / 1000))  # Assuming ts is in milliseconds
# clean_sdf = sdf.withColumn("date", date_format(col("ts"), "yyyy-MM-dd"))

In [30]:
def handle_missing_users(
    sdf: DataFrame, unique_users: DataFrame, fill_value=lit(0)
) -> DataFrame:
    """
    Handle missing users in a PySpark DataFrame.

    Parameters:
    - sdf (DataFrame): PySpark DataFrame representing user data. Should have columns 'userId' and 'featur_name'.
    - unique_users (DataFrame): PySpark DataFrame with unique user information.

    Returns:
    - DataFrame: Updated PySpark DataFrame with filled missing users.
    """

    sdf_user_count = sdf.count()

    unique_count = unique_users.count()

    if sdf_user_count != unique_count:
        print(f"Missing Values: {unique_count - sdf_user_count}")
        missing_users = unique_users.select("userId").subtract(sdf.select("userId"))
        # Since the sdf is only two we rename the column based on sdf's second column
        missing_users_sdf = missing_users.withColumn(sdf.columns[1], fill_value)
        filled_missing_users = sdf.union(missing_users_sdf)

        return filled_missing_users
    else:
        return sdf
    
def flag_rows(df, column_name, check_list, flagged_column_name):
    """
    Flag rows in a PySpark DataFrame based on whether the value in a specified column is in a given list.

    Parameters:
    - df (pyspark.sql.DataFrame): The PySpark DataFrame to be modified.
    - column_name (str): The name of the column to check for values.
    - check_list (list): The list of values to check against.
    - flagged_column_name (str): The name of the new column to be created for the flags.

    Returns:
    pyspark.sql.DataFrame: The modified PySpark DataFrame with the new flagged column.

    This function takes a PySpark DataFrame, a column name, a list of values, and a flagged column name. It then adds a new column to the DataFrame
    that contains a flag (1 or 0) based on whether the values in the specified column are present in the given list.
    """

    def check_list_udf(value):
        return 1 if value in check_list else 0

    check_list_udf = udf(check_list_udf, IntegerType())

    df = df.withColumn(flagged_column_name, check_list_udf(df[column_name]))

    return df

## Build Features

### Song Counts

In [31]:
song_counts = (
    clean_sdf[["userId", "artist"]].dropna(subset="artist").groupBy("userId").count()
)

song_counts = handle_missing_users(song_counts, unique_users)

# song_counts.show()



Missing Values: 17


                                                                                

### Distinct Artist

In [32]:
distinct_artist = (
    clean_sdf.filter(clean_sdf["artist"].isNotNull())
    .groupBy("userId")
    .agg(F.countDistinct("artist").alias("distinct_artist"))
)

distinct_artist = handle_missing_users(distinct_artist, unique_users)

# distinct_artist.show()

                                                                                

Missing Values: 17


### User Level

In [33]:
user_level = (
    clean_sdf[["userId", "level", "ts"]]
    .orderBy("ts", ascending=False)
    .dropDuplicates(subset=["userId"])
    .select("userId", "level")
)

level_flag_udf = udf(lambda x: 1 if x == "paid" else 0, IntegerType())

# one-hot encode
user_level = user_level.withColumn(
    "level_flag", level_flag_udf(user_level["level"])
).select("userId", "level_flag")

user_level = handle_missing_users(user_level, unique_users)
# user_level.show()

                                                                                

### Positive App Usage

In [34]:
positive_usage_list = ["Thumbs Up", "Thumbs Down", "Add Friend", "Add to playlist"]

positive_usage = (
    clean_sdf[["userId", "page"]]
    .filter(col("page").isin(positive_usage_list))
    .groupBy("userId")
    .count()
)

positive_usage = positive_usage.withColumnRenamed("count", "pos_interactions")

positive_usage = handle_missing_users(positive_usage, unique_users)
# positive_usage.show()



Missing Values: 318


                                                                                

### Negative Interactions

In [35]:
neg_interactions_list = ["Error", "Help"]

neg_interactions = (
    clean_sdf[["userId", "page"]]
    .filter(col("page").isin(neg_interactions_list))
    .groupBy("userId")
    .count()
)

neg_interactions = neg_interactions.withColumnRenamed("count", "neg_interactions")

neg_interactions = handle_missing_users(neg_interactions, unique_users)
# neg_interactions.show()

                                                                                

Missing Values: 3412


### Unique Locations

In [36]:
unique_locations = (
    clean_sdf.filter(clean_sdf["location"].isNotNull())
    .groupBy("userId")
    .agg(F.countDistinct("location").alias("unique_locations"))
)


unique_locations = handle_missing_users(unique_locations, unique_users)




Missing Values: 1


                                                                                

### Avg Daily Listens

In [37]:
# Calculate the average daily listens per user
average_daily_listens = (
    clean_sdf.dropna(subset="artist")
    .groupBy("userId", "date")
    .agg(F.count("artist").alias("daily_listens"))
    .groupBy("userId")
    .agg(F.avg("daily_listens").alias("avg_daily_listens"))
)

average_daily_listens = average_daily_listens.withColumn("avg_daily_listens", F.round("avg_daily_listens"))


### Page Counts

In [38]:
page_filter = ["Cancel", "Cancellation Confirmation", "NextSong"]
page_count_df = (
    clean_sdf[["userId", "page"]]
    .filter(~col("page").isin(page_filter))
    .toPandas()
    .groupby("userId")
    .value_counts()
    .reset_index()
    .pivot(columns="page", values="count", index="userId")
    .fillna(int(0))
)


# page_count_corr = page_count_df.corr()

# # sns.heatmap(
# #     page_count_corr,
# #     annot=False,
# #     cmap="coolwarm",
# #     fmt=".2f",
# #     linewidths=0.5,
# # )

page_count = spark.createDataFrame(page_count_df.reset_index())
# page_count.show()

                                                                                

## Device and Browsers

In [39]:
def ua_check(sdf):
    from ua_parser import user_agent_parser

    def device_check(ua):
        if ua:
            ua = ua.replace("\\", "")
            parsed_string = user_agent_parser.Parse(ua)
            return parsed_string["os"]["family"]
        else:
            return "Other"

    def browser_check(ua):
        if ua:
            ua = ua.replace("\\", "")
            parsed_string = user_agent_parser.Parse(ua)
            return parsed_string["user_agent"]["family"]
        else:
            return "Other"

    device_check_udf = udf(device_check, StringType())
    browse_brand_udf = udf(browser_check, StringType())

    sdf = sdf.withColumn("device", device_check_udf(sdf["userAgent"]))
    sdf = sdf.withColumn("browser", browse_brand_udf(sdf["userAgent"]))

    return sdf





In [40]:

cols_to_encode = ["device","browser"]

def one_hot_encode(sdf,cols_to_encode):
    conditions = {}
    categories = {}

    for col in cols_to_encode:
        cats = [row[0] for row in sdf.select(col).distinct().collect()]
        conditions[col] = {f"{col} == '{cat}'": idx for idx, cat in enumerate(cats)}
        categories[col] = {cat: idx for idx, cat in enumerate(cats)}

    for col, cats_dict in categories.items():
        for cat, idx in cats_dict.items():
            expression = F.when(sdf[col] == cat, 1).otherwise(0)
            sdf = sdf.withColumn(f"{col}_{cat}", expression)
        sdf = sdf.drop(col)

    return sdf

ua_sdf = one_hot_encode(ua_check(sdf).select("userId","device","browser"),cols_to_encode)


                                                                                

## Time Since Registration

In [41]:
time_since_registration = sdf.groupBy('userId', 'ts', 'registration').count() \
    .withColumn('life_time', (F.col('ts') - F.col('registration')) / 1000) \
    .groupBy('userId').agg(F.max('life_time').alias('time_since_registration'))

## Label

In [42]:
labels = (
    clean_sdf[["userId", "page"]]
    .filter(col("page").isin(["Cancellation Confirmation"]))
    .drop_duplicates(["userId"])
)


labels = labels.withColumn("label", lit(1))
labels_sdf = labels.drop("page")

labels_sdf = handle_missing_users(labels_sdf, unique_users)





Missing Values: 17275


                                                                                

In [43]:
dfs = [
    song_counts,
    # song_listened_mean,
    average_daily_listens,
    user_level,
    positive_usage,
    neg_interactions,
    #unique_locations,
    distinct_artist,
    page_count,
    time_since_registration,
   ua_sdf
]

processed_features = labels_sdf
for df in dfs:
    processed_features = processed_features.join(df, "userId", "left_outer")
    del df

In [44]:
processed_features = processed_features.dropDuplicates().dropna()

### Export Features For Model Testing

In [45]:
from datetime import datetime
processed_features.toPandas().to_csv(f"./data/lg_all_features.csv")
                                
spark.stop()        
                                
#processed_features.write.csv(f"/Users/jacobfletcher/git/churn_project/data/processed_features_{datetime.now()}", mode='overwrite',header= True)

24/01/20 23:59:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/20 23:59:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/20 23:59:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/20 23:59:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/20 23:59:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/20 23:59:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/20 23:59:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/20 23:59:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/01/20 23:59:53 WARN RowBasedKeyValueBatch: Calling spill() on

### Feature Correalations

In [None]:
pf_df = processed_features.toPandas()

In [None]:
del processed_features

In [None]:
def display_distributions(df):
    sns.set(style="whitegrid")

    numerical_columns = df.select_dtypes(include=['float64', 'int64']).columns
    num_numerical_columns = len(numerical_columns)
    
    # Determine the number of rows and columns for subplots
    num_rows = (num_numerical_columns - 1) // 3 + 1
    num_cols = min(3, num_numerical_columns)

    plt.figure(figsize=(15, 5 * num_rows))

    for i, column in enumerate(numerical_columns, start=1):
        plt.subplot(num_rows, num_cols, i)
        sns.histplot(df[column], kde=True)
        plt.title(f'Distribution of {column}')

    plt.tight_layout()
    plt.show()



display_distributions(pf_df)