In [1]:
import pandas as pd
from utils import Timer

with Timer("read train data from csv"):
    train_data = pd.read_csv("nyc_taxi_fare_cleaned.csv", nrows=10000)
    print(f"train_data shape is {train_data.shape}")

from pyrecdp.autofe import FeatureWrangler
with Timer("initiate autofe pipeline"):
    pipeline = FeatureWrangler(dataset=train_data, label="fare_amount")

with Timer("transform"):
    ret = pipeline.fit_transform(engine_type = 'spark')
    
print(f"transformed shape is {ret.shape}")
ret

train_data shape is (10000, 7)
read train data from csv took 0.01138334535062313 sec




After analysis, decided pipeline includes below steps:

Stage 0: [<class 'pyrecdp.primitives.generators.dataframe.DataframeConvertFeatureGenerator'>]
Stage 1: [<class 'pyrecdp.primitives.generators.fillna.FillNaFeatureGenerator'>, <class 'pyrecdp.primitives.generators.type.TypeInferFeatureGenerator'>, <class 'pyrecdp.primitives.generators.geograph.CoordinatesInferFeatureGenerator'>]
Stage 2: [<class 'pyrecdp.primitives.generators.datetime.DatetimeFeatureGenerator'>, <class 'pyrecdp.primitives.generators.geograph.GeoFeatureGenerator'>]
Stage 3: [<class 'pyrecdp.primitives.generators.drop.DropUselessFeatureGenerator'>, <class 'pyrecdp.primitives.generators.name.RenameFeatureGenerator'>]
Stage 4: [<class 'pyrecdp.primitives.generators.dataframe.DataframeTransformFeatureGenerator'>]
Stage 5: [<class 'pyrecdp.primitives.generators.category.CategoryFeatureGenerator'>]
Stage 6: []
Stage 7: [<class 'pyrecdp.primitives.generators.type.TypeCheckFeatureGenerator'>, <class 'pyrecdp.primitives.gene

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/19 00:12:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
per core memory size is 6.273 GB and shuffle_disk maximum capacity is 8589934592.000 GB


                                                                                

DataframeConvert partition pandas dataframe to spark RDD took 2.885 secs


                                                                                

DataframeTransform took 3.501 secs, processed 10000 rows with num_partitions as 200
DataframeTransform combine to one pandas dataframe took 0.062 secs
transform took 10.461571127176285 sec
transformed shape is (10000, 12)


Unnamed: 0,fare_amount,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,pickup_datetime__day,pickup_datetime__month,pickup_datetime__weekday,pickup_datetime__year,pickup_datetime__hour,pickup_datetime__part_of_day__idx
0,4.5,-73.844311,40.721319,-73.841610,40.712278,1,15,6,0,2009,17,0
1,16.9,-74.016048,40.711303,-73.979268,40.782004,1,5,1,1,2010,16,1
2,5.7,-73.982738,40.761270,-73.991242,40.750562,2,18,8,3,2011,0,2
3,7.7,-73.987130,40.733143,-73.991567,40.758092,1,21,4,5,2012,4,3
4,5.3,-73.968095,40.768008,-73.956655,40.783762,1,9,3,1,2010,7,4
...,...,...,...,...,...,...,...,...,...,...,...,...
9995,16.9,-73.994594,40.732393,-73.974557,40.788259,1,30,1,5,2010,22,6
9996,5.5,-74.001017,40.746352,-73.990873,40.739497,1,23,1,3,2014,18,0
9997,4.5,-74.005530,40.720826,-73.996565,40.716309,1,5,5,3,2011,19,0
9998,8.0,-74.001850,40.745591,-74.006125,40.723338,1,4,8,6,2013,11,7


In [2]:
## Convert

from pyrecdp.utils import create_spark_context
test_sample = ret.sample(frac = 0.1)
train_sample = ret.drop(test_sample.index)
spark = create_spark_context()

with Timer("convert to spark dataframe"):
    train_sparkdf=spark.createDataFrame(train_sample) 
    test_sparkdf=spark.createDataFrame(test_sample) 

### Train

from xgboost.spark import SparkXGBRegressor
from pyspark.ml.feature import VectorAssembler

feature_list = [f"{i}" for i in ret.columns if i != 'fare_amount' and i != 'pickup_datetime__part_of_day']
print(feature_list)
vecAssembler = VectorAssembler(outputCol="features")
vecAssembler.setInputCols(feature_list)

with Timer("vecAssembler.transform"):
    train_sparkdf = vecAssembler.transform(train_sparkdf)
    test_sparkdf = vecAssembler.transform(test_sparkdf)

try:
    xgb_regressor = SparkXGBRegressor(num_workers=2, label_col="fare_amount")
    print("start to fit:")
    with Timer("fit"):
        xgb_regressor_model = xgb_regressor.fit(train_sparkdf)

    print("start to predict:")
    with Timer("predict transform"):
        prediction = xgb_regressor_model.transform(test_sparkdf)
except:
    spark.stop()


# calculate mrse
from pyspark.mllib.evaluation import RegressionMetrics

try:
    valuesAndPreds = prediction.select(['fare_amount', 'prediction'])
    valuesAndPreds = valuesAndPreds.rdd.map(tuple)
    metrics = RegressionMetrics(valuesAndPreds)

    # Squared Error
    print("MSE = %s" % metrics.meanSquaredError)
    print("RMSE = %s" % metrics.rootMeanSquaredError)

    # Mean absolute error
    print("MAE = %s" % metrics.meanAbsoluteError)
except:
    spark.stop()

Will assign 48 cores and 308340 M memory for spark


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


convert to spark dataframe took 15.816643935628235 sec
['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count', 'pickup_datetime__day', 'pickup_datetime__month', 'pickup_datetime__weekday', 'pickup_datetime__year', 'pickup_datetime__hour']
vecAssembler.transform took 0.23228257056325674 sec
start to fit:
23/01/18 21:35:05 WARN TaskSetManager: Stage 0 contains a task of very large size (99086 KiB). The maximum recommended task size is 1000 KiB.


[21:35:24] task 1 got new rank 0                                    (0 + 2) / 2]
[21:35:25] task 0 got new rank 1


