In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/17 04:53:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/17 04:53:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
status = sc.textFile("../Data/bike_share/status_million.csv")

## schema : station_id (int), num_bikes_available (int), num_docks_available (int), timestamp(timestamp)

In [3]:
from datetime import datetime
def toIntSafe(inval):
    try:
        return int(inval)
    except:
        return None

def toTimeSafe(inval):
    inval = inval.strip("\"") # Timestamp starting and ending with a double quotation mark.
    try:
        return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S")
    except:
        return None

In [4]:
schema = StructType([ StructField("station_id", IntegerType(), False),
                      StructField("num_bikes_available", IntegerType(), True),
                      StructField("num_docks_available", IntegerType(), True),
                      StructField("timestamp", TimestampType(), True)
                    ])

In [5]:
status_transformed = status.map(lambda x : x.split(","))\
                           .map(lambda x : (int(x[0]), toIntSafe(x[1]), toIntSafe(x[2]), toTimeSafe(x[3])))

In [6]:
status_transformed.partitionBy(10).cache()

MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:145

In [7]:
status_transformed.count()

                                                                                

1000000

In [8]:
status_df = ss.createDataFrame(status_transformed, schema)

In [9]:
status_df.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+----------+-------------------+-------------------+-------------------+
|station_id|num_bikes_available|num_docks_available|          timestamp|
+----------+-------------------+-------------------+-------------------+
|        10|                  7|                  8|2014-12-30 15:37:02|
|        10|                  7|                  8|2014-12-30 15:35:02|
|        10|                  7|                  8|2014-12-30 15:34:02|
|        10|                  7|                  8|2014-12-30 15:33:02|
|        10|                  7|                  8|2014-12-30 15:32:02|
+----------+-------------------+-------------------+-------------------+
only showing top 5 rows



                                                                                

In [10]:
status_df.filter('station_id == 10').orderBy('timestamp').show()



+----------+-------------------+-------------------+-------------------+
|station_id|num_bikes_available|num_docks_available|          timestamp|
+----------+-------------------+-------------------+-------------------+
|        10|                  9|                  6|2014-09-01 00:00:03|
|        10|                  9|                  6|2014-09-01 00:01:02|
|        10|                  9|                  6|2014-09-01 00:02:02|
|        10|                  9|                  6|2014-09-01 00:03:03|
|        10|                  9|                  6|2014-09-01 00:04:02|
|        10|                  9|                  6|2014-09-01 00:05:02|
|        10|                  9|                  6|2014-09-01 00:06:02|
|        10|                  9|                  6|2014-09-01 00:07:02|
|        10|                  9|                  6|2014-09-01 00:08:02|
|        10|                  9|                  6|2014-09-01 00:09:03|
|        10|                  9|                  6

                                                                                

## Return the current and previous number of bike at station_id, 10 at each time data was collected order by timestamp.


In [11]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [12]:
status_df.filter("station_id == 10")\
         .select('station_id', 'timestamp', 'num_bikes_available',\
                 lag('num_bikes_available',1).over(Window.partitionBy('station_id').orderBy('timestamp'))\
                 .alias("prev_num_bikes_available"))\
         .show()



+----------+-------------------+-------------------+------------------------+
|station_id|          timestamp|num_bikes_available|prev_num_bikes_available|
+----------+-------------------+-------------------+------------------------+
|        10|2014-09-01 00:00:03|                  9|                    null|
|        10|2014-09-01 00:01:02|                  9|                       9|
|        10|2014-09-01 00:02:02|                  9|                       9|
|        10|2014-09-01 00:03:03|                  9|                       9|
|        10|2014-09-01 00:04:02|                  9|                       9|
|        10|2014-09-01 00:05:02|                  9|                       9|
|        10|2014-09-01 00:06:02|                  9|                       9|
|        10|2014-09-01 00:07:02|                  9|                       9|
|        10|2014-09-01 00:08:02|                  9|                       9|
|        10|2014-09-01 00:09:03|                  9|            

                                                                                

In [13]:
status_df.filter("station_id == 10")\
         .select('station_id', 'timestamp', 'num_bikes_available',\
                 lead('num_bikes_available', 1).over(Window.partitionBy('station_id').orderBy('timestamp'))\
                 .alias("next_num_bikes_available"))\
         .show()



+----------+-------------------+-------------------+------------------------+
|station_id|          timestamp|num_bikes_available|next_num_bikes_available|
+----------+-------------------+-------------------+------------------------+
|        10|2014-09-01 00:00:03|                  9|                       9|
|        10|2014-09-01 00:01:02|                  9|                       9|
|        10|2014-09-01 00:02:02|                  9|                       9|
|        10|2014-09-01 00:03:03|                  9|                       9|
|        10|2014-09-01 00:04:02|                  9|                       9|
|        10|2014-09-01 00:05:02|                  9|                       9|
|        10|2014-09-01 00:06:02|                  9|                       9|
|        10|2014-09-01 00:07:02|                  9|                       9|
|        10|2014-09-01 00:08:02|                  9|                       9|
|        10|2014-09-01 00:09:03|                  9|            

                                                                                

In [14]:
ss.stop()