In [1]:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier  as RF
from sklearn.model_selection import KFold, train_test_split
import datetime

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
def map5eval(preds, dtrain):
    actual = dtrain.get_label()
    predicted = preds.argsort(axis=1)[:,-np.arange(5)]
    metric = 0.
    for i in range(5):
        metric += np.sum(actual==predicted[:,i])/(i+1)
    metric /= actual.shape[0]
    return 'MAP@5', -metric

In [3]:
DATA_FILE = "../data/train.csv"
COLS = ['site_name', 'user_location_region', 'is_package', 'srch_adults_cnt', 'srch_children_cnt','srch_destination_id', 'hotel_market', 'hotel_country', 'hotel_cluster']

In [4]:
params = {}
params['objective'] = 'multi:softprob'
params['eval_metric'] = 'mlogloss'
params['num_class'] = 100

In [14]:
def read_pandas_df(num_rows=100000, chunk_size=100000):
    
    tot_rows = 0
    df_train = pd.DataFrame(columns=COLS)
    train_chunk = pd.read_csv(DATA_FILE, chunksize=chunk_size)
    i = 0
    for chunk in train_chunk:
        df_train = pd.concat([df_train, chunk[chunk['is_booking'] == 1][COLS]])
        tot_rows += df_train.shape[0]
        i = i + 1
        if i % 10 == 0:
            print("Rows loaded: " + str(i / 10) + "mn")
        
        if (num_rows - tot_rows) < 1000:
            break
    
    return df_train

In [23]:
def build_RF_with_sklearn(df):
    for column in df:
        df[column] = df[column].astype(str).astype(int)

    # print(df_train.shape())
    X = df.drop(['hotel_cluster'],axis=1)
    y = df['hotel_cluster'].values
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
    start = datetime.datetime.now()
    clf = RF(n_jobs=-1)
    clf.fit(X_train, y_train)
    pred = clf.predict(X_test)
    end = datetime.datetime.now()
    time_taken = (end-start).total_seconds()/60
    num_rows = X.shape[0]
    print('Training {:,} rows took {} minutes with all cores'.format(num_rows, time_taken))
    return time_taken

In [36]:
pandas_outputs = []

In [37]:
rows = [i*1e6 for i in range(3,20)]
data = []
current_nrows = 0
for r in rows:
    try:
        df = read_pandas_df(num_rows=r, chunk_size=1000000)
        time_taken = build_RF_with_sklearn(df)
        pandas_outputs.append({'TimeTaken': round(time_taken, 4), 'NumRows': df.shape[0]})
    except:
        continue

Training 715,472 rows took 1.0411282833333333 minutes with all cores
Rows loaded: 1.0mn
Training 795,034 rows took 1.0911258666666666 minutes with all cores
Rows loaded: 1.0mn
Training 876,862 rows took 1.2374210499999998 minutes with all cores
Rows loaded: 1.0mn
Training 957,961 rows took 1.32568415 minutes with all cores
Rows loaded: 1.0mn
Training 1,039,190 rows took 1.5135286833333335 minutes with all cores
Rows loaded: 1.0mn
Training 1,120,113 rows took 1.6149491833333334 minutes with all cores
Rows loaded: 1.0mn
Training 1,199,323 rows took 2.0890350833333335 minutes with all cores
Rows loaded: 1.0mn
Training 1,278,563 rows took 2.2376370166666666 minutes with all cores
Rows loaded: 1.0mn
Training 1,356,603 rows took 2.3325253 minutes with all cores
Rows loaded: 1.0mn
Training 1,356,603 rows took 2.3295823500000004 minutes with all cores
Rows loaded: 1.0mn
Training 1,434,612 rows took 2.3590483333333334 minutes with all cores
Rows loaded: 1.0mn
Training 1,513,961 rows took 2.4693

In [None]:
def build_model_with_sparkMLlib(nrows):
    
    # Load and parse the data file, converting it to a DataFrame.
    schema = "`site_name` INT, `user_location_region` INT, `is_package` INT,`srch_adults_cnt` INT, `srch_children_cnt` INT,`srch_destination_id` INT, `hotel_market` INT, `hotel_country` INT,`hotel_cluster` INT"
    sdf = spark.read.schema(schema).csv(data)
    sdf = sdf.dropna()
    sdf_sample = sdf.sample(fraction=0.01).cache()
    
    
    feature_cols = list(set(train_cols)-set(['hotel_cluster']))
    print(feature_cols)
    assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
    transformed_train_data = assembler.transform(sdf_sample).cache()
    
    (train, test) = transformed_train_data.randomSplit([0.8, 0.2])
    
    rf = RandomForestClassifier(labelCol='hotel_cluster', featuresCol='features')
    rf.fit(train)

In [None]:
# create SparkSession
spark = SparkSession.builder.master("local[*]").appName("SimpleApp").getOrCreate()

In [None]:
# Load and parse the data file, converting it to a DataFrame.
schema = "`site_name` INT, `user_location_region` INT, `is_package` INT,`srch_adults_cnt` INT, `srch_children_cnt` INT,`srch_destination_id` INT, `hotel_market` INT, `hotel_country` INT,`hotel_cluster` INT"
sdf = spark.read.schema(schema).csv(data)
sdf = sdf.dropna()
sdf_sample = sdf.sample(fraction=0.01).cache()

In [None]:
feature_cols = list(set(train_cols)-set(['hotel_cluster']))
print(feature_cols)
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
transformed_train_data = assembler.transform(sdf_sample).cache()

In [None]:
(train, test) = transformed_train_data.randomSplit([0.8, 0.2])

In [None]:
rf = RandomForestClassifier(labelCol='hotel_cluster', featuresCol='features')
rf.fit(train)