# Importing necessary libraries

In [0]:
from pyspark.sql import functions as sf
import datetime as dt

from pyspark.sql.window import Window
from pyspark.sql.types import *
import datetime
import pandas as pd
import numpy as np
import sklearn.cluster
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from itertools import product as it_product
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf
from kmodes.kprototypes import KPrototypes
spark.conf.set("spark.databricks.io.cache.enabled", "true")
# testing a reduction in the number of DF partitions from 200 down to 20
numSparkPartitions = 20
spark.conf.set("spark.sql.shuffle.partitions", numSparkPartitions)
# spark.conf.set("spark.default.parallelism", numSparkPartitions)


**Reading & saving prefixes & suffixes**

In [0]:
SAVE_PREFIX = "adl://outitanliveadlsne.azuredatalakestore.net/DataService/DataScience/Project/Segmentation/" 
#source data
EDM_PREFIX = "adl://outitanliveadlsne.azuredatalakestore.net//DataService/DataScience/Student/EdmViews/All/"
EDM_PREFIX_2 ="abfss://outitanliveadls@outitanliveadlsgen2.dfs.core.windows.net/DataService/DataScience/Student/EdmViews/All/"
SUFFIX = ".parquet"

**Reading relevant tables from the EDM from datalake**

In [0]:
dimModPres = spark.read.parquet(EDM_PREFIX_2 + "/" + "adlsfile_edmviews_dim_module_presentation_all" + SUFFIX)#.cache()
dimStudent = spark.read.parquet(EDM_PREFIX_2 + "/" + "adlsfile_edmviews_dim_student_all" + SUFFIX)#.cache()
factSMPQ = spark.read.parquet(EDM_PREFIX_2 + "adlsfile_edmviews_fact_student_module_presentation_qualification_all" + SUFFIX)#.cache()
factSMPP =  spark.read.parquet(EDM_PREFIX_2 + "/" + "adlsfile_edmviews_fact_student_module_presentation_progression_all" + SUFFIX)#.cache()
feeTable = spark.read.parquet("adl://outitanliveadlsne.azuredatalakestore.net/DataService/DataScience/Student/CirceStudent/All/adlsfile_circestudent_ci_d_posted_stud_fees_all.data") 
vleTable = spark.read.parquet(EDM_PREFIX_2 + "/" + "adlsfile_edmviews_fact_student_module_presentation_vle_visits_all" + SUFFIX)
stdInCare = spark.read.parquet("adl://outitanliveadlsne.azuredatalakestore.net/DataService/DataScience/Student/Voice/All/adlsfile_voice_cx_d_ou_contact_all.data")
postCodeTable = spark.read.csv("adl://outitanliveadlsne.azuredatalakestore.net/DataService/DataScience/Project/Segmentation/NSPL_AUG_2021_UK.csv", header=True)
factCRMAll = spark.read.parquet(EDM_PREFIX_2 + "/" + "adlsfile_edmviews_fact_student_crm_interactions_all" + SUFFIX)
factCRMTicket = spark.read.parquet(EDM_PREFIX_2 + "/" + "adlsfile_edmviews_fact_student_crm_service_ticket_all" + SUFFIX)

- by putting value of corresponding presentation (2020J, 2021J & 2022J) in presCode and running all notebook will automatically create & save the data in datalake

In [0]:
presCode = '2022J'

**Following 19 variables were extracted & engineered from the above mentioned tables:** C is referred for candidate
 - C_is_linked_to_qual
 - C_gender 
 - C_hesa_stud_carer 
 - C_career_motivation
 - C_2a_levels
 - C_employment
 - C_study_motivation
 - C_is_international
 - C_age_group
 - C_nation 
 - C_imd_groupings 
 - C_is_linked_to_qual
 - C_n_module_1 
 - C_credit_groups
 - C_course_level_groups
 - C_sum_event_6_months_b4_start
 - C_avg_pre_course_start_vle
 - C_urban_rural
 - C_hesa_stud_in_care
 - C_slc_loan

In [0]:
# get first set of student information
# needs factSMPP, dimStudent, factSMPQ , 

selectCols = ['student_sk','module_presentation_sk']

myTable = factSMPP.filter(
    'is_registered_at_start == 1 AND student_classification_sk == 51'
  ).select(
    selectCols
  ).persist()

