# IMPORT LIBRARY

In [17]:
import faulthandler 
from pyspark.sql import SparkSession 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType, DateType, FloatType, TimestampType
from pyspark.sql.functions import col, array_contains, isnan, when, count
from pyspark.sql.functions import lit, concat_ws, concat, collect_list, udf
from pyspark.sql.functions import countDistinct
import plotly.express as px
from sklearn.preprocessing import RobustScaler
import pandas as pd
import os
import seaborn as sns
import matplotlib.pyplot as plt

from tqdm import tqdm


# spark 접속

In [2]:

faulthandler.enable()   
spark = SparkSession.builder.master('local').appName("Python Spark SQL Practice").config("spark.driver.maxResultSize", "64g").getOrCreate()

# DATA LOAD

In [3]:
def search(dirname):
    filenames = os.listdir(dirname)
    file_list = []
    for filename in filenames:
        full_filename = os.path.join(dirname, filename)
        print(full_filename)
        file_list.append(full_filename)
    return file_list
data_path_list = search("D:/workspace/data/")

D:/workspace/data/2019-Dec.csv
D:/workspace/data/2019-Nov.csv
D:/workspace/data/2019-Oct.csv
D:/workspace/data/2020-Apr.csv
D:/workspace/data/2020-Feb.csv
D:/workspace/data/2020-Jan.csv
D:/workspace/data/2020-Mar.csv


## 스키마 타입

In [4]:
schema = StructType() \
      .add("event_time",TimestampType(),True) \
      .add("event_type",StringType(),True) \
      .add("product_id",StringType(),True) \
      .add("category_id",StringType(),True) \
      .add("category_code",StringType(),True) \
      .add("brand",StringType(),True) \
      .add("price",DoubleType(),True) \
      .add("user_id",StringType(),True) \
      .add("user_session",StringType(),True)

## data merge
    - 2019년 10월 ~ 2020년 4월

In [5]:
for i, x in enumerate(data_path_list):
    file_path = x
    df = spark.read.format("csv") \
      .option("header", True) \
      .option('delimiter', ',') \
      .schema(schema) \
      .load(file_path)
    if i == 0:
        merged_df = df
    else:
        merged_df = merged_df.union(df)

In [6]:
merged_df.show()

+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
|2019-12-01 09:00:00|      view|   1005105|2232732093077520756|construction.tool...|  apple|1302.48|556695836|ca5eefc5-11f9-450...|
|2019-12-01 09:00:00|      view|  22700068|2232732091643068746|                NULL|  force| 102.96|577702456|de33debe-c7bf-44e...|
|2019-12-01 09:00:01|      view|   2402273|2232732100769874463|appliances.person...|  bosch| 313.52|539453785|5ee185a7-0689-4a3...|
|2019-12-01 09:00:02|  purchase|  26400248|2053013553056579841|computers.periphe...|   NULL| 132.31|535135317|61792a26-672f-4e6...|
|2019-12-01 09:00:02|      view|  20100164|2232732110089618156|    apparel.t

## spark table 생성

In [7]:
merged_df.createOrReplaceTempView("ecommerce")

# 1. ACQUISITION(고객 유치) 

## 1) DAU (Daily Active User)

In [8]:
dau = spark.sql("""
SELECT DATE(event_time) AS event_date, COUNT(DISTINCT user_id) AS DAU
FROM ecommerce
GROUP BY event_date
ORDER BY event_date
""")

In [9]:
dau_df = dau.toPandas()

In [10]:
dau_df.to_csv("dau.csv")

### DAU 그래프

## 2) MAU(Monthly Active User)

### MAU 쿼리

In [11]:
mau = spark.sql("""
SELECT DATE_FORMAT(event_time, 'yyyy-MM') AS event_month, COUNT(DISTINCT user_id) AS MAU
FROM ecommerce
GROUP BY event_month
ORDER BY event_month
""")

In [12]:
mau_df = mau.toPandas()

In [13]:
mau_df.to_csv('mau.csv')

### MAU 그래프

# 2. ACTIVATION(활성화)

## 1) DT(Duration Time, 체류 시간)
    - DT가 1일 이하

### DT 쿼리

In [18]:
duration_under_1day = spark.sql("""WITH paying_sessions AS (
    SELECT DISTINCT user_session
    FROM ecommerce
    WHERE event_type = 'purchase'
), 

session_time AS (
    SELECT e.user_session, 
           MAX(e.event_time) AS max_session_time,
           MIN(e.event_time) AS min_session_time
    FROM ecommerce e
    JOIN paying_sessions p ON e.user_session = p.user_session
    GROUP BY e.user_session
)

SELECT s.user_session, 
       (UNIX_TIMESTAMP(s.max_session_time) - UNIX_TIMESTAMP(s.min_session_time)) AS duration_seconds
FROM session_time s
WHERE (UNIX_TIMESTAMP(s.max_session_time) - UNIX_TIMESTAMP(s.min_session_time)) < 86400""")

In [19]:
duration_under_1day_df = duration_under_1day.toPandas()

In [20]:
duration_under_1day_df.describe()

Unnamed: 0,duration_seconds
count,5267605.0
mean,624.6561
std,2153.458
min,0.0
25%,121.0
50%,251.0
75%,554.0
max,86385.0


In [22]:
total_session_num_df = spark.sql("""SELECT COUNT(*) AS CNT
                              FROM ecommerce
                              GROUP BY user_session
                              """).toPandas()


Py4JJavaError: An error occurred while calling o154.collectToPython.
: java.lang.OutOfMemoryError: Java heap space


