<a href="https://www.kaggle.com/code/irenashen1/many-one-hot-encoding-in-pyspark?scriptVersionId=120760448" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

<span style="color:#007BA7"> Many One-hot Encoding in Pyspark </span>

Recently I ran into single-column structured datasets at work- it is a set of survey questions responded by shoppers; many question allow "check-all-that-apply". The multiple reponses to each survey question by the shoppers are recorded as a <b>string seperated by comma</b> Here is a super simple example of what the data looks like, and what I wanted to achieve:  

<center><img src="https://i.imgur.com/R5KWnew.png"></center>

The raw data contain the answers each user id responded, split by comma. They might not be in alpabetical orders, and the number of answers vary each row. To make sense out of these datapoints, I would need to transform those values into multiple-column structure, and then into Boolean datatype. Or, we can describe it as transforming the long dataframe to a wide dataframe. Here are two key questions: 

- How to do that in Pyspark? 
- How to process multiple questions at once? 

We will use **foreveralone dataset** from Reddit for demo, as this dataset contains multiple check-all-that-apply questions.
This notebook will focus on feature engineering techniques in Pyspark, instead of analytics. Here are the 7 steps of the flow:

1. Exploration of the data set: identify the columns (number of columns = N.In this demo, N=3) that record check-all-that-apply values. 
2. Convert N columns into a list. 
3. Clean up the strings with a for loop, in order to collect all the unique cateogorical answers in columns into lists.
4. Write a function that renames all new columns in a systematic way. 
5. Write THE function `Check_All_That_Apply()` <span style="color:#FF0000">The key of this notebook! </span>
6. Run `Check_All_That_Apply()` for column list to generate N new dataframes.
7. Join all N dataframes, and you are all set! 






