In [1]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator, StringIndexer
from pyspark.sql import SparkSession, DataFrame, functions as F
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable, DefaultParamsReader 
from pyspark.ml import PipelineModel
from pyspark.ml.param.shared import HasOutputCols, Param, Params, TypeConverters
from pyspark import keyword_only

In [2]:
spark.conf.set(
  "fs.azure.account.key.sparkmltrainig.blob.core.windows.net",
  "7GxsG/4zpjksVxCbEiXICIy4hmImskk9SlVNDE3uOngZBT3h3oks2ks3FrjgRmHS3m1nR052qSHhjkVRQmB6nw==")

In [3]:
data = spark.read.csv("wasbs://data@sparkmltrainig.blob.core.windows.net/train.csv", inferSchema = True, header = True)

train_df, test_df = data.randomSplit([0.9, 0.1], seed=42)
print(train_df.count())
print(test_df.count())


In [4]:
train_df.show()

In [5]:
train_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in train_df.columns]).show()
test_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in test_df.columns]).show()

In [6]:
def evaluate_initials(df: DataFrame) -> DataFrame:
  dizip_initials = {k:v for k,v in (zip(['Mlle','Mme','Ms','Dr', 'Major','Lady','Countess', 'Jonkheer','Col','Rev', 'Capt','Sir','Don'], ['Miss','Miss','Miss', 'Mr','Mr','Mrs','Mrs', 'Other','Other','Other', 'Mr','Mr','Mr']))}
  _df = df.withColumn('Initial',  F.regexp_extract( df['Name'], ('([A-Za-z]+)\.'),1 ) )
  _df = _df.replace(dizip_initials,1,'Initial')
  return _df
  
def handle_missing_age(df: DataFrame) -> DataFrame:
    _df = df
    _df = _df.withColumn('Age', 
           F.when((F.isnull(_df['Age'])) & (_df['Initial'] == 'Mr') , 33 )\
            .otherwise(F.when((F.isnull(_df['Age'])) 
                              & (_df['Initial'] == 'Mrs') , 36)\
            .otherwise(F.when((F.isnull(_df['Age'])) 
                              & (_df['Initial'] == 'Master') , 5)\
            .otherwise(F.when((F.isnull(_df['Age'])) 
                              & (_df['Initial'] == 'Miss') , 22)\
            .otherwise(F.when((F.isnull(_df['Age'])) 
                              & (_df['Initial'] == 'Other') , 46)\
            .otherwise(_df['Age']) )))))
    return _df

def create_family_size(df: DataFrame) -> DataFrame:
  _df = df.withColumn('FamilySize', df['Parch'] + df['SibSp'] + 1 )
  return _df

def create_is_alone(df: DataFrame) -> DataFrame:
  _df = df.withColumn('IsAlone', F.when(df['FamilySize'] > 1, 0).otherwise(1))
  return _df

def create_fare_per_person(df: DataFrame) -> DataFrame:  
  _df = df.withColumn('FarePerPerson', df['Fare'] /df['FamilySize'])
  return _df

def drop_rows_with_null(df: DataFrame, col) -> DataFrame:
  _df = df = df.filter(df[col].isNotNull())
  return _df

def change_to_index(df: DataFrame, col) -> DataFrame:
  indexer = StringIndexer(inputCol=col, outputCol='{0}_indexed'.format(col))
  _df = indexer.fit(df).transform(df)
  return _df

def change_to_one_hot_encoded(df: DataFrame, cols) -> DataFrame:
  for col in cols:
    df = change_to_index(df, col)
  col_indexed = ['{0}_indexed'.format(col) for col in cols]
  col_encoded = ['{0}_encoded'.format(col) for col in cols]
  encoder = OneHotEncoderEstimator(inputCols=col_indexed, outputCols=col_encoded)
  _df = encoder.fit(df).transform(df)
  return _df

In [7]:
train_df = drop_rows_with_null(handle_missing_age(evaluate_initials(create_fare_per_person(create_is_alone(create_family_size(train_df))))), 'Embarked')
test_df = handle_missing_age(evaluate_initials(create_fare_per_person(create_is_alone(create_family_size(test_df)))))

In [8]:
train_df = change_to_one_hot_encoded(train_df, ['Sex', 'Initial', 'Embarked'])
test_df = change_to_one_hot_encoded(test_df, ['Sex', 'Initial', 'Embarked'])


In [9]:
cols_to_drop=['PassengerId', 'Cabin', 'Ticket', 'Name', 'Sex', 'Initial', 'Embarked']
train_df = train_df.drop(*cols_to_drop)
test_df = test_df.drop(*cols_to_drop)