#Join on dimModPres
selectCols = [
    'module_presentation_sk', 'module_code', 'presentation_code', 'module_credits', 'module_academic_level_code', 'module_study_level_description', 'dummy_module_flag', 'resit_flag', 'module_presentation_start_date']
myTable = myTable.join(
    dimModPres.select(selectCols), 
    on='module_presentation_sk',
    how='left'
  )

myTable = myTable.filter(
      (sf.col('module_academic_level_code') != 'X') &
      (sf.col('dummy_module_flag') == 'NO') &
      (sf.col('module_code') != 'Z999') &
      (sf.col('resit_flag') == 'NO') &
      (sf.col('presentation_code') == presCode) &
      (sf.col('module_study_level_description') == 'Undergraduate') 
       )
myTable = myTable.drop(*['dummy_module_flag', 'resit_flag', 'presentation_code', 'module_study_level_description'])

#'module_credits' & 'module_presentation_start_date' are in string format so convert them to integer & datetype, respectively
myTable = myTable.withColumn('module_credits', sf.col('module_credits').cast('int'))
myTable = myTable.withColumn('module_presentation_start_date', sf.col('module_presentation_start_date').cast(DateType()))

In [0]:
display(myTable)

module_presentation_sk,student_sk,module_code,module_credits,module_academic_level_code,module_presentation_start_date
46545,9794637,B207,60,2,2022-10-01
57106,10130232,W111,60,1,2022-10-01
57106,10031728,W111,60,1,2022-10-01
62017,6239368,DE100,60,1,2022-10-01
38010,5480344,MST124,30,1,2022-10-01
95103,10287040,Y032,30,0,2022-10-01
37590,283605,E102,60,1,2022-10-01
62267,9992032,B100,60,1,2022-10-01
37590,10295129,E102,60,1,2022-10-01
22949,7007065,A111,60,1,2022-10-01


In [0]:
#(1)C_is_linked_to_qual: Indicates whether the PI had at least one of their modules associated to a qualification (Yes, No)
myFactSMPQ = factSMPQ.filter('student_classification_sk ==51').select(['student_sk','module_presentation_sk', 'is_linked_at_module_presentation_start']).distinct()
myTable = myTable.join(myFactSMPQ, on= ['student_sk', 'module_presentation_sk'], how='left')

In [0]:
print(myTable.select('student_sk').count())
print(myTable.select('student_sk').distinct().count())


In [0]:

aggWindowSk = Window.partitionBy('student_sk')

myTable = myTable.withColumn(
    'linked_to_qual',
    sf.max('is_linked_at_module_presentation_start') # we notice that there were multiple values corresponding to 'is_linked_at_module_presentation_start' so we'll prefer the value = 1, i.e., student is linked to the qualification
    .over(aggWindowSk)
    .cast(IntegerType()) 
  )
myTable = myTable.withColumn(
    'C_is_linked_to_qual', 
    sf.when(sf.col('linked_to_qual')== 1, 'Yes').otherwise('No')
  )
#drop the columns that we don't need further
myTable = myTable.drop(*['module_code', 'is_linked_at_module_presentation_start', 'linked_to_qual'])

#if we see row 4 ,5,6,7 same student_sk have duplicate rows corresponding to same student_module combination; so we must drop duplicates
myTable = myTable.dropDuplicates(subset = ['student_sk', 'module_presentation_sk'])


In [0]:
display(myTable)

student_sk,module_presentation_sk,module_credits,module_academic_level_code,module_presentation_start_date,C_is_linked_to_qual
237,38829,30,0,2022-10-01,No
387,38010,30,1,2022-10-01,Yes
387,52757,30,1,2022-10-01,Yes
387,53144,60,1,2022-10-08,Yes
1953,56138,60,1,2022-10-01,Yes
6054,46453,30,1,2022-10-01,Yes
7242,62267,60,1,2022-10-01,Yes
7547,46005,30,1,2022-10-01,Yes
7732,38289,60,1,2022-10-01,Yes
10877,61433,30,1,2022-10-01,Yes


In [0]:
#(2)C_n_module_1: Indicates the number of modules the PI is registered on at start (binary- 1, 2+)-how?
aggWindowSk = Window.partitionBy('student_sk')
myTable = myTable.withColumn(
    'module_count',
    sf.count('module_presentation_sk') 
    .over(aggWindowSk)
    .cast(IntegerType())
  )