In [None]:
print(f"전체 session 중 1일 이하의 duration time을 갖고 있는 session의 비율은 : {len(duration_under_1day_df)/total_session_num_df.iloc[0,0]}")

## 카테고리별 전환율

In [None]:
main_category_df = spark.sql("""SELECT DISTINCT (SUBSTRING_INDEX(category_code, '.', 1)) AS cnt 
          FROM ecommerce""").toPandas()

In [None]:
category_list = main_category_df.values.tolist()

In [None]:
category_dataframes = []
for x in tqdm(category_list):
     funnel_per_category = spark.sql(f"""
                    SELECT 
                         event_type, COUNT(*) AS CNT
                    FROM 
                         ecommerce
                    WHERE
                         SUBSTRING_INDEX(category_code, '.', 1) = '{x[0]}'
                         
                    GROUP BY 
                         event_type
                                   
                    ORDER BY
                         CNT DESC
                         
                    
     """)
     pdf = funnel_per_category.toPandas()
     pdf["category_code"] = x[0]
     category_dataframes.append(pdf)


In [None]:
df_funnel_per_category = pd.concat(category_dataframes, ignore_index=True)

In [None]:
df_funnel_per_category

## 카테고리별 구매 주기

In [None]:

category_list = [['medicine'],
 ['computers'],
 ['auto'],
 ['stationery'],
 ['sport'],
 ['apparel'],
 ['appliances'],
 ['country_yard'],
 ['furniture'],
 ['accessories'],
 ['kids'],
 ['electronics'],
 ['construction'],
 [None]]

category_dataframes = []

for x in tqdm(category_list):
     purcahse_cycle_per_category = spark.sql(f"""
                                                WITH third AS(
                                                            WITH second AS(
                                                                        WITH first AS(
                                                                                    SELECT 
                                                                                          user_id,
                                                                                          event_time
                                                                                    FROM ecommerce
                                                                                    WHERE SUBSTRING_INDEX(category_code, '.', 1) = '{x[0]}'
                                                                                    AND event_type = 'purchase'
                                                                                    )
                                                                        SELECT 
                                                                              user_id,
                                                                              event_time AS order_date,
                                                                              LAG(event_time) OVER (
                                                                                                      PARTITION BY user_id 
                                                                                                      ORDER BY event_time) AS prev_order_date      
                                                                        FROM first
                                                                        GROUP BY user_id, event_time
                                                                        )
                                                            SELECT
                                                                        user_id,
                                                                        AVG(DATEDIFF(order_date, prev_order_date)) AS user_avg_purchase_cycle
                                                            FROM second
                                                            WHERE prev_order_date IS NOT NULL
                                                            GROUP BY user_id
                                                      )
                                                      SELECT 
                                                            AVG(user_avg_purchase_cycle) AS category_avg_purchase_cycle
                                                      FROM third
     """)
     pdf = purcahse_cycle_per_category.toPandas()
     pdf["category_code"] = x[0]
     category_dataframes.append(pdf)

In [None]:
df_frequency_purchase_per_category = pd.concat(category_dataframes, ignore_index=True)

In [None]:
df_frequency_purchase_per_category

LTR

In [None]:
def calculate_ltr(m):
    query = f"""
                    WITH month_users AS (
                        SELECT user_id, MIN(event_time) AS first_join
                        FROM ecommerce
                        GROUP BY user_id
                        HAVING MONTH(first_join) = {m}
                    ),

                    activity_summary AS (
                        SELECT DATE_FORMAT(event_time, 'yyyy-MM') AS Month,
                                COUNT(DISTINCT s.user_id) AS ActiveUser,
                                COUNT(DISTINCT CASE WHEN s.event_type ='purchase' THEN s.user_id END) AS PayingUser,
                                SUM(DISTINCT CASE WHEN s.event_type = 'purchase' THEN s.price END) AS PayingAmount
                        FROM ecommerce s INNER JOIN month_users m
                        ON s.user_id = m.user_id
                        GROUP BY Month
                    )

                    SELECT Month, ActiveUser, PayingUser, PayingAmount,
                            ROUND(PayingAmount / FIRST(ActiveUser) OVER (),2)  AS AmountPerFirstActiveUser
                    FROM activity_summary 
                    ORDER BY Month
                """
    return spark.sql(query)


In [None]:

months = ['10', '11', '12', '1', '2', '3', '4']
ltrsstr = {}

for month in months:
    globals()[f"ltrstr{month}_df"] = calculate_ltr(month).toPandas()
    ltrsstr[month] = globals()[f"ltrstr{month}_df"].AmountPerFirstActiveUser.sum()

In [None]:
# 시각화 
import matplotlib.pyplot as plt

x = list(ltrsstr.keys())
y = list(ltrsstr.values())

plt.plot(x, y, marker='o', color='skyblue', label="LTR")
plt.title("LTR and CAC", fontsize=16)
plt.xlabel("Month")
plt.ylabel('Value')
plt.xticks(x)
plt.grid(True)
plt.legend()
plt.show()

In [None]:
ltrsstr

In [None]:
rr = spark.sql("""
               WITH rr AS (
                    SELECT user_id, COUNT(*) AS CNT
                    FROM ecommerce
                    WHERE event_type = 'purchase'
                    GROUP BY user_id
                    )
               SELECT (SELECT COUNT(*) FROM rr WHERE CNT > 1) / COUNT(*)
               FROM rr
                """)

In [None]:
rr.toPandas()

In [23]:
spark.stop()