In [0]:
# File location and type
file_location = "/FileStore/tables/tips.csv"
file_type = "csv"

# The applied options are for CSV files.
# For other file types, these will be ignored.
df = spark.read.csv(
  file_location,
  header=True,
  inferSchema=True
)

df.show(5)

In [0]:
df.printSchema()

In [0]:
df.columns

In [0]:
# Handle categorical features
# StringIndexer <--> OneHot Encoder (sklearn)
from pyspark.ml.feature import StringIndexer

In [0]:
# Creating indexer object for 1 categorical col
indexer = StringIndexer(
  inputCol="sex",
  outputCol="sex_indexed"
)

df_tmp = indexer.fit(df).transform(df)

df_tmp.show(3)

In [0]:
# Creating indexer object for 2 or more categorical cols
indexer = StringIndexer(
  inputCols=["sex", "smoker", "day", "time"],
  outputCols=["sex_indexed", "smoker_indexed", "day_indexed", "time_indexed"]
)

df_tmp = indexer.fit(df).transform(df)

df_tmp.show(3)

In [0]:
# For PySpark, we have to group independent features together and the dependent feature separatively with an assembler vector
# It is needed before running a model
# thx to it, all datas from all clusters will be processed in the same way
from pyspark.ml.feature import VectorAssembler

# input cols
list_input_cols = [x if x in ["tip", "size"] else x + "_indexed" for x in df.columns[1:]]

# Initializaton
# inputCols -> independent features
# outputCol -> independent features group name
# outputCol is a specific channel created in PySpark
feature_assembler = VectorAssembler(
  inputCols=list_input_cols,
  outputCol="Independent_Features"
)

# fit and transform
output = feature_assembler.transform(df_tmp)

In [0]:
output.select("Independent_Features").show(3)

In [0]:
finalized_data = output.select(["Independent_Features", "total_bill"])
finalized_data.show(3)

In [0]:
# shape function
def spark_shape(self):
  return (self.count(), len(self.columns))

In [0]:
# Train Test Split
train_data, test_data = finalized_data.randomSplit(
  [0.75, 0.25],
  seed=0
)

print(f"train shape = {spark_shape(train_data)}")
print(f"test shape  = {spark_shape(test_data)}")

In [0]:
# Linear regression
from pyspark.ml.regression import LinearRegression

linReg_model = LinearRegression(
  featuresCol="Independent_Features",
  labelCol="total_bill",
  elasticNetParam=0,
  standardization=True
)

linReg_model = linReg_model.fit(train_data)

In [0]:
# Print the coefficients and intercept for linear regression
print(f"Coefficients: {linReg_model.coefficients}")
print(f"Intercept: {linReg_model.intercept}")

# Prediction
pred_results = linReg_model.evaluate(test_data)

# Metrics
print(f"\nRMSE: {pred_results.rootMeanSquaredError}")
print(f"MAE: {pred_results.meanAbsoluteError}")
print(f"r2: {pred_results.r2}")

In [0]:
pred_results.predictions.show(5)