myTable = myTable.withColumn(
    'C_n_module_1', 
    sf.when(sf.col('module_count')==1, '1').otherwise('2+')
  ) 


In [0]:
display(myTable.select('module_count').groupBy('module_count').count())

module_count,count
4,704
1,18173
5,20
3,3174
2,15836


In [0]:
#(3)sum of credits for 'C_credit_groups' - <60, 60, >60
aggWindowSk = Window.partitionBy('student_sk')

myTable = myTable.withColumn(
    'credits_this_year',
    sf.sum('module_credits')
    .over(aggWindowSk)
    .cast(IntegerType())
  )
#module_credits calculation
credit_grp = udf(lambda credits_this_year: '<60' if (credits_this_year < 60) else 
                       '60' if (credits_this_year == 60) else
                       '>60'if (credits_this_year > 60) else 'Unclassified')
myTable = myTable.withColumn('C_credit_groups', credit_grp(myTable.credits_this_year))

In [0]:
display(myTable.select('C_credit_groups').groupBy('C_credit_groups').count())

C_credit_groups,count
60,14328
<60,6611
>60,16968


In [0]:
#(4)'C_course_level_groups'-"Level_1", "Level_0/2/3"
aggWindowSk = Window.partitionBy('student_sk')
myTable = myTable.withColumn(
    'module_level',
    sf.max('module_academic_level_code')
    .over(aggWindowSk))



In [0]:
display(myTable.select('module_level').groupBy('module_level').count())

module_level,count
1,31524
2,2214
0,2089
3,2080


In [0]:
myTable = myTable.withColumn(
    'C_course_level_groups', 
    sf.when(sf.col('module_level')=='0', 'Level_0')
    .when(sf.col('module_level')=='1', 'Level_1')
    .when(sf.col('module_level')=='2', 'Level_2')
    .otherwise('Level_3'))

In [0]:
#(5)'C_sum_event_6_months_b4_start'- 0 to n
selectCols = ['student_sk', 'crm_service_ticket_created_date_sk']
crmInteractions  = factCRMTicket.select(selectCols) 

#crm_interaction_created_date_sk is in integer format so convert it to datetype
crmInteractions = crmInteractions.withColumn("crm_interaction_date", sf.to_date(sf.col("crm_service_ticket_created_date_sk").cast("string"), \
    'yyyyMMdd')).drop(*['crm_service_ticket_created_date_sk'])

#create stdList 
selectCols = ['student_sk', 'module_presentation_start_date']
stdList = myTable.select(selectCols).distinct()

#If same student have opted for more than one module then we'll consider the latest start date of module for calculation of crminteractions
aggWindowSk = Window.partitionBy('student_sk')
stdList = stdList.withColumn(
    'latest_module_presentation_start_date',
    sf.max('module_presentation_start_date')
    .over(aggWindowSk))
#drop 'module_presentation_start_date' as it is not required further
stdList = stdList.drop(*['module_presentation_start_date'])

#get crm data for students
crmDf= stdList.join(crmInteractions, on ='student_sk', how = 'left')
#filter crm table for upto 6 months before start of modules; for this first create 'mp_start_date_6_months' column by subtracting 6 months from 'module_presentation_start_date'
crmDf = crmDf.withColumn('mp_start_date_6_months', sf.add_months(crmDf['latest_module_presentation_start_date'], -6))
#filter the crmDf table so that we have crm interactions between 'mp_start_date_6_months' (6 months before the start of module) & latest_module_presentation_start_date
crmDf = crmDf.filter((crmDf.crm_interaction_date >= crmDf.mp_start_date_6_months) & (crmDf.crm_interaction_date <= crmDf.latest_module_presentation_start_date))

aggWindowSk = Window.partitionBy('student_sk')

crmDf = crmDf.withColumn(
    'C_sum_event_6_months_b4_start',
    sf.count('crm_interaction_date')
    .over(aggWindowSk).cast(IntegerType()))
selectCols = ['student_sk', 'C_sum_event_6_months_b4_start']
crmDf = crmDf.select(selectCols).distinct()

myTable = myTable.join(crmDf, on = 'student_sk', how = 'left')


In [0]:
# get first set of student information
# needs factSMPP, dimStudent, factSMPQ , 
#(6)'C_avg_pre_course_start_vle' - 0 to n
#here we have to sort out - variable type issue-i.e it must be integer; no duplicates must be there 
#select studentList for vle 
studentList = myTable.select(['student_sk', 'module_presentation_sk', 'module_presentation_start_date']).dropDuplicates()

