In [1]:
from pyspark.sql import SparkSession

MAX_MEMORY="5g"
spark = SparkSession.builder.appName("sparkml_text_fare_prediction_v3")\
                .config("spark.executor.memory",MAX_MEMORY)\
                .config("spark.driver.memory",MAX_MEMORY)\
                .getOrCreate()

22/05/24 11:31:24 WARN Utils: Your hostname, ihyeonmin-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.11 instead (on interface en0)
22/05/24 11:31:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/24 11:31:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data_dir = "/Users/ihyeonmin/Desktop/study/data-engineering/01-spark/data/trips/"

In [5]:
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")

                                                                                

In [6]:
toy_df = train_df.sample(False,0.1,seed=1)

In [8]:
toy_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: long (nullable = true)
 |-- dropoff_location_id: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [9]:
# one hot encoding
# ex) Wednesday -> 4 -> [0,0,0,1,0,0,0]

In [10]:
from pyspark.ml.feature import OneHotEncoder,StringIndexer

cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
    ]
stages=[]

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c,outputCol=c+"_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()],outputCols=[c+"_onehot"])
    stages += [cat_indexer,onehot_encoder]
print(stages)

[StringIndexer_95737b30b350, OneHotEncoder_477665edd5d6, StringIndexer_4a5068563b97, OneHotEncoder_161e917430a4, StringIndexer_9134ddb74dc1, OneHotEncoder_a2d97a80e409]


In [11]:
from pyspark.ml.feature import VectorAssembler,StandardScaler

num_feats = [
    "passenger_count",
    "trip_distance",
    "total_amount"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n],outputCol=n+"_vector")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(),outputCol=n+"_scaled")
    stages+=[num_assembler,num_scaler]

In [12]:
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n+"_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs,outputCol="feature_vector")
stages += [assembler]

## HyperParameter Tuning

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(
    maxIter=30,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

cv_stage = stages+[lr]

In [21]:
cv_pipeline = Pipeline(stages=cv_stage)

In [22]:
param_Grid = ParamGridBuilder()\
                .addGrid(lr.elasticNetParam,[0.1,0.2,0.3,0.4,0.5])\
                .addGrid(lr.regParam,[0.01,0.02,0.03,0.04,0.05])\
                .build()

In [24]:
cross_val = CrossValidator(estimator=cv_pipeline,
                           estimatorParamMaps=param_Grid,
                           evaluator=RegressionEvaluator(labelCol="total_amount"),
                           numFolds=5
)

In [25]:
cv_model = cross_val.fit(toy_df)

Exception ignored in: <function JavaWrapper.__del__ at 0x114b14e50>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/pyspark/ml/wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LinearRegression' object has no attribute '_java_obj'
22/05/24 11:42:03 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/24 11:42:03 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/05/24 11:42:05 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/05/24 11:42:05 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [27]:
alpha=cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam()
reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam()

## Training

In [28]:
from pyspark.ml import Pipeline

transform_stages=stages
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)

                                                                                

In [29]:
vtrain_df = fitted_transformer.transform(train_df)

In [30]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter=5,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector",
    elasticNetParam=alpha,
    regParam=reg_param
)

In [31]:
vtrain_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: long (nullable = true)
 |-- dropoff_location_id: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- total_amount_vector: vector (nullable = true)
 |-- total_amount_scaled: vector (nullable = true)
 |-- feature_vector: vector (nullable

In [32]:
model = lr.fit(vtrain_df)

                                                                                

In [33]:
vtest_df = fitted_transformer.transform(test_df)

In [34]:
predictions=model.transform(vtest_df)

In [35]:
predictions.cache()

DataFrame[passenger_count: double, pickup_location_id: bigint, dropoff_location_id: bigint, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, total_amount_vector: vector, total_amount_scaled: vector, feature_vector: vector, prediction: double]

In [36]:
predictions.select(["trip_distance","day_of_week","total_amount","prediction"]).show()

[Stage 3046:>                                                       (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|         13.9|   Saturday|         4.3|31.225308046786118|
|          3.3|     Sunday|         5.8| 8.651171994667823|
|          5.5|     Sunday|       26.75|26.470713602222297|
|          7.9|   Saturday|        30.3|31.141557171024292|
|          0.4|  Wednesday|         7.8| 8.344974111154128|
|          1.0|   Saturday|        11.8|11.834671791881147|
|          1.3|   Saturday|       12.85|12.875437500475657|
|          9.4|   Saturday|        39.8| 39.60780364855241|
|          1.5|  Wednesday|        13.3|13.367471378095443|
|          4.7|     Sunday|       26.15|25.669476520322217|
|          0.6|   Saturday|       11.75|  11.5852168041755|
|          1.1|    Tuesday|        11.8|11.976043158727427|
|          3.7|     Friday|       24.35|23.667212807541105|
|          2.4|   Saturday|       14.75|

                                                                                

In [37]:
# hyper parameter 적용 전 : 1.3337457020031904
model.summary.rootMeanSquaredError

1.3454024138712863

In [38]:
# hyper parameter 적용 전 : 0.9892934761685851
model.summary.r2

0.9891055119394475

In [39]:
model_dir = "/Users/ihyeonmin/Desktop/study/data-engineering/01-spark/data/trips/model"
model.save(model_dir)

                                                                                

In [40]:
# model load
from pyspark.ml.regression import LinearRegressionModel

In [41]:
lr_model = LinearRegressionModel().load(model_dir)

In [42]:
predictions=lr_model.transform(vtest_df)

In [43]:
predictions.show() 

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+-------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|total_amount_vector| total_amount_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+--------------------

In [44]:
spark.stop()