In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m17.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / done
[?25h  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824024 sha256=a20a84b8de53be5a209aa9fbd9bd091acdc355671e47c15442dba30359c32562
  Stored in directory: /root/.cache/pip/wheels/07

In [2]:
import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.master("local[2]").appName("foreveralone").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/01 18:48:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sc = spark.sparkContext

In [5]:
foreveralone_path = '../input/the-demographic-rforeveralone-dataset/foreveralone.csv'
foreveralone_sdf = spark.read.csv(foreveralone_path,inferSchema=True,header=True)
foreveralone_sdf.printSchema()

root
 |-- time: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- sexuallity: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- income: string (nullable = true)
 |-- race: string (nullable = true)
 |-- bodyweight: string (nullable = true)
 |-- virgin: string (nullable = true)
 |-- prostitution_legal: string (nullable = true)
 |-- pay_for_sex: string (nullable = true)
 |-- friends: double (nullable = true)
 |-- social_fear: string (nullable = true)
 |-- depressed: string (nullable = true)
 |-- what_help_from_others: string (nullable = true)
 |-- attempt_suicide: string (nullable = true)
 |-- employment: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- edu_level: string (nullable = true)
 |-- improve_yourself_how: string (nullable = true)



First, let's create user_id, as ths original dataset is missing it. Also, by exploring the values of each columns, we have identified that the survey questions below are check-all-that-apply. So, we wil select these column along with `user_id` first.  
- what_help_from_others
- edu_level
- improve_yourself_how

In [6]:
selected_survey_sdf = foreveralone_sdf\
    .withColumn("user_id", F.monotonically_increasing_id())\
    .select('user_id', 'what_help_from_others', 'edu_level', 'improve_yourself_how')

selected_survey_sdf.show()

+-------+---------------------+--------------------+--------------------+
|user_id|what_help_from_others|           edu_level|improve_yourself_how|
+-------+---------------------+--------------------+--------------------+
|      0| wingman/wingwoman...|    Associate degree|                None|
|      1| wingman/wingwoman...|Some college, no ...|join clubs/socual...|
|      2|    I don't want help|Some college, no ...|      Other exercise|
|      3|        date coaching|Some college, no ...|Joined a gym/go t...|
|      4|    I don't want help|High school gradu...|                None|
|      5|        date coaching|   Bachelor’s degree|Joined a gym/go t...|
|      6| Set me up with a ...|High school gradu...|                None|
|      7| wingman/wingwoman...|Trade/technical/v...|Therapy, join clu...|
|      8| Set me up with a ...|Some college, no ...|Joined a gym/go t...|
|      9| Set me up with a ...|     Master’s degree|             Therapy|
|     10|        date coaching|   Bach

Next, create a column list that contains all the columns you want to process. In this case, the list will be:

In [7]:
columns = ['what_help_from_others', 'edu_level', 'improve_yourself_how']

Next, since all responses are seperate by commas, we will split them by applying `split()`, `explode()`, and `trim()` functions:

In [8]:
list_dict = {}

for column in columns : 
    list_dict[column] = selected_survey_sdf \
    .withColumn('label_split', F.explode((F.split(F.col(column), ',')))) \
    .withColumn('label_split', F.trim(F.col("label_split"))) \
    .orderBy('label_split', ascending=True) \
    .select('label_split').distinct().rdd.flatMap(lambda x: x).collect()

                                                                                

Next, define a function that will automatically rename all columns, except for `user_id`

In [9]:
def prefix_columns(sdf, exclude_cols, prefix):
    to_prefix_cols = [
        col
        for col in sdf.columns
        if col not in exclude_cols
    ]
    prefixed_sdf = sdf
    for col in list(set(to_prefix_cols)):
        prefixed_sdf = prefixed_sdf \
            .withColumnRenamed(col, prefix + col)
    return prefixed_sdf

Next, write a function named `Check_All_That_Apply(column, list_of_value)` where you define how you want to rename the columns (because there will be many new columns being created, it is important to have a good naming convention in place, so that not only your coworks, but also yourself, would know what these columns acutally mean):

In [10]:
def Check_All_That_Apply(column, list_of_value):

    df = selected_survey_sdf\
         .select('user_id', column)
    
    categ = list_of_value

    categ = [f"{cat}" for cat in categ]
    exprs = [
          F.when(F.col(column).contains(str(cat)), 1).otherwise(0).alias(str(cat))
          for cat in categ
      ]

    new_df = df.select(*exprs+df.columns).drop(column)

    for col in new_df.columns:
        new_df = new_df.withColumnRenamed(col, f"{col}".lower().replace(" ", "_").replace(".", ""))

    new_feature_sdf = prefix_columns(
          sdf=new_df,
          exclude_cols=["user_id"],
          prefix=f"{column}__"
      )
    return new_feature_sdf

Next, add a for loop for a list of columns- each column will run `Check_All_That_Apply(column, list_of_value)`

In [11]:
sdf_dict = {}

for column in columns : 
    sdf_dict[column] = Check_All_That_Apply(column, list_dict[column])

Now, we have created 3 Spark Dataframes, each reflects the categorical data tranformed into a multiple-column structure. We can take 'edu_level' for example, all choices are transformed into columns, and based on the user's answer, the value in these columns will be either 1 or 0 (Yes or No). Based on how the survey was designed, we may get a output of high cardinality - check how many features are generated from `improve_yourself_how` : 54 features (aka 54 new columns!) Expand the cell to see the detail of the new schema.

In [12]:
print("number of new columns = " + str(len(sdf_dict['edu_level'].columns)))
sdf_dict['edu_level'].columns

number of new columns = 13


['edu_level__trade/technical/vocational_training',
 'edu_level__doctorate_degree',
 'edu_level__high_school_graduate',
 'edu_level__professional_degree',
 'edu_level__bachelor’s_degree',
 'edu_level__diploma_or_the_equivalent_(for_example:_ged)',
 'edu_level__associate_degree',
 'edu_level__some_high_school',
 'edu_level__no_diploma',
 'edu_level__no_degree',
 'edu_level__master’s_degree',
 'edu_level__some_college',
 'user_id']

In [13]:
print("number of new columns = " + str(len(sdf_dict['improve_yourself_how'].columns)))
sdf_dict['improve_yourself_how'].columns

number of new columns = 54


['improve_yourself_how__traveling',
 'improve_yourself_how__practice_various_skills',
 'improve_yourself_how__fashion_makeup_personality_etc',
 'improve_yourself_how__nope_not_fa',
 'improve_yourself_how__trying_to_accept_my_fate',
 'improve_yourself_how__just_b_confident',
 'improve_yourself_how__joined_a_german_language_learning_course',
 'improve_yourself_how__joined_a_gym/go_to_the_gym',
 'improve_yourself_how__none',
 'improve_yourself_how__started_taking_some_medication',
 'improve_yourself_how__losing_weight',
 'improve_yourself_how__diets',
 'improve_yourself_how__non-physical_forms_of_improvement',
 'improve_yourself_how__losing_weight_through_better_diet_no_exercise_yet',
 'improve_yourself_how__join_clubs/socual_clubs/meet_ups',
 'improve_yourself_how__change_of_wardrobe',
 'improve_yourself_how__talking_to_people',
 'improve_yourself_how__therapy',
 'improve_yourself_how__work_out_at_home',
 'improve_yourself_how__clubbing',
 'improve_yourself_how__started_losing_weight',
 

Lastly, we want to join all new dataframes together and make it a wide dataframe:

In [14]:
sdf_list = list(sdf_dict.values())

new_features_sdf = sdf_list[0]

if len(sdf_list) > 1:
    for sdf in sdf_list[1:]:
        new_features_sdf = new_features_sdf.join(sdf, on='user_id', how='left')
    
print("number of columns in new_features_sdf = " + str(len(new_features_sdf.columns)))
new_features_sdf.columns

number of columns in new_features_sdf = 111


['user_id',
 'what_help_from_others__i_used_to_want_all_of_those_things_now_it_is_too_late_for_any_of_them_to_make_a_difference',
 "what_help_from_others__i_don't_want_any_help_i_can't_even_talk_about_it",
 'what_help_from_others__any_help',
 'what_help_from_others__emotional_support',
 'what_help_from_others__just_more_friends/greater_social_life_in_general_tbh',
 'what_help_from_others__i_have_no_idea',
 'what_help_from_others__maybe_to_learn_how_to_be_happy',
 'what_help_from_others__shoulder_to_cry_on',
 'what_help_from_others__i_want_help_but_i_am_not_sure_what_kind_i_always_think_it_would_be_nice_if_a_woman_would_approch_me_but_thats_not_realistic',
 "what_help_from_others__i_don't_know",
 'what_help_from_others__free_event)_and_sell_me_up',
 'what_help_from_others__therapy',
 "what_help_from_others__i_don't_want_help",
 'what_help_from_others__friendship',
 'what_help_from_others__therapy',
 "what_help_from_others__i'm_not_fa_lol",
 'what_help_from_others__im_on_my_own',
 'what_

### <span style="color:#007BA7"> And that's it! Thank you for reading my first feature engineering notebook in Pyspark! Please comment and upvote if you find it useful.<br /><br /><br /><br /></span>