#convert 'vle_site_visit_date_sk' from integer to datetype
vleTable = vleTable.filter('student_classification_sk == 51').select(['student_sk','module_presentation_sk', 'vle_site_visit_date_sk',
              'total_vle_site_visits_by_day']).withColumn("vle_visit_date", sf.to_date(sf.col("vle_site_visit_date_sk").cast("string"), \
    'yyyyMMdd')).drop(*['vle_site_visit_date_sk'])

#get vle visits data for students
vleDf = studentList.join(vleTable, on=['student_sk', 'module_presentation_sk'], how = 'left')

#filter vle table for one week after start of modules; for this first create 'mp_start_date_7' column by adding one week from 'module_presentation_start_date'
#vleDf = vleDf.withColumn('mp_start_date_7', sf.date_add(vleDf['module_presentation_start_date'], -7))
vleDf = vleDf.filter(vleDf.vle_visit_date <= vleDf.module_presentation_start_date)

#count the total number of vle visits per student
aggWindowSk = Window.partitionBy('student_sk')

vleDf = vleDf.withColumn(
    'total_vle_visits',
    sf.sum('total_vle_site_visits_by_day')
    .over(aggWindowSk)
    .cast(IntegerType())
  )

#calculate the module count per student using vle table (here window function doesn't work well as there is no countDistinct function in it)
vleModuleCount = vleDf.select('student_sk', 'module_presentation_sk').groupBy('student_sk').agg(
    sf.countDistinct('module_presentation_sk').alias('vleModuleCount'))

#convert 'vleModuleCount' from string to integer type
vleModuleCount = vleModuleCount.withColumn('vleModuleCount', sf.col('vleModuleCount').cast('int'))

vleDf = vleDf.join(vleModuleCount, on = 'student_sk', how = 'left')
#'C_avg_pre_course_start_vle' is generated by dividing total vle visits by module counts (from vle table; not from myTable) per student
vleDf = vleDf.withColumn('C_avg_pre_course_start_vle', sf.round(sf.col('total_vle_visits')/sf.col('vleModuleCount')))

#convert 'C_avg_pre_course_start_vle' from double to integer type
vleDf = vleDf.withColumn('C_avg_pre_course_start_vle', sf.col('C_avg_pre_course_start_vle').cast('int'))

selectCols = ['student_sk', 'C_avg_pre_course_start_vle']
vleDf = vleDf.select(selectCols).dropDuplicates()
#join this vle_list on myTable for getting the variable 'C_avg_pre_course_start_vle' in final table
myTable = myTable.join(vleDf, on = 'student_sk', how = 'left')

#As we have to consider latest module presentation start date for age calculation, we can create a column 'latest_module_presentation_start_date' by considering the latest date of module presentation if student have moe than one module. we are doing this at this stage because after this we are going to reduce tyhe table upto student level from student_module level
aggWindowSk = Window.partitionBy('student_sk')
myTable = myTable.withColumn(
    'latest_module_presentation_start_date',
    sf.max('module_presentation_start_date')
    .over(aggWindowSk))

selectCols = ['student_sk', 'latest_module_presentation_start_date', 'C_is_linked_to_qual', 'C_n_module_1', 'C_credit_groups','C_course_level_groups', 'C_sum_event_6_months_b4_start', 'C_avg_pre_course_start_vle']
myTable = myTable.select(selectCols).distinct()


# join myTable with Dim Student to get biographics
selectCols = ['student_sk', 'personal_id', 'student_source_postcode', 'student_country_description', 'student_birth_year_month', 'student_index_multiple_deprivation','student_uk_nation_from_postcode', 'student_gender_description', 'student_hesa_is_student_carer_description','student_highest_education_level_on_entry_conformed','student_study_motivation_description','student_career_motivation_description', 'student_occupation_status_description', 'student_source_postcode']

#as we have filtered the dimStudent table by student_yass_scheme_flag == "NO" & myTable by various filters like dummy flag == No etc so  we must go for inner join so that we'll have only that ids that satisfy these conditions 
myTable = myTable.join(
    dimStudent.filter('student_yass_scheme_flag == "NO"').select(selectCols), 
    on='student_sk',
    how='inner'
  )

