In [1]:
sc

<pyspark.context.SparkContext at 0x2aeffbfdc890>

In [2]:
# Define functions to parse txt files containing items, stores, customers, and transactions
from pyspark.sql import Row
from datetime import datetime
from pyspark.sql import functions as F
def parseStore(s):
    l=s.split('|')
    return Row(store_num=int(l[0]), 
               store_name=l[1],               
               store_zone=l[2],
               store_city=l[3], 
               store_state=l[4], 
               store_type=int(l[5]))
def parseItem(s):
    l=s.split('|')
    return Row(item_number=int(l[0]), 
               dept_categ_class=l[1],               
               item_des=l[2],
               item_unt_qty=float(l[3]), 
               size_unit_desc=l[4], 
               brand_code=l[5], 
               dept_num=int(l[6]), 
               dept_name=l[7], 
               categ_num=int(l[8]), 
               categ_name=l[9], 
               class_num=int(l[10]),
               class_name=l[11])
def parseCustomer(s):
    l=s.split('|')
    return Row(hshld_acct=int(l[0]),
               birth_yr_head_hh=l[1],
               hh_income=l[2],
               hh_size=l[3],
               adult_count=l[4],
               child_count=l[5],
               birth_yr_oldest=l[6],
               birth_yr_youngest=l[7],
               bad_address=l[8],
               privacy=l[9],
               application_date=datetime.strptime(l[10],'%Y-%m-%d'),
               wine_email_sent=int(l[11]),
               wine_email_open=int(l[12]),
               wine_email_click=int(l[13]))
def parsePostrans(s):
    l=s.split('|')
    return Row(hshld_acct=int(l[0]),
               acct_num=int(l[1]),
               trans_num=int(l[2]),
               trans_date=datetime.strptime(l[3],'%Y-%m-%d'),
               store_num=int(l[4]),
               item_number=int(l[5]),
               dept_categ_class=l[6],
               unit_count=int(l[7]),
               net_sales=float(l[8]),
               gross_sales=float(l[9]),
               manuf_coupon=float(l[10]))

In [3]:
#Read in text files
path='/public/jcarrol5/data/Wegmans/'
storeRDD=sc.textFile(path+'wegmans_store_master.txt')
itemRDD=sc.textFile(path+'wegmans_item_master.txt')
customerRDD=sc.textFile(path+'wegmans_customer_master.txt')
postransRDD=sc.textFile(path+'partial_transaction.dat')

In [4]:
#Create Row RDD's
storeRowRDD=storeRDD.map(parseStore)
itemRowRDD=itemRDD.map(parseItem)
customerRowRDD=customerRDD.map(parseCustomer)
postransRowRDD=postransRDD.map(parsePostrans)

In [5]:
#Create Data Frames
storeDF=sqlContext.createDataFrame(storeRowRDD)
itemDF=sqlContext.createDataFrame(itemRowRDD)
customerDF=sqlContext.createDataFrame(customerRowRDD)
postransDF=sqlContext.createDataFrame(postransRowRDD)

In [6]:
postransDF.show(5)

+--------+----------------+-----------+----------+-----------+------------+---------+---------+--------------------+---------+----------+
|acct_num|dept_categ_class|gross_sales|hshld_acct|item_number|manuf_coupon|net_sales|store_num|          trans_date|trans_num|unit_count|
+--------+----------------+-----------+----------+-----------+------------+---------+---------+--------------------+---------+----------+
|     559|          010327|       1.99|       559|      16705|         0.0|     1.99|       64|2013-06-01 00:00:...|   174758|         1|
|     559|          010640|       2.29|       559|      85189|         0.0|     2.29|       64|2013-06-01 00:00:...|   174758|         1|
|     559|          021410|       5.99|       559|      24159|         0.0|     5.99|       64|2013-06-01 00:00:...|   174758|         1|
|     559|          022036|       3.49|       559|      33378|         0.0|     3.49|       64|2013-06-01 00:00:...|   174758|         1|
|     559|          022036|       

In [7]:
# for the first quesiton
# combine gross_sales with same transcation number
trans_rev=postransDF \
    .groupBy('trans_num') \
    .agg(F.sum('gross_sales').alias('total')) \
    .withColumnRenamed("trans_num", "new_trans_num")
