## Read data

In [None]:
import pyspark.sql.functions as F

In [None]:
import os

data_folder = '/mnt/2024-team1/'

csv_data = 'JanBDRcount_transpose.csv'
raw_path = os.path.join(data_folder, csv_data)

raw_path


In [None]:
df = spark.read.csv(raw_path, header=True)

In [None]:
display(df)

In [None]:
df.printSchema()

## Convert datatype to int

In [None]:
from pyspark.sql.types import IntegerType

df = df.select(F.col("column"), *[F.col(c).cast(IntegerType()) for c in df.columns[1:]])
df.count()

In [None]:
# drop unwanted columns
to_remove = ['FID', 'IID', 'PAT', 'MAT']
df_dropped = df.filter(~F.col('column').isin(to_remove))

In [None]:
display(df_dropped)

In [None]:
df_fillna = df_dropped.na.fill(3)
display(df_fillna)

## Size-up measure decorator 

In [None]:
def measure_size_up(func):
  import time

  def inner(df, split_count=10, num_of_partition = None):
    if num_of_partition == None:
      num_of_partition = df.rdd.getNumPartitions()
    
    # list for record all size-up value
    size_up_li = []

    for i in range(1, split_count+1):
      df_small = df.sample((i/split_count))

      print(df_small.count())
      print(df_small.rdd.getNumPartitions())
      
      # perform operations and count the run time
      start_time = time.time()
      func(df_small)

      size_up_li.append(time.time() - start_time)
  
    return size_up_li
  
  return inner


## Speed-up measure decorator 

In [None]:
def measure_speed_up(func):
	import time

	def inner(df):
		num_of_partition = df.rdd.getNumPartitions()
		print("total number of partition: ", num_of_partition)
  
		target_num_of_partition = 16
  
		# list for record all speed-up value
		speed_up_li = []

		for i in range(3, target_num_of_partition+1):
			
			if i <= num_of_partition:
				df_small_partition = df.coalesce(i)
			else:
				df_small_partition = df.repartition(i)
			# check we changed the number of partition
			print(df_small_partition.count())
			print(df_small_partition.rdd.getNumPartitions())
			
			# perform operations and count the run time
			start_time = time.time()
			func(df_small_partition)
   
			speed_up_li.append(time.time() - start_time)
		
		return speed_up_li
  
	return inner

## Measure one-hot encoding

### Size-up

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import arrays_zip, col, explode


@measure_size_up
def one_hot_encoding(df_fillna):
    df_gene = df_fillna.filter(~F.col('column').isin(['SEX', 'PHENOTYPE']))
    df_sex = df_fillna.filter('column = "SEX"')
    df_target = df_fillna.filter('column = "PHENOTYPE"')


    def one_hot_encode_sex(c):
        return F.when(df_sex[c] == 1, F.lit([1, 0])).otherwise(
            F.when(df_sex[c] == 2, F.lit([0, 1]))
        ).alias(c)

    df_sex_onehot = df_sex.select(
    F.col("column"), *list(map(one_hot_encode_sex, df.columns[1:]))
    )

    def one_hot_encode_gene(c):
        return F.when(df_gene[c] == 0, F.lit([1, 0, 0])).otherwise(
                F.when(df_gene[c] == 1, F.lit([0, 1, 0])).otherwise(
                        F.when(df_gene[c] == 2, F.lit([0, 0, 1])).otherwise(F.lit([0, 0, 0]))
                    )
            ).alias(c)

    df_gene_onehot = df_gene.select(
    F.col("column"), *list(map(one_hot_encode_gene, df.columns[1:]))
    )

    cols = df.columns[1:]

    # ref: https://stackoverflow.com/questions/41027315/pyspark-split-multiple-array-columns-into-rows
    # ref: https://stackoverflow.com/questions/69162207/pyspark-explode-list-creating-column-with-index-in-list

    df_sex_exp = (df_sex_onehot
        .withColumn("tmp", arrays_zip(*cols))
        .select(col("column"), F.posexplode_outer("tmp").alias("index", "tmp"))
        .select(col("column"), col("index"), *[col(f"tmp.{c}") for c in cols]))

    df_gene_exp = (df_gene_onehot
        .withColumn("tmp", arrays_zip(*cols))
        .select(col("column"), F.posexplode_outer("tmp").alias("index", "tmp"))
        .select(col("column"), col("index"), *[col(f"tmp.{c}") for c in cols]))


    df_feat = df_sex_exp.union(df_gene_exp)

    ## Deal with target column
    df_target_with_index = df_target.withColumn("index", F.lit(0))

    # re-order columns
    df_target_with_index = df_target_with_index.select(F.col("column"), F.col("index"), *[F.col(c) for c in df.columns if c not in ["column", "index"]])

    ## Combine features and target columns
    df_all = df_feat.union(df_target_with_index)