#some of the postcodes are in lowercase so convert these to uppercase
myTable = myTable.withColumn('pcds',sf.upper('student_source_postcode'))

#(7)'C_slc_loan' - 'Yes', 'No'
selectCols = ['personal_id', 'course_code', 'fee_code', 'amount', 'debit_credit','pres_code_5']
myFeeTable = feeTable.select(selectCols)
myFeeTable = myFeeTable.filter(
      (sf.col('pres_code_5') == presCode))

#has_loan
myFeeTable = myFeeTable.withColumn(
    'has_loan',
    (
      (
        (sf.col('fee_code')=='LO')
      ) & (sf.col('amount') > 0.1)
    ).cast(IntegerType())
   )

# now drop all debits to prevent duplication
myFeeTable = myFeeTable.filter('debit_credit == "CR"').drop(*['course_code', 'fee_code', 'amount', 'debit_credit', 'pres_code_5'])

myFeeTable = myFeeTable.select('personal_id', 'has_loan').groupBy('personal_id').agg(
    sf.max('has_loan').alias('has_loan'))

#C_slc_loan - Yes, No
#we can do this more efficiently by first joining on myTable & then creating 'C_slc_loan' variable because if there comes null values after join we have to replace them with 'No'
myFeeTable = myFeeTable.withColumn(
    'C_slc_loan', 
    sf.when(sf.col('has_loan')== 1, 'Yes').otherwise('No')
  ) 

myTable = myTable.join(myFeeTable, on = 'personal_id', how = 'left')


#(8)'C_gender'- Female, Male, Unknown
myTable = myTable.withColumn(
    'C_gender', 
    sf.when(sf.col('student_gender_description')=='Female', 'Female')
    .when(sf.col('student_gender_description')=='Male', 'Male').otherwise('Unknown'))

#(9)C_hesa_stud_carer - Yes, No, Unknown
myTable = myTable.withColumn(
    'C_hesa_stud_carer', 
    sf.when(sf.col('student_hesa_is_student_carer_description')=='Yes', 'Yes')
    .when(sf.col('student_hesa_is_student_carer_description')=='No', 'No').otherwise('Unknown'))

#(10)C_career_motivation - Stated, Unknown

myTable = myTable.withColumn(
    'C_career_motivation', 
    sf.when(sf.col('student_career_motivation_description')=='Change to a new career area', 'Stated')
    .when(sf.col('student_career_motivation_description')=='Develop or progress in your current career', 'Stated')
    .when(sf.col('student_career_motivation_description')=='Move into employment for the first time', 'Stated')
    .otherwise('Unknown'))

#(11)C_2a_levels - Yes, No, Unknown
myTable = myTable.withColumn(
    'C_2a_levels', 
    sf.when(sf.col('student_highest_education_level_on_entry_conformed')=='Equivalent to two A Levels', 'Yes')
    .when(sf.col('student_highest_education_level_on_entry_conformed')=='Postgraduate Qualification', 'Yes')
    .when(sf.col('student_highest_education_level_on_entry_conformed')=='Higher Education Qualification', 'Yes')
  .when(sf.col('student_highest_education_level_on_entry_conformed')=='A Levels', 'No')
  .when(sf.col('student_highest_education_level_on_entry_conformed')=='Less than two A Levels', 'No')
  .when(sf.col('student_highest_education_level_on_entry_conformed')=='No formal Qualification', 'No')
  .otherwise('Unknown'))

#(12)C_employment - FT, PT, NW, Unknown
#FT if "In full-time work/self-employed"
#PT if "In part-time work/self-employed"
#NW if else NW
#Unknown if "Information Refused", "Unclassified", "Not Known", 

myTable = myTable.withColumn(
    'C_employment', 
    sf.when(sf.col('student_occupation_status_description')=='In full-time work/self-employed', 'FT')
    .when(sf.col('student_occupation_status_description')=='In part-time work/self-employed', 'PT')
    .when(sf.col('student_occupation_status_description')=='Looking after the home/family', 'NW')
    .when(sf.col('student_occupation_status_description')=='Unemployed and looking for a job', 'NW')
    .when(sf.col('student_occupation_status_description')=='Unable to work due to long-term sickness or disability', 'NW')
    .when(sf.col('student_occupation_status_description')=='Doing unpaid voluntary work', 'NW')
    .when(sf.col('student_occupation_status_description')=='Not in paid work for some other reason', 'NW')
    .when(sf.col('student_occupation_status_description')=='Retired from paid work', 'NW').otherwise('Unknown')
  )

