<a href="https://colab.research.google.com/github/Dawson-Stutzman/DatasetGenerator/blob/main/DataSet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# General Setup

In [None]:
!pip install pyspark
#from google.colab import drive
#drive.mount('/content/gdrive')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
#import pyspark.pandas as ps
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, NumericType
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import time
from pyspark.sql.functions import udf, struct, when, exp, pow, lit, least
from pyspark.sql.functions import round as sqlround
from pyspark.sql.functions import sum as sqlsum
# round and sum are default functions, so the sqlsum and sqlround names are needed to differentiate
from pyspark.sql.types import *
from numpy.lib import math as M
from numpy import random as Random
################################
conf = SparkConf().setAppName("AppGen")
sc=SparkContext(conf=conf)
sc.setLogLevel("WARN")
sqlContext = SQLContext(sc)



# Censor Equation Definitions (Needs events fitted)

In [1]:
## NOTE: Right now the equation has been translated into terms of x instead of y for simpler lookups
def eq1(y: float):
  """Calculates a random age at which an event occurs:
  - y: The likelyhood of event. (Used to find the corrosponding age on the hazard function)
  => Returns the time corrosponding to y according to the equation below
  """
  #Changed from 12 -> 18
  return ((18 * M.sqrt(5)) * (M.sqrt(M.log(1 / y))))

def eq2(y: float):
  """Calculates a random age at which an event occurs:
  - y: The likelyhood of event. (Used to find the corrosponding age on the hazard function)
  => Returns the time corrosponding to y according to the equation below
  """
  return (60.6115 * M.pow((M.log(1/y)), 0.25))

def eq3(y: float):
  """Calculates a random age at which an event occurs:
  - y: The likelyhood of event. (Used to find the corrosponding age on the hazard function)
  => Returns the time corrosponding to y according to the equation below
  """
  return (80 * (M.sqrt(1 - y)))

def time_dependency(t: float, c: int, beta: float):
  return ((t**2) * M.exp((c + (beta/2) )* t))

def new_Eq1(dose: float, smoke: int, econ: int, sex: int, t0: float, alpha: float, doseBeta: float, smokeBeta: float, econBeta: float, sexBeta: float, c: float):
  #Fixed values
  alpha = 1.0
  doseBeta = 1.0
  smokeBeta = 1.0
  econBeta = 1.0
  sexBeta = 1.0
  c = 1.0
  # Variable values
  A = alpha * M.exp((doseBeta * dose)+( smokeBeta * smoke)+(econBeta * econ)+(sexBeta * sex))
  f = time_dependency(t0, c, doseBeta)
  return A * f


def new_Eq2(dose: float, smoke: int, econ: int, sex: int, t0: float, alpha: float, doseBeta: float, smokeBeta: float, econBeta: float, sexBeta: float, c: float):
  #Fixed values
  alpha = 1.0
  doseBeta = 1.0
  smokeBeta = 1.0
  econBeta = 1.0
  sexBeta = 1.0
  c = 1.0
  # Variable values
  A = alpha * M.exp((doseBeta * dose)+( smokeBeta * smoke)+(econBeta * econ)+(sexBeta * sex))
  f = time_dependency(t0, c, doseBeta)
  return A * f

def new_Eq3(dose: float, smoke: int, econ: int, sex: int, t0: float, alpha: float, doseBeta: float, smokeBeta: float, econBeta: float, sexBeta: float, c: float):
  #Fixed values
  alpha = 1.0
  doseBeta = 1.0
  smokeBeta = 1.0
  econBeta = 1.0
  sexBeta = 1.0
  c = 1.0
  # Variable values
  A = alpha * M.exp((doseBeta * dose)+( smokeBeta * smoke)+(econBeta * econ)+(sexBeta * sex))
  f = time_dependency(t0, c, doseBeta)
  return A * f

# Time & Dosage Approximation Definitions

