# Import Libraries and Load data

In [1]:
import os
import sys
import pandas as pd
from pandas import DataFrame
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import seaborn as sns 
from mpl_toolkits.mplot3d import Axes3D
import math
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
from pyecharts.charts import Bar
from pyecharts.charts import Funnel
from pyecharts import options as opts
InteractiveShell.ast_node_interactivity = "all" 

%matplotlib inline

# pyspark 관련
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
# SparkSession 만들기

spark = SparkSession.builder.appName('eCommerce - Consumer Behavior Analysis').getOrCreate()

In [3]:
spark

In [4]:
# 폰트 설정

from matplotlib import font_manager, rc
import platform

if platform.system() == 'Windows' :
    font_name = font_manager.FontProperties(fname = 'C:\Windows\\Fonts\\NanumBarunGothic.ttf').get_name()
    rc('font', family = font_name)
else :
    rc('font', family = 'AppleGothic')

In [5]:
#commerce_df = spark.read.option('header', 'true').csv('C:\\Users\\u7rye\\Desktop\\e-commerce\\2019-Nov.csv')
commerce_df = spark.read.option('header', 'true').csv('2019-Nov.csv')
print('Data frame type: ' + str(type(commerce_df)))

Data frame type: <class 'pyspark.sql.dataframe.DataFrame'>


In [6]:
commerce_df.cache()

DataFrame[event_time: string, event_type: string, product_id: string, category_id: string, category_code: string, brand: string, price: string, user_id: string, user_session: string]

# Overview of Dataset

In [7]:
print('Data overview')
commerce_df.printSchema()

Data overview
root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



In [8]:
# price double 형변환

commerce_df = commerce_df.withColumn('price', commerce_df.price.cast('double'))
print('Change Data overview')
commerce_df.printSchema()

Change Data overview
root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



In [10]:
print(f'There are total {commerce_df.count()} row, Let print first 5 data rows:')
commerce_df.show(5)

print('Low Data describe:')
commerce_df.describe().toPandas()

There are total 67501979 row, Let print first 5 data rows:
+--------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:...|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:...|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 00:00:...|      view|  17302664|2053013553853497655|                null| creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:...|      view|   3601530|2053013563810775923|appliances.kitche...|    lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 00:00:...|  

Unnamed: 0,summary,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,count,67501979,67501979,67501979.0,67501979.0,45603808,58283744,67501979.0,67501979.0,67501969
1,mean,,,12514064.889882294,2.0578976443190984e+18,,,292.4593165646144,538639745.6296759,
2,stddev,,,17257413.62984622,2.0125490328842856e+16,,,355.67449958606784,22885161.05152206,
3,min,2019-11-01 00:00:00 UTC,cart,100000000.0,2.053013552226108e+18,accessories.bag,a-case,0.0,100963605.0,0000007c-adbf-4ed7-af17-d1fef9763d67
4,max,2019-11-30 23:59:59 UTC,view,9900463.0,2.1877078610380068e+18,stationery.cartrige,zyxel,2574.07,97129396.0,fffffde2-4522-4b44-8a32-510c55739ba1


'category_code', 'brand', 'user_session'에서 null값이 존재한다. 또한 'price'에서 min이 0.0인만큼 확인이 필요하다.

# Detect missing values and abnormal zeroes

In [25]:
# null, price가 0원인 경우도 함께 확인하기