#we decided to drop this variable as it is replicating the same information from C_employment. we do not have 'Occupation_type' variable present in  EDM dimStudent table as it was present in SAS dimStudent table
#(13)C_occupation_type - Stated, Unknown
#myTable = myTable.withColumn(
    #'C_occupation_type', 
    #sf.when(sf.col('C_employment')=='FT', 'Stated')
   # .when(sf.col('C_employment')=='PT', 'Stated')
   # .when(sf.col('C_employment')=='NW', 'Stated')
   # .otherwise('Unknown')
  #)


#(14)C_study_motivation - Stated, Unknown
myTable = myTable.withColumn(
    'C_study_motivation', 
    sf.when(sf.col('student_study_motivation_description')=='Mainly personal development', 'Stated')
    .when(sf.col('student_study_motivation_description')=='Mainly employment/career', 'Stated')
    .when(sf.col('student_study_motivation_description')=='Employment/career & personal dev equally important', 'Stated')
    .otherwise('Unknown')
  )

#(15)C_is_international - Yes, No, Unknown
myTable = myTable.fillna('(Unknown)', subset = ['student_country_description'])
myTable = myTable.withColumn(
    'C_is_international', 
    sf.when(sf.col('student_country_description')=='United Kingdom', 'No')
    .when(sf.col('student_country_description')=='(Unknown)', 'Unknown')
    .otherwise('Yes')
  )

#(16)'C_age_group'- 18-20, 21-25, 26-30, 31-35, 36-45, 46+
# use 'latest_module_presentation_start_date' for age calculation 

myTable = myTable.withColumn(
    'student_age', 
    (
      sf.datediff(
        sf.col('latest_module_presentation_start_date'), 
        sf.col('student_birth_year_month')
      ) / 365 
    ).cast(IntegerType())
  )
#condition that we have to keep students having age >=18 
myTable = myTable.filter(myTable.student_age >= 18)

age_range = udf(lambda student_age: '18-20' if (student_age >= 18 and student_age < 21) else 
                       '21-25' if (student_age >= 21 and student_age < 26) else
                       '26-30' if (student_age >= 26 and  student_age < 31) else
                       '31-35' if (student_age >= 31 and student_age < 36) else
                       '36-45' if (student_age >= 36 and student_age < 46) else
                       '46+' if (student_age >= 46) else '')

myTable = myTable.withColumn('C_age_group', age_range(myTable.student_age))


#(17)C_nation - ENGLAND,FOREIGN,N.IRELAND,SCOTLAND,Unclassified,WALES
## replace 'Isle of Man', 'Channel Islands', '(Unknown)' - by 'Unclassified'
# null values - foreign because postcode maynot be available for foreign students
myTable = myTable.fillna('Foreign', subset = ['student_uk_nation_from_postcode'])
myTable = myTable.withColumn(
    'C_nation', 
    sf.when(sf.col('student_uk_nation_from_postcode')=='England', 'England')
    .when(sf.col('student_uk_nation_from_postcode')=='Northern Ireland', 'Northern Ireland')
    .when(sf.col('student_uk_nation_from_postcode')=='Scotland', 'Scotland')
    .when(sf.col('student_uk_nation_from_postcode')=='Wales', 'Wales')
    .when(sf.col('student_uk_nation_from_postcode')=='Foreign', 'Foreign').otherwise('Unclassified')
  )

#(18)'C_imd_groupings' - 0-20%, 20-40%, 40-60%, 60-80%, 80-100%, unclassified
imd_percentile = udf(lambda student_index_multiple_deprivation: '0-20%' if (student_index_multiple_deprivation >= 0 and student_index_multiple_deprivation < 0.20) else 
                       '20-40%' if (student_index_multiple_deprivation >= 0.20 and student_index_multiple_deprivation < 0.40) else
                       '40-60%' if (student_index_multiple_deprivation >= 0.40 and  student_index_multiple_deprivation < 0.60) else
                       '60-80%' if (student_index_multiple_deprivation >= 0.60 and student_index_multiple_deprivation < 0.80) else
                       '80-100%' if (student_index_multiple_deprivation >= 0.80 and student_index_multiple_deprivation <= 1.00) else
                       'Unclassified')