fit took 582.3093842621893 sec
start to predict:
predict transform took 0.15532350074499846 sec




23/01/18 21:44:46 WARN TaskSetManager: Stage 3 contains a task of very large size (11012 KiB). The maximum recommended task size is 1000 KiB.
23/01/18 21:44:47 WARN TaskSetManager: Stage 4 contains a task of very large size (11012 KiB). The maximum recommended task size is 1000 KiB.




MSE = 16.02380323707701
RMSE = 4.002974298827937
MAE = 1.8119170106486322




In [5]:
from utils import Timer
import pandas as pd
from sklearn.metrics import mean_squared_error
import lightgbm as lgbm
import numpy as np
import re
           
params = {
        'boosting_type':'gbdt',
        'objective': 'regression',
        'nthread': 4,
        'num_leaves': 31,
        'learning_rate': 0.05,
        'max_depth': -1,
        'subsample': 0.8,
        'bagging_fraction' : 1,
        'max_bin' : 5000 ,
        'bagging_freq': 20,
        'colsample_bytree': 0.6,
        'metric': 'rmse',
        'min_split_gain': 0.5,
        'min_child_weight': 1,
        'min_child_samples': 10,
        'scale_pos_weight':1,
        'zero_as_missing': True,
        'seed':0,
        'num_rounds':1000,
        'num_boost_round': 1000,
        'early_stopping_rounds': 50
    }

test_sample = ret.sample(frac = 0.1)
train_sample = ret.drop(test_sample.index)

x_train = train_sample.drop(columns=['fare_amount'])
y_train = train_sample['fare_amount'].values

x_val = test_sample.drop(columns=['fare_amount'])
y_val = test_sample['fare_amount'].values

lgbm_train = lgbm.Dataset(x_train, y_train, silent=False)
lgbm_val = lgbm.Dataset(x_val, y_val, silent=False)

with Timer("train"):
    model = lgbm.train(params=params, train_set=lgbm_train, valid_sets=lgbm_val, verbose_eval=100)
    
with Timer("predict"):
    pred = model.predict(x_val, num_iteration=model.best_iteration)
    
with Timer("calculate rmse"):
    rmse = np.sqrt(mean_squared_error(y_val, pred))

print('LightGBM RMSE', rmse)



You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 20091
[LightGBM] [Info] Number of data points in the train set: 48884359, number of used features: 10
[LightGBM] [Info] Start training from score 11.324084
Training until validation scores don't improve for 50 rounds
[100]	valid_0's rmse: 4.57764
[200]	valid_0's rmse: 4.22901
[300]	valid_0's rmse: 4.08987
[400]	valid_0's rmse: 4.0124
[500]	valid_0's rmse: 3.96486
[600]	valid_0's rmse: 3.92968
[700]	valid_0's rmse: 3.90212
[800]	valid_0's rmse: 3.87272
[900]	valid_0's rmse: 3.85179
[1000]	valid_0's rmse: 3.83922
Did not meet early stopping. Best iteration is:
[1000]	valid_0's rmse: 3.83922
train took 905.4738801121712 sec
predict took 129.27908113691956 sec
calculate rmse took 0.05247993487864733 sec
LightGBM RMSE 3.8392170960024345
