In [None]:
data_raw = sc.textFile('data/201508_trip_data.csv.gz')

In [None]:
data_raw.take(5)

In [None]:
from pyspark.sql import Row

In [None]:
durations = spark.createDataFrame(data_raw
             .filter(lambda x: not x.startswith('Trip'))
             .map(lambda x: x.split(','))
             .map(lambda x: Row(trip_id = int(x[0]), duration = int(x[1]))))

In [None]:
from pyspark.sql.functions import mean, stddev, udf
from pyspark.sql.types import FloatType

In [None]:
stats = durations.select(stddev(durations.duration).alias('sd'),
                         mean(durations.duration).alias('mu')).collect()

In [None]:
sd, mu = stats[0]

In [None]:
z_vals = durations.select(durations.trip_id,
                 udf(lambda x: (x - mu)/sd, FloatType())('duration').alias('z'))

In [None]:
import numpy as np

In [None]:
zs = np.array([i[0] for i in z_vals.select('z').collect()])

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
plt.hist(zs[zs<5],bins=30)
plt.yscale('log')
None

In [None]:
dates = (data_raw
         .map(lambda x: x.split(','))
         .map(lambda x: x[2]))

In [None]:
dates.take(5)

In [None]:
from datetime import datetime

In [None]:
def convert_date(date_str):
    try:
        return datetime.strptime(date_str, '%m/%d/%Y %H:%M')
    except ValueError:
        return None

In [None]:
dates = (dates
        .map(lambda x: convert_date(x))
        .filter(lambda x: x != None))

In [None]:
dates.take(5)

In [None]:
from operator import add

In [None]:
def seqOp(d, (h, c)):
    d[h] = c
    return d
def combOp(d1, d2):
    d1.update(d2)
    return d1

In [None]:
hourly_counts = (
    dates
    .map(lambda x: ((x.year, x.month, x.day, x.hour), 1))
    .reduceByKey(add)
    .map(lambda ((y, m, d, h), c): ((y, m, d), (h, c)))
    .aggregateByKey({}, seqOp, combOp))

In [None]:
from pyspark.ml.linalg import Vectors

In [None]:
X = (hourly_counts
     .map(lambda (day, d): (day, Vectors.dense([d.get(i, 0) for i in range(24)]))))

In [None]:
X.cache()

In [None]:
from pyspark.sql import Row

In [None]:
Xdf = spark.createDataFrame(X.map(lambda x: Row(features = x[1], day=x[0])))

In [None]:
from pyspark.ml.clustering import KMeans

In [None]:
X_norm.cache()
Xdf.cache()

In [None]:
ks = range(2, 10)
costs = []
for k in ks:
    costs.append(KMeans().setK(k).fit(Xdf).computeCost(Xdf))

In [None]:
plt.plot(ks, costs)

In [None]:
model = KMeans().setK(4).fit(Xdf)

In [None]:
centers = [Vectors.dense(x) for x in model.clusterCenters()]

In [None]:
predictions = model.transform(Xdf)
predictions.show(5)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, DateType, IntegerType, StringType
from pyspark.ml.linalg import VectorUDT

In [None]:
from calendar import day_abbr, month_abbr
from datetime import date
def get_weekday(d):
    return day_abbr[date(*d).weekday()]

In [None]:
per_weekday = predictions.select(udf(get_weekday, StringType())(predictions.day).alias('weekday'),
                  predictions.prediction.alias('class'))

In [None]:
per_weekday.show(5)

In [None]:
per_weekday.groupby('class', 'weekday').count().orderBy(per_weekday['class']).show()

In [None]:
df = per_weekday.groupby('class', 'weekday').count().orderBy(per_weekday['class']).toPandas()

In [None]:
df.head()

In [None]:
df.pivot_table(index='class', columns='weekday', values='count').fillna(0).plot.bar()

In [None]:
per_month = predictions.select(udf(lambda x: month_abbr[x[1]],
                                   StringType())(predictions.day).alias('month'),
                  predictions.prediction.alias('class'))

In [None]:
mdf = per_month.groupBy('month', 'class').count().toPandas()

In [None]:
mdf.head()

In [None]:
mdf.pivot_table(index='class', values='count', columns='month').plot.bar()
plt.legend(bbox_to_anchor=(1,1))

In [None]:
for n, center in enumerate(centers):
    plt.plot(center, label='class {}'.format(n))
plt.legend()

In [None]:
def dist_from_center(x, cluster):
    return float((x - centers[cluster]).norm(2))

In [None]:
d_udf = udf(dist_from_center, DoubleType())

In [None]:
dists = predictions.select(d_udf(predictions['features'], predictions['prediction']).alias('dist'),
                           predictions.day, predictions.prediction)

In [None]:
dists.orderBy(dists.dist.desc()).show(5)

In [None]:
from pyspark.sql.functions import stddev, mean

In [None]:
dist_stats = dists.groupby('prediction').agg(stddev('dist').alias('sd'), mean('dist').alias('mu'))

In [None]:
dist_stats.show()

In [None]:
joined = dists.join(dist_stats, dist_stats.prediction == dists.prediction).drop(dist_stats.prediction)
joined.show()

In [None]:
z_vals = joined.select(udf(lambda d, mu, sd: abs(d - mu)/sd, FloatType())(
    joined.dist, joined.mu, joined.sd).alias('z'), joined.day, "prediction")

In [None]:
z_vals.orderBy(z_vals.z.desc()).show(5)

In [None]:
for day, features, cluster in predictions.rdd.filter(lambda x :x.day in (Row(2015,6,1),
                                                                         Row(2014,10,20))).collect():
    plt.plot(features, label='outlier: {}'.format(day))
plt.plot(centers[cluster], label='cluster mean')
plt.legend(loc='lower left')
plt.xlabel('hour')
plt.ylabel('trip count')