### Imports & Setup

In [1]:
import os
import time

from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import *

In [2]:
pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

### Mongo

In [3]:
spark_meters = SparkSession \
    .builder \
    .appName("test_connection") \
    .config("spark.mongodb.input.uri", "mongodb://ec2-18-191-205-220.us-east-2.compute.amazonaws.com/finalProject.meters")\
    .getOrCreate()

df_meters = spark_meters.read.format("com.mongodb.spark.sql.DefaultSource").load()

spark_schedules = SparkSession \
    .builder \
    .appName("test_connection") \
    .config("spark.mongodb.input.uri", "mongodb://ec2-18-191-205-220.us-east-2.compute.amazonaws.com/finalProject.schedules")\
    .getOrCreate()

df_schedules = spark_schedules.read.format("com.mongodb.spark.sql.DefaultSource").load()

spark_transactions = SparkSession \
    .builder \
    .appName("test_connection") \
    .config("spark.mongodb.input.uri", "mongodb://ec2-18-191-205-220.us-east-2.compute.amazonaws.com/finalProject.transactions")\
    .getOrCreate()

df_transactions = spark_transactions.read.format("com.mongodb.spark.sql.DefaultSource").load()

### Exploration of `DF`s

Number of observations in each `df`

```
df_meters: 28997
df_schedules: 60485
df_transactions: 42263968
```

In [4]:
df_meters.persist(StorageLevel.MEMORY_AND_DISK)
df_schedules.persist(StorageLevel.MEMORY_AND_DISK)
df_transactions.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[GROSS_PAID_AMT: double, METER_EVENT_TYPE: string, PAYMENT_TYPE: string, POST_ID: string, SESSION_END_DT: string, SESSION_START_DT: string, STREET_BLOCK: string, _id: struct<oid:string>]

### Processing

In [5]:
start_time = time.time()

df_transactions.distinct() \
               .groupBy('PAYMENT_TYPE') \
               .count() \
               .orderBy(['count', 'PAYMENT_TYPE'], ascending=[True, False]) \
               .show(2)

end_time = time.time()
elapsed = end_time - start_time
print "Elapsed: " + str(elapsed) + " seconds."

+------------+-------+
|PAYMENT_TYPE|  count|
+------------+-------+
|  SMART CARD|1824697|
| PAY BY CELL|4633609|
+------------+-------+
only showing top 2 rows

Elapsed: 205.944715977 seconds.


**`df_meters`**
- Trial 1 (no cacheing) 
    - `4.386 seconds`
- Trial 2 (cached)
    - `5.76 seconds`
    - `1.21 seconds`
**`df_transactions`**
- Trial 1 (no cacheing)
    - `175 seconds`
- Trial 2 (StorageLevel.MEMORY_AND_DISK)
    - `185.9 seconds`
    - `1.40 seconds`
- Trial 3 (different action -- `.distinct()` then `.count()`. Before, just `count()`)
    - `24 seconds`
    - `16.79 seconds`
- Trial 4 (Trial 3, `.distinct()` then `show()`)
    - `10 seconds`