In [None]:
size_up_li = one_hot_encoding(df_fillna)

In [None]:
size_up_li

In [None]:
import matplotlib.pyplot as plt

size_up_li = [224.73626255989075,
 230.8692979812622,
 230.81820130348206,
 233.47459483146667,
 222.6825487613678,
 223.79688024520874,
 240.78272604942322,
 252.61592864990234,
 234.54719829559326,
 230.66714191436768]

plt.plot([x * 0.1 for x in range(1, len(size_up_li)+1)], size_up_li)
plt.xlabel("percent of data")
plt.ylabel("Run Time (sec)")
plt.show()

### Speed-up

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import arrays_zip, col, explode


@measure_speed_up
def one_hot_encoding(df_fillna):
    df_gene = df_fillna.filter(~F.col('column').isin(['SEX', 'PHENOTYPE']))
    df_sex = df_fillna.filter('column = "SEX"')
    df_target = df_fillna.filter('column = "PHENOTYPE"')


    def one_hot_encode_sex(c):
        return F.when(df_sex[c] == 1, F.lit([1, 0])).otherwise(
            F.when(df_sex[c] == 2, F.lit([0, 1]))
        ).alias(c)

    df_sex_onehot = df_sex.select(
    F.col("column"), *list(map(one_hot_encode_sex, df.columns[1:]))
    )

    def one_hot_encode_gene(c):
        return F.when(df_gene[c] == 0, F.lit([1, 0, 0])).otherwise(
                F.when(df_gene[c] == 1, F.lit([0, 1, 0])).otherwise(
                        F.when(df_gene[c] == 2, F.lit([0, 0, 1])).otherwise(F.lit([0, 0, 0]))
                    )
            ).alias(c)

    df_gene_onehot = df_gene.select(
    F.col("column"), *list(map(one_hot_encode_gene, df.columns[1:]))
    )

    cols = df.columns[1:]

    # ref: https://stackoverflow.com/questions/41027315/pyspark-split-multiple-array-columns-into-rows
    # ref: https://stackoverflow.com/questions/69162207/pyspark-explode-list-creating-column-with-index-in-list

    df_sex_exp = (df_sex_onehot
        .withColumn("tmp", arrays_zip(*cols))
        .select(col("column"), F.posexplode_outer("tmp").alias("index", "tmp"))
        .select(col("column"), col("index"), *[col(f"tmp.{c}") for c in cols]))

    df_gene_exp = (df_gene_onehot
        .withColumn("tmp", arrays_zip(*cols))
        .select(col("column"), F.posexplode_outer("tmp").alias("index", "tmp"))
        .select(col("column"), col("index"), *[col(f"tmp.{c}") for c in cols]))


    df_feat = df_sex_exp.union(df_gene_exp)

    ## Deal with target column
    df_target_with_index = df_target.withColumn("index", F.lit(0))

    # re-order columns
    df_target_with_index = df_target_with_index.select(F.col("column"), F.col("index"), *[F.col(c) for c in df.columns if c not in ["column", "index"]])

    ## Combine features and target columns
    df_all = df_feat.union(df_target_with_index)

In [None]:
speed_up_li = one_hot_encoding(df_fillna)
speed_up_li


In [None]:
import matplotlib.pyplot as plt

speed_up_li = [182.29031944274902,
 170.90971517562866,
 166.1481957435608,
 163.8801667690277,
 164.40883135795593,
 163.40291213989258,
 164.29751634597778,
 164.64252924919128,
 163.68030381202698,
 168.63783502578735,
 166.7928307056427,
 168.07008266448975,
 165.91625785827637,
 164.4101107120514]

plt.plot(range(3, 3 + len(speed_up_li)), speed_up_li)
plt.xlabel("Number of Clusters")
plt.ylabel("Run Time (sec)")
plt.show()