In [1]:
import pyspark
import matplotlib.pyplot as plt
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

#MLlib
from pyspark.ml.regression import LinearRegression, LinearRegressionModel

In [2]:
def print_train_info(model):
    print("Model:")
    print("Coefficients: %s" % str(model.coefficients))
    print("Intercept: %s" % str(model.intercept))
    print("  ")
    print("Model info")
    trainingSummary = model.summary
    print("numIterations: %d" % trainingSummary.totalIterations)
    print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
    print("MAE: %f" % trainingSummary.meanAbsoluteError)
    print("r2: %f" % trainingSummary.r2)

def print_eval_info(model_eval):
    print("Eval info:")
    print("RMSE: %f" % model_eval.rootMeanSquaredError)
    print("MAE: %f" % model_eval.meanAbsoluteError)
    print("r2: %f" % model_eval.r2)

In [13]:
def read_data(sqlContext, filepath = "data/curitiba/prediction_data.csv"):
    df = sqlContext.read.format("csv").option("header", "true").option("inferSchema","true").load(filepath)

    df = df.withColumn('date_timestamp', df['date'].cast('Integer'))
    
    return df

In [8]:
def data_pre_proc(df, string_columns = ["route", "week_day", "difference_previous_schedule", "difference_next_schedule"],
                 features = ["route_index", "date_timestamp", "week_day_index", "group_15_minutes", "difference_next_schedule_index", "difference_previous_schedule_index"]):
    indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in string_columns]
    pipeline = Pipeline(stages=indexers)
    df_r = pipeline.fit(df).transform(df)
    
    assembler = VectorAssembler(
    inputCols=features,
    outputCol='features')

    assembled_df = assembler.transform(df_r)
    
    return assembled_df

In [11]:
def train_duration_model(training_df):
    duration_lr = LinearRegression(maxIter=10, regParam=0.01, elasticNetParam=1.0).setLabelCol("duration").setFeaturesCol("features")

    duration_lr_model = duration_lr.fit(training_df)
    
    return duration_lr_model

In [18]:
def train_crowdedness_model(training_df):
    crowdedness_lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=1).setLabelCol("totalpassengers").setFeaturesCol("features")

    crowdedness_lr_model = crowdedness_lr.fit(training_df)
    
    return crowdedness_lr_model

In [21]:
def predict(model, test):
    evaluated_data = model.evaluate(test)
    return evaluated_data

In [26]:
def save_model(model, filepath):
    model.write().overwrite().save(filepath)

In [27]:
def load_model(filepath, model = LinearRegressionModel):
    return model.load(filepath)

Main:

In [None]:
sc = pyspark.SparkContext('local[*]')
sqlContext = pyspark.SQLContext(sc)

In [22]:
data = read_data(sqlContext)

preproc_data = data_pre_proc(data)

train, test = preproc_data.randomSplit([0.6, 0.4], seed=0)

print "Duration model"
duration_model = train_duration_model(train)
print_train_info(duration_model)

duration_model_eval = predict(duration_model, test)
print_eval_info(duration_model_eval)
loaded_model = load_model("data/models/duration_lasso_model")
print ""

print "Crowdedness model"
crowdedness_model = train_crowdedness_model(train)
print_train_info(crowdedness_model)

crowdedness_model_eval = predict(crowdedness_model, test)
print_eval_info(crowdedness_model_eval)
loaded_model = load_model("data/models/crowdedness_lasso_model")

Duration model
Model:
Coefficients: [-0.0700377141175,3.00967333739e-07,-0.531780521136,-2.41339298608e-05,-0.000596150175885,-0.00053062218407]
Intercept: -401.380429639
  
Model info
numIterations: 11
RMSE: 20.849891
MAE: 13.726247
r2: 0.054802
Eval info:
RMSE: 20.260875
MAE: 13.681778
r2: 0.053832

Crowdedness model
Model:
Coefficients: [0.00915223769642,0.0,-1.58388101245,-0.000145362432559,4.31352067881e-06,0.000313375439569]
Intercept: 21.1096148246
  
Model info
numIterations: 11
RMSE: 14.368144
MAE: 9.658412
r2: 0.093718
Eval info:
RMSE: 14.005498
MAE: 9.558157
r2: 0.095711
