# Goal of this notebook

This Notebook is the one where we create and save all our stratified samples for the istdaten dataset.

You may want to run the two stratifications on different clusters (run different sessions) because both may not fit in memory (lot of reshuffling unfortunately).

# Init spark

In [None]:
%%local
import os
username = os.environ['JUPYTERHUB_USER']

# set the application name as "<your_gaspar_id>-homework3"
get_ipython().run_cell_magic('configure', line="-f", cell='{ "name":"%s-finalproject3", "executorMemory":"6G", "executorCores":4, "numExecutors":10, "driverMemory": "4G" }' % username)

In [None]:
%%send_to_spark -i username -t str -n username

In [None]:
print('We are using Spark %s' % spark.version)

In [None]:
spark.sparkContext.addPyFile('hdfs:///user/boesinge/finalproject/data_utils.py')

## Imports

In [4]:
import data_utils
import pandas as pd
import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pyspark

from pyspark.ml.feature import StringIndexer
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler
from scipy.stats import gamma, zscore

from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.sql.window import Window

from functools import reduce
from pyspark.sql import DataFrame

import matplotlib
matplotlib.use('agg')
import matplotlib.pylab as plt

from pandas.compat import StringIO
pd.set_option('display.max_columns', 500)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load Istdaten

In [5]:
istdaten_df = sqlContext.read.format("orc").load('/data/sbb/orc/istdaten')

# by looking at the arrival and departure values we deduce the following formats
scheduled_format = "dd.MM.yyyy HH:mm"
actual_format   = "dd.MM.yyyy HH:mm:ss"

istdaten_df = (istdaten_df
    .withColumnRenamed('BETRIEBSTAG'        , 'trip_date')
    .withColumnRenamed('FAHRT_BEZEICHNER'   , 'trip_id')
    .withColumnRenamed('BETREIBER_ABK'      , 'operator_smallname')
    .withColumnRenamed('BETREIBER_NAME'     , 'operator_name')
    .withColumnRenamed('PRODUKT_ID'         , 'transport_type')
    .withColumnRenamed('LINIEN_ID'          , 'line_id')
    .withColumnRenamed('LINIEN_TEXT'        , 'line_text')
    .withColumnRenamed('VERKEHRSMITTEL_TEXT', 'service_type')
    .withColumnRenamed('ZUSATZFAHRT_TF'     , 'additional_trip')
    .withColumnRenamed('FAELLT_AUS_TF'      , 'trip_failed')
    .withColumnRenamed('HALTESTELLEN_NAME'  , 'stop_name')
    .withColumnRenamed('ANKUNFTSZEIT'       , 'scheduled_arrival_time')
    .withColumnRenamed('AN_PROGNOSE'        , 'actual_arrival_time')
    .withColumnRenamed('AN_PROGNOSE_STATUS' , 'actual_arrtime_measured')
    .withColumnRenamed('ABFAHRTSZEIT'       , 'scheduled_departure_time')
    .withColumnRenamed('AB_PROGNOSE'        , 'actual_departure_time')
    .withColumnRenamed('AB_PROGNOSE_STATUS' , 'actual_deptime_measured')
    .withColumnRenamed('DURCHFAHRT_TF'      , 'not_stopping_here')
    .withColumnRenamed('BPUIC'              , 'stop_id')
    .withColumnRenamed('BETREIBER_ID'       , 'operator_id')
    .withColumnRenamed('UMLAUF_ID'          , 'circuit_id')
    .withColumn("actual_arrival_time",F.unix_timestamp('actual_arrival_time', actual_format))\
    .withColumn("scheduled_arrival_time",F.unix_timestamp('scheduled_arrival_time', scheduled_format))\
    .withColumn("actual_departure_time",F.unix_timestamp('actual_departure_time', actual_format))\
    .withColumn("scheduled_departure_time",F.unix_timestamp('scheduled_departure_time', scheduled_format))
    .filter(F.col('additional_trip') == False)
    .filter(F.col('trip_failed') == False)
    .filter(F.col('actual_deptime_measured').isin(['REAL', 'GESCHAETZT']))
    .filter(F.col('actual_arrtime_measured').isin(['REAL', 'GESCHAETZT']))
    .filter(F.col('service_type').isin(data_utils.istdaten_to_groups.keys()))                              # Keep only transport types which are useful to us
    .filter(F.dayofweek(F.from_unixtime("scheduled_departure_time")).isin(['2','3','4','5','6'])) # Filter only weekdays
    .filter(F.hour(F.from_unixtime("scheduled_arrival_time"))<=24)                                # Keep only schedules in a single day
    .filter(~(F.col('transport_type').isNull()))                                                  # Keep only nonnull service and transport types
    .filter(~(F.col('service_type').isNull()))
    .withColumn('transport_group', data_utils.istdaten_group('service_type'))
    .withColumn('stop_id', data_utils.normalize_id('stop_id'))
)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Sampling some data from each transport category

In [None]:
istdaten_sampled = data_utils.strat_sample(istdaten_df, 'transport_group', 100000)
istdaten_sampled.write.save("/user/boesinge/finalproject/istdaten_transport_group.parquet",mode="overwrite")

## Compute arrival departure edges

In [None]:
delays = data_utils.compute_delays(data_utils.compute_arrival_departure(istdaten_df))

## Compute the arrival delays

In [None]:
delays = delays.withColumn('time_category', data_utils.time_cat(F.hour(F.from_unixtime('scheduled_arrival_time'))))

## Identify the transport_group and time_category

In [None]:
merged_col = delays.withColumn('fullgroup', F.concat_ws(',', F.col('transport_group'), F.col('time_category')))

## Sampling and saving 

In [None]:
istdaten_sampled_full = data_utils.strat_sample(merged_col, 'fullgroup', 1000)

In [7]:
istdaten_sampled_full.write.save("/user/boesinge/finalproject/istdaten_sampled.parquet",mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…