In [0]:
df = spark.read.table("nyc_taxi.idk.yellow_trips_csv_v")
display(df)
df.printSchema()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import radians, sin, cos, atan2, sqrt
R=3959 #radius of earth in miles
df_clean=(
    df
    .filter((df.fare_amount > 0) & (df.fare_amount<500) & (df.trip_distance >0))
    .withColumn("pickup_hour", F.hour(df.tpep_pickup_datetime))
    .withColumn("pickup_dow", F.dayofweek(df.tpep_pickup_datetime))
    .withColumn("pickup_ts",  F.unix_timestamp("tpep_pickup_datetime"))
    .withColumn("dropoff_ts", F.unix_timestamp("tpep_dropoff_datetime"))
    .withColumn("trip_duration_min", (F.col("dropoff_ts") - F.col("pickup_ts")) / 60.0)
    
    
)
df_clean=(
    df_clean
    .withColumn("pick_lat", radians(df_clean.pickup_latitude))
    .withColumn("pick_long", radians(df_clean.pickup_longitude))
    .withColumn("drop_lat", radians(df_clean.dropoff_latitude))
    .withColumn("drop_long", radians(df_clean.dropoff_longitude))
)
df_clean=(
    df_clean
    .withColumn("lat_diff", df_clean.pick_lat - df_clean.drop_lat)
    .withColumn("long_diff", df_clean.pick_long - df_clean.drop_long)
)
df_clean=(
    df_clean
    .withColumn("a", sin(df_clean.lat_diff/2)**2 + cos(df_clean.pick_lat)*cos(df_clean.drop_lat)*sin(df_clean.long_diff/2)**2)
    .withColumn("c", 2*atan2(sqrt(F.col("a")), sqrt(1-F.col("a"))))
)
df_clean=(
    df_clean
    .withColumn("straight_line_distance", df_clean.c*R)

)
df_clean=df_clean.drop("pick_lat", "pick_long", "drop_lat", "drop_long", "lat_diff", "long_diff", "a", "c")

df_clean=df_clean.withColumn("high_fare", F.when(df_clean.fare_amount >20,1).otherwise(0))
numericFeatures = [
    "passenger_count",
    "trip_distance",
    "trip_duration_min",
    "pickup_hour",
    "pickup_dow",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge"
]

categoricalFeatures = [
    "VendorID",
    "RateCodeID",
    "store_and_fwd_flag",
    "payment_type"
]

#remove nulls for cols used as features + label
colsToKeepNotNull = numericFeatures + categoricalFeatures + ["fare_amount"]

df_clean = df_clean.na.drop(subset=colsToKeepNotNull)
train_df, test_df = df_clean.randomSplit([0.7,0.3], seed=42)
display(df_clean)

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

#index categoricals
indexers = [
    StringIndexer(
        inputCol=c,
        outputCol=c + "_idx",
        handleInvalid="keep"
    )
    for c in categoricalFeatures
]

#one-hot encode
encoder = OneHotEncoder(
    inputCols=[c + "_idx" for c in categoricalFeatures],
    outputCols=[c + "_oh" for c in categoricalFeatures]
)

#assemble all features
assembler = VectorAssembler(
    inputCols=numericFeatures + [c + "_oh" for c in categoricalFeatures],
    outputCol="features_unscaled"
)

#scale for linear regression
#withMean=False keeps vector sparse to avoid huge memory usage
scaler = StandardScaler(
    inputCol="features_unscaled",
    outputCol="features",
    withMean=False, #True
    withStd=True
)

#linear regression model
lr = LinearRegression(
    featuresCol="features",
    labelCol="fare_amount",
    predictionCol="prediction"
)

#full linear regression pipeline
lrPipeline = Pipeline(
    stages=indexers + [encoder, assembler, scaler, lr]
)

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

#param grid
paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.0, 0.1]) # [0.0, 0.01, 0.1]
    .addGrid(lr.elasticNetParam, [0.0, 0.5]) # [0.0, 0.5, 1.0]
    .addGrid(lr.maxIter, [50]) # [50, 100]
    .build()
)

evaluatorRmse = RegressionEvaluator(
    labelCol="fare_amount",
    predictionCol="prediction",
    metricName="rmse"
)

cvLr = CrossValidator(
    estimator=lrPipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluatorRmse,
    numFolds=2,     #lower if needed
    parallelism=1,  #lower if needed
    collectSubModels=False
)

In [0]:
import os

os.environ["SPARKML_TEMP_DFS_PATH"] = "/Volumes/ml_storage/ml_schema/ml_volume/sparkml_tmp"
cvLrModel = cvLr.fit(train_df)

bestLrPipelineModel = cvLrModel.bestModel
bestLrModel = bestLrPipelineModel.stages[-1]