In [1]:
# 用以下指令連結spark
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder.appName('instacart').getOrCreate()

In [3]:
# 用SparkSession讀取csv為DataFrame
schema = pyspark.sql.types.StructType([
   StructField("order_id", IntegerType(), True),
   StructField("product_id", IntegerType(), True),])
prior_order = spark.read.csv(header=True, path='./instacart_2017_05_01/order_products__prior.csv', schema=schema)
train = spark.read.csv(header=True, path='./instacart_2017_05_01/order_products__train.csv', schema=schema)

schema = pyspark.sql.types.StructType([
   StructField("order_id", IntegerType(), True),
   StructField("user_id", IntegerType(), True),
   StructField("eval_set", StringType(), True),
])
order = spark.read.csv(header=True, path='./instacart_2017_05_01/orders.csv', schema=schema)

schema = pyspark.sql.types.StructType([
   StructField("product_id", IntegerType(), True),
   StructField("product_name", StringType(), True),
])
product = spark.read.csv(header=True, path='./instacart_2017_05_01/products.csv', schema=schema)

In [None]:
# 觀察資料
print(order.first())
print(prior_order.first())
print(train.first())
print(product.first())

In [None]:
# 觀察資料
prior_order.take(20)

In [None]:
# 呈現表格
prior_order.filter(prior_order['order_id'] == 3).show()

In [None]:
# 計算數量
print(order.select('user_id').distinct().count()) # 計算總訂單表內客戶數量

# 計算總訂單表內不屬於prior表的訂單，表示每個人最後一張訂單都被取出作為test或train
print(order.filter(order['eval_set'] != 'prior').count())

In [None]:
# 找出最多訂單的前幾名客戶

order.groupBy('user_id').count().sort("count", ascending=False).show(15)

In [4]:
# 合併prior和train訂單資料，生成客戶的偏好表

# 把test的訂單去除
preference = order.filter(order['eval_set'] != 'test')

# 合併prior和train訂單資料
prefer_prior = preference.join(prior_order, "order_id", "inner").select("user_id", "order_id", "product_id")
prefer_train = preference.join(train, "order_id", "inner").select("user_id", "order_id", "product_id")
preference = prefer_prior.unionAll(prefer_train)

In [None]:
# 查詢特定客戶購買的產品數量

preference.filter(preference['user_id']=='106510').groupBy("product_id").count().show()

In [5]:
# 整理出客戶購買每個產品的次數

preference = preference.groupby("user_id", "product_id").count()
preference.take(30)

[Row(user_id=160475, product_id=11123, count=7),
 Row(user_id=135004, product_id=29974, count=3),
 Row(user_id=82943, product_id=48398, count=7),
 Row(user_id=157805, product_id=49273, count=2),
 Row(user_id=150317, product_id=26790, count=15),
 Row(user_id=128089, product_id=28204, count=4),
 Row(user_id=181571, product_id=13176, count=3),
 Row(user_id=92678, product_id=32655, count=17),
 Row(user_id=88698, product_id=3896, count=62),
 Row(user_id=8031, product_id=27086, count=2),
 Row(user_id=193965, product_id=33342, count=5),
 Row(user_id=131089, product_id=22035, count=5),
 Row(user_id=869, product_id=29810, count=1),
 Row(user_id=108520, product_id=8277, count=3),
 Row(user_id=194400, product_id=30142, count=9),
 Row(user_id=106483, product_id=13984, count=6),
 Row(user_id=47306, product_id=40229, count=2),
 Row(user_id=46859, product_id=8580, count=4),
 Row(user_id=37498, product_id=7139, count=1),
 Row(user_id=158552, product_id=21479, count=7),
 Row(user_id=73538, product_id=2

In [6]:
# 加上產品名稱

preference = preference.join(product, "product_id")
preference.persist()

DataFrame[product_id: int, user_id: int, count: bigint, product_name: string]

In [11]:
# 重新查詢客戶購買的產品

preference.filter(preference.user_id == 8853).take(30)

[Row(product_id=9076, user_id=8853, count=1, product_name='Blueberries'),
 Row(product_id=33000, user_id=8853, count=1, product_name='Pure Irish Butter'),
 Row(product_id=21137, user_id=8853, count=1, product_name='Organic Strawberries'),
 Row(product_id=21288, user_id=8853, count=1, product_name='Blackberries'),
 Row(product_id=3798, user_id=8853, count=1, product_name='Pink Lady Apples'),
 Row(product_id=49621, user_id=8853, count=1, product_name='Challah Bread'),
 Row(product_id=26405, user_id=8853, count=4, product_name='XL Pick-A-Size Paper Towel Rolls'),
 Row(product_id=5769, user_id=8853, count=1, product_name='Organic Chicken Stock'),
 Row(product_id=23423, user_id=8853, count=1, product_name='Original Hawaiian Sweet Rolls'),
 Row(product_id=41787, user_id=8853, count=1, product_name='Bartlett Pears'),
 Row(product_id=38300, user_id=8853, count=1, product_name='Tall Kitchen Bag With Febreze Odor Shield'),
 Row(product_id=13176, user_id=8853, count=1, product_name='Bag of Organi

In [8]:
# 存成新csv檔

preference.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("prefer_raw.csv")