# Pre-processing

In [1]:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

import java.io.File
import scala.collection.mutable.ListBuffer

In [2]:
val spark = SparkSession.builder.master("local[*]").appName("SparkPreProcessing").getOrCreate()
import spark.implicits._

spark = org.apache.spark.sql.SparkSession@4ce3d48c


org.apache.spark.sql.SparkSession@4ce3d48c

In [3]:
// http://alvinalexander.com/scala/how-to-list-files-in-directory-filter-names-scala/

def getListOfFiles(dir: String):List[File] = {
    val d = new File(dir)
    if (d.exists && d.isDirectory) {
        d.listFiles.filter(_.isFile).toList
    } else {
        List[File]()
    }
}

getListOfFiles: (dir: String)List[java.io.File]


In [4]:
val folder = "./2019/"
val csvFiles = getListOfFiles(folder)

folder = ./2019/
csvFiles = List()


List()

## Desired dataset

We want a simple dataset with the following structure:
- station_id
- time
- (weather)
- number of bikes available (target)

It will be the input of our learning algorithm. Then, we also have to store some informations about the stations, in another dataset, with the following structure:
- station_id
- name
- latitude
- longitude

## Generating a single .csv for the stations

This .csv consists of four columns:
- id
- name
- latitude
- longitude

We get all stations from all .csv files within the folder defined above.

In [19]:
val newColumns = Seq("id",
                     "name",
                     "latitude",
                     "longitude"
                    )

val oldStartColumns = Seq("start station id",
                          "start station name",
                          "start station latitude",
                          "start station longitude"
                         )

val oldEndColumns = Seq("end station id",
                        "end station name",
                        "end station latitude",
                        "end station longitude"
                       )

val columnsStartList = oldStartColumns.zip(newColumns).map(f => { col(f._1).as(f._2) })

val columnsEndList = oldEndColumns.zip(newColumns).map(f => { col(f._1).as(f._2) })

newColumns = List(id, name, latitude, longitude)
oldStartColumns = List(start station id, start station name, start station latitude, start station longitude)
oldEndColumns = List(end station id, end station name, end station latitude, end station longitude)
columnsStartList = List(start station id AS `id`, start station name AS `name`, start station latitude AS `latitude`, start station longitude AS `longitude`)
columnsEndList = List(end station id AS `id`, end station name AS `name`, end station latitude AS `latitude`, end station longitude AS `longitude`)


List(end station id AS `id`, end station name AS `name`, end station latitude AS `latitude`, end station longitude AS `longitude`)

In [5]:
var stationsDf = Seq.empty[(String,String,String,String)].toDF(newColumns:_*)

for (csvFile <- csvFiles) {
    val df = spark.read
                  .option("header", "true")
                  .csv(csvFile.getPath())

    val startStations = df.select(df("start station id"),
                                  df("start station name"),
                                  df("start station latitude"),
                                  df("start station longitude")
                                 )
                           .distinct()
                           .select(columnsStartList:_*)
    
    val endStations = df.select(df("end station id"),
                                df("end station name"),
                                df("end station latitude"),
                                df("end station longitude")
                               )
                       .distinct()
                       .select(columnsEndList:_*)
    
    stationsDf = stationsDf.union(startStations).union(endStations).distinct()
    
}

Compile Error: <console>:39: error: not found: value newColumns
       var stationsDf = Seq.empty[(String,String,String,String)].toDF(newColumns:_*)
                                                                      ^
<console>:54: error: not found: value columnsStartList
                                  .select(columnsStartList:_*)
                                          ^
<console>:62: error: not found: value columnsEndList
                              .select(columnsEndList:_*)
                                      ^


In [6]:
stationsDf.repartition(1)
          .write
          .format("com.databricks.spark.csv")
          .option("header", "true")
          .save("stations")

Compile Error: <console>:38: error: not found: value stationsDf
       stationsDf.repartition(1)
       ^


In [7]:
// Only after running the last cells and renaming the output as "stations.csv"

val stationsDf = spark.read
                   .option("header", "true")
                   .csv("stations.csv")

org.apache.spark.sql.AnalysisException: Path does not exist: file:/media/sf_codes/id2221_project/data/stations.csv;

In [8]:
stationsDf.count()

Unknown Error: lastException: Throwable = null
<console>:38: error: not found: value stationsDf
       stationsDf.count()
       ^


## Getting the number of bikes available at the beginning

To determine the number of bikes available at the beginning, we just have to select the first start station of each bike and add +1 to the initial number of bikes available in this station.

In [10]:
// Let's test with a single .csv

val df = spark.read
              .option("header", "true")
              .csv("./raw_data/2019/201901-citibike-tripdata.csv")

df = [tripduration: string, starttime: string ... 13 more fields]


lastException: Throwable = null


[tripduration: string, starttime: string ... 13 more fields]

In [11]:
df.printSchema()

root
 |-- tripduration: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- start station id: string (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: string (nullable = true)
 |-- start station longitude: string (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: string (nullable = true)
 |-- end station longitude: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth year: string (nullable = true)
 |-- gender: string (nullable = true)



In [11]:
val newDf = df.select(df("starttime").as("time"),
                      df("start station id").as("station_id"),
                      df("bikeid").as("bike_id"))

newDf = [time: string, station_id: string ... 1 more field]


[time: string, station_id: string ... 1 more field]

In [178]:
// https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group

val bikeIdStationIdPairs = newDf.orderBy("time")
                                .groupBy("bike_id")
                                .agg(first("station_id").alias("station_id"))
                                .orderBy(asc("station_id"))
                                .rdd

bikeIdStationIdPairs = MapPartitionsRDD[118] at rdd at <console>:41


MapPartitionsRDD[118] at rdd at <console>:41

In [179]:
val stationIdInitialNbOfBikesPairs = bikeIdStationIdPairs.filter(_(1) != "NULL")
                                                         .map(x => (x(1), 1))
                                                         .reduceByKey(_ + _)
//                                                          .sortBy(_._1)

stationIdInitialNbOfBikesPairs = ShuffledRDD[121] at reduceByKey at <console>:41


ShuffledRDD[121] at reduceByKey at <console>:41