In [1]:
#구글 드라이브 연동
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=be13a43a686bddd8d662dcf58307e70ed10236d517a06801827a03f4a6c9c65d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

In [4]:
spark = SparkSession.builder.getOrCreate()

# 전체데이터 로드

In [6]:
import os
os.chdir('/content/drive/MyDrive/ecommerce')

In [7]:
new_df = spark.read.option('header', True).option('inferSchema', True).csv('/content/drive/MyDrive/ecommerce/new.csv')
new = new_df.createOrReplaceTempView('new')

In [8]:
# 컬럼 확인
item = spark.sql("""
SELECT *
FROM new
limit 10;
""")
item.show()

+-------------------+----------+----------+-------------------+-------------+------+-----+-------+------------+
|         event_time|event_type|product_id|        category_id|category_code| brand|price|user_id|user_session|
+-------------------+----------+----------+-------------------+-------------+------+-----+-------+------------+
|2020-01-14 16:15:21|      view|   5865526|1487580008447738866|         NULL|   cnd| 10.0| 465496|           1|
|2020-01-14 16:25:35|      view|   5769989|1487580008447738866|         NULL|   cnd| 10.0| 465496|           2|
|2020-01-14 16:27:31|      view|   5865524|1487580008447738866|         NULL|   cnd| 10.0| 465496|           2|
|2019-12-22 12:50:22|      view|   5746011|1487580009051717646|         NULL|runail|34.92|2963072|          38|
|2019-12-22 12:50:45|      view|   5707747|1487580009051717646|         NULL|  NULL|73.02|2963072|          38|
|2019-12-22 12:50:58|      view|   5746011|1487580009051717646|         NULL|runail|34.92|2963072|      

In [None]:
# 컬럼 확인
result = spark.sql("""
SELECT count(distinct user_id)
FROM new
""")
result.show()

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                1639358|
+-----------------------+



In [None]:
result = spark.sql("""
SELECT count(distinct product_id, user_session)
FROM new
WHERE event_type = 'purchase'
""")
result.show()

+----------------------------------------+
|count(DISTINCT product_id, user_session)|
+----------------------------------------+
|                                 1279833|
+----------------------------------------+



In [None]:
result = spark.sql("""
SELECT count(distinct user_id, user_session)
FROM new
WHERE event_type = 'purchase'
""")
result.show()

+-------------------------------------+
|count(DISTINCT user_id, user_session)|
+-------------------------------------+
|                               157371|
+-------------------------------------+



# 구매된 제품수 확인


In [None]:
# 전체 제품수
item = spark.sql("""
SELECT count(*)
FROM new
WHERE event_type = 'purchase'
""")
item.show()

+--------+
|count(1)|
+--------+
| 1286881|
+--------+



In [None]:
# 가격 구간별 제품 수
item = spark.sql("""
SELECT ( CASE
        WHEN price <= 15 THEN 0
        WHEN 15 < price AND price <= 75 THEN 1
        ELSE 2
        END) price_group
        , count(distinct product_id, user_id) case
        , ROUND(SUM(price), 2) AS sales
FROM new
WHERE event_type = 'purchase'
GROUP BY price_group
ORDER BY case DESC;
""")
item.show()

+-----------+-------+----------+
|price_group|   case|     sales|
+-----------+-------+----------+
|          0|1170478|4538650.63|
|          1|  52003|1375177.48|
|          2|   3390| 438002.18|
+-----------+-------+----------+



In [None]:
# 가격 구간별 제품 수
item = spark.sql("""
SELECT ( CASE
        WHEN price <= 15 THEN 0
        WHEN 15 < price AND price <= 75 THEN 1
        ELSE 2
        END) price_group
        , count(distinct p2roduct_id, user_id) case
FROM new
WHERE event_type = 'purchase'
GROUP BY price_group
ORDER BY case DESC;
""")
item.show()

+-----------+-------+
|price_group|   case|
+-----------+-------+
|          0|1170478|
|          1|  52003|
|          2|   3390|
+-----------+-------+



In [None]:
# 가격 구간별 제품 수
item = spark.sql("""
WITH p_group AS (
  SELECT product_id
       , price
       , CASE
           WHEN price <= 15 THEN 'low'
           WHEN price > 15 AND price <= 75 THEN 'middle'
           ELSE 'high'
         END AS price_group
  FROM new
  WHERE event_type = 'purchase'
)
SELECT price_group
     , ROUND(SUM(price), 2) AS sales
     , COUNT(product_id) AS cnt_group
FROM p_group
GROUP BY price_group
ORDER BY sales DESC NULLS LAST;
""")
item.show()

+-----------+----------+---------+
|price_group|     sales|cnt_group|
+-----------+----------+---------+
|        low|4538650.63|  1227556|
|     middle|1375177.48|    55758|
|       high| 438002.18|     3567|
+-----------+----------+---------+



# 구매주기 확인


In [None]:
# 아무런 조건 없이 1번 이상 구매한 사람들 list
cycle = spark.sql("""
WITH cycle AS (
  SELECT brand
       , product_id
       , user_id
       , MIN(DATE_FORMAT(event_time, 'yyyy-MM-dd')) AS first_purchase
       , MAX(DATE_FORMAT(event_time, 'yyyy-MM-dd')) AS last_purchase
       , COUNT(DISTINCT user_id, user_session) AS cnt_purchase
  FROM new
  WHERE event_type = 'purchase'
  GROUP BY brand, product_id, user_id
  ORDER BY first_purchase
)
SELECT *
FROM (
    SELECT *
          , CASE WHEN DATE_ADD(first_purchase, 1) <= last_purchase THEN 'Y' ELSE 'N' END AS repurchase
          , DATEDIFF(last_purchase, first_purchase) AS inverval
          , CASE WHEN DATEDIFF(last_purchase, first_purchase) = 0 THEN 0
                  ELSE DATEDIFF(last_purchase, first_purchase) / (cnt_purchase - 1) END AS purchase_cycle
    FROM cycle
) sub

""")
cycle.show()

+-----+----------+---------+--------------+-------------+------------+----------+--------+--------------+
|brand|product_id|  user_id|first_purchase|last_purchase|cnt_purchase|repurchase|inverval|purchase_cycle|
+-----+----------+---------+--------------+-------------+------------+----------+--------+--------------+
| null|      8098|555438679|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      6846|538234633|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|     36747|555571205|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|     35344|555448661|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      9139|530495486|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      7527|361249478|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4542|531662481|    2019-10-01|   

In [None]:
cycle.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("/content/drive/MyDrive/cosmetic/all_repurch.csv")

In [None]:
repurch1 = spark.read.csv('/content/drive/MyDrive/cosmetic/all_repurch.csv/part-00000-455c4a15-147b-40c9-b711-ebc04bbe9d66-c000.csv', inferSchema=True, header = True)
repurch1.show()

+-----+----------+---------+--------------+-------------+------------+----------+--------+--------------+
|brand|product_id|  user_id|first_purchase|last_purchase|cnt_purchase|repurchase|inverval|purchase_cycle|
+-----+----------+---------+--------------+-------------+------------+----------+--------+--------------+
| null|      4542|550444466|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4592|550444466|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4642|554832207|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4645|408145588|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4645|554832207|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4664|531408728|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4681|550444466|    2019-10-01|   

In [None]:
#1회성 구매
repurch1.createOrReplaceTempView('repurch1')

result = spark.sql("""
SELECT count(*)
FROM repurch1
where repurchase = 'N'
""")
result.show()

+--------+
|count(1)|
+--------+
| 1189306|
+--------+



In [None]:
#구매자
repurch1.createOrReplaceTempView('repurch1')

result = spark.sql("""
SELECT count(*)
FROM repurch1
""")
result.show()

+--------+
|count(1)|
+--------+
| 1225944|
+--------+



In [None]:
#1회성 구매한 case와 재구매 case수 구하기
repurch1.createOrReplaceTempView('repurch1')

result = spark.sql("""
SELECT COUNT(product_id) AS pur_num,
       COUNT(CASE WHEN repurchase = 'Y' THEN product_id END) AS re_pur_num
FROM repurch1;
""")
result.show()

+-------+----------+
|pur_num|re_pur_num|
+-------+----------+
|1225944|     36638|
+-------+----------+



In [None]:
#1회성 구매한 case와 재구매 case수 구하기
repurch1.createOrReplaceTempView('repurch1')

result = spark.sql("""
SELECT COUNT(distinct user_id) AS pur_num,
       COUNT(distinct CASE WHEN repurchase = 'Y' THEN user_id END) AS re_pur_num
FROM repurch1;
""")
result.show()

+-------+----------+
|pur_num|re_pur_num|
+-------+----------+
| 110518|     10135|
+-------+----------+



In [None]:
repurch.createOrReplaceTempView('repurch')
cycle1 = spark.sql("""
  WITH cycle AS (
  SELECT user_id
       , MIN(DATE_FORMAT(event_time, 'yyyy-MM-dd')) AS first_purchase
       , MAX(DATE_FORMAT(event_time, 'yyyy-MM-dd')) AS last_purchase
       , COUNT(DISTINCT user_id, user_session) AS cnt_purchase
  FROM new
  WHERE event_type = 'purchase'
  GROUP BY user_id
  ORDER BY first_purchase
)
SELECT COUNT(*)
FROM (
    SELECT *
          , CASE WHEN DATE_ADD(first_purchase, 1) <= last_purchase THEN 'Y' ELSE 'N' END AS repurchase
          , DATEDIFF(last_purchase, first_purchase) AS inverval
          , CASE WHEN DATEDIFF(last_purchase, first_purchase) = 0 THEN 0
                  ELSE DATEDIFF(last_purchase, first_purchase) / (cnt_purchase - 1) END AS purchase_cycle
    FROM cycle
    WHERE repurchase = 'Y'
) sub
""")
cycle1.show()

# 가격 구간별 구매주기
- $75 이상인 제품군 관련 로그데이터 로드
- 1단계: (마지막구매 - 첫구매) > 1인 조건에 따라
- 2단계: 재구매자면 'Y', 일회성 구매자면 'N' 라는 값을 갖는 'repurchase' 컬럼 생성
- 3단계: 재구매 한정, 구매 주기 확인  

In [None]:
high_price_cycle = spark.sql("""
WITH cycle AS (
  SELECT brand
       , product_id
       , user_id
       , MIN(DATE_FORMAT(event_time, 'yyyy-MM-dd')) AS first_purchase
       , MAX(DATE_FORMAT(event_time, 'yyyy-MM-dd')) AS last_purchase
       , COUNT(DISTINCT user_id, user_session) AS cnt_purchase
  FROM new
  WHERE event_type = 'purchase'
  AND price > 75
  GROUP BY brand, product_id, user_id
  ORDER BY first_purchase
)
SELECT *
FROM (
    SELECT *
          , CASE WHEN DATE_ADD(first_purchase, 1) <= last_purchase THEN 'Y' ELSE 'N' END AS repurchase
          , DATEDIFF(last_purchase, first_purchase) AS inverval
          , CASE WHEN DATEDIFF(last_purchase, first_purchase) = 0 THEN 0
                  ELSE DATEDIFF(last_purchase, first_purchase) / (cnt_purchase - 1) END AS purchase_cycle
    FROM cycle
) sub

""")

high_price_cycle.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("/content/drive/MyDrive/cosmetic/high_75.csv")

In [None]:
# 가격별 구매리스트 파일 불러오기
repurch = spark.read.csv('/content/drive/MyDrive/cosmetic/all_repurch.csv/part-00000-455c4a15-147b-40c9-b711-ebc04bbe9d66-c000.csv', inferSchema=True, header = True)
repurch.show()
repurch.createOrReplaceTempView('repurch')

+-----+----------+---------+--------------+-------------+------------+----------+--------+--------------+
|brand|product_id|  user_id|first_purchase|last_purchase|cnt_purchase|repurchase|inverval|purchase_cycle|
+-----+----------+---------+--------------+-------------+------------+----------+--------+--------------+
| null|      4542|550444466|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4592|550444466|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4642|554832207|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4645|408145588|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4645|554832207|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4664|531408728|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4681|550444466|    2019-10-01|   

In [None]:
# 가격별 구매리스트 파일 불러오기
# 저가 상품 로그데이터 로드
repurch = spark.read.csv('/content/drive/MyDrive/cosmetic/low_15.csv/part-00000-324d76a0-eb72-4964-bf26-9f4dd69cf47a-c000.csv', inferSchema=True, header = True)
repurch.show()
repurch.createOrReplaceTempView('repurch')

+-----+----------+---------+--------------+-------------+------------+----------+--------+--------------+
|brand|product_id|  user_id|first_purchase|last_purchase|cnt_purchase|repurchase|inverval|purchase_cycle|
+-----+----------+---------+--------------+-------------+------------+----------+--------+--------------+
| null|      4542|550444466|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4592|550444466|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4642|554832207|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4645|408145588|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4645|554832207|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4664|531408728|    2019-10-01|   2019-10-01|           1|         N|       0|           0.0|
| null|      4681|550444466|    2019-10-01|   

In [None]:
# 잘못구한듯,,,, distinct 제외
# 재구매회원
repurch.createOrReplaceTempView('repurch')

result = spark.sql("""
SELECT COUNT(DISTINCT user_id) AS pur_num,
       COUNT(DISTINCT CASE WHEN repurchase = 'Y' THEN user_id END) AS re_pur_num
FROM repurch;
""")
result.show()

+-------+----------+
|pur_num|re_pur_num|
+-------+----------+
|  92587|      7211|
+-------+----------+



In [None]:
# Low군 구매/ 재구매
repurch.createOrReplaceTempView('repurch')

result = spark.sql("""
SELECT COUNT(distinct user_id, product_id) AS pur_num,
       COUNT(distinct CASE WHEN repurchase = 'Y' THEN product_id END) AS re_pur_num
FROM repurch;
""")
result.show()

+-------+----------+
|pur_num|re_pur_num|
+-------+----------+
|1170649|     10084|
+-------+----------+



In [None]:
# 평균 구매주기
result = spark.sql("""
SELECT avg(purchase_cycle)
FROM repurch
WHERE repurchase = 'Y'
""")
result.show()

+-------------------+
|avg(purchase_cycle)|
+-------------------+
|  34.50546095408604|
+-------------------+



In [None]:
# 구간별 회원수
result = spark.sql("""
SELECT  interval_4
		,COUNT(user_id) AS cnt
   FROM  (
		SELECT  *
				,CASE WHEN inverval <= 7 THEN '7일 이내'
					  WHEN inverval <= 14 THEN '14일 이내'
					  WHEN inverval <= 21 THEN '21일 이내'
					  WHEN inverval <= 28 THEN '28일 이내'
					  ELSE '29일 이후' END AS interval_4
		  FROM  repurch
		 WHERE  inverval > 0
		 )AS A
  GROUP BY  interval_4
  ORDER BY cnt desc
""")
result.show()

+----------+---+
|interval_4|cnt|
+----------+---+
| 29일 이후|787|
|  7일 이내|276|
| 21일 이내| 84|
| 28일 이내| 82|
| 14일 이내| 79|
+----------+---+

