# Data ingestion

In [1]:
!pip freeze

alembic==0.8.10
apache-airflow==1.9.0
avro==1.8.2
backports-abc==0.5
backports.functools-lru-cache==1.4
backports.shutil-get-terminal-size==1.0.0
backports.weakref==1.0.post1
beautifulsoup4==4.6.0
bleach==1.5.0
brewer2mpl==1.4.1
bs4==0.0.1
cachetools==2.0.1
certifi==2017.11.5
chardet==3.0.4
click==6.7
configparser==3.5.0
crcmod==1.7
croniter==0.3.20
cycler==0.10.0
datalab==1.1.0
decorator==4.1.2
dill==0.2.6
docutils==0.14
entrypoints==0.2.3
enum34==1.1.6
Flask==0.11.1
Flask-Admin==1.4.1
Flask-Cache==0.13.1
Flask-Login==0.2.11
flask-swagger==0.2.13
Flask-WTF==0.14
funcsigs==1.0.0
functools32==3.2.3.post2
future==0.16.0
futures==3.0.5
gapic-google-cloud-datastore-v1==0.15.3
gapic-google-cloud-error-reporting-v1beta1==0.15.3
gapic-google-cloud-logging-v2==0.91.3
geojson==2.3.0
ggplot==0.6.8
gitdb2==2.0.3
GitPython==2.1.8
gmaps==0.7.0
google-api-core==0.1.3
google-api-python-client==1.6.2
google-apitools==0.5.10
google-auth==1.3.0
google-auth-

In [2]:
from datalab import bigquery as bq
from datalab import storage

In [3]:
table = bq.Table('taxi_demo.taxi_data') # you must first go to bigquery, create project, make a query and save the results as a table under the project
gcs_temp_bucket = 'david-taxi-demo'
gcs_temp_dir = 'taxi-data/'
gcs_temp_uri = 'gs://{}/{}'.format(gcs_temp_bucket, gcs_temp_dir)

# Extract Bigquery data
if not list(storage.Bucket(gcs_temp_bucket).items(prefix=gcs_temp_dir)):
  table.extract(destination = gcs_temp_uri + '*.csv.gz', compress=True)

In [4]:
# Load Data into Spark

df =spark.read.format('csv') \
              .options(header='true') \
              .options(inferSchema='true') \
              .load(gcs_temp_uri)
df.printSchema()
# df.takePandas()

root
 |-- pickup_datetime: string (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)



# Data Prep

In [5]:
from pyspark.sql.functions import unix_timestamp

typed_df = df.select(
  unix_timestamp(df.pickup_datetime).cast('timestamp').alias('pickup_timestamp'),
  'pickup_latitude',
  'pickup_longitude') \
  .drop('pickup_datetime').drop('dropoff_datetime') \
  .cache()

typed_df.printSchema()
typed_df.toPandas()

root
 |-- pickup_timestamp: timestamp (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)



Unnamed: 0,pickup_timestamp,pickup_latitude,pickup_longitude
0,2010-03-11 15:15:00,40.758160,-73.937427
1,2010-02-17 02:58:08,40.714128,-74.011567
2,2010-03-19 04:38:43,40.726548,-73.989161
3,2013-08-27 22:28:14,40.757815,-73.969634
4,2010-03-15 22:27:30,40.754440,-73.989572
5,2009-01-17 10:58:42,40.752675,-73.950507
6,2009-01-25 19:19:15,40.811989,-73.929824
7,2009-01-18 15:41:53,40.732855,-74.000064
8,2010-08-01 10:03:00,40.758392,-73.937222
9,2009-09-22 06:45:21,40.703524,-74.009184


In [6]:
%%sql
SELECT COUNT(*) FROM [taxi_demo.taxi_data]
-- Demo you can write bigquery in datalab

f0_
100000


In [7]:
typed_df.registerTempTable('taxi')
spark.sql("""
SELECT COUNT(*) from taxi
""").toPandas()

Unnamed: 0,count(1)
0,100000


In [8]:
from pyspark.sql.functions import log