myTable = myTable.withColumn('C_imd_groupings', imd_percentile(myTable.student_index_multiple_deprivation))

selectCols = ['personal_id', 'C_gender', 'C_hesa_stud_carer', 'C_career_motivation', 'C_2a_levels', 'C_employment', 'C_study_motivation', 'C_is_international', 'C_age_group', 'C_nation', 'C_imd_groupings', 'C_is_linked_to_qual', 'C_n_module_1', 'C_credit_groups', 'C_course_level_groups', 'C_sum_event_6_months_b4_start', 'C_avg_pre_course_start_vle', 'C_slc_loan', 'pcds']

myTable = myTable.select(selectCols).dropDuplicates()

#(19) C_urban_rural - Urban, Rural, Unclassified
#postCodeTable = postCodeTable.select(['pcds', 'ru11ind'])
#postCodeTable = postCodeTable.withColumn("pcd", rtrim(postCodeTable.pcds))
#postCodeTable = postCodeTable.withColumn("pcd",
  #sf.regexp_replace("pcd", " ", ""))
selectCols = ['pcds', 'ru11ind']
postCodeTable = postCodeTable.select(selectCols)
#join postCodeTable on myTable for finding whether the student is rural or urban
myTable = myTable.join(postCodeTable, on = 'pcds', how ='left')
myTable = myTable.withColumn(
    'C_urban_rural', 
    sf.when(sf.col('ru11ind')=='A1', 'Urban')
    .when(sf.col('ru11ind')=='B1', 'Urban')
    .when(sf.col('ru11ind')=='C1', 'Urban')
    .when(sf.col('ru11ind')=='C2', 'Urban')
    .when(sf.col('ru11ind')=='1', 'Urban')
    .when(sf.col('ru11ind')=='2', 'Urban')
    .when(sf.col('ru11ind')=='D1', 'Rural')
    .when(sf.col('ru11ind')=='D2', 'Rural')
    .when(sf.col('ru11ind')=='E1', 'Rural')
    .when(sf.col('ru11ind')=='E2', 'Rural')
    .when(sf.col('ru11ind')=='F1', 'Rural')
    .when(sf.col('ru11ind')=='F2', 'Rural')
    .when(sf.col('ru11ind')=='3', 'Rural')
    .when(sf.col('ru11ind')=='4', 'Rural')
    .when(sf.col('ru11ind')=='5', 'Rural')
    .when(sf.col('ru11ind')=='6', 'Rural')
    .when(sf.col('ru11ind')=='7', 'Rural')
    .when(sf.col('ru11ind')=='8', 'Rural')
    .otherwise('Unclassified')
  )

## (20)C_hesa_stud_in_care: Indicates whether the PI declared ever being in care at start (Yes, No, Unknown-Yes if else; -No if "I have never been in care"; -Unknown if "I do not wish to declare"
#C_hesa_stud_in_care - Yes, No, Unknown
#'Yes': when 'hesa_stud_in_care' = '01', '02', '03', '04'
#'No': when 'hesa_stud_in_care' = '05'
#'Unknown': when 'hesa_stud_in_care' = '98', '99', 'null -replaced by 'Unknown'
#https://www.hesa.ac.uk/collection/c20051/a/careleaver - link for the description of hesa_in_care codes
#01- care leaver(16+)
#02 - Looked after in Scotland
#03 - In care in the rest of UK
#04 - UCAS defined care leaver
#05 - not a care leaver
#96 - 'No to other options but I have been in care for over 3 months'(source = Johny's sas code)
#97 - 'I have been in care for 3 months or more outside the UK'((source = Johny's sas code))
#98 - Information refused
#99 - Not Known

selectCols = ['person_uid','hesa_stud_in_care']
stdInCare = stdInCare.select(selectCols)
#rename the 'person_uid' to 'personal_id'
stdInCare = stdInCare.withColumnRenamed('person_uid', 'personal_id')

stdInCare = stdInCare.dropDuplicates(subset = ['personal_id'])

myTable = myTable.join(stdInCare, on='personal_id', how='left')

