## Import and read

In [1]:
import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import *

#Feel free to add other libraries from pyspark

# conf = SparkConf().setAppName(app_name)
sc = SparkContext.getOrCreate()
sc.setLogLevel("OFF")

ss = SparkSession.builder.getOrCreate()

In [2]:
sc

In [3]:
Parking_meters = '../Data/Parking_meters.csv'
transcation_input = '../Data/output1month.csv'

In [4]:
schema = StructType([StructField("POST_ID", StringType(), True), 
                    StructField("MS_ID", StringType(), True),
                    StructField("MS_SPACEID", StringType(), True),
                    StructField("CAP_COL", StringType(), True), 
                    StructField("METER_TYPE", StringType(), True),
                    StructField("SMART_METE", StringType(), True),
                    StructField("ACTIVESENS", StringType(), True),
                    StructField("JURISDICTI", StringType(), True),
                    StructField("ON_OFF_STR", StringType(), True),
                    StructField("OSP_ID", StringType(), True),
                    StructField("STREET_NUM", StringType(), True),
                    StructField("STREETNAME", StringType(), True),
                    StructField("STREET_SEG", StringType(), True),
                    StructField("RATEAREA", StringType(), True),
                    StructField("SFPARKAREA", StringType(), True),
                    StructField("LOCATION", StringType(), True)])

parking_meters = ss.read.csv(Parking_meters, schema = schema,header=True)
print(len(parking_meters.columns))
parking_meters.show(1)