bucket_df = spark.sql("""
SELECT
    *,
    HOUR(pickup_timestamp) AS pickup_hour,
    INT(DATE_FORMAT(pickup_timestamp, 'u')) AS day_of_week,
    ROUND(pickup_latitude, 3) AS lat,
    ROUND(pickup_longitude, 3) AS long
FROM taxi
""").cache()

traffic_df = bucket_df.groupby('pickup_hour', 'day_of_week', 'lat', 'long').count()
traffic_df = traffic_df.withColumn('log_pickups', log(traffic_df['count']))
traffic_df.printSchema()
traffic_df.cache().describe().toPandas()

root
 |-- pickup_hour: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- count: long (nullable = false)
 |-- log_pickups: double (nullable = true)



Unnamed: 0,summary,pickup_hour,day_of_week,lat,long,count,log_pickups
0,count,79297.0,79297.0,79297.0,79297.0,79297.0,79297.0
1,mean,13.333530902808429,4.040682497446309,40.7540173020417,-73.9778562997339,1.261081755930237,0.1582473364783048
2,stddev,6.467981196182328,1.9810529272219044,0.0245775834906546,0.0240484496377034,0.6465106293271085,0.3400898773154197
3,min,0.0,1.0,40.333,-74.313,1.0,0.0
4,max,23.0,7.0,41.066,-73.518,20.0,2.995732273553991


# Spark MLLib API

### Transformers
### Estimators
### Model
### Pipelines

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# Create toy training data
training = spark.createDataFrame(
  [
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)],
  ["id", "text", "label"])

# Configure an ML pipeline, with 3 stages: tokenizer, hashingTF, and logisticRegression

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])


# Train the model
model = pipeline.fit(training)

# Create toy test data
test = spark.createDataFrame(
  [
    (4, "spark, u j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadop")
  ],
  ["id", "text"])

prediction = model.transform(test)
print('prediction: {}'.format(prediction))

prediction: DataFrame[id: bigint, text: string, words: array<string>, features: vector, rawPrediction: vector, probability: vector, prediction: double]


# Random Forest Regression

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor

label_column = 'log_pickups'
feature_column = 'features'

# Split data into train, validation and test set
training, validation, test = traffic_df.randomSplit([.33, .33, .33])

# Assemble features into a vector column
assembler = VectorAssembler(
  inputCols=['pickup_hour', 'day_of_week', 'lat', 'long'],
  outputCol=feature_column
)

# Instantiate a Random Forest Regressor
rf = RandomForestRegressor(
  featuresCol=assembler.getOutputCol(),
  labelCol=label_column
)

# Build a pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Run the pipeline
model = pipeline.fit(training)
predictions = model.transform(validation)

In [11]:
predictions.select('prediction', label_column, feature_column) \
           .sample(False, 0.1) \
           .toPandas()

Unnamed: 0,prediction,log_pickups,features
0,0.116203,0.000000,"[0.0, 3.0, 40.758, -73.99]"
1,0.145062,0.000000,"[14.0, 5.0, 40.72, -73.99]"
2,0.193374,0.000000,"[15.0, 5.0, 40.775, -73.967]"
3,0.189222,0.000000,"[15.0, 7.0, 40.78, -73.96]"
4,0.193433,1.609438,"[18.0, 3.0, 40.734, -73.99]"
5,0.052273,0.000000,"[21.0, 7.0, 40.714, -73.96]"
6,0.174991,0.000000,"[22.0, 2.0, 40.738, -74.002]"
7,0.195866,0.000000,"[22.0, 3.0, 40.759, -73.966]"
8,0.084353,0.000000,"[3.0, 7.0, 40.715, -73.992]"
9,0.150730,0.000000,"[5.0, 7.0, 40.747, -73.988]"


# Evaluate model

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator

r2_evaluator = RegressionEvaluator(
  labelCol=label_column, predictionCol='prediction', metricName='r2'
)

rmse_evaluator = RegressionEvaluator(
  labelCol=label_column, predictionCol='prediction', metricName='rmse'
)

