In [1]:
# vehicle transaction costs project
# This project leverages transaction orders to extract useful information with regards to vehicle transactions costs and created metrics on vehicle transaction costs, particularly official costs for each order

In [2]:
dbutils.fs.mkdirs('/FileStore/tables/snap')

In [3]:
# coding=utf-8
import pyspark
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row 
from pyspark.sql import Column
import pyspark.sql.functions as func

spark = SparkSession \
    .builder \
    .appName("snap_data_challenge") \
    .config("spark.some.config.option", "value") \
    .getOrCreate()
items = spark.read.csv('/FileStore/tables/snap/order_order_items.csv', sep = ",", header='true', inferSchema='true')
orders = spark.read.csv('/FileStore/tables/snap/order_orders.csv', sep = ",", header='true', inferSchema='true')
orders = orders.withColumnRenamed("id", "order_id")
# items = items.withColumn("created_at", items['created_at'].cast(TimestampType())) \
#               .withColumn("updated_at", items['updated_at'].cast(TimestampType())) 
#               .withColumn("order_id", items['order_id'].cast(IntegerType()))
# orders = orders.withColumn("created_at", orders['created_at'].cast(TimestampType())) \
#               .withColumn("updated_at", orders['updated_at'].cast(TimestampType())) 
#               .withColumn("order_id", orders['order_id'].cast(IntegerType())) \
#               .withColumn("application_id", orders['application_id'].cast(IntegerType()))
items.createOrReplaceTempView("items")
orders.createOrReplaceTempView("orders")

In [4]:
orders.dtypes

In [5]:
items.dtypes

In [6]:
# explore orders data
orders.select('order_id', 'application_id', 'created_at').describe().show()

In [7]:
print orders.count()
print orders.select('order_id').distinct().count()
# order id is the unique identifier in orders dataset
print orders.select('application_id').distinct().count()
# one application id has several order ids
print orders.select('application_id', 'created_at').distinct().count()
# 'application_id' and 'created_at' can uniquely identify an order in the data

In [8]:
# no missing values in application_id, created_at or order_id
orders = orders.na.drop(subset = ['application_id'])
orders = orders.na.drop(subset = ['order_id'])
orders = orders.na.drop(subset = ['created_at'])
orders.count()

In [9]:
# extract the most recent order for each applicaiton
latest_order = orders.select('application_id','created_at') \
                      .orderBy('created_at', ascending = False) \
                      .groupby('application_id') \
                      .agg(func.first('created_at').alias('created_at'))
# merge in the order id to prepare for the join with order items data
orders_recent = orders.join(latest_order,  (latest_order['created_at'] == orders['created_at']), 'inner' ).select(orders['order_id'], latest_order['application_id'], latest_order['created_at'])
print orders_recent.count()

In [10]:
print orders_recent.count()
orders_recent.show()

In [11]:
# explore order items data
print items.count()
print items.select('id').distinct().count()
print items.select('order_id').distinct().count()
# drop missing values in order_id
items = items.na.drop(subset = ['order_id'])
print items.count()
items = items.na.drop(subset = ['created_at'])
print items.count()

In [12]:
# item types for each order
items.select('name').distinct().head(50)

In [13]:
%sql
SELECT DISTINCT item_type
FROM items

In [14]:
items = items.select('order_id', 'name', 'item_type', 'price_cents')

In [15]:
items = items.withColumn('name', func.trim(func.lower(items['name']) ) ) \
             .withColumn('item_type', func.trim(func.lower(items['item_type']) ) )

In [16]:
# Reclassify the items into six types: Documentation Fee, Electronic Title, CA Tire, License, registration/transfer related(Smog, Transfer, Registration), and Vehicle 
for string in ['doc', 'title', 'tire', 'license']:    
    items = items.withColumn(string, func.when( items['name'].contains(string), items['price_cents'] ).otherwise(0) )
items = items.withColumn('vehicle', func.when( items['name'] == "vehicle", items['price_cents'] ).otherwise(0) )
items = items.withColumn('reg_trans_smog', func.lit(0))
for string in [ 'reg', 'trans', 'smog', 'reg/trans']:    
    items = items.withColumn('reg_trans_smog', func.when( items['name'].contains(string), items['price_cents'] ).otherwise(items['reg_trans_smog']) )

In [17]:
# data quality check for reg_trans_smog
items.select('reg_trans_smog').show()

In [18]:
# extract the pricing information for each of the above six types
items = items.groupby('order_id').agg(func.sum(items['doc']).alias('doc'), func.sum(items['title']).alias('title'), func.sum(items['tire']).alias('tire'), func.sum(items['license']).alias('license'), func.sum(items['reg_trans_smog']).alias('reg_trans_smog'), func.sum(items['vehicle']).alias('vehicle') )

In [19]:
items = items.withColumn('total_official',items['reg_trans_smog'] + items['tire'] + items['license'])

In [20]:
# only keep the most recent order information for each application by inner joining order_recent dataset
res =  items.join(orders_recent,  orders_recent['order_id'] == items['order_id'], 'inner').select(orders_recent['order_id'], orders_recent['application_id'], orders_recent['created_at'], items['doc'],items['title'], items['tire'], items['license'], items['reg_trans_smog'], items['vehicle'], items['total_official'] )
res = res.orderBy('order_id')

In [21]:
# data quality check
res = res.orderBy('order_id')
res.show()

In [22]:
print res.count()
res.select('reg_trans_smog').describe().show()

In [23]:
# export results
res.createOrReplaceTempView("res")

In [24]:
%sql
SELECT *
FROM res

In [25]:
# test code
# orders.withColumn("first", )
# have row number and only keep the first one by filter
orders = spark.sql("SELECT id, status, person_id, application_id, vehicle_id, first_value(created_at) over(partition by application_id order by created_at desc) as created_at, updated_at, offer_id, dealer_id, vehicle_mileage, contract_package_id, miles_per_year, dealer_person_id, subscription_id FROM orders")


cnt_applicationID_created = orders.groupby('application_id','created_at').count()
# cnt_applicationID_created.show()
# cnt_applicationID_created.orderBy('count', ascending = False).show()
cnt_applicationID_created.orderBy('created_at', ascending = False).show()
# cnt_applicationID_created.count()

# latest_order = orders.select('application_id','created_at', 'order_id') \
#                       .groupby('application_id','created_at') \
#                       .count() \
#                       .orderBy('count', ascending = False) \
#                       .groupby('application_id') \
#                       .agg(func.first('created_at'))

# another way to write latest_order
latest_order = spark.sql("SELECT id, application_id, first_value(created_at) over(partition by application_id order by created_at desc) as created_at, updated_at, offer_id, dealer_id, vehicle_mileage, contract_package_id, miles_per_year, dealer_person_id, subscription_id FROM orders")

udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())
new_df = df.select(*[udf(column).alias(column) for column in df.columns])