In [2]:
from pyspark.sql import SQLContext
from pyspark.sql import functions as func
from pyspark.sql.types import *

### Read Data

In [1]:
spark = SQLContext(sc)
# mysql config

prop = {'user': 'root',
        'password': 'xxx',
        'driver': 'com.mysql.cj.jdbc.Driver'}

db_name = "big_data_5003"
tb_name = "SeoulBike"

url = 'jdbc:mysql://127.0.0.1:3306/%s' % db_name

In [2]:
df = spark.read.jdbc(url=url, table=tb_name, properties=prop)
df.cache()

DataFrame[id: int, Date: string, RentedBikeCount: int, Hour: int, Temperaturee: double, Humidity_pct: int, WindSpeed_m_per_s: double, Visibility_10m: int, DewPointTemperature: double, SolarRadiation: double, Rainfall_mm: double, Snowfall_cm: double, Seasons: string, Holiday: string, FunctioningDay: string]

In [3]:
output_df = df.write.csv("data/SeoulBike_new.csv", header=True, sep=",")

In [1]:
df = spark.read.csv("data/SeoulBike_new.csv", header=True, sep=",", inferSchema=True)

In [3]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- RentedBikeCount: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Temperaturee: double (nullable = true)
 |-- Humidity_pct: integer (nullable = true)
 |-- WindSpeed_m_per_s: double (nullable = true)
 |-- Visibility_10m: integer (nullable = true)
 |-- DewPointTemperature: double (nullable = true)
 |-- SolarRadiation: double (nullable = true)
 |-- Rainfall_mm: double (nullable = true)
 |-- Snowfall_cm: double (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- Holiday: string (nullable = true)
 |-- FunctioningDay: string (nullable = true)



In [4]:
df.count()

8760

In [5]:
df.show(2, False)

+---+----------+---------------+----+------------+------------+-----------------+--------------+-------------------+--------------+-----------+-----------+-------+----------+--------------+
|id |Date      |RentedBikeCount|Hour|Temperaturee|Humidity_pct|WindSpeed_m_per_s|Visibility_10m|DewPointTemperature|SolarRadiation|Rainfall_mm|Snowfall_cm|Seasons|Holiday   |FunctioningDay|
+---+----------+---------------+----+------------+------------+-----------------+--------------+-------------------+--------------+-----------+-----------+-------+----------+--------------+
|1  |01/12/2017|254            |0   |-5.2        |37          |2.2              |2000          |-17.6              |0.0           |0.0        |0.0        |Winter |No Holiday|Yes           |
|2  |01/12/2017|204            |1   |-5.5        |38          |0.8              |2000          |-17.6              |0.0           |0.0        |0.0        |Winter |No Holiday|Yes           |
+---+----------+---------------+----+------------+

### Feature Engineering
- time: extract month, day of week, weekday or weekend

In [5]:
# convert Date from string to date format
df = df.withColumn("Date", func.to_date("Date", "dd/MM/yyyy"))

# extract the month component
df = df.withColumn("Month", func.month("Date"))

In [6]:
# extract the weekday component
@func.udf(StringType())
def getWeekday(x):
    dayofweek = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
    return dayofweek[int(x)-1]

df = df.withColumn("Weekday", func.dayofweek("Date")) \
       .withColumn("Weekday", getWeekday("Weekday")) \
       .withColumn("Weekend", 
                  func.udf(lambda x: 1 if x in ["Saturday", "Sunday"] else 0, IntegerType())("Weekday"))

In [7]:
df.select(func.min("Date"), func.max("Date")).show(1,False)

+----------+----------+
|min(Date) |max(Date) |
+----------+----------+
|2017-12-01|2018-11-30|
+----------+----------+



In [5]:
weekday = df.groupBy("id") \
            .pivot("Weekday") \
            .agg(func.count("Weekday")) \
            .fillna(0)

In [6]:
weekday.show(2,False)

+----+------+------+--------+------+--------+-------+---------+
|id  |Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|
+----+------+------+--------+------+--------+-------+---------+
|2122|0     |0     |0       |0     |0       |1      |0        |
|6397|1     |0     |0       |0     |0       |0      |0        |
+----+------+------+--------+------+--------+-------+---------+
only showing top 2 rows



In [7]:
df = df.join(weekday, on="id", how="inner")

- one-hot: Weekday, seasons, holiday

In [8]:
seasons = df.groupBy("id") \
            .pivot("Seasons")\
            .agg(func.count("Seasons")) \
            .fillna(0)

holiday = df.groupBy("id") \
            .pivot("Holiday")\
            .agg(func.count("Holiday")) \
            .fillna(0) \
            .withColumnRenamed("Holiday", "has_holiday")

df = df.join(seasons, on="id", how="inner") \
       .join(holiday.select("id", "has_holiday"), on="id", how="inner")

- one-hot: Functioningday

In [9]:
df.filter(func.col("FunctioningDay") == "No").count()

295

In [10]:
df = df.withColumn("FunctioningDay", 
                   func.udf(lambda x: 1 if x == "Yes" else 0, IntegerType())("FunctioningDay"))

In [11]:
feature_col = [c for c in df.columns if c not in 
               ["RentedBikeCount", "id", "Date", "Seasons", "Holiday","Weekday"]
              ]

In [12]:
feature_col

['Hour',
 'Temperaturee',
 'Humidity_pct',
 'WindSpeed_m_per_s',
 'Visibility_10m',
 'DewPointTemperature',
 'SolarRadiation',
 'Rainfall_mm',
 'Snowfall_cm',
 'FunctioningDay',
 'Month',
 'Weekend',
 'Friday',
 'Monday',
 'Saturday',
 'Sunday',
 'Thursday',
 'Tuesday',
 'Wednesday',
 'Autumn',
 'Spring',
 'Summer',
 'Winter',
 'has_holiday']

In [13]:
len(feature_col)

24

In [14]:
data = df.select(*feature_col, "RentedBikeCount")

In [15]:
data.show(2,False)

+----+------------+------------+-----------------+--------------+-------------------+--------------+-----------+-----------+--------------+-----+-------+------+------+--------+------+--------+-------+---------+------+------+------+------+-----------+---------------+
|Hour|Temperaturee|Humidity_pct|WindSpeed_m_per_s|Visibility_10m|DewPointTemperature|SolarRadiation|Rainfall_mm|Snowfall_cm|FunctioningDay|Month|Weekend|Friday|Monday|Saturday|Sunday|Thursday|Tuesday|Wednesday|Autumn|Spring|Summer|Winter|has_holiday|RentedBikeCount|
+----+------------+------------+-----------------+--------------+-------------------+--------------+-----------+-----------+--------------+-----+-------+------+------+--------+------+--------+-------+---------+------+------+------+------+-----------+---------------+
|3   |1.9         |91          |1.4              |218           |0.5                |0.0           |0.0        |0.9        |1             |12   |0      |0     |0     |0       |0     |1       |0      

In [17]:
data.repartition(1).write.csv("data/preprocessed_data.csv", header=True, sep=",")