# Spark Playground

## Preparation

In [1]:
import math
import datetime

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import min
from pyspark.sql.types import StringType, BooleanType, TimestampType
from pyspark.sql.functions import udf
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from pyspark.mllib.evaluation import RegressionMetrics

import geohash

In [None]:
CSV_FILE_PATH = '../data/yellow_sample_01.csv'

### Configure Spark

In [None]:
conf = (SparkConf().setMaster("local[*]").setAppName('pyspark'))
conf.set('spark.executor.memory', '4g')
conf.set('spark.eventLog.enabled', 'true')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)

### Read CSV

In [3]:
df = sql_context.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(CSV_FILE_PATH)

### Preprocess Data

In [None]:
# Filter invalid coordinates
def is_float(value):
  try:
    float(value)
    return True
  except ValueError:
    return False
is_float_udf = udf(is_float, BooleanType())
df = df.filter((is_float_udf(df.Start_Lon)) & (is_float_udf(df.Start_Lat)) & (df.Start_Lon >= -80) & (df.Start_Lon <= -70) & (df.Start_Lat >= 40) & (df.Start_Lat <= 50))

# Parse date times
parse_data_udf = udf(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S'), TimestampType())
df = df.withColumn('Trip_Pickup_DateTime', parse_data_udf(df.Trip_Pickup_DateTime))
df = df.withColumn('Trip_Dropoff_DateTime', parse_data_udf(df.Trip_Dropoff_DateTime))

### Discretize Coordinates

In [None]:
geohash_udf = udf(lambda lat,lon: geohash.encode(float(lat), float(lon), 8), StringType())
discretized_df = df.withColumn('Start_Geohash', geohash_udf(df.Start_Lat, df.Start_Lon))

## Regression

In [None]:
def extract_feature(row):
    date = row.Trip_Pickup_DateTime
    return Row(geohash=row.Start_Geohash, 
               hour=date.hour, 
               day=date.day,
               month=date.month,
               year=date.year)

feature_df = discretized_df.map(extract_feature).toDF()
grouped_feature_df = feature_df.groupby('hour', 'day', 'month', 'year', 'geohash').count().withColumnRenamed('count', 'pickup_count')

In [None]:
def create_point(row):
    lat, lon = geohash.decode(row.geohash)
    day_of_week = datetime.date(row.year, row.month, row.day).weekday()
    
    return LabeledPoint(row.pickup_count, [lat, lon, row.hour, row.day, row.month, row.year, day_of_week])

points = grouped_feature_df.map(create_point)
(training_data, test_data) = points.randomSplit([0.7, 0.3])

In [None]:
def print_evaluation(test_data, model):
    values_and_preds = test_data.map(lambda p: (float(model.predict(p.features)), p.label))
    metrics = RegressionMetrics(values_and_preds)
    
    print('Test Mean Squared Error = %s' % metrics.meanSquaredError)
    print('Test Root Mean Squared Error = %s' % metrics.rootMeanSquaredError)
    print('Explained Variance = %s' % metrics.explainedVariance)

### Linear regression

In [None]:
lr_model = LinearRegressionWithSGD.train(training_data, iterations=100, step=0.00000001)
print_evaluation(test_data, lr_model)

### Random Forrest Regression

In [None]:
knn_model = RandomForest.trainRegressor(training_data, categoricalFeaturesInfo={},
                                        numTrees=3, featureSubsetStrategy="auto",
                                        impurity='variance', maxDepth=4, maxBins=32)
print_evaluation(test_data, knn_model)

### *TODO* K-Nearest Neighbors regression