In [2]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [3]:
from pyspark.sql import Row,Column
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
import csv

## Load data

In [None]:
station_path = 'station.csv'
status_path ='status.csv'
trip_path = 'trip.csv'
weather_path = 'weather.csv'

station_raw = sc.textFile(station_path)

station_with_header = station_raw.mapPartitions(lambda x:csv.reader(x))
station_header = station_with_header.first()
station = station_with_header.filter(lambda x:x!=station_header)

status_raw = sc.textFile(status_path)
status_with_header = status_raw.mapPartitions(lambda x:csv.reader(x))
status_header = status_with_header.first()
status = status_with_header.filter(lambda x:x!=status_header)

trip_raw = sc.textFile(trip_path)
trip_with_header = trip_raw.mapPartitions(lambda x:csv.reader(x))
trip_header = trip_with_header.first()
trip = trip_with_header.filter(lambda x:x!=trip_header)

weather_raw = sc.textFile(weather_path)
weather_with_header = weather_raw.mapPartitions(lambda x:csv.reader(x))
weather_header = weather_with_header.first()
weather = weather_with_header.filter(lambda x:x!=weather_header)

## from here

In [None]:
def toIntSafe(inval):
  try:
    return int(inval)
  except ValueError:
    return None

def toFloatSafe(inval):
  try:
    return float(inval)
  except ValueError:
    return None

def toTimeSafe(inval):
  try:
    return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S.%f")
  except ValueError:
    return None

def toLong(inval):
  try:
    return long(float(inval))
  except ValueError:
    return None


def toStringSafe(inval):
  try:
    return str(inval)
  except ValueError:
    return None

def toTime_Safe(inval):
  try:
    return datetime.strptime(inval, "%m/%d/%Y")
  except ValueError:
    return None

def status_to_time(inval):
    #'2013/08/29 12:06:01'
  try:
    return datetime.strptime(inval, "%Y/%m/%d %H:%M:%S")
  except ValueError:
    return None

def trip_to_time(inval):
    #8/29/2013 14:14
  try:
    return datetime.strptime(inval, "%m/%d/%Y %H:%M")
  except ValueError:
    return None

#8/29/2013
def weather_to_time(inval):
    #8/29/2013 14:14
  try:
    return datetime.strptime(inval, "%m/%d/%Y")
  except ValueError:
    return None

## from here

In [None]:
from time import time
from datetime import datetime

# get weekdays and daily hours from timestamp
def toWeekDay(x):
#     v = datetime.strptime(datetime.fromtimestamp(int(x)).strftime("%Y %m %d %H"), "%Y %m %d %H").strftime('%w') - from unix timestamp
    v = x.strftime('%w')
    return v

to_week_day = udf(toWeekDay, StringType())


# newdf = elevDF.select(year(elevDF.date).alias('dt_year'), month(elevDF.date).alias('dt_month'), 
# dayofmonth(elevDF.date).alias('dt_day'), dayofyear(elevDF.date).alias('dt_dayofy'), hour(elevDF.date).alias('dt_hour'), minute(elevDF.date).alias('dt_min'), weekofyear(elevDF.date).alias('dt_week_no'), unix_timestamp(elevDF.date).alias('dt_int'))

trip_new = trip_df.withColumn("year",year(trip_df['date']))\
                  .withColumn("month",month(trip_df['date']))\
                  .withColumn("day",dayofmonth(trip_df['date']))\
                  .withColumn("dayofweek", to_week_day(trip_df['date']))\
                  .withColumn("hour",hour(trip_df['date']))\
                  .withColumn("minute",minute(trip_df['date']))\
                  .withColumn("second",second(trip_df['date']))\
                  .drop('date')\
                  .withColumn("end_year",year(trip_df['end_date']))\
                  .withColumn("end_month",month(trip_df['end_date']))\
                  .withColumn("end_day",dayofmonth(trip_df['end_date']))\
                  .withColumn("end_hour",hour(trip_df['end_date']))\
                  .withColumn("end_minute",minute(trip_df['end_date']))\
                  .withColumn("end_second",second(trip_df['end_date']))\
                  .drop('end_date')


## impute NAs

In [None]:
def impute(df):
    for column in df.columns:
        if column != 'events' and column != 'max_gust_speed_mph':
            if df.filter(df[column].isNull()).count() != 0:
                value = (df.groupby(df[column]).count().orderBy('count', ascending = False).first())[0]
                df = df.na.fill({column:value})
        if column == 'events':
            df = df.na.replace("","nothing",[column])
        if column == 'max_gust_speed_mph':
            df = df.drop(column)
    return df

weather_new_df = impute(weather_df)
weather_new_df = weather_new_df.withColumn("year",year(weather_new_df['date']))\
                  .withColumn("month",month(weather_new_df['date']))\
                  .withColumn("day",dayofmonth(weather_new_df['date']))\
                  .drop('date')


In [None]:
city_zip = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/Users/shen/Desktop/USF/MSAN697DCS/project/sf-bay-area-bike-share/city_zip.csv')

In [None]:
weather_city = weather_new_df.join(city_zip, 'zip_code').drop('zip_code')

In [None]:
trip_delete = trip_new.withColumnRenamed('id','user_id')
trip_station = trip_delete.join(station_df, (trip_delete.start_station_name == station_df.name))

## clean 

In [None]:
all_table_new = all_table.filter(all_table['duration']<=340)

In [None]:
#converting strings to numeric values
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf


In [None]:
columns_name_string = ['city','events','subscription_type','start_station_name','dayofweek']
dfnumeric = indexStringColumns(all_table_new, columns_name_string)

In [None]:
from pyspark.ml.feature import OneHotEncoder
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

dfhot = oneHotEncodeColumns(dfnumeric, columns_name_string)

In [None]:
drop_name = ['user_id', 'start_station_id', 'zip_code','bike_id','id','lat','long','end_station_name','name','installation_date','end_station_id',\
            'end_year','end_month','end_day','end_hour','end_minute','end_second','minute','second',\
            'max_temperature_f','min_temperature_f','max_dew_point_f','min_dew_point_f','max_humidity','min_humidity',\
            'max_sea_level_pressure_inches','min_sea_level_pressure_inches','max_visibility_miles','min_visibility_miles',\
            'max_wind_Speed_mph']

df_new = dfhot.select([c for c in dfhot.columns if c not in drop_name])

In [None]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
response = 'duration'


input_columns = [c for c in df_new.columns if c != response]
va = VectorAssembler(outputCol="features", inputCols=input_columns)
lp = va.transform(df_new).select("features","duration").withColumnRenamed("duration","label")

In [None]:
a, b = lp.randomSplit([0.8,.2],1)