trans_rev.cache()
trans_rev.show(5)

+-------------+------------------+
|new_trans_num|             total|
+-------------+------------------+
|     36122148|61.120000000000005|
|     46382669|43.870000000000005|
|     24061893|23.509999999999998|
|    131539883|             21.07|
|      3401555|188.72999999999996|
+-------------+------------------+
only showing top 5 rows



In [10]:
# for the first question (method 1)
trans_rev.select(F.max('total')).show()
trans_rev.select(F.min('total')).show()
trans_rev.select(F.avg('total')).show()

+----------+
|max(total)|
+----------+
|   1746.08|
+----------+

+----------+
|min(total)|
+----------+
|   -330.09|
+----------+

+-----------------+
|       avg(total)|
+-----------------+
|50.65479100251678|
+-----------------+



In [11]:
# for the first question (method 2)
max_trans_rev = trans_rev.sort('total',ascending=False)
max_trans_rev.show(1)
min_trans_rev = trans_rev.sort('total',ascending=True)
min_trans_rev.show(1)
trans_rev.select(F.avg('total')).show()
# The result is that max is 1746.08, min is -330.09, and the average is 50.65

+-------------+-------+
|new_trans_num|  total|
+-------------+-------+
|     23315450|1746.08|
+-------------+-------+
only showing top 1 row

+-------------+-------+
|new_trans_num|  total|
+-------------+-------+
|   2133235197|-330.09|
+-------------+-------+
only showing top 1 row

+-----------------+
|       avg(total)|
+-----------------+
|50.65479100251678|
+-----------------+



In [12]:
# for the second quesiton
# combine gross_sales by trans_num and store_num
trans_rev_withstore = postransDF.select('store_num','trans_num','gross_sales')\
    .groupBy('trans_num','store_num')\
    .agg(F.sum('gross_sales').alias('total'))   
trans_rev_withstore.show(20) 

+----------+---------+------------------+
| trans_num|store_num|             total|
+----------+---------+------------------+
|  31726186|        3|             56.21|
|  41151856|       63|             71.41|
| 146113579|       25|126.18999999999994|
|  96284562|       19|45.010000000000005|
|  27437815|       25|            153.69|
| 135300280|       64|             25.75|
|2145152018|       24|13.150000000000002|
|  84340118|       24|55.430000000000014|
|  98796177|       25|             117.1|
| 101437413|       25|             121.0|
|2137293893|       24| 95.55999999999999|
|  26932643|       25|-1.790000000000001|
|   5371800|       25|             90.04|
| 142020646|       81|             68.96|
|2144686640|       24|33.629999999999995|
|  88210115|       81|             45.96|
| 109897628|        4|             12.69|
|  43336551|       24|140.37000000000003|
|   2200409|       81|             10.99|
|  52714342|       18|14.379999999999999|
+----------+---------+------------

In [14]:
# for the second quesiton
average_trans_store = trans_rev_withstore.groupBy('store_num').agg(F.avg('total').alias('average'))
average_trans_store.cache()
average_trans_store.show(50)
# the result of average amount spent per transaction for each store is showing below

+---------+------------------+
|store_num|           average|
+---------+------------------+
|       26| 63.31999999999998|
|       65| 39.70462025316455|
|       19|39.743137977227036|
|       22|49.345414937759315|
|       34|25.081875000000004|
|       84|         43.669375|
|       31|17.246000000000002|
|       39|28.879090909090905|
|       25| 64.72486446616263|
|       71| 31.75280701754387|
|       68|  45.4400885668277|
|        6| 66.28145695364238|
|       87|36.559411764705885|
|       63| 54.10971262341327|
|       51|36.441538461538464|
|       17|43.028749999999995|
|       33|41.105714285714285|
|       88| 56.06238095238096|
|        1|28.467916666666664|
|       89|           25.4556|
|       67|51.780608734009675|
|        3|46.872894115663456|
|       37|          31.19375|
|       83|28.361639344262297|
|       12| 46.86992916934965|
|       74| 65.98292355371898|
|       62|39.638707328174796|
|       11| 63.87691033138401|
|       35|             63.08|
|       