# Linear Regression Example

This example analyzes the bike sharing data set, which contains information of how many bikes have been rented on specific days. In addition to the raw numbers, the data set also contains information about the weather and holidays.

Using a linear regression, we try to train a model which can be used to predict the number of bikes rented.

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Load Data from File

The data is stored in a CSV file. We first load every column as a String type and then we cast the columns into numeric data types. This approach prevents errors if contents cannot be interpreted as a number. In these cases columns will contain `NaN` values.

In [None]:
schema = StructType([
    StructField('row_id',StringType(),True),
    StructField('date',StringType(), True),
    StructField('season',StringType(), True),
    StructField('year',StringType(), True),
    StructField('month',StringType(), True),
    StructField('hour',StringType(), True),
    StructField('holiday',StringType(), True),
    StructField('weekday',StringType(), True),
    StructField('workingday',StringType(), True),
    StructField('weather',StringType(), True),
    StructField('temperature',StringType(), True),
    StructField('apparent_temperature',StringType(), True),
    StructField('humidity',StringType(), True),
    StructField('wind_speed',StringType(), True),
    StructField('casual',StringType(), True),
    StructField('registered',StringType(), True),
    StructField('counter',StringType(), True)
    ])

raw_data = spark.read \
    .schema(schema) \
    .csv('s3://dimajix-training/data/bike-sharing/hour_nohead.csv')

data = raw_data.select(
    raw_data.row_id.cast('int'),
    raw_data.date.cast('string'),
    unix_timestamp(raw_data.date, "yyyy-MM-dd").alias('ts'),
    raw_data.season.cast('double'),
    raw_data.year.cast('double'),
    raw_data.month.cast('double'),
    raw_data.hour.cast('double'),
    raw_data.holiday.cast('double'),
    raw_data.weekday.cast('double'),
    raw_data.workingday.cast('double'),
    raw_data.weather.cast('double'),
    raw_data.temperature.cast('double'),
    raw_data.apparent_temperature.cast('double'),
    raw_data.humidity.cast('double'),
    raw_data.wind_speed.cast('double'),
    raw_data.casual.cast('double'),
    raw_data.registered.cast('double'),
    raw_data.counter.cast('double')
    )

# Inspect Data

Now that we have loaded the data let's peek inside the DataFrame.

In [None]:
# Inspect first 10 entries of the DataFrame
# YOUR CODE HERE

# Make some Pictures

Just t get a rough feeling about the data, we make some pictures of the number of rented bikes against the time.

First we need to import matplotlib.pyplot and also make all plots appear inline in the notebook

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

## Make a Plot of Rents per Day
The original data contains rents per hour, we want to have the data per day

In [None]:
daily = data\
    .groupBy('ts').sum('counter') \
    .orderBy('ts')

# Convert to Pandas    
pdf = daily.toPandas()

# Make a Plot
plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(pdf['ts'],pdf['sum(counter)'])

In [None]:
tmp = data \
    .groupBy('ts').sum('casual') \
    .orderBy('ts')
    
pdf = tmp.toPandas()

plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(pdf['ts'],pdf['sum(casual)'])

In [None]:
tmp = data \
    .groupBy('ts').sum('registered') \
    .orderBy('ts')
    
pdf = tmp.toPandas()

plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(pdf['ts'],pdf['sum(registered)'])    

# Use VectorAssembler

Most Spark ML methods require one feature column of type `Vector`. In order to generate this feature column from the raw data, Spark provides a `VectorAssembler` which assembles one feature column from arbitrary source columns. The source columns have to be of type `double`.

We use it to automatically extract the columns

    season, year, month, hour, holiday, weekday, workingday, weather, 
    temperature, apparent_temperature, humidity, wind_speed
    
into the new output column 'features'

In [None]:
from pyspark.ml.feature import *