### <center>Time Approximation Function</center>
---
\begin{matrix}
  t_0 & \text{Entrance time}\\
  s & \text{Approximated exit time}\\
  a & \text{Individual's covariates}\\
  b & \text{Constant for fitting equations}\\
  u & \text{Random Likelyhood of event}\\
\end{matrix}
\begin{gather*}
  \\t_1 = \frac{2}{b} \Biggl(\ln \Bigl( -\frac{b}{2}\ln(u) + e^{a + \frac{b}{2} t_0} \Bigl) - a \Biggl)
\end{gather*}
\begin{matrix}\\
⇒ & t_1
\end{matrix}

In [None]:
def Get_Time_For_Dosage(t0, s: int, a=None, b=None):
  """Finds time of events using linnear reggression approx.
  - t0: The individual's entrance age
  - s: The seed for random event recreation
  - a: Covariates of an individual
  - b: Constant to fit the equation
  => approximate time of event as a float"""
  Random.seed(s)
  if b == None:
    d = Random.uniform(0.0, 30.0)
    b = (0.05 * t0 / 50) + (0.05 * t0 / 60)
  if a == None:
    a = -2.5
  Random.seed(s)
  u = Random.uniform(0.0, 1.0)
  return 2/b * ( M.log( ((-b / 2.0) * M.log(u)) + (M.exp(a + (b/2) * t0)) ) - a )

### <center>Dosage Functions</center>
---
\begin{matrix}
  t_0 & \text{Age at entrance}\\
  s & \text{Seed for recreatable dosage data}\\
  dx & \text{Dosage step size}\\\\
  ⇒ & \text{A dictionary of the individual's dosages at each step}
\end{matrix}

In [None]:

def Get_Dose_History(s, t0, dx=None):
  """Finds the dosage history of the individual while in the trial.
  - t0: The entrance age of the individual
  - s: The seed for random event recreation
  - dx: The interval to increase the value by
  => recreatable approximate dosage of an individual by time of exit"""
  t1 = Get_Time_For_Dosage(a=-2.5, t0=t0, s=s)
  if dx == None:
    dx = 0.1
  dose = 0
  Random.seed(s)
  dose_history = {0: 0.00}
  dose_history.update({t0 - dx: 0.00})
  age = t0

  while age <= t1:
    dose_history.update({age: dose + dx * Random.uniform(0.0, 1.0)})
    age += dx
    dose = [*dose_history.values()][-1]
  return dose_history;

def Get_Dose(s, t0, dx=None):
  """Finds the dosage of the individual at the time of exit.
  - t0: The entrance age of the individual
  - s: The seed for random event recreation
  - dx: The interval to increase the value by
  => recreatable approximate dosage of an individual by time of exit"""
  dose_history = Get_Dose_History(s=s, t0=t0, dx=dx)
  return [*dose_history.values()][-1];

  def Get_Initial_Dose(s, t0, dx=None):
    Random.Seed(s)
    return Random.uniform(0.0, 30.0)

### <center>Dosage Visualization</center>
---
\begin{matrix}
  t_0 & \text{Entrance age}\\
  s & \text{Seed for recreatable random dosage data}\\
  dx & \text{Dosage step size}\\\\
  ⇒ & \text{A graph representing dosage vs. age}
\end{matrix}

In [None]:
def Get_Dose_Graph(s, t0, dx=None):
  """Finds the dosage & it's graph of the individual at the time of exit.
  - t0: The entrance age of the individual
  - s: The seed for random event recreation
  - dx: The interval to increase the value by
  => recreatable approximate dosage of an individual by time of exit"""
  dose_history = Get_Dose_History(s=s, t0=t0, dx=dx)
  dose_history.update({0.0: 0.00})
  dose_history.update({[*dose_history.keys()][-1] + 10: [*dose_history.values()][-1]})

  fig=plt.figure()
  ax=fig.add_subplot(1,1,1)
  ax.plot(dose_history.keys(), dose_history.values())
  ax.set_title("Seed " + str(s))
  plt.tight_layout()
  plt.show()

# Dataset Generation Function Definition

In [None]:
import random
def Create_Dataset_SparkContext(Set_Size, Input=None, Output=None, Decimal_Precision=None, sort=None, ascending=None):
  """Creates a dataset of random individuals which are then censored via the hazard function definitions
  - Set_Size: The number of individuals to represent (affects person years)
  - Input: Specifies where the input .parquet file should be read from. If left empty, the function will create its own starting data to censor
  - Output: Specifies where the output .parquet file should be written and Overwritten. If left Empty, Dataset will print to console
  - Decimal_Precision: The number of decimal places to round to
  """
  if Decimal_Precision is None: Precision = 12 # Set the decimal place precision for the output
  else: Precision = Decimal_Precision
  data = [] # Check if the input Parameter is used and provides a path to a .parquet file
  if Input != None: df = sqlContext.read.parquet(Input) # Create a dataset for possible individuals before censoring is done.
  else:
    for i in range(0,  Set_Size):
      Entrance_Age = Random.uniform(18, 80)
      seed = random.randint(0, 1000000)
      dose = Get_Dose(t0=Entrance_Age, s=seed)
      age = Entrance_Age
      dx = 0.1
      data.append({"-Index": i, "Included": 1,
                   "Smoker_Status": Random.randint(0, 3),
                   "Entrance_Age": Entrance_Age,
                   "Dosage": dose,
                   "Sex": Random.randint(0, 2),
                   "Economic_Status": Random.randint(0, 7),
                   "Dosage_Seed": seed,
                   "Event1": 0.0, "Event2": 0.0, "Event3": 0.0,
                   "Earliest_Event_Age": 0.0})
  RDD = sc.parallelize(data)
  '''
  df = RDD.toDF();
  EventRDD = df.rdd.map(lambda x:
                     (x["-Index"],
                      x["Included"],
                      x["Smoker_Status"],
                      x["Entrance_Age"],
                      x["Dosage"],
                      x["Sex"],
                      x["Economic_Status"],
                      x["Dosage_Seed"],
                      new_Eq1(dose=x["Dosage"], smoke=x["Smoker_Status"], econ=x["Economic_Status"], sex=x["Sex"], t0=x["Entrance_Age"]),
                      new_Eq2(dose=x["Dosage"], smoke=x["Smoker_Status"], econ=x["Economic_Status"], sex=x["Sex"], t0=x["Entrance_Age"]),
                      new_Eq3(dose=x["Dosage"], smoke=x["Smoker_Status"], econ=x["Economic_Status"], sex=x["Sex"], t0=x["Entrance_Age"]),
                      x["Earliest_Event_Age"])
                     )
  '''
  EventRDD = RDD.map(lambda x:
                     (x["-Index"],
                      x["Included"],
                      x["Smoker_Status"],
                      x["Entrance_Age"],
                      x["Dosage"],
                      x["Sex"],
                      x["Economic_Status"],
                      x["Dosage_Seed"],
                      new_Eq1(dose=x["Dosage"], smoke=x["Smoker_Status"], econ=x["Economic_Status"], sex=x["Sex"], t0=x["Entrance_Age"]),
                      new_Eq2(dose=x["Dosage"], smoke=x["Smoker_Status"], econ=x["Economic_Status"], sex=x["Sex"], t0=x["Entrance_Age"]),
                      new_Eq3(dose=x["Dosage"], smoke=x["Smoker_Status"], econ=x["Economic_Status"], sex=x["Sex"], t0=x["Entrance_Age"]),
                      x["Earliest_Event_Age"])
                     )

  df = EventRDD.toDF(["-Index", "Included", "Smoker_Status", "Entrance_Age", "Dosage", "Sex", "Economic_Status", "Dosage_Seed", "Event1", "Event2", "Event3", "Earliest_Event_Age"])
  df.show()
  '''
  df = df.withColumn("Included", when((df["Earliest_Event_Age"] > df["Entrance_Age"]), 0).otherwise(1))# Censor cases where Individuals have not yet reached an event
  df = df.withColumn('Earliest_Event', when(df["Earliest_Event_Age"] == df["Event1"], 1).when(df["Earliest_Event_Age"] == df["Event2"], 2).otherwise(2)) # I assumed that people over 80 would automatically be removed from our set, so the person years are 80 - entrance age
  df = df.withColumn("Person_Years", sqlround((df["Earliest_Event_Age"] - df["Entrance_Age"]), Precision))
  df = df.withColumn("Dosage", sqlround(df["Dosage"], Precision))
  df = df.withColumn("Earliest_Event_Age", sqlround(df["Earliest_Event_Age"], Precision))
  df = df.withColumn("Event1", sqlround(df["Event1"], Precision))
  df = df.withColumn("Event2", sqlround(df["Event2"], Precision))
  df = df.withColumn("Event3", sqlround(df["Event3"], Precision))
  df = df.withColumn("Entrance_Age", sqlround(df["Entrance_Age"], Precision))
  df_filtered=df.filter(df.Included == 0 )
  df_filtered=df_filtered.drop('Included')
  if sort != None:
    if ascending == None: ascending = False
    df_filtered = df_filtered.orderBy(sort, ascending=ascending)
  else:
    if ascending == None: ascending = False
    df_filtered = df_filtered.orderBy('-Index', ascending=True)
  if ascending == True and sort==None:
    df_filtered = df_filtered.orderBy('-Index', ascending=False)
  if Output is None:
    print("debug")
    df_filtered.show()
    df_filtered.select(sqlround(sqlsum(df["Person_Years"]), Precision).alias('Total_Person_Years')).show()
  else:
    df.write.parquet(Output, 'overwrite')
    '''

def Create_Dataset_SparkContext_2(Set_Size, Input=None, Output=None, Decimal_Precision=None, sort=None, ascending=None):
  """Creates a dataset of random individuals which are then censored via the hazard function definitions
  - Set_Size: The number of individuals to represent (affects person years)
  - Input: Specifies where the input .parquet file should be read from. If left empty, the function will create its own starting data to censor
  - Output: Specifies where the output .parquet file should be written and Overwritten. If left Empty, Dataset will print to console
  - Decimal_Precision: The number of decimal places to round to
  """
  if Decimal_Precision is None: Precision = 12 # Set the decimal place precision for the output
  else: Precision = Decimal_Precision
  data = [] # Check if the input Parameter is used and provides a path to a .parquet file
  if Input != None: df = sqlContext.read.parquet(Input) # Create a dataset for possible individuals before censoring is done.
  else:
    for i in range(0,  Set_Size):
      Entrance_Age = Random.uniform(18, 80)
      seed = random.randint(0, 1000000)
      dose = Get_Dose(t0=Entrance_Age, s=seed)
      age = Entrance_Age
      dx = 0.1
      smoker_status = Random.randint(0, 3)
      econ_status =  Random.randint(0, 7)
      sex = Random.randint(0, 2)
      data.append({"-Index": i, "Included": 1,
                   "Smoker_Status": smoker_status,
                   "Entrance_Age": Entrance_Age,
                   "Dosage": dose,
                   "Sex": sex,
                   "Economic_Status": econ_status,
                   "Dosage_Seed": seed,
                   "Event1": new_Eq1(dose, smoker_status, econ_status, sex, Entrance_Age),
                   "Event2": new_Eq2(dose, smoker_status, econ_status, sex, Entrance_Age),
                   "Event3": new_Eq3(dose, smoker_status, econ_status, sex, Entrance_Age),
                   "Earliest_Event_Age": 0.0})
  RDD = sc.parallelize(data)
  df = RDD.toDF();
  df.show()
  '''
  df = df.withColumn("Included", when((df["Earliest_Event_Age"] > df["Entrance_Age"]), 0).otherwise(1))# Censor cases where Individuals have not yet reached an event
  df = df.withColumn('Earliest_Event', when(df["Earliest_Event_Age"] == df["Event1"], 1).when(df["Earliest_Event_Age"] == df["Event2"], 2).otherwise(2)) # I assumed that people over 80 would automatically be removed from our set, so the person years are 80 - entrance age
  df = df.withColumn("Person_Years", sqlround((df["Earliest_Event_Age"] - df["Entrance_Age"]), Precision))
  df = df.withColumn("Dosage", sqlround(df["Dosage"], Precision))
  df = df.withColumn("Earliest_Event_Age", sqlround(df["Earliest_Event_Age"], Precision))
  df = df.withColumn("Event1", sqlround(df["Event1"], Precision))
  df = df.withColumn("Event2", sqlround(df["Event2"], Precision))
  df = df.withColumn("Event3", sqlround(df["Event3"], Precision))
  df = df.withColumn("Entrance_Age", sqlround(df["Entrance_Age"], Precision))
  df_filtered=df.filter(df.Included == 0 )
  df_filtered=df_filtered.drop('Included')
  if sort != None:
    if ascending == None: ascending = False
    df_filtered = df_filtered.orderBy(sort, ascending=ascending)
  else:
    if ascending == None: ascending = False
    df_filtered = df_filtered.orderBy('-Index', ascending=True)
  if ascending == True and sort==None:
    df_filtered = df_filtered.orderBy('-Index', ascending=False)
  if Output is None:
    print("debug")
    df_filtered.show()
    df_filtered.select(sqlround(sqlsum(df["Person_Years"]), Precision).alias('Total_Person_Years')).show()
  else:
    df.write.parquet(Output, 'overwrite')
    '''

# Testing Code Functionality

In [None]:
#Time1 = 10
#Seed1 = 394090
#print("Age Approximation: " + str(Get_Time_For_Dosage(t0=Time1, s=Seed1)))
#print("Final Dosage: " + str(Get_Dose(t0=Time1, s=Seed1)))
#Get_Dose_Graph(t0=Time1, s=Seed1)

In [None]:
%%timeit -r8 -n2
Create_Dataset_SparkContext(Set_Size=1000)

In [None]:
%%timeit -r8 -n2
Create_Dataset_SparkContext_2(Set_Size=1000)

In [None]:
%%timeit -r8 -n2
Create_Dataset_SparkContext(Set_Size=10000)

In [None]:
%%timeit -r8 -n2
Create_Dataset_SparkContext_2(Set_Size=10000)

In [None]:
%%timeit -r8 -n2
Create_Dataset_SparkContext(Set_Size=100000)

In [None]:
%%timeit -r8 -n2
Create_Dataset_SparkContext_2(Set_Size=100000)

In [None]:
%%timeit -r8 -n2
Create_Dataset_SparkContext(Set_Size=1000000)

+------+--------+-------------+------------------+-------------------+---+---------------+-----------+--------------------+--------------------+--------------------+------------------+
|-Index|Included|Smoker_Status|      Entrance_Age|             Dosage|Sex|Economic_Status|Dosage_Seed|              Event1|              Event2|              Event3|Earliest_Event_Age|
+------+--------+-------------+------------------+-------------------+---+---------------+-----------+--------------------+--------------------+--------------------+------------------+
|     0|       1|            0| 27.43032765848195| 1.2711049796363065|  1|              4|     371587|2.945969098778789E23|2.945969098778789E23|2.945969098778789E23|               0.0|
|     1|       1|            0|  72.7569248288224|0.04543434437376122|  1|              0|     454307|3.755532256546205...|3.755532256546205...|3.755532256546205...|               0.0|
|     2|       1|            0| 19.63633943388458|  3.868933675917663|  0| 

In [None]:
%%timeit -r8 -n2
Create_Dataset_SparkContext_2(Set_Size=1000000)

+------+--------------------+-----------+------------------+---------------+-----------------+--------------------+--------------------+--------------------+--------+---+-------------+
|-Index|              Dosage|Dosage_Seed|Earliest_Event_Age|Economic_Status|     Entrance_Age|              Event1|              Event2|              Event3|Included|Sex|Smoker_Status|
+------+--------------------+-----------+------------------+---------------+-----------------+--------------------+--------------------+--------------------+--------+---+-------------+
|     0| 0.08685632001013331|     427227|               0.0|              5|73.24120347109351|3.308790808788943E54|3.308790808788943E54|3.308790808788943E54|       1|  0|            2|
|     1| 0.06365393689557096|     538499|               0.0|              2|67.34575691588756|7.227438225628121E48|7.227438225628121E48|7.227438225628121E48|       1|  0|            1|
|     2|  4.5426094003917274|     781847|               0.0|              5