print('r2: {}'.format(r2_evaluator.evaluate(predictions)))
print('r2: {}'.format(rmse_evaluator.evaluate(predictions)))

r2: 0.0294582456608
r2: 0.333937518821


# Visualise our predictions

In [13]:
!bash -c 'pip install gmaps -r <(pip freeze)' > /dev/null
!jupyter nbextension enable --py --sys-prefix gmaps

[31mDouble requirement given: gmaps==0.7.0 (from -r /dev/fd/63 (line 43)) (already in gmaps, name='gmaps')[0m
Enabling notebook extension jupyter-gmaps/extension...
      - Validating: [32mOK[0m


In [16]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

def repeat(df, col='predicted_pickups', tmp_col='_list_col'):
  col_range = udf(lambda n: range(n), ArrayType(IntegerType(), False))
  tmp_df = df.withColumn(tmp_col, col_range(df[col].cast('int')))
  return tmp_df.withColumn('count', explode(tmp_col))

def fuzz(col, precision=3):
  return col + (rand() / (10 ** precision))

def simulate(day, hour, sample_rate=.01):
  tmp_df = predictions.filter(
    (predictions.day_of_week == day) & (predictions.pickup_hour == hour)) \
    .withColumn('predicted_pickups', exp(predictions.prediction))
    
  simulated_df = repeat(tmp_df).sample(False, sample_rate).select('lat', 'long')
  return simulated_df.select(
    fuzz(simulated_df.lat).alias('lat'),
    fuzz(simulated_df.long).alias('long')
  )

actual_validation = bucket_df.sample(False, 0.333).cache()

def get_actual(day, hour, sample_rate=.01):
  return actual_validation.filter(
    (actual_validation.day_of_week == day) & (actual_validation.pickup_hour == hour)
  ).sample(False, sample_rate) \
  .select(
    actual_validation.pickup_latitude.alias('lat'),
    actual_validation.pickup_longitude.alias('long')
  )
  

### Predicted Thursday afternoon rush hour pickups

In [None]:
import gmaps 

gmaps.configure(api_key="AIzaSyDwk3ohLTspbB6X53zmsUvzUbROHeR8c9U")

m = gmaps.Map()
predicted = simulate(day=4, hour=17).toPandas()
m.addLayer(gmaps.heatmap_layer(predicted))
m

### Actual Thursday afternoon rush hour pickups

In [None]:
import gmaps 

m = gmaps.Map()
actual = get_actual(day=4, hour=17).toPandas()
m.addLayer(gmaps.heatmap_layer(actual))
m

# Iteration 2: Random Forest with parameter tuning

In [None]:
# The parameters which we can tune in a random forest model:
import pprint
pprint.pprint({k.name: v for k,v in rf.extractParamMap().items()})

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

tvTraining = training.sample(False, 0.01)

# Build param grid
paramGrid = ParamGridBuilder().addGrid(rf.maxDepth, [5, 15]) \
                              .addGrid(rf.numTrees, [5, 20]) \
                              .build()
    
# Run automate train validation split over grid
tvSplit = TrainValidationSplit(
  estimator=pipeline,
  estimatorParamMaps=paramGrid,
  evaluator=r2_evaluator
)

# Select the best model
model = tvSplit.fit(tvTraining).bestModel
predictions = model.transform(validation)

print('r2: {}'.format(r2_evaluator.evaluate(predictions)))
print('rmse: {}'.format(rmse_evaluator.evaluate(predictions)))

# Save model

In [None]:
model.save('gs://{}/model/'.format(gcs_temp_bucket))
test.write.save('gs://{}/test-data/'.format(gcs_temp_bucket))

# Load model

In [None]:
from pyspark.ml import PipelineModel

model = PipelineModel.load('gs://{}/model/'.format(gcs_temp_bucket))
data = spark.read.load('gs://{}/test-data/'.format(gcs_temp_bucket))

In [None]:
predictions = model.transform(data)
predictions.toPandas()

print('r2: {}'.format())