- Trial 5 (`.distinct()` then `.orderBy(['POST_ID', 'SESSION_END_DT'], ascending=[True, False]) \` -- an intensive process
    - `16 seconds`
    - `15 seconds`
- Trial 6 (`.distinct()`, `.groupBy()`, `.count()`, `.orderBy()`, `.show(2)`
    - `16 seconds`

In [6]:
df_meters.cache()

DataFrame[ACTIVESENS: string, CAP_COLOR: string, JURISDICTI: string, LOCATION: string, METER_TYPE: string, MS_ID: string, MS_SPACEID: int, ON_OFF_STR: string, OSP_ID: int, POST_ID: string, RATEAREA: string, SFPARKAREA: string, SMART_METE: string, STREETNAME: string, STREET_NUM: int, STREET_SEG: int, _id: struct<oid:string>]

## Import and reading

In [7]:
import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import *

#Feel free to add other libraries from pyspark

# conf = SparkConf().setAppName(app_name)
sc = SparkContext.getOrCreate()
sc.setLogLevel("OFF")

ss = SparkSession.builder.getOrCreate()

In [8]:
def toTimeSafe(inval):
    try:
        return datetime.strptime(inval, "%d-%b-%y %I.%M.%S %p")
    except ValueError:
        return None

def toFloatSafe(inval):
    try:
        return float(inval)
    except ValueError:
        return None

In [9]:
df_transactions.printSchema()

root
 |-- GROSS_PAID_AMT: double (nullable = true)
 |-- METER_EVENT_TYPE: string (nullable = true)
 |-- PAYMENT_TYPE: string (nullable = true)
 |-- POST_ID: string (nullable = true)
 |-- SESSION_END_DT: string (nullable = true)
 |-- SESSION_START_DT: string (nullable = true)
 |-- STREET_BLOCK: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)



In [10]:
df_transactions.show(5)

+--------------+----------------+------------+---------+--------------------+--------------------+---------------+--------------------+
|GROSS_PAID_AMT|METER_EVENT_TYPE|PAYMENT_TYPE|  POST_ID|      SESSION_END_DT|    SESSION_START_DT|   STREET_BLOCK|                 _id|
+--------------+----------------+------------+---------+--------------------+--------------------+---------------+--------------------+
|          0.35|              NS|        CASH|568-22390|02-MAR-18 10.17.4...|02-MAR-18 10.05.4...|MISSION ST 2200|[5c40f9e517f951ee...|
|           0.1|              NS|        CASH|680-17110|26-FEB-18 05.18.0...|26-FEB-18 05.15.2...|TARAVAL ST 1700|[5c40f9e517f951ee...|
|          1.25|              NS|  SMART CARD|360-05240|15-FEB-18 09.51.5...|15-FEB-18 09.18.3...| CLEMENT ST 500|[5c40f9e517f951ee...|
|          3.75|              NS| CREDIT CARD|321-03100|06-JUN-18 12.59.5...|06-JUN-18 11.07.2...|  BALBOA ST 300|[5c40f9e517f951ee...|
|          1.12|              AT| PAY BY CELL|46

In [11]:
from datetime import datetime
from pyspark.sql.functions import udf
func =  udf (lambda x: datetime.strptime(x.lower(), '%d-%b-%y %H.%M.%S %p'), TimestampType())

In [12]:
df_transactions = df_transactions.withColumn('SESSION_START_DT', func(col('SESSION_START_DT')))
df_transactions = df_transactions.withColumn('SESSION_END_DT', func(col('SESSION_END_DT')))

In [13]:
df_transactions.drop('_id').show(5)

+--------------+----------------+------------+---------+-------------------+-------------------+---------------+
|GROSS_PAID_AMT|METER_EVENT_TYPE|PAYMENT_TYPE|  POST_ID|     SESSION_END_DT|   SESSION_START_DT|   STREET_BLOCK|
+--------------+----------------+------------+---------+-------------------+-------------------+---------------+
|          0.35|              NS|        CASH|568-22390|2018-03-02 10:17:46|2018-03-02 10:05:46|MISSION ST 2200|
|           0.1|              NS|        CASH|680-17110|2018-02-26 05:18:08|2018-02-26 05:15:28|TARAVAL ST 1700|
|          1.25|              NS|  SMART CARD|360-05240|2018-02-15 09:51:57|2018-02-15 09:18:37| CLEMENT ST 500|
|          3.75|              NS| CREDIT CARD|321-03100|2018-06-06 12:59:52|2018-06-06 11:07:22|  BALBOA ST 300|
|          1.12|              AT| PAY BY CELL|462-05360|2018-01-20 02:20:45|2018-01-20 01:50:45|  HAIGHT ST 500|
+--------------+----------------+------------+---------+-------------------+-------------------+

In [15]:
revenue_df = df_transactions.drop('_id')

In [16]:
from pyspark.sql.functions import unix_timestamp, from_unixtime, date_format, col

revenue_df = revenue_df.withColumn('starttime_date',date_format('SESSION_START_DT','yyyy-M-d'))
revenue_df = revenue_df.withColumn('starttime_hour',date_format('SESSION_START_DT','HH').cast('string'))
revenue_df = revenue_df.withColumn('dow',date_format('SESSION_START_DT','u').cast('string'))
revenue_df = revenue_df.withColumn('endtime_hour',date_format('SESSION_END_DT','HH').cast('string'))
revenue_df = revenue_df.withColumn('timeDiff', unix_timestamp('SESSION_END_DT')- unix_timestamp('SESSION_START_DT'))
revenue_df.drop('SESSION_START_DT','SESSION_END_DT').show()

+--------------+----------------+------------+---------+--------------------+--------------+--------------+---+------------+--------+
|GROSS_PAID_AMT|METER_EVENT_TYPE|PAYMENT_TYPE|  POST_ID|        STREET_BLOCK|starttime_date|starttime_hour|dow|endtime_hour|timeDiff|
+--------------+----------------+------------+---------+--------------------+--------------+--------------+---+------------+--------+
|          0.35|              NS|        CASH|568-22390|     MISSION ST 2200|      2018-3-2|            10|  5|          10|     720|
|           0.1|              NS|        CASH|680-17110|     TARAVAL ST 1700|     2018-2-26|            05|  1|          05|     160|
|          1.25|              NS|  SMART CARD|360-05240|      CLEMENT ST 500|     2018-2-15|            09|  4|          09|    2000|
|          3.75|              NS| CREDIT CARD|321-03100|       BALBOA ST 300|      2018-6-6|            11|  3|          12|    6750|
|          1.12|              AT| PAY BY CELL|462-05360|      

In [17]:
wanted_df = revenue_df.groupBy("STREET_BLOCK","dow","starttime_hour")\
                      .agg(avg('GROSS_PAID_AMT').alias('avgRevenue'), 
                      count('starttime_hour').alias('numNewTrans'),
                      avg('timeDiff').alias('avgDur'),
                      count(when(col('METER_EVENT_TYPE')=='NS', True)).alias('numNS'),
                      count(when(col('METER_EVENT_TYPE')=='AT', True)).alias('numAT'),
                      count(when(col('PAYMENT_TYPE')=='CASH', True)).alias('numCASH'),
                      count(when(col('PAYMENT_TYPE')=='CREDIT CARD', True)).alias('numCC'),
                      count(when(col('PAYMENT_TYPE')=='PAY BY CELL', True)).alias('numPhone'),
                      count(when(col('PAYMENT_TYPE')=='SMART CARD', True)).alias('numSmartCard'))

In [18]:
wanted_df.show(2)

+--------------------+---+--------------+------------------+-----------+-------------------+-----+-----+-------+-----+--------+------------+
|        STREET_BLOCK|dow|starttime_hour|        avgRevenue|numNewTrans|             avgDur|numNS|numAT|numCASH|numCC|numPhone|numSmartCard|
+--------------------+---+--------------+------------------+-----------+-------------------+-----+-----+-------+-----+--------+------------+
|        BERRY ST 400|  1|            12|1.1257715430861723|        499|-24278.096192384768|  416|   83|    190|  196|      94|          19|
|SOUTH VAN NESS AVE 0|  4|            03|1.9535009671179882|        517|  2503.413926499033|  416|  101|    203|  253|      44|          17|
+--------------------+---+--------------+------------------+-----------+-------------------+-----+-----+-------+-----+--------+------------+
only showing top 2 rows



In [19]:
wanted_df_derv = revenue_df.groupBy("STREET_BLOCK","dow","endtime_hour").count()
wanted_df_derv = wanted_df_derv.withColumnRenamed('STREET_BLOCK', 'STREET_BLOCK_derv')
wanted_df_derv = wanted_df_derv.withColumnRenamed('dow', 'dow_derv')

In [20]:
wanted_df = wanted_df.join(wanted_df_derv, 
                           (wanted_df.STREET_BLOCK == wanted_df_derv.STREET_BLOCK_derv) &
                           (wanted_df.dow == wanted_df_derv.dow_derv) &
                           (wanted_df.starttime_hour == wanted_df_derv.endtime_hour)).drop('endtime_hour')

In [21]:
wanted_df = wanted_df.withColumnRenamed('count','numEndTrans')
wanted_df = wanted_df.withColumn('turnover', wanted_df['numNewTrans'] - wanted_df['numEndTrans'])

In [22]:
wanted_df = wanted_df.drop('STREET_BLOCK_derv','dow_derv')
wanted_df.drop('block','avgRevenue','avgDur').show()

+-------------+---+--------------+-----------+-----+-----+-------+-----+--------+------------+-----------+--------+
| STREET_BLOCK|dow|starttime_hour|numNewTrans|numNS|numAT|numCASH|numCC|numPhone|numSmartCard|numEndTrans|turnover|
+-------------+---+--------------+-----------+-----+-----+-------+-----+--------+------------+-----------+--------+
|    01ST ST 0|  4|            08|        193|  160|   33|     46|   60|      28|          59|        259|     -66|
|  01ST ST 200|  3|            02|        281|  247|   34|     47|  145|      52|          37|        535|    -254|
|  01ST ST 200|  4|            04|        277|  253|   24|     61|  192|      12|          12|        112|     165|
|    02ND ST 0|  2|            08|        319|  315|    4|      3|  218|      75|          23|         62|     257|
|  02ND ST 300|  6|            05|        500|  405|   95|    115|  294|      88|           3|        492|       8|
|03RD AVE 1300|  4|            03|        233|  184|   49|     74|  127|

In [23]:
df_schedules = df_schedules.withColumnRenamed("Post ID", "POST_ID")

In [24]:
for col in df_schedules.columns:
    df_schedules = df_schedules.withColumnRenamed(col, "_".join(col.split()).upper())

In [25]:
df_transactions.select(['STREET_BLOCK', 'POST_ID']).distinct().write.saveAsTable("Transactions")

AnalysisException: u'Table `Transactions` already exists.;'

In [None]:
df_meters.write.saveAsTable("Meters")

In [None]:
df_schedules.write.saveAsTable("Schedules")

In [None]:
sched_base = ss.sql("""
SELECT POST_ID, AVG_RATE AS RATE, HAS_OVERRIDE FROM (

    SELECT POST_ID, AVG_RATE, CASE WHEN num >= 2 THEN 1 ELSE 0 END HAS_OVERRIDE FROM
    (
        SELECT *,
        count(1) OVER(
        PARTITION BY POST_ID
        ) num,
        AVG(RATE) OVER(
        PARTITION BY POST_ID
        ) AVG_RATE

        FROM Schedules
    )    
)
GROUP BY 1, 2, 3
""")
sched_base.write.saveAsTable('Schedules_Base')

In [None]:
meter_dummy = ss.sql("""
SELECT POST_ID,
RATE,
HAS_OVERRIDE,
CASE WHEN CAP_COLOR = 'Green' THEN 1 ELSE 0 END GREEN,
CASE WHEN CAP_COLOR = 'Grey' THEN 1 ELSE 0 END GREY
FROM
Schedules_Base
join
(
    SELECT * FROM Meters
    WHERE (Meters.CAP_COLOR = 'Green' OR Meters.CAP_COLOR = 'Grey')
)
USING(POST_ID)
""")

In [None]:
block_post = ss.sql("""
    SELECT STREET_BLOCK, POST_ID
    FROM Transactions
    GROUP BY 1,2
    ORDER BY 1
""")

In [None]:
block_post_full = block_post.join(meter_dummy, how='inner', on='POST_ID')

In [None]:
block_post_full.write.saveAsTable("Post_block")

In [None]:
block_info = ss.sql("""
SELECT STREET_BLOCK,
MAX(RATE) RATE,
SUM(HAS_OVERRIDE) NUM_OR,
SUM(GREEN) NUM_GREEN,
SUM(GREY) NUM_GREY
FROM Post_block
GROUP BY 1
""")

In [None]:
block_info.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(outputCol="features", inputCols=block_info.columns[1:]) #except the last col.
block_feat = va.transform(block_info)

In [None]:
from pyspark.ml.clustering import KMeans
kmeans =  KMeans(k = 90, maxIter = 200, tol = 0.1) # k = 10 as there are 10 different handwritten numbers.
model = kmeans.fit(block_feat)

In [None]:
kmeans_df = model.transform(block_feat).select('STREET_BLOCK', 'RATE', 'NUM_OR', 'NUM_GREEN', 'NUM_GREY', 'PREDICTION')\
    .withColumnRenamed('PREDICTION', 'CLUSTER')

In [None]:
kmeans_df.show(5)

In [None]:
final_df = wanted_df.join(kmeans_df, "STREET_BLOCK", "inner")

In [None]:
final_df.show(5)

In [None]:
finalal_df = final_df.drop("STREET_BLOCK")

In [None]:
finalal_df.show(5)

In [None]:
finalal_df.printSchema()

## Modeling

In [None]:
from pyspark.ml.feature import StringIndexer

def indexStringColumns(df, cols):
    #variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        #For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num",stringOrderType="alphabetDesc")
        sm = si.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        #and then drops the original columns.
        #and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

stringindex_anotherdf = indexStringColumns(finalal_df, ['CLUSTER','dow','starttime_hour'])

In [None]:
from pyspark.ml.feature import OneHotEncoder
def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        #For each given colum, create OneHotEncoder. 
        #dropLast : Whether to drop the last category in the encoded vector (default: true)
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

onehot_anotherdf = oneHotEncodeColumns(stringindex_anotherdf, ['CLUSTER','dow','starttime_hour'])

In [None]:
onehot_anotherdf.show(1)

In [None]:
onehot_anotherdf.printSchema()

In [None]:
# Merging the data with Vector Assembler.
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["dow","starttime_hour","avgRevenue","numNewTrans","avgDur","numNS","numAT","numCASH","numCC","numPhone","numSmartCard","numEndTrans","RATE","NUM_OR","NUM_GREEN","NUM_GREY","CLUSTER"],
    outputCol="features")

output = assembler.transform(onehot_anotherdf)

# va = VectorAssembler(outputCol="features", inputCols=anotherdf.columns[1:]) #except the last col.
penlpoints = output.select("features", "turnover")
penlpoints.show(3)

In [None]:
splits = penlpoints.randomSplit([0.8, 0.2])

#cache() : the algorithm is interative and training and data sets are going to be reused many times.
train = splits[0].cache()
test = splits[1].cache()

In [None]:
# Train the data.
from pyspark.ml.regression import DecisionTreeRegressor
# Paramenters
#maxDepth : maximum tree depth (default : 5).
#maxBins : maximum number of bins when binning continuous features (default : 32).
#minInstancesPerNode : minimum number of dataset samples each branch needs to have after a split (default : 1).
#minInfoGain : minimum information gain for a split (default : 0).
dt = DecisionTreeRegressor(labelCol="turnover")
dtmodel = dt.fit(train)

In [None]:
#Test data.
dtpredicts = dtmodel.transform(test)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="turnover", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(dtpredicts)
print(rmse)