In [10]:
train_df.show()

In [11]:
cols = [c for c in train_df.columns if c != 'Survived']
vec_assembler = VectorAssembler(inputCols=cols, outputCol="features")
vec_train_DF = vec_assembler.transform(train_df)
lr = LinearRegression(featuresCol="features", labelCol="Survived")
lr_model = lr.fit(vec_train_DF)

In [12]:
vec_test_df = vec_assembler.transform(test_df)
pred_df = lr_model.transform(vec_test_df)
pred_df.show()

In [13]:
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Survived", metricName="rmse")

rmse = regression_evaluator.evaluate(pred_df)
print(f"RMSE is {rmse}")

In [14]:
class InitialsTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable, DefaultParamsReader):
  @keyword_only
  def __init__(self):
    super(InitialsTransformer, self).__init__()

  def _transform(self, df: DataFrame) -> DataFrame:
    dizip_initials = {k:v for k,v in (zip(['Mlle','Mme','Ms','Dr', 'Major','Lady','Countess', 'Jonkheer','Col','Rev', 'Capt','Sir','Don'], ['Miss','Miss','Miss', 'Mr','Mr','Mrs','Mrs', 'Other','Other','Other', 'Mr','Mr','Mr']))}
    _df = df.withColumn('Initial',  F.regexp_extract( df['Name'], ('([A-Za-z]+)\.'),1 ) )
    _df = _df.replace(dizip_initials,1,'Initial')
    return _df

m = __import__("__main__")
setattr(m, 'InitialsTransformer', InitialsTransformer)
  
class MissingAgeTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self):
    super(MissingAgeTransformer, self).__init__()

  def _transform(self, df: DataFrame) -> DataFrame:
    _df = df
    _df = _df.withColumn('Age', 
           F.when((F.isnull(_df['Age'])) & (_df['Initial'] == 'Mr') , 33 )\
            .otherwise(F.when((F.isnull(_df['Age'])) 
                              & (_df['Initial'] == 'Mrs') , 36)\
            .otherwise(F.when((F.isnull(_df['Age'])) 
                              & (_df['Initial'] == 'Master') , 5)\
            .otherwise(F.when((F.isnull(_df['Age'])) 
                              & (_df['Initial'] == 'Miss') , 22)\
            .otherwise(F.when((F.isnull(_df['Age'])) 
                              & (_df['Initial'] == 'Other') , 46)\
            .otherwise(_df['Age']) )))))
    return _df

setattr(m, 'MissingAgeTransformer', MissingAgeTransformer)
  
class FamilySizeTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self):
    super(FamilySizeTransformer, self).__init__()

  def _transform(self, df: DataFrame) -> DataFrame:
    _df = df.withColumn('FamilySize', df['Parch'] + df['SibSp'] + 1 )
    return _df

setattr(m, 'FamilySizeTransformer', FamilySizeTransformer)
  
class IsAloneTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self):
    super(IsAloneTransformer, self).__init__()

  def _transform(self, df: DataFrame) -> DataFrame:
    _df = df.withColumn('IsAlone', F.when(df['FamilySize'] > 1, 0).otherwise(1))
    return _df

setattr(m, 'IsAloneTransformer', IsAloneTransformer)
  
class FarePerPersonTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
  @keyword_only
  def __init__(self):
    super(FarePerPersonTransformer, self).__init__()

  def _transform(self, df: DataFrame) -> DataFrame:  
    _df = df.withColumn('FarePerPerson', df['Fare'] /df['FamilySize'])
    return _df

setattr(m, 'FarePerPersonTransformer', FarePerPersonTransformer)
  
class DropRowsWithNullTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
  def __init__(self): 
    super(DropRowsWithNullTransformer, self).__init__()
  
  col = Param(Params._dummy(), "col", "column", typeConverter=TypeConverters.toString)

  def setCol(self, value):
      return self._set(col=value)

  def getCol(self):
      return self.getOrDefault(self.col)

  def _transform(self, df: DataFrame) -> DataFrame:
    _df = df = df.filter(df[self.getCol()].isNotNull())
    return _df

setattr(m, 'DropRowsWithNullTransformer', DropRowsWithNullTransformer)
  
class CategoryToIndexTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
  def __init__(self): 
    super(CategoryToIndexTransformer, self).__init__()

  col = Param(Params._dummy(), "col", "column", typeConverter=TypeConverters.toString)

  def setCol(self, value):
      return self._set(col=value)

  def getCol(self):
      return self.getOrDefault(self.col)

  def _transform(self, df: DataFrame) -> DataFrame:
    indexer = StringIndexer(inputCol=self.getCol(), outputCol='{0}_indexed'.format(self.getCol()))
    _df = indexer.fit(df).transform(df)
    return _df