tx = VectorAssembler(inputCols=[
        'season',
        'year',
        'month',
        'hour',
        'holiday',
        'weekday',
        'workingday',
        'weather',
        'temperature',
        'apparent_temperature',
        'humidity',
        'wind_speed'],
        outputCol='features')
td = # YOUR CODE HERE

## Split Train and Test Data

Since we found an easier way to generate features, we split incoming data first and apply the VectorAssembler

In [None]:
train_data, test_data = ... # YOUR CODE HERE

# Print sizes of training and testing sets
# YOUR CODE HERE

## Perform Regression

1. Apply VectorAssembler
2. Perform Fitting

In [None]:
from pyspark.ml.regression import *

asm = # YOUR CODE HERE
regression = # YOUR CODE HERE
model = # YOUR CODE HERE

## Predict

Make predictions from test data and print some results. We use the `test_data` DataFrame (which was not used during training). Since this DataFrame does not already contain the feature column, we also need to apply the previously configured `VectorAssembler`.

In [None]:
# Create features using the VectorAssembler
test_features = # YOUR CODE HERE

# Transform the resulting DataFrame using the trained model
prediction = # YOUR CODE HERE

# Print result
# YOUR CODE HERE

## Evaluation

Finally lets evaluate the prediction

In [None]:
from pyspark.ml.evaluation import *

# Create evaluator instance
evaluator = # YOUR CODE HERE

# Evaluate predictions
# YOUR CODE HERE

# Adding More Features

We might want to add more features in order to improve prediction quality. We propose the following additional features:

1. Features for modelling period effects of a year. This can be done by adding the two features:
        sin(ts / 31536000 * 6.28318531) 
        cos(ts / 31536000 * 6.28318531)
2. Similarily for modelling periodic effects within a week, the following features can be used:
        sin(weekday / 7 * 6.28318531)
        cos(weekday / 7 * 6.28318531)
3. And for modelling periodic effects within a single day the following features can be used:
        sin(hour / 24 * 6.28318531)
        cos(hour / 24 * 6.28318531)
4. season, one-hot encoded
5. weather, one-hot encoded

You can use SQLTransformer for arithmetic transformations and a combination of

    StringIndexer(inputCol='categoricalFeature', outputCol='categoricalIndex')
    OneHotEncoder(inputCol='categoricalIndex', outputCol='categoricalOneHot')
    
for creating one hot encoded categorical features.

We now have a lot of transformations, all of them need to be applied to both the training data and also to the test data. A `Pipeline` can be used to encapsulate multiple feature extraction and model training steps into a single object, which will train also a single pipeline model.

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages= [
    SQLTransformer(statement="""
        SELECT 
            *,
            sin(ts / 31536000 * 6.28318531) as ts_sin,
            cos(ts / 31536000 * 6.28318531) ts_cos, 
            sin(weekday / 7 * 6.28318531) as wd_sin, 
            cos(weekday / 7 * 6.28318531) as wd_cos,
            sin(hour / 24 * 6.28318531) as hour_sin, 
            cos(hour / 24 * 6.28318531) as hour_cos 
        FROM __THIS__
    """),
    StringIndexer(inputCol='season',outputCol='iseason'),
    OneHotEncoder(inputCol='iseason',outputCol='vseason'),
    StringIndexer(inputCol='weather',outputCol='iweather'),
    OneHotEncoder(inputCol='iweather',outputCol='vweather'),
    VectorAssembler(inputCols=['ts_sin','ts_cos','wd_sin','wd_cos','hour_sin','hour_cos','year','month','hour','holiday','weekday','workingday','temperature','apparent_temperature','humidity','wind_speed','vseason'],outputCol='features'),
    LinearRegression(featuresCol='features', labelCol='counter', predictionCol='prediction'),
])

pipeline_model = # YOUR CODE HERE

In [None]:
# Transform the test_data DataFrame using the trained pipeline model
prediction = # YOUR CODE HERE

# Evlaute pipeline model
print(evaluator.evaluate(prediction))