In [None]:
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType, StringType, BooleanType, FloatType, StructType, StructField
from pyspark.sql.functions import col, to_date, dayofmonth
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

## Dataset

We'll consider the San Francisco Fire Department Calls dataset.

```
/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv
```

## Part 1

Write a function named `prepare_dataframe()` that

- reads the csv file
- excludes the records with `CallType` that occurs less than `200` times in the entire dataset (it is fine to hardcode the values manually)
- computes a new column `CallDayOfWeek` that computes the day of the week (1 Sunday, 7 Saturday) based on the column `CallDate`
- computes a new column `DeltaPriority` that computes the difference between `FinalPriority` and `OriginalPriority` (eg `DeltaPriority=2` when `FinalPriority=4` and `OriginalPriority=2`)
- keeps only the following columns `CallNumber`, `CallType`, `Priority`, `CallDayOfWeek`, `DeltaPriority`, `Delay`
- remove rows where any record is null (you can use `dropna`)

In [None]:
def prepare_dataframe() -> DataFrame:
    # TASK 1
    fire_schema = StructType([
    StructField('CallNumber', IntegerType(), True),
    StructField('UnitID', StringType(), True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zipcode', IntegerType(), True),
    StructField('Battalion', StringType(), True),
    StructField('StationArea', StringType(), True),
    StructField('Box', StringType(), True),
    StructField('OriginalPriority', IntegerType(), True),
    StructField('Priority', IntegerType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True),
    StructField('NumAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True),
    StructField('Delay', FloatType(), True)
    ])
    df = spark.read.csv("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv",header=True, schema=fire_schema)
    
    # TASK 2
    # display(df.groupby(df.CallType)
    #            .count()
    #            .where(col("count")<200)
    #            .select(df.CallType)
    #            .collect())
    df=df.where(df.CallType != "Administrative")
    df=df.where(df.CallType != "Train / Rail Fire")
    df=df.where(df.CallType != "Lightning Strike (Investigation)")
    
    # TASK 3
    df=(df.withColumn("Call_to_Date",to_date(col("CallDate"), "dd/MM/yyyy"))
        .withColumn("CallDayOfWeek", dayofmonth(col("Call_to_Date")))
       )
    # TASK 4
    df=df. withColumn("DeltaPriority",df.FinalPriority - df.OriginalPriority)
    
    # TASK 5
    df=df.select("CallNumber","CallType", "Priority", "CallDayOfWeek", "DeltaPriority", "Delay")
    
    # TASK 6
    df = df.dropna()
    
    return df

## Part 2

Write a function named `define_pipeline()` that

- returns a `Pipeline` object
- the object is initialized with stages that predict the `Delay` via a linear regression based on the following features: `CallType`, `Priority`, `CallDayOfWeek`, `DeltaPriority`. Two of these features can be considered as categorical, which are the 2?

In [None]:
def define_pipeline() -> Pipeline:
    indexer = StringIndexer(
        inputCols = ["CallType","CallDayOfWeek"],
        outputCols=["CallType_indexed", "CallDayOfWeek_indexed"]
    )
    
    assembler = VectorAssembler(
        inputCols = ["CallType_indexed","Priority","CallDayOfWeek_indexed","DeltaPriority"],
        outputCol= "features"
    )
    
    regressor= LinearRegression(
        labelCol="Delay",
        featuresCol="features"
    )
    
    pipeline = Pipeline(
        stages=[indexer, assembler, regressor]
    )
    return pipeline

## Part 3

Write a function named `fit_model(df, pipeline)` that
- takes as input
  - `df`, a `DataFrame` object that corresponds to the one that is returned by `prepare_dataframe()`
  - `pipeline`, a `Pipeline` object that corresonds to the one that is returned by `define_pipeline()`
- splits `df` in _train_ and _test_ dataset (80% and 20% respectively using a seed of `42`)
- fits the `pipeline` on the _train_ dataset and runs prediction on the _test_ dataset
- returns the `DataFrame` of the predictions made of only 2 columns: `DelayLabel`, `DelayPrediction`

In [None]:
def fit_model(df: DataFrame, pipeline: Pipeline) -> DataFrame:
    trainDF,testDF = df.randomSplit([0.8, 0.2], seed=42)
    
    model = pipeline.fit(trainDF)
    delay_predictions = model.transform(testDF)
    
    predictions_df = delay_predictions.select("Delay","prediction")
    
    return predictions_df


## Part 4
Write a function `evaluate_r2(predictions_df)` that
- takes as input a `DataFrame` object that corresponds to the one that is returned by `fit_model()`, ie made of 2 columns `DelayLabel` and `DelayPrediction`
- computes the R squared metric and returns the R2 value

In [None]:
def evaluate_r2(predictions_df: DataFrame) -> float:
    evaluator = RegressionEvaluator(
        predictionCol="prediction",
        labelCol="Delay",
        metricName="r2"
    )
    
    r2 = evaluator.evaluate(predictions_df)
    
    return r2

In [None]:
df = prepare_dataframe()

pipeline = define_pipeline()

predictions = fit_model(df = df, pipeline = pipeline)

r2 = evaluate_r2(predictions)
display ("r2: ",r2)

'r2: '0.004543142194121441