setattr(m, 'CategoryToIndexTransformer', CategoryToIndexTransformer)
  
class CategoryToOneHotEncodedTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
  def __init__(self): 
    super(CategoryToOneHotEncodedTransformer, self).__init__()

  cols = Param(Params._dummy(), "cols", "columns", typeConverter=TypeConverters.toListString)

  def setCols(self, value):
      return self._set(cols=value)

  def getCols(self):
      return self.getOrDefault(self.cols)

  def _transform(self, df: DataFrame) -> DataFrame:
    for col in self.getCols():
      cat_to_index = CategoryToIndexTransformer()
      cat_to_index.setCol(col)
      df = cat_to_index.transform(df)
    col_indexed = ['{0}_indexed'.format(col) for col in self.getCols()]
    col_encoded = ['{0}_encoded'.format(col) for col in self.getCols()]
    encoder = OneHotEncoderEstimator(inputCols=col_indexed, outputCols=col_encoded)
    _df = encoder.fit(df).transform(df)
    return _df

setattr(m, 'CategoryToOneHotEncodedTransformer', CategoryToOneHotEncodedTransformer)
  
class DropStringColsTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
  def __init__(self): 
    super(DropStringColsTransformer, self).__init__()

  cols = Param(Params._dummy(), "cols", "columns", typeConverter=TypeConverters.toListString)

  def setCols(self, value):
      return self._set(cols=value)

  def getCols(self):
      return self.getOrDefault(self.cols)
    
  def _transform(self, df: DataFrame) -> DataFrame:
    df = df.drop(*self.getCols())
    return df

setattr(m, 'DropStringColsTransformer', DropStringColsTransformer)

In [15]:
data = spark.read.csv("wasbs://data@sparkmltrainig.blob.core.windows.net/train.csv", inferSchema = True, header = True)

train_df, test_df = data.randomSplit([0.9, 0.1], seed=42)


In [16]:
stages = []
init_trans = InitialsTransformer()
missing_age_trans = MissingAgeTransformer()
family_size_trans = FamilySizeTransformer()
is_alone_trans = IsAloneTransformer()
fare_per_person_trans = FarePerPersonTransformer()
drop_rows_with_null_trans = DropRowsWithNullTransformer()
drop_rows_with_null_trans.setCol('Embarked')
cat_to_one_how_trans = CategoryToOneHotEncodedTransformer()
cat_to_one_how_trans.setCols(['Sex', 'Initial', 'Embarked'])
drop_trans = DropStringColsTransformer()
drop_trans.setCols(['PassengerId', 'Cabin', 'Ticket', 'Name', 'Sex', 'Initial', 'Embarked'])

cols = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'FamilySize', 'IsAlone', 'FarePerPerson', 'Sex_indexed', 'Initial_indexed', 'Embarked_indexed', 'Sex_encoded', 'Initial_encoded', 'Embarked_encoded']
#cols = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'FamilySize', 'IsAlone', 'FarePerPerson']
vec_assembler = VectorAssembler(inputCols=cols, outputCol="Features")
lr = LinearRegression(featuresCol="Features", labelCol="Survived")

stages = [init_trans, family_size_trans, is_alone_trans, fare_per_person_trans, missing_age_trans, drop_rows_with_null_trans, cat_to_one_how_trans, drop_trans, vec_assembler, lr]
#stages = [init_trans, family_size_trans, is_alone_trans, fare_per_person_trans, missing_age_trans, drop_rows_with_null_trans, vec_assembler, lr]

pipeline = Pipeline(stages=stages)

pipelineModel = pipeline.fit(train_df)

In [17]:
pipelinePath = "/tmp/lr_pipeline_model"
pipelineModel.write().overwrite().save(pipelinePath)

In [18]:

#stages = [init_trans, family_size_trans, is_alone_trans, fare_per_person_trans, missing_age_trans, drop_rows_with_null_trans, cat_to_one_how_trans, drop_trans, vec_assembler]

#test_df_transformed = vec_assembler.transform(drop_trans.transform(cat_to_one_how_trans.transform(drop_rows_with_null_trans.transform(missing_age_trans.transform(fare_per_person_trans.transform(is_alone_trans.transform(family_size_trans.transform(init_trans.transform(test_df)))))))))

In [19]:
test_df.show()

In [20]:
savedPipelineModel = PipelineModel.load(pipelinePath)
pred_df = savedPipelineModel.transform(test_df)


In [21]:
regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Survived", metricName="rmse")

rmse = regression_evaluator.evaluate(pred_df)
print(f"RMSE is {rmse}")