16
+---------+-----+----------+-------+----------+----------+----------+----------+----------+------+----------+----------+----------+--------+----------+--------------------+
|  POST_ID|MS_ID|MS_SPACEID|CAP_COL|METER_TYPE|SMART_METE|ACTIVESENS|JURISDICTI|ON_OFF_STR|OSP_ID|STREET_NUM|STREETNAME|STREET_SEG|RATEAREA|SFPARKAREA|            LOCATION|
+---------+-----+----------+-------+----------+----------+----------+----------+----------+------+----------+----------+----------+--------+----------+--------------------+
|401-06340|    -|         0|   Grey|        SS|         N|         N|     SFMTA|        ON|     0|       634|  ELLIS ST|   5177000|  Area 3|          |(37.78436, -122.4...|
+---------+-----+----------+-------+----------+----------+----------+----------+----------+------+----------+----------+----------+--------+----------+--------------------+
only showing top 1 row



In [5]:
parking_meters_selected = parking_meters.select(['POST_ID','MS_ID','MS_SPACEID','CAP_COL','METER_TYPE','SMART_METE','ACTIVESENS','ON_OFF_STR'])
parking_meters_selected.show(3)

+---------+-----+----------+-------+----------+----------+----------+----------+
|  POST_ID|MS_ID|MS_SPACEID|CAP_COL|METER_TYPE|SMART_METE|ACTIVESENS|ON_OFF_STR|
+---------+-----+----------+-------+----------+----------+----------+----------+
|401-06340|    -|         0|   Grey|        SS|         N|         N|        ON|
|104-03190|    -|         0|   Grey|        SS|         N|         Y|        ON|
|352-04350|    -|         0|   Grey|        SS|         N|         N|        ON|
+---------+-----+----------+-------+----------+----------+----------+----------+
only showing top 3 rows



In [6]:
parking_meters_cars = parking_meters_selected.filter((parking_meters_selected.CAP_COL == 'Green') | (parking_meters_selected.CAP_COL == 'Grey'))
print(parking_meters_cars.count())
parking_meters_cars.show(5)


22426
+---------+-----+----------+-------+----------+----------+----------+----------+
|  POST_ID|MS_ID|MS_SPACEID|CAP_COL|METER_TYPE|SMART_METE|ACTIVESENS|ON_OFF_STR|
+---------+-----+----------+-------+----------+----------+----------+----------+
|401-06340|    -|         0|   Grey|        SS|         N|         N|        ON|
|104-03190|    -|         0|   Grey|        SS|         N|         Y|        ON|
|352-04350|    -|         0|   Grey|        SS|         N|         N|        ON|
|116-03980|    -|         0|   Grey|        SS|         N|         N|        ON|
|224-27570|    -|         0|   Grey|        SS|         N|         N|        ON|
+---------+-----+----------+-------+----------+----------+----------+----------+
only showing top 5 rows



In [7]:
parking_meters_tojoin = parking_meters_cars.select(['POST_ID','CAP_COL','METER_TYPE','SMART_METE','ACTIVESENS','ON_OFF_STR'])
parking_meters_tojoin.show(5)


+---------+-------+----------+----------+----------+----------+
|  POST_ID|CAP_COL|METER_TYPE|SMART_METE|ACTIVESENS|ON_OFF_STR|
+---------+-------+----------+----------+----------+----------+
|401-06340|   Grey|        SS|         N|         N|        ON|
|104-03190|   Grey|        SS|         N|         Y|        ON|
|352-04350|   Grey|        SS|         N|         N|        ON|
|116-03980|   Grey|        SS|         N|         N|        ON|
|224-27570|   Grey|        SS|         N|         N|        ON|
+---------+-------+----------+----------+----------+----------+
only showing top 5 rows



In [8]:
schema = StructType([StructField("POST_ID", StringType(), True), 
                    StructField("MS_ID", StringType(), True),
                    StructField("MS_SPACEID", StringType(), True),
                    StructField("CAP_COL", StringType(), True), 
                    StructField("METER_TYPE", StringType(), True),
                    StructField("SMART_METE", StringType(), True),
                    StructField("ACTIVESENS", StringType(), True),
                    StructField("JURISDICTI", StringType(), True),
                    StructField("ON_OFF_STR", StringType(), True),
                    StructField("OSP_ID", StringType(), True),
                    StructField("STREET_NUM", StringType(), True),
                    StructField("STREETNAME", StringType(), True),
                    StructField("STREET_SEG", StringType(), True),
                    StructField("RATEAREA", StringType(), True),
                    StructField("SFPARKAREA", StringType(), True),
                    StructField("LOCATION", StringType(), True)])

trans = ss.read.csv(transcation_input,header=True, inferSchema='true')
print(len(trans.columns))
trans.printSchema()
trans.show(1)

7
root
 |-- POST_ID: string (nullable = true)
 |-- STREET_BLOCK: string (nullable = true)
 |-- PAYMENT_TYPE: string (nullable = true)
 |-- SESSION_START_DT: string (nullable = true)
 |-- SESSION_END_DT: string (nullable = true)
 |-- METER_EVENT_TYPE: string (nullable = true)
 |-- GROSS_PAID_AMT: double (nullable = true)

+---------+--------------+------------+--------------------+--------------------+----------------+--------------+
|  POST_ID|  STREET_BLOCK|PAYMENT_TYPE|    SESSION_START_DT|      SESSION_END_DT|METER_EVENT_TYPE|GROSS_PAID_AMT|
+---------+--------------+------------+--------------------+--------------------+----------------+--------------+
|490-22190|IRVING ST 2200|        CASH|19-OCT-18 11.00.0...|19-OCT-18 11.09.2...|              NS|          0.35|
+---------+--------------+------------+--------------------+--------------------+----------------+--------------+
only showing top 1 row



## Preprocessing

In [9]:
def toTimeSafe(inval):
    try:
        return datetime.strptime(inval, "%d-%b-%y %I.%M.%S %p")
    except ValueError:
        return None

def toFloatSafe(inval):
    try:
        return float(inval)
    except ValueError:
        return None

In [10]:
def stringToPost(row):
    r = row.split(",")
    return Row(
    r[0].lstrip('\"').rstrip('\"'),\
      r[1].lstrip('\"').rstrip('\"'),\
      r[2].lstrip('\"').rstrip('\"'),\
      toTimeSafe(r[3].lstrip('\"').rstrip('\"')),\
      toTimeSafe(r[4].lstrip('\"').rstrip('\"')),\
      r[5].lstrip('\"').rstrip('\"'),\
      toFloatSafe(r[6].lstrip('\"').rstrip('\"'))
    )

In [11]:
trans = sc.textFile(transcation_input)


In [12]:
trans = sc.textFile(transcation_input)
tmp = trans.first()
trans = trans.filter(lambda x: x!= tmp)
trans.take(2)

['490-22190,IRVING ST 2200,CASH,19-OCT-18 11.00.01 AM,19-OCT-18 11.09.21 AM,NS,0.35',
 '823-00160,CHESTNUT ST 0,CASH,13-OCT-18 03.30.34 PM,13-OCT-18 04.33.25 PM,AT,0.05']

In [13]:
trans = sc.textFile(transcation_input)
tmp = trans.first()
trans = trans.filter(lambda x: x!= tmp)
tran_df = trans.map(lambda x: stringToPost(x))

In [14]:
from datetime import datetime

schema = StructType([ StructField("POST_ID", StringType(), True),
                      StructField("STREET_BLOCK", StringType(), True),
                      StructField("PAYMENT_TYPE", StringType(), True),
                      StructField("SESSION_START_DT", TimestampType(), True),
                      StructField("SESSION_END_DT", TimestampType(), True),
                      StructField("METER_EVENT_TYPE", StringType(), True),
                      StructField("GROSS_PAID_AMT", DoubleType(), True),
                    ])
transaction_df = ss.createDataFrame(tran_df, schema)
transaction_df.show(10)

+---------+--------------------+------------+-------------------+-------------------+----------------+--------------+
|  POST_ID|        STREET_BLOCK|PAYMENT_TYPE|   SESSION_START_DT|     SESSION_END_DT|METER_EVENT_TYPE|GROSS_PAID_AMT|
+---------+--------------------+------------+-------------------+-------------------+----------------+--------------+
|490-22190|      IRVING ST 2200|        CASH|2018-10-19 11:00:01|2018-10-19 11:09:21|              NS|          0.35|
|823-00160|       CHESTNUT ST 0|        CASH|2018-10-13 15:30:34|2018-10-13 16:33:25|              AT|          0.05|
|490-21250|      IRVING ST 2100|        CASH|2018-10-29 12:40:30|2018-10-29 13:07:10|              NS|           1.0|
|440-37030|     GEARY BLVD 3700| CREDIT CARD|2018-10-30 13:36:13|2018-10-30 13:49:33|              NS|           0.5|
|540-00100|         LAGUNA ST 0| PAY BY CELL|2018-10-05 09:42:00|2018-10-05 10:12:00|              AT|          1.12|
|700-12080|    VALENCIA ST 1200| PAY BY CELL|2018-10-12 

In [15]:
transaction_df.printSchema()

root
 |-- POST_ID: string (nullable = true)
 |-- STREET_BLOCK: string (nullable = true)
 |-- PAYMENT_TYPE: string (nullable = true)
 |-- SESSION_START_DT: timestamp (nullable = true)
 |-- SESSION_END_DT: timestamp (nullable = true)
 |-- METER_EVENT_TYPE: string (nullable = true)
 |-- GROSS_PAID_AMT: double (nullable = true)



## aggregate

In [16]:
revenue_df = transaction_df

In [17]:
from pyspark.sql.functions import unix_timestamp, from_unixtime, date_format, col

revenue_df = revenue_df.withColumn('starttime_date',date_format('SESSION_START_DT','yyyy-M-d'))
revenue_df = revenue_df.withColumn('starttime_hour',date_format('SESSION_START_DT','HH').cast('string'))
revenue_df = revenue_df.withColumn('dow',date_format('SESSION_START_DT','u').cast('string'))
revenue_df = revenue_df.withColumn('endtime_hour',date_format('SESSION_END_DT','HH').cast('string'))
revenue_df = revenue_df.withColumn('timeDiff', unix_timestamp('SESSION_END_DT')- unix_timestamp('SESSION_START_DT'))
revenue_df.drop('SESSION_START_DT','SESSION_END_DT').show()

+---------+--------------------+------------+----------------+--------------+--------------+--------------+---+------------+--------+
|  POST_ID|        STREET_BLOCK|PAYMENT_TYPE|METER_EVENT_TYPE|GROSS_PAID_AMT|starttime_date|starttime_hour|dow|endtime_hour|timeDiff|
+---------+--------------------+------------+----------------+--------------+--------------+--------------+---+------------+--------+
|490-22190|      IRVING ST 2200|        CASH|              NS|          0.35|    2018-10-19|            11|  5|          11|     560|
|823-00160|       CHESTNUT ST 0|        CASH|              AT|          0.05|    2018-10-13|            15|  6|          16|    3771|
|490-21250|      IRVING ST 2100|        CASH|              NS|           1.0|    2018-10-29|            12|  1|          13|    1600|
|440-37030|     GEARY BLVD 3700| CREDIT CARD|              NS|           0.5|    2018-10-30|            13|  2|          13|     800|
|540-00100|         LAGUNA ST 0| PAY BY CELL|              AT|

In [18]:
wanted_df = revenue_df.groupBy("STREET_BLOCK","dow","starttime_hour")\
                      .agg(avg('GROSS_PAID_AMT').alias('avgRevenue'), 
                      count('starttime_hour').alias('numNewTrans'),
                      avg('timeDiff').alias('avgDur'),
                      count(when(col('METER_EVENT_TYPE')=='NS', True)).alias('numNS'),
                      count(when(col('METER_EVENT_TYPE')=='AT', True)).alias('numAT'),
                      count(when(col('PAYMENT_TYPE')=='CASH', True)).alias('numCASH'),
                      count(when(col('PAYMENT_TYPE')=='CREDIT CARD', True)).alias('numCC'),
                      count(when(col('PAYMENT_TYPE')=='PAY BY CELL', True)).alias('numPhone'),
                      count(when(col('PAYMENT_TYPE')=='SMART CARD', True)).alias('numSmartCard'))

In [19]:
wanted_df.show(2)

+----------------+---+--------------+------------------+-----------+------------------+-----+-----+-------+-----+--------+------------+
|    STREET_BLOCK|dow|starttime_hour|        avgRevenue|numNewTrans|            avgDur|numNS|numAT|numCASH|numCC|numPhone|numSmartCard|
+----------------+---+--------------+------------------+-----------+------------------+-----+-----+-------+-----+--------+------------+
|    BROADWAY 100|  3|            10|           3.66375|         32|            6322.5|   19|   13|      5|   22|       5|           0|
|PRESIDIO AVE 500|  1|            12|1.3291666666666666|         12|2544.3333333333335|   10|    2|      5|    7|       0|           0|
+----------------+---+--------------+------------------+-----------+------------------+-----+-----+-------+-----+--------+------------+
only showing top 2 rows



In [20]:
wanted_df_derv = revenue_df.groupBy("STREET_BLOCK","dow","endtime_hour").count()
wanted_df_derv = wanted_df_derv.withColumnRenamed("STREET_BLOCK", 'block_derv')
wanted_df_derv = wanted_df_derv.withColumnRenamed('dow', 'dow_derv')
# wanted_df_derv.sort("dow","endtime_hour", ascending=[True, True]).show()

In [21]:
wanted_df = wanted_df.join(wanted_df_derv, 
                           (wanted_df.STREET_BLOCK == wanted_df_derv.block_derv) &
                           (wanted_df.dow == wanted_df_derv.dow_derv) &
                           (wanted_df.starttime_hour == wanted_df_derv.endtime_hour)).drop('endtime_hour')


In [22]:
wanted_df = wanted_df.withColumnRenamed('count','numEndTrans')
wanted_df = wanted_df.withColumn('turnover', wanted_df['numNewTrans'] - wanted_df['numEndTrans'])

In [23]:
wanted_df = wanted_df.drop('block_derv','dow_derv')
wanted_df.show()


+-------------+---+--------------+------------------+-----------+------------------+-----+-----+-------+-----+--------+------------+-----------+--------+
| STREET_BLOCK|dow|starttime_hour|        avgRevenue|numNewTrans|            avgDur|numNS|numAT|numCASH|numCC|numPhone|numSmartCard|numEndTrans|turnover|
+-------------+---+--------------+------------------+-----------+------------------+-----+-----+-------+-----+--------+------------+-----------+--------+
|    01ST ST 0|  4|            08| 3.068888888888889|          9|2946.6666666666665|    9|    0|      0|    4|       3|           2|         12|      -3|
|  01ST ST 200|  4|            13|3.7371428571428575|         21|            5248.0|   19|    2|      4|   11|       4|           2|         23|      -2|
|03RD AVE 1300|  5|            16|1.4833333333333336|         21| 4053.095238095238|   13|    8|     11|    7|       3|           0|         15|       6|
| 03RD ST 4700|  3|            17|0.2833333333333333|          3|232.6666666

In [25]:
schedules = ss.read.csv('../Data/Meter_Rate_Schedules.csv', header=True).withColumnRenamed("Post ID", "POST_ID")
meters = ss.read.csv('../Data/Parking_meters.csv', header=True)

In [26]:
# Change all column names to uppercase
for col in schedules.columns:
    schedules = schedules.withColumnRenamed(col, "_".join(col.split()).upper())

In [27]:
# schedules.write.saveAsTable("Schedules")
transaction_df.select(['STREET_BLOCK', 'POST_ID']).distinct().write.saveAsTable("Transactions")
meters.write.saveAsTable("Meters")

In [None]:
sched_base = ss.sql("""
SELECT POST_ID, AVG_RATE AS RATE, HAS_OVERRIDE FROM (

    SELECT POST_ID, AVG_RATE, CASE WHEN num >= 2 THEN 1 ELSE 0 END HAS_OVERRIDE FROM
    (
        SELECT *,
        count(1) OVER(
        PARTITION BY POST_ID
        ) num,
        AVG(RATE) OVER(
        PARTITION BY POST_ID
        ) AVG_RATE

        FROM Schedules
    )    
)
GROUP BY 1, 2, 3
""")
sched_base.write.saveAsTable('Schedules_Base')

In [None]:
meter_dummy = ss.sql("""
SELECT POST_ID,
RATE,
HAS_OVERRIDE,
CASE WHEN CAP_COLOR = 'Green' THEN 1 ELSE 0 END GREEN,
CASE WHEN CAP_COLOR = 'Grey' THEN 1 ELSE 0 END GREY
FROM
Schedules_Base
join
(
    SELECT * FROM Meters
    WHERE (Meters.CAP_COLOR = 'Green' OR Meters.CAP_COLOR = 'Grey')
)
USING(POST_ID)
""")

In [None]:
block_post = ss.sql("""
    SELECT STREET_BLOCK, POST_ID
    FROM Transactions
    GROUP BY 1,2
    ORDER BY 1
""")

In [None]:
block_post_full = block_post.join(meter_dummy, how='inner', on='POST_ID')

In [None]:
block_post_full.write.saveAsTable("Post_block")

In [None]:
block_info = ss.sql("""
SELECT STREET_BLOCK,
MAX(RATE) RATE,
SUM(HAS_OVERRIDE) NUM_OR,
SUM(GREEN) NUM_GREEN,
SUM(GREY) NUM_GREY
FROM Post_block
GROUP BY 1
""")

In [None]:
block_info.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(outputCol="features", inputCols=block_info.columns[1:]) #except the last col.
block_feat = va.transform(block_info)

In [None]:
from pyspark.ml.clustering import KMeans
kmeans =  KMeans(k = 90, maxIter = 200, tol = 0.1) # k = 10 as there are 10 different handwritten numbers.
model = kmeans.fit(block_feat)

In [None]:
kmeans_df = model.transform(block_feat).select('STREET_BLOCK', 'RATE', 'NUM_OR', 'NUM_GREEN', 'NUM_GREY', 'PREDICTION')\
    .withColumnRenamed('PREDICTION', 'CLUSTER').show()

In [None]:
kmeans_df.show(5)

In [None]:
final_df = wanted_df.join(kmeans_df, "STREET_BLOCK", "inner")

In [None]:
final_df.show(5)

In [None]:
final_df.select("*", final_df.turnover/(final_df.NUM_GREEN + final_df.NUM_GREY))

## modeling

In [None]:
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num",stringOrderType="alphabetDesc")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

stringindex_anotherdf = indexStringColumns(anotherdf, ['STREET_BLOCK'])

In [None]:
from pyspark.ml.feature import OneHotEncoder
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

onehot_anotherdf = oneHotEncodeColumns(stringindex_anotherdf, ['STREET_BLOCK'])

In [None]:
onehot_anotherdf.show(1)

In [None]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["SESSION_START_DT_h", "SESSION_START_DT_dof", "revenue","numNS","numAT","numCASH","numCC","numPhone","numSmartCard","STREET_BLOCK"],
    outputCol="features")

output = assembler.transform(onehot_anotherdf)

# va = VectorAssembler(outputCol="features", inputCols=anotherdf.columns[1:]) #except the last col.
penlpoints = output.select("features", "numOfTransaction")
penlpoints.show(3)

In [30]:
wanted_df.select('turnover').avg()

KeyboardInterrupt: 