## Install Spark & Install Packages & Initial Spark

In [1]:
!pip install pyspark==3.0.1 py4j==0.10.9 

Collecting pyspark==3.0.1
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 74kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 20.1MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=ccbe502941fa3f3a7af6405ea5556b8ca309fa94d1b0956b4df55b0deca9484c
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0

In [2]:
import numpy as np
import pandas as pd 
import warnings
import zipfile
import os
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.font_manager import FontProperties
from pyspark.sql import functions as F
from pyspark.sql import Window

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

warnings.filterwarnings('ignore')

%matplotlib inline

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Feature Engineering(Counts & Ratio)")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

## Read Data

In [None]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [None]:
train = spark.read.option("header",True) \
    .csv("/content/drive/MyDrive/Colab Notebooks/data/data_format1/train_format1.csv")
test = spark.read.option("header",True) \
    .csv("/content/drive/MyDrive/Colab Notebooks/data/data_format1/test_format1.csv")
train_info = spark.read.option("header",True) \
    .csv("/content/drive/MyDrive/Colab Notebooks/data/data_format1/user_info_format1.csv") 
train_log = spark.read.option("header",True) \
    .csv("/content/drive/MyDrive/Colab Notebooks/data/data_format1/user_log_format1.csv") \
    .withColumnRenamed("seller_id", "merchant_id")

In [None]:
train_info_0 = train_info \
    .withColumn("age", F.when(train_info.age_range == 8, 7).otherwise(train_info.age_range)) \
    .fillna({"age": 0, "gender": 2}) \
    .drop("age_range") 

In [None]:
train_log_0 = train_log \
            .withColumn("action_type_tmp", F.when(train_log.action_type == 0, 1).otherwise(train_log.action_type)) \
            .drop("action_type") \
            .withColumnRenamed("action_type_tmp", "action_type") \
            .fillna({"brand_id": -1})

## Show Data

In [None]:
train.show()

+-------+-----------+-----+
|user_id|merchant_id|label|
+-------+-----------+-----+
|  34176|       3906|    0|
|  34176|        121|    0|
|  34176|       4356|    1|
|  34176|       2217|    0|
| 230784|       4818|    0|
| 362112|       2618|    0|
|  34944|       2051|    0|
| 231552|       3828|    1|
| 231552|       2124|    0|
| 232320|       1168|    0|
| 232320|       4270|    0|
| 167040|        671|    0|
| 101760|       1760|    0|
| 298368|       2981|    0|
|  36480|       4730|    0|
| 299136|       2935|    0|
|  37248|       2615|    0|
| 103296|       2482|    0|
| 299904|       1742|    0|
|  38016|       1028|    0|
+-------+-----------+-----+
only showing top 20 rows



In [None]:
train_info_0.show()

+-------+------+---+
|user_id|gender|age|
+-------+------+---+
| 376517|     1|  6|
| 234512|     0|  5|
| 344532|     0|  5|
| 186135|     0|  5|
|  30230|     0|  5|
| 272389|     1|  6|
| 281071|     0|  4|
| 139859|     0|  7|
| 198411|     1|  5|
|  67037|     1|  4|
| 149002|     2|  5|
|   7468|     0|  4|
|  94292|     0|  4|
| 347414|     1|  6|
| 191719|     0|  4|
| 391524|     1|  5|
| 153790|     0|  6|
| 349112|     1|  3|
| 344766|     0|  6|
|  81816|     0|  5|
+-------+------+---+
only showing top 20 rows



In [None]:
train_log_0.orderBy("merchant_id", "user_id").show()

+-------+-------+------+-----------+--------+----------+-----------+
|user_id|item_id|cat_id|merchant_id|brand_id|time_stamp|action_type|
+-------+-------+------+-----------+--------+----------+-----------+
|    100|1041304|   420|          1|    1662|      1108|          1|
|    100| 472260|   420|          1|    1662|      1108|          1|
|    100| 912479|   993|          1|    1662|      1018|          1|
|    100|  24620|   420|          1|    1662|      1008|          3|
|    100| 912479|   993|          1|    1662|      1016|          1|
|    100|1008023|   629|          1|    1662|      1008|          1|
|    100| 912479|   993|          1|    1662|      1016|          1|
|    100| 918789|   629|          1|    1662|      1008|          1|
|    100| 912479|   993|          1|    1662|      1016|          1|
|    100| 912479|   993|          1|    1662|      1020|          1|
|    100|  83998|   420|          1|    1662|      1108|          1|
|    100|1008023|   629|          

## Feature Engineering (M-B Profile)

### Market Share

In [None]:
mkt_share_0 = train_log_0.groupBy("merchant_id", "brand_id").agg(F.count(F.lit(1)).alias("mer_brd_num"))
mer_num = train_log_0.groupBy("merchant_id").agg(F.count(F.lit(1)).alias("mer_ttl_num"))
brd_num = train_log_0.groupBy("brand_id").agg(F.count(F.lit(1)).alias("brd_ttl_num"))

mkt_share_1 = mkt_share_0 \
            .join(brd_num, "brand_id", "left") \
            .join(mer_num, "merchant_id", "left") \
            .withColumn("mb_mer_share_on_brd", F.round((F.col("mer_brd_num")/F.col("brd_ttl_num")), 2)) \
            .withColumn("mb_brd_share_on_mer", F.round((F.col("mer_brd_num")/F.col("mer_ttl_num")), 2)) \
            .drop(*["mer_brd_num", "brd_ttl_num", "mer_ttl_num"]) \
            .orderBy("merchant_id")

### User Share

In [None]:
user_share_0 = train_log_0.groupBy("merchant_id", "brand_id").agg(F.countDistinct("user_id").alias("mer_brd_cust"))
mer_user = train_log_0.groupBy("merchant_id").agg(F.countDistinct("user_id").alias("mer_ttl_cust"))
brd_user = train_log_0.groupBy("brand_id").agg(F.countDistinct("user_id").alias("brd_ttl_cust"))

user_share_1 = user_share_0 \
            .join(brd_user, "brand_id", "left") \
            .join(mer_user, "merchant_id", "left") \
            .withColumn("mb_mer_user_share_on_brd", F.round((F.col("mer_brd_cust")/F.col("brd_ttl_cust")), 2)) \
            .withColumn("mb_brd_user_share_on_mer", F.round((F.col("mer_brd_cust")/F.col("mer_ttl_cust")), 2)) \
            .drop(*["mer_brd_cust", "brd_ttl_cust", "mer_ttl_cust"]) \
            .orderBy("merchant_id")

In [None]:
mb_share = mkt_share_1 \
        .join(user_share_1, ["merchant_id", "brand_id"], "left") \
        .orderBy(["merchant_id", "brand_id"])

# +-----------+--------+-------------------+-------------------+------------------------+------------------------+
# |merchant_id|brand_id|mb_mer_share_on_brd|mb_brd_share_on_mer|mb_mer_user_share_on_brd|mb_brd_user_share_on_mer|
# +-----------+--------+-------------------+-------------------+------------------------+------------------------+
# |          1|      -1|                0.0|                0.0|                    0.01|                    0.01|
# |          1|    1104|                1.0|               0.02|                     1.0|                    0.07|
# |          1|    1662|                1.0|               0.98|                     1.0|                    0.99|
# |         10|      -1|                0.0|                0.0|                     0.0|                    0.01|
# |         10|    2183|                1.0|               0.94|                     1.0|                    0.99|
# |         10|    2650|                1.0|               0.05|                     1.0|                    0.18|
# |        100|      -1|                0.0|                0.0|                     0.0|                     0.0|
# |        100|    2276|                0.0|               0.44|                    0.01|                    0.47|
# |        100|    3612|               0.56|               0.56|                    0.65|                    0.61|
# |        100|    8243|                0.0|                0.0|                     0.0|                     0.0|
# +-----------+--------+-------------------+-------------------+------------------------+------------------------+


+-----------+--------+-------------------+-------------------+------------------------+------------------------+
|merchant_id|brand_id|mb_mer_share_on_brd|mb_brd_share_on_mer|mb_mer_user_share_on_brd|mb_brd_user_share_on_mer|
+-----------+--------+-------------------+-------------------+------------------------+------------------------+
|          1|      -1|                0.0|                0.0|                    0.01|                    0.01|
|          1|    1104|                1.0|               0.02|                     1.0|                    0.07|
|          1|    1662|                1.0|               0.98|                     1.0|                    0.99|
|         10|      -1|                0.0|                0.0|                     0.0|                    0.01|
|         10|    2183|                1.0|               0.94|                     1.0|                    0.99|
|         10|    2650|                1.0|               0.05|                     1.0|         

## Feature Engineering (M-C Profile)

### Market Share

In [None]:
mkt_share_2 = train_log_0.groupBy("merchant_id", "cat_id").agg(F.count(F.lit(1)).alias("mer_cat_num"))
mer_num = train_log_0.groupBy("merchant_id").agg(F.count(F.lit(1)).alias("mer_ttl_num"))
cat_num = train_log_0.groupBy("cat_id").agg(F.count(F.lit(1)).alias("cat_ttl_num"))

mkt_share_3 = mkt_share_2 \
            .join(cat_num, "cat_id", "left") \
            .join(mer_num, "merchant_id", "left") \
            .withColumn("mc_mer_share_on_cat", F.round((F.col("mer_cat_num")/F.col("cat_ttl_num")), 2)) \
            .withColumn("mc_cat_share_on_mer", F.round((F.col("mer_cat_num")/F.col("mer_ttl_num")), 2)) \
            .drop(*["mer_cat_num", "cat_ttl_num", "mer_ttl_num"]) \
            .orderBy("merchant_id")

### User Share

In [None]:
user_share_2 = train_log_0.groupBy("merchant_id", "cat_id").agg(F.countDistinct("user_id").alias("mer_cat_cust"))
mer_user = train_log_0.groupBy("merchant_id").agg(F.countDistinct("user_id").alias("mer_ttl_cust"))
cat_user = train_log_0.groupBy("cat_id").agg(F.countDistinct("user_id").alias("cat_ttl_cust"))

user_share_3 = user_share_2 \
            .join(cat_user, "cat_id", "left") \
            .join(mer_user, "merchant_id", "left") \
            .withColumn("mc_mer_user_share_on_cat", F.round((F.col("mer_cat_cust")/F.col("cat_ttl_cust")), 2)) \
            .withColumn("mc_cat_user_share_on_mer", F.round((F.col("mer_cat_cust")/F.col("mer_ttl_cust")), 2)) \
            .drop(*["mer_cat_cust", "cat_ttl_cust", "mer_ttl_cust"]) \
            .orderBy("merchant_id")

In [None]:
mc_share = mkt_share_3 \
        .join(user_share_3, ["merchant_id", "cat_id"], "left") \
        .orderBy(["merchant_id", "cat_id"])

# +-----------+------+-------------------+-------------------+------------------------+------------------------+
# |merchant_id|cat_id|mc_mer_share_on_cat|mc_cat_share_on_mer|mc_mer_user_share_on_cat|mc_cat_user_share_on_mer|
# +-----------+------+-------------------+-------------------+------------------------+------------------------+
# |          1|  1028|               0.21|               0.16|                    0.27|                    0.35|
# |          1|   103|               0.01|                0.0|                    0.01|                     0.0|
# |          1|  1036|               0.12|               0.02|                    0.14|                    0.07|
# |          1|  1130|               0.02|               0.01|                    0.03|                    0.04|
# |          1|  1159|               0.04|                0.0|                    0.06|                    0.01|
# |          1|  1174|               0.14|                0.1|                    0.16|                    0.27|
# |          1|  1197|               0.01|                0.0|                    0.01|                    0.01|
# |          1|  1228|               0.03|               0.01|                    0.05|                    0.05|
# |          1|   125|                0.0|                0.0|                     0.0|                     0.0|
# |          1|  1252|               0.03|               0.03|                    0.06|                    0.09|
# +-----------+------+-------------------+-------------------+------------------------+------------------------+


+-----------+------+-------------------+-------------------+------------------------+------------------------+
|merchant_id|cat_id|mc_mer_share_on_cat|mc_cat_share_on_mer|mc_mer_user_share_on_cat|mc_cat_user_share_on_mer|
+-----------+------+-------------------+-------------------+------------------------+------------------------+
|          1|  1028|               0.21|               0.16|                    0.27|                    0.35|
|          1|   103|               0.01|                0.0|                    0.01|                     0.0|
|          1|  1036|               0.12|               0.02|                    0.14|                    0.07|
|          1|  1130|               0.02|               0.01|                    0.03|                    0.04|
|          1|  1159|               0.04|                0.0|                    0.06|                    0.01|
|          1|  1174|               0.14|                0.1|                    0.16|                    0.27|
|

## Save Parquet

In [None]:
mb_share.coalesce(10) \
        .write.format("parquet") \
        .mode("overwrite") \
        .save("./drive/MyDrive/Colab Notebooks/data/feature_mb")

In [None]:
mc_share.coalesce(10) \
        .write.format("parquet") \
        .mode("overwrite") \
        .save("./drive/MyDrive/Colab Notebooks/data/feature_mc")