myTable = myTable.withColumn(
    'C_hesa_stud_in_care', 
    sf.when(sf.col('hesa_stud_in_care')=='01', 'Yes')
    .when(sf.col('hesa_stud_in_care')=='02', 'Yes')
    .when(sf.col('hesa_stud_in_care')=='03', 'Yes')
    .when(sf.col('hesa_stud_in_care')=='04', 'Yes')
    .when(sf.col('hesa_stud_in_care')=='96', 'Yes')
    .when(sf.col('hesa_stud_in_care')=='97', 'Yes')
  .when(sf.col('hesa_stud_in_care')=='05', 'No').otherwise('Unknown'))


##drop 'ru11ind', 'postcode', 'hesa_stud_in_care'  columns from myTable
myTable = myTable.drop(*['ru11ind', 'pcds', 'hesa_stud_in_care'])


In [0]:
display(myTable)


personal_id,C_gender,C_hesa_stud_carer,C_career_motivation,C_2a_levels,C_employment,C_study_motivation,C_is_international,C_age_group,C_nation,C_imd_groupings,C_is_linked_to_qual,C_n_module_1,C_credit_groups,C_course_level_groups,C_sum_event_6_months_b4_start,C_avg_pre_course_start_vle,C_slc_loan,C_urban_rural,C_hesa_stud_in_care
A1725329,Female,No,Unknown,Yes,FT,Stated,No,36-45,England,20-40%,Yes,1,60,Level_1,3.0,,Yes,Urban,Unknown
A2287027,Female,Yes,Unknown,No,NW,Stated,No,31-35,England,20-40%,Yes,1,60,Level_1,4.0,2.0,Yes,Urban,Unknown
A3480845,Male,No,Unknown,No,FT,Stated,No,31-35,England,60-80%,Yes,1,<60,Level_1,2.0,5.0,Yes,Urban,Unknown
A3600475,Female,No,Unknown,No,PT,Unknown,No,31-35,Wales,40-60%,Yes,1,60,Level_1,6.0,4.0,Yes,Rural,Unknown
A4268542,Female,Unknown,Unknown,Yes,FT,Stated,No,46+,Wales,40-60%,Yes,2+,>60,Level_1,11.0,3.0,Yes,Rural,No
A4362535,Female,No,Stated,Unknown,FT,Stated,No,36-45,England,80-100%,Yes,2+,>60,Level_1,3.0,1.0,Yes,Urban,No
A4480910,Female,No,Stated,Yes,FT,Stated,No,31-35,Wales,60-80%,Yes,1,60,Level_1,6.0,26.0,Yes,Urban,No
A4720833,Male,Yes,Unknown,Yes,FT,Unknown,No,46+,Scotland,80-100%,No,1,60,Level_2,5.0,1.0,No,Urban,No
A7857571,Female,No,Unknown,Yes,FT,Unknown,No,46+,England,60-80%,Yes,1,60,Level_1,,2.0,No,Urban,Unknown
A8773343,Female,No,Unknown,No,NW,Stated,No,36-45,Scotland,0-20%,Yes,1,60,Level_1,1.0,7.0,No,Urban,No


In [0]:
print(myTable.select('personal_id').count())
print(myTable.select('personal_id').distinct().count())


In [0]:
#check for nan & null values in myTable
from pyspark.sql.functions import col,isnan, when, count
display(myTable.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in myTable.columns]
   ))


personal_id,C_gender,C_hesa_stud_carer,C_career_motivation,C_2a_levels,C_employment,C_study_motivation,C_is_international,C_age_group,C_nation,C_imd_groupings,C_is_linked_to_qual,C_n_module_1,C_credit_groups,C_course_level_groups,C_sum_event_6_months_b4_start,C_avg_pre_course_start_vle,C_slc_loan,C_urban_rural,C_hesa_stud_in_care
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,3971,5710,1486,0,0


In [0]:
#replacing nan & null values in 'C_slc_loan' with 'No'
myTable = myTable.fillna('No', subset = ['C_slc_loan'])

In [0]:
#replacing nan & null values in 'C_avg_pre_course_start_vle' & 'C_sum_event_6_months_b4_start' with '0' as students whose vle data is not present indicates they haven't visited the vle site yet & same applies to 'C_sum_event_6_months_b4_start'
myTable = myTable.fillna(value = 0, subset = ['C_avg_pre_course_start_vle', 'C_sum_event_6_months_b4_start'])


In [0]:
#
myTable.write.mode("overwrite").parquet(f"{SAVE_PREFIX}segmentation_2022J")