string_columns = ['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'user_id', 'user_session']
numeric_columns = ['price']
missing_values = {}

for index, column in enumerate(commerce_df.columns) :
    if column in string_columns :
        missing_count = commerce_df.filter(col(column).eqNullSafe(None)|col(column).isNull()).count()
        missing_values.update({column:missing_count})
    if column in numeric_columns :
        missing_count = commerce_df.where(col(column).isin([0,None,np.nan])).count()
        missing_values.update({column:missing_count})
        
missing_df = pd.DataFrame.from_dict([missing_values])
missing_df

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,0,0,0,0,21898171,9218235,188088,0,10


In [26]:
missing_df_columns = list(missing_df)
missing_df_values = missing_df.values.tolist()

missing_df_columns
missing_df_values

['event_time',
 'event_type',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id',
 'user_session']

[[0, 0, 0, 0, 21898171, 9218235, 188088, 0, 10]]

In [27]:
# 시각화

bar = (Bar().add_xaxis(['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']) \
       .add_yaxis('null_cnt',[0, 0, 0, 0, 21898171, 9218235, 188088, 0, 10]).set_global_opts(title_opts=opts.TitleOpts(title='Missing Values by Column')))

bar.render_notebook()

## category_code null

In [28]:
commerce_df.filter(col('category_code').isNull()).show()

+--------------------+----------+----------+-------------------+-------------+---------+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|category_code|    brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+-------------+---------+------+---------+--------------------+
|2019-11-01 00:00:...|      view|  17302664|2053013553853497655|         null|    creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:...|      view|  15900065|2053013558190408249|         null|  rondell| 30.86|518574284|5e6ef132-4d7c-473...|
|2019-11-01 00:00:...|      view|  12708937|2053013553559896355|         null| michelin| 72.72|532364121|0a899268-31eb-46d...|
|2019-11-01 00:00:...|      view|  34600011|2060981320581906480|         null|     null| 20.54|512416379|4dfe2c67-e537-4dc...|
|2019-11-01 00:00:...|      view|  24900193|2053013562183385881|         null|     null|  1.09|512651494|f603c8

In [29]:
commerce_df_fill_code = commerce_df.fillna('null')
commerce_df_fill_code.filter(col('category_code') == 'null').show()

+--------------------+----------+----------+-------------------+-------------+---------+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|category_code|    brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+-------------+---------+------+---------+--------------------+
|2019-11-01 00:00:...|      view|  17302664|2053013553853497655|         null|    creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:...|      view|  15900065|2053013558190408249|         null|  rondell| 30.86|518574284|5e6ef132-4d7c-473...|
|2019-11-01 00:00:...|      view|  12708937|2053013553559896355|         null| michelin| 72.72|532364121|0a899268-31eb-46d...|
|2019-11-01 00:00:...|      view|  34600011|2060981320581906480|         null|     null| 20.54|512416379|4dfe2c67-e537-4dc...|
|2019-11-01 00:00:...|      view|  24900193|2053013562183385881|         null|     null|  1.09|512651494|f603c8

In [31]:
commerce_df_fill_code.groupBy(col('category_id')).agg(countDistinct(col('category_code')).alias('cnt_code')).filter(col('cnt_code')>=2).show()

+-----------+--------+
|category_id|cnt_code|
+-----------+--------+
+-----------+--------+



In [34]:
commerce_df = commerce_df_fill_code

## brand null

In [37]:
commerce_df.filter(col('brand') == 'null').show()

+--------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+
|2019-11-01 00:00:...|      view|  17200570|2053013559792632471|furniture.living_...| null|437.33|518780843|aa806835-b14c-45a...|
|2019-11-01 00:00:...|      view|   2701517|2053013563911439225|appliances.kitche...| null|155.11|518427361|c89b0d96-247f-404...|
|2019-11-01 00:00:...|      view|  16700260|2053013559901684381|furniture.kitchen...| null| 31.64|566255262|173d7b72-1db7-463...|
|2019-11-01 00:00:...|      view|  34600011|2060981320581906480|                null| null| 20.54|512416379|4dfe2c67-e537-4dc...|
|2019-11-01 00:00:...|      view|  24900193|2053013562183385881|                null| null

## price 0

In [38]:
commerce_df.filter(col('price') == 0).show()

+--------------------+----------+----------+-------------------+--------------------+-----+-----+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|brand|price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+-----+-----+---------+--------------------+
|2019-11-01 00:38:...|      view|  33100000|2058719826188173878|                null| null|  0.0|546996930|969ea68f-a919-4d3...|
|2019-11-01 00:42:...|      view|  33100000|2058719826188173878|                null| null|  0.0|546996930|b1ab3863-bbf5-437...|
|2019-11-01 01:07:...|      view|  12720812|2053013553559896355|                null| null|  0.0|516269492|9bf68f2a-fd78-4b1...|
|2019-11-01 01:07:...|      view|  12720812|2053013553559896355|                null| null|  0.0|516269492|9bf68f2a-fd78-4b1...|
|2019-11-01 01:26:...|      view|  38900075|2085718636156158307|                null| null|  0.0|

- 같은 product_id를 가지고 있음에도 price가 존재하는 경우, 존재하지 않는 경우가 있다.
- 같은 product_id, category_code를 가진 경우에 price가 다른 경우도 있다. 할인, 이벤트 등을 고려하여 날짜와 함께 보는 것이 좋겠다.

이 경우에 대해서는 'product_id', 'category_id'가 동일한 행이 존재하는지 확인해서 price를 채워주거나 만약 시기별로 할인율 등이 반영되어 가격이 다르다면 어떻게 처리할지 확인해봐야 한다.

In [39]:
commerce_df.groupBy(col('product_id')).agg(countDistinct(col('price')).alias('cnt_price')).filter(col('cnt_price')>=2).show()

+----------+---------+
|product_id|cnt_price|
+----------+---------+
|   3200376|        7|
|   4200507|        3|
|  12708642|        5|
|   1005171|       80|
|  21405316|        2|
|  12500482|        2|
|   1004042|       15|
|  12711438|        3|
|  12300845|        4|
|   5301409|       15|
|  12701113|        2|
|  15200136|       16|
|   7002066|       46|
|  10201260|        4|
|   1004266|       13|
|  19200229|       11|
| 100001427|        2|
|   2402648|        2|
| 100027780|        2|
|  11700020|        2|
+----------+---------+
only showing top 20 rows



한 개의 'product_id'에 여러개의 가격이 존재한다.

## user_session null

In [41]:
commerce_df.filter(col('user_session') == 'null').show()

+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+------------+
|          event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|user_session|
+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+------------+
|2019-11-09 15:32:...|      cart|  19700004|2053013559104766575|                null|kabrita|  37.77|539704497|        null|
|2019-11-09 17:15:...|      cart|   1005083|2053013555631882655|electronics.smart...|  honor| 566.27|568843390|        null|
|2019-11-13 04:02:...|      cart|   4804008|2053013554658804075|electronics.audio...|bluedio|  97.81|570411102|        null|
|2019-11-13 07:18:...|      cart|   1004767|2053013555631882655|electronics.smart...|samsung| 243.51|570878749|        null|
|2019-11-23 12:53:...|      cart|   7600528|2053013552821698803|                null|tp-link|  16.73|575357602|        null|


# Funnel

구매까지의 funnel ( view -> cart -> purchase )

In [56]:
checked_event_type_cnt_all = commerce_df.groupBy(col('event_type')).count().sort(desc('count'))
    
checked_event_type_cnt_all_df = checked_event_type.toPandas()
checked_event_type_cnt_all_df

Unnamed: 0,event_type,count
0,view,63556110
1,cart,3028930
2,purchase,916939


In [57]:
event_type_funnel_cnt_all = checked_event_type_df.values.tolist()
event_type_funnel_cnt_all

[['view', 63556110], ['cart', 3028930], ['purchase', 916939]]

In [58]:
funnel = (Funnel().add('count all', event_type_funnel_cnt_all).set_global_opts(title_opts=opts.TitleOpts(title="Purchase Funnel (all count)")))

funnel.render_notebook()

1. 품목별 - 품목별로 퍼널이 어떻게 되는지 
- category_id별 카운트가 많은 순.
- 품목별로 전환율이 가장 좋은 퍼널은 무엇인지. -> 가장 잘팔리는 물건

2. price별 
- price를 구간대로 나눠서 퍼널이 어떻게 되는지

3. 요일별
- 요일별로 퍼널이 어떻게 되는지

4. 세선별
- 세선별로 그 안에서 퍼널이 얼만큼 이루어지는지



## 품목별 Funnel

In [64]:
checked_event_type_by_category_pivot = commerce_df.groupBy('category_id').pivot('event_type').count()
checked_event_type_by_category_pivot.show()

+-------------------+----+--------+------+
|        category_id|cart|purchase|  view|
+-------------------+----+--------+------+
|2085718636156158307|2076|     577|143711|
|2134905045328592909|  86|      17|  3406|
|2053013556252639687|3592|     879| 92865|
|2137134549706998477|   3|       1|   714|
|2053013561696846593| 171|      44|  9227|
|2152167773222993940|2187|     830|106512|
|2053013555413778833| 766|     299| 24363|
|2151564008329576933|  23|       7|  2105|
|2053013562250494749| 434|      53| 31228|
|2053013552888807671|4321|    1129| 92383|
|2164478103445832281|   2|    null|   435|
|2075962341790319330|  77|      11|  3298|
|2079713983761220055| 331|      75| 14296|
|2070004998778912979|  22|       4|  1622|
|2141229835995840847| 242|      80|  9697|
|2100064858975240628| 515|     175| 13939|
|2053013552603594983| 340|      80| 13554|
|2146430800906682785| 127|      10|  3834|
|2053013566587404305| 172|      33| 11049|
|2053013564968403895| 451|     122| 30807|
+----------

In [67]:
checked_event_type_by_category = checked_event_type_by_category_pivot.select('category_id', 'view', 'cart', 'purchase')

checked_event_type_by_category.show()

+-------------------+------+----+--------+
|        category_id|  view|cart|purchase|
+-------------------+------+----+--------+
|2085718636156158307|143711|2076|     577|
|2134905045328592909|  3406|  86|      17|
|2053013556252639687| 92865|3592|     879|
|2137134549706998477|   714|   3|       1|
|2053013561696846593|  9227| 171|      44|
|2152167773222993940|106512|2187|     830|
|2053013555413778833| 24363| 766|     299|
|2151564008329576933|  2105|  23|       7|
|2053013562250494749| 31228| 434|      53|
|2053013552888807671| 92383|4321|    1129|
|2164478103445832281|   435|   2|    null|
|2075962341790319330|  3298|  77|      11|
|2079713983761220055| 14296| 331|      75|
|2070004998778912979|  1622|  22|       4|
|2141229835995840847|  9697| 242|      80|
|2100064858975240628| 13939| 515|     175|
|2053013552603594983| 13554| 340|      80|
|2146430800906682785|  3834| 127|      10|
|2053013566587404305| 11049| 172|      33|
|2053013564968403895| 30807| 451|     122|
+----------

### 전환율

In [72]:
checked_event_type_by_category = checked_event_type_by_category.withColumn('view_cart_rate', round(col('cart')/col('view') , 3)).withColumn('cart_purchase_rate', round(col('purchase')/col('cart') , 3))

checked_event_type_by_category.show()

+-------------------+------+----+--------+--------------+------------------+
|        category_id|  view|cart|purchase|view_cart_rate|cart_purchase_rate|
+-------------------+------+----+--------+--------------+------------------+
|2085718636156158307|143711|2076|     577|         0.014|             0.278|
|2134905045328592909|  3406|  86|      17|         0.025|             0.198|
|2053013556252639687| 92865|3592|     879|         0.039|             0.245|
|2137134549706998477|   714|   3|       1|         0.004|             0.333|
|2053013561696846593|  9227| 171|      44|         0.019|             0.257|
|2152167773222993940|106512|2187|     830|         0.021|              0.38|
|2053013555413778833| 24363| 766|     299|         0.031|              0.39|
|2151564008329576933|  2105|  23|       7|         0.011|             0.304|
|2053013562250494749| 31228| 434|      53|         0.014|             0.122|
|2053013552888807671| 92383|4321|    1129|         0.047|             0.261|

In [87]:
view_cart_rate_top_10 = checked_event_type_by_category.sort(desc('view_cart_rate')).limit(10)

view_cart_rate_top_10.show()

+-------------------+--------+-------+--------+--------------+------------------+
|        category_id|    view|   cart|purchase|view_cart_rate|cart_purchase_rate|
+-------------------+--------+-------+--------+--------------+------------------+
|2053013552662315243|   36207|   3669|     771|         0.101|              0.21|
|2053013553375346967|  317480|  27948|    8220|         0.088|             0.294|
|2053013553559896355| 2952969| 256794|   67813|         0.087|             0.264|
|2053013556344914381|   29296|   2463|     819|         0.084|             0.333|
|2185524688778691138|      99|      8|       1|         0.081|             0.125|
|2053013554658804075| 1644910| 131133|   40834|          0.08|             0.311|
|2053013555631882655|14832387|1159966|  382647|         0.078|              0.33|
|2053013559071212141|    1432|    110|      27|         0.077|             0.245|
|2053013558978937451|    3699|    283|      59|         0.077|             0.208|
|214948485209155

In [82]:
cart_purchase_rate_top_10 = checked_event_type_by_category.sort(desc('cart_purchase_rate')).limit(10)

cart_purchase_rate_top_10.show()

+-------------------+----+----+--------+--------------+------------------+
|        category_id|view|cart|purchase|view_cart_rate|cart_purchase_rate|
+-------------------+----+----+--------+--------------+------------------+
|2149939352476582010| 555|   1|       5|         0.002|               5.0|
|2175419595697947312| 158|   2|       3|         0.013|               1.5|
|2166064779414732801|  82|   1|       1|         0.012|               1.0|
|2181922923097358794| 134|   1|       1|         0.007|               1.0|
|2187707789055361298| 182|   1|       1|         0.005|               1.0|
|2053013560388223675|  33|   1|       1|          0.03|               1.0|
|2079713978711277851|3709|  18|      12|         0.005|             0.667|
|2134904982346924669| 303|   3|       2|          0.01|             0.667|
|2075962341706433246|1425|   7|       4|         0.005|             0.571|
|2142047355153678986|1076|   7|       4|         0.007|             0.571|
+-------------------+----

cart에 담은 category보다 purchase가 더 많은 품목 존재. view에서 바로 구매가 가능한 것 같다.

In [84]:
category_id_code = commerce_df.select('category_id', 'category_code')

In [88]:
view_cart_rate_top_10 = view_cart_rate_top_10.join(category_id_code, view_cart_rate_top_10.category_id == category_id_code.category_id, 'left')

Py4JJavaError: An error occurred while calling o495.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 222.0 failed 1 times, most recent failure: Lost task 20.0 in stage 222.0 (TID 12546) (7ryean executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.lang.reflect.Array.newInstance(Array.java:75)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2106)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1680)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2142)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1680)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.hasNext(MemoryStore.scala:753)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2250/2019946363.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2252)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2251)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2251)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2490)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3709)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2735)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2735)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2942)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:302)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:339)
	at sun.reflect.GeneratedMethodAccessor164.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.lang.reflect.Array.newInstance(Array.java:75)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2106)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1680)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2142)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1680)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.hasNext(MemoryStore.scala:753)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.executor.Executor$TaskRunner$$Lambda$2250/2019946363.apply(Unknown Source)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)


In [89]:
view_cart_rate_top_10_pd = view_cart_rate_top_10.toPandas()

view_cart_rate_top_10_pd

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:53362)
Traceback (most recent call last):
  File "C:\Users\u7rye\anaconda3\envs\test_pyspark\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] 현재 연결은 원격 호스트에 의해 강제로 끊겼습니다

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\u7rye\anaconda3\envs\test_pyspark\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\u7rye\anaconda3\envs\test_pyspark\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\u7rye\anaconda3\envs\test_pyspark\

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:53362)

- view -> cart -> purchase 의 전환율
- view -> purchase 의 전환율
을 비교하여 보자.

In [76]:
checked_event_type_by_category_session_pivot = commerce_df.groupBy('category_id', 'user_session').pivot('event_type').count()

In [77]:
checked_event_type_by_category_session_pivot.show()

+-------------------+--------------------+----+--------+----+
|        category_id|        user_session|cart|purchase|view|
+-------------------+--------------------+----+--------+----+
|2053013555631882655|3bd7e141-e90a-45f...|null|    null|   2|
|2053013555631882655|556f4f10-ea00-4d3...|null|    null|   4|
|2053013555631882655|b5d98282-9beb-40a...|null|    null|   1|
|2053013555631882655|c7ed7046-a26e-480...|null|    null|   4|
|2053013558920217191|bfe5920a-017c-434...|null|    null|  12|
|2053013560899928785|8bab759d-3f31-417...|null|    null|   1|
|2146660887203676486|b9becb3b-d826-4e3...|null|    null|   2|
|2172371436436455782|1e961854-1f8d-47e...|null|    null|   5|
|2053013558920217191|5a9b7e4e-dd00-448...|null|    null|   4|
|2053013553853497655|d7a842d0-ad3a-462...|null|       1|   1|
|2053013555631882655|a5536303-5e16-449...|null|    null|   9|
|2053013555631882655|cec1fb34-aeb2-4b8...|null|    null|   2|
|2053013556311359947|8ce73e11-9327-441...|null|    null|   3|
|2053013

In [None]:
checked_event_type_by_category_session_pivot = checked_event_type_by_category_session_pivot.select('category_id', 'user_session', 'view', 'cart', 'purchase')

checked_event_type_by_category_session_pivot.show()

In [None]:
bar = (Bar().add_xaxis(funnel_event_type).add_yaxis('a',funnel_event_cnt).set_global_opts(title_opts=opts.TitleOpts(title='퍼널분석')))

bar.render_notebook()

### user_session에 관한 분석

In [11]:
checked_count_user = commerce_df.select(count('user_id'), count('user_session'), \
                                            countDistinct('user_id'), countDistinct('user_session'))

checked_count_user.show()

+--------------+-------------------+-----------------------+----------------------------+
|count(user_id)|count(user_session)|count(DISTINCT user_id)|count(DISTINCT user_session)|
+--------------+-------------------+-----------------------+----------------------------+
|      67501979|           67501969|                3696117|                    13776050|
+--------------+-------------------+-----------------------+----------------------------+



In [21]:
# 'user_session'에 따른 행의 갯수를 파악하고 세부 내역 확인을 통해 관게를 파악

checked_user_relation = commerce_df.select(commerce_df.user_session) \
                        .groupBy(commerce_df.user_session) \
                        .count() \
                        .orderBy("count", ascending=False) \
                        .limit(5)

checked_user_relation.show(truncate=False)

+------------------------------------+-----+
|user_session                        |count|
+------------------------------------+-----+
|d99d91bf-40f8-4e29-9593-54b4a1826542|4128 |
|fc749a4e-c432-4dae-a0a1-04de89f1e4ea|2466 |
|b556f0c7-3a23-44f5-9f34-e713fefa9686|1963 |
|d6433d7b-3846-456a-88de-748c3fac2675|1658 |
|88206fc3-b5ea-4e3b-be68-67edfbf7009b|1373 |
+------------------------------------+-----+



In [40]:
checked_user_relation_detail = commerce_df.select(commerce_df.user_id) \
                            .distinct() \
                            .filter(commerce_df.user_session == 'd99d91bf-40f8-4e29-9593-54b4a1826542') \

checked_user_relation_detail.show(truncate=False)

+---------+
|user_id  |
+---------+
|573277455|
+---------+



'user_id'와 'user_session'이 1:N 관계이다.

그렇다면 세션이 null인 경우는 어떤 경우일지?

In [14]:
# user_session이 null인 경우 확인하기

commerce_df.filter(commerce_df.user_session.isNull()).show()

+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+------------+
|          event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|user_session|
+--------------------+----------+----------+-------------------+--------------------+-------+-------+---------+------------+
|2019-11-09 15:32:...|      cart|  19700004|2053013559104766575|                null|kabrita|  37.77|539704497|        null|
|2019-11-09 17:15:...|      cart|   1005083|2053013555631882655|electronics.smart...|  honor| 566.27|568843390|        null|
|2019-11-13 04:02:...|      cart|   4804008|2053013554658804075|electronics.audio...|bluedio|  97.81|570411102|        null|
|2019-11-13 07:18:...|      cart|   1004767|2053013555631882655|electronics.smart...|samsung| 243.51|570878749|        null|
|2019-11-23 12:53:...|      cart|   7600528|2053013552821698803|                null|tp-link|  16.73|575357602|        null|


모든 경우가 'event_type'이 'cart'인 경우에서 발생했다. 'user_session' 에 대해서는 일정 시간 이상 행동이 없는 경우 종료된다고 나와있기 때문에 아마 선행된 view와 연결된 경우라고 생각된다.

In [None]:
commerce_df.filter(commerce_df.user_id == '539704497').show()

In [16]:
# id, session 관계 확인 - 수정

checked_id_session = commerce_df.select(commerce_df.user_id, commerce_df.user_session) \
    .distinct() \
    .count()

checked_id_session.show()

+---------+-----+
|  user_id|count|
+---------+-----+
|568778435|22542|
|569335945|14810|
|512475445| 6074|
|568793129| 4453|
|567475167| 3617|
+---------+-----+



In [29]:
# id, session 관계 확인

checked_id_session = commerce_df.select(commerce_df.user_id, commerce_df.user_session) \
    .distinct() \
    .groupBy(commerce_df.user_id, commerce_df.user_session) \
    .count() \
    .orderBy("count", ascending=False) \
    .limit(5)
    
checked_id_session.show()

+---------+--------------------+-----+
|  user_id|        user_session|count|
+---------+--------------------+-----+
|520772685|816a59f3-f5ae-4cc...|    1|
|513200477|742aba02-727b-4d1...|    1|
|553802615|e09684bb-0c95-4f6...|    1|
|542346595|75c35801-ce60-44b...|    1|
|558726315|e43aa696-aefc-406...|    1|
+---------+--------------------+-----+



event type 관련 확인하기.

In [15]:
checked_event_type = commerce_df.select(commerce_df.event_type, commerce_df.user_id, commerce_df.user_session) \
    .distinct() \
    .groupBy(commerce_df.event_type) \
    .count() \
    .orderBy("count", ascending=False)
    
checked_event_type_df = checked_event_type.toPandas()
checked_event_type_df

Unnamed: 0,event_type,count
0,view,13767353
1,cart,1743354
2,purchase,773214


In [16]:
checked_event_type_not_distinct = commerce_df.select(commerce_df.event_type, commerce_df.user_id, commerce_df.user_session) \
    .groupBy(commerce_df.event_type) \
    .count() \
    .orderBy("count", ascending=False)
    
checked_event_type_not_distinct_df = checked_event_type_not_distinct.toPandas()
checked_event_type_not_distinct_df

Unnamed: 0,event_type,count
0,view,63556110
1,cart,3028930
2,purchase,916939


In [None]:
# Top 5 workout types

highest_sport_users_df = ranked_sport_users_df.limit(5).toPandas()

# Rename column name : 'count' --> Users count
highest_sport_users_df.rename(columns = {'count':'Users count'}, inplace = True)

# Caculate the total users, we will this result to compute percentage later
total_sports_users = ranked_sport_users_df.groupBy().sum().collect()[0][0]

In [125]:
# 퍼널 차트 그려보기

attr = ["A", "B", "C", "D", "E", "F"]
value = [20, 40, 60, 80, 100, 120]
funnel = Funnel("퍼널 그래프")
funnel.add(
    "퍼널",
    attr,
    value,
    is_label_show=True,
    label_pos="inside",
    label_text_color="#fff",
)
funnel.width=700
funnel.height=500
funnel

NameError: name 'Funnel' is not defined

In [124]:
!pip install pyecharts

Collecting pyecharts
  Downloading pyecharts-1.9.1-py3-none-any.whl (135 kB)
     -------------------------------------- 135.6/135.6 kB 4.0 MB/s eta 0:00:00
Collecting prettytable
  Downloading prettytable-3.4.1-py3-none-any.whl (26 kB)
Collecting simplejson
  Downloading simplejson-3.17.6-cp39-cp39-win_amd64.whl (75 kB)
     ---------------------------------------- 75.8/75.8 kB 4.1 MB/s eta 0:00:00
Installing collected packages: simplejson, prettytable, pyecharts
Successfully installed prettytable-3.4.1 pyecharts-1.9.1 simplejson-3.17.6




In [127]:
from pyecharts import Funnel

ModuleNotFoundError: No module named 'pyecharts'

In [None]:
# event_type 종류 확인

commerce_df.select('event_type').distinct().show()

kaggle data 설명에는 event_type에 'remove_from_cart'가 있었으나 해당 데이터셋에서는 확인되지 않았음.

In [None]:
# evnet_type == remove_from_cart 값 확인

df_event_type_remove_cart = commerce_df.select('*').where(commerce_df.event_type == 'remove_from_cart')
df_event_type_remove_cart.show()

### 전체 데이터셋 시각화

In [None]:
print(commerce_df_pandas.info())

In [None]:
# evnet_type == view 값 확인

df_event_type_view = commerce_df.select('*').where(commerce_df.event_type == 'view')
df_event_type_view.show()

In [None]:
# evnet_type == cart 값 확인

df_event_type_cart = commerce_df.select('*').where(commerce_df.event_type == 'cart')
df_event_type_cart.show()

In [None]:
# evnet_type == purchase 값 확인

df_event_type_purchase = commerce_df.select('*').where(commerce_df.event_type == 'purchase')
df_event_type_purchase.show()

kaggle 데이터 설명에는 event_type에 'remove_from_cart'가 있다고 하였지만 실제로는 확인되지 않음.

In [None]:
df_event_type_purchase.count()

In [None]:
df_event_type_cart.count()

In [None]:
df_event_type_view.count()

### 매출 시각화

In [None]:
sum_sales_by_date = df_sales.groupby('event_date')['price'].sum()
sum_sales_by_date.head()

In [None]:
# 그래프 그리기

sum_sales_by_date.plot()
plt.title("일자별 총 매출")
plt.xlabel("Date")
plt.ylabel("Sales")

plt.show()

In [None]:
# brand null값 확인

commerce_df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in commerce_df.columns)).show()

'category_id', 'category_code', 'brand'의 미싱값이 이상함. id, brand는 있는데 코드는 없거나. id, code, 

In [None]:
# category_code, brand 종류 확인

commerce_df.select("category_code","brand").distinct().show()