In [1]:
import numpy as np
from pyspark.sql import SparkSession

In [2]:
spark = (SparkSession
            .builder
            .appName("Python Spark create RDD example")
            .config("spark.some.config.option", "some-value")
            .getOrCreate())

In [3]:
def data_shape(data):
    """return the shape of data"""
    n_rows = data.count()
    n_cols = len(data.columns)

    return n_rows, n_cols


# 1. Clean the AirBnB Data

In [4]:
from pyspark.sql.functions import col, translate, percentile_approx
from pyspark.sql.types import FloatType

def remove_missing_values(data):
    return data.na.drop(subset=['neighbourhood group', 'price'])

def clean_neighbourhood(data):
    return (data.replace(['brookln', 'manhatan'],
                         ['Brooklyn', 'Manhattan'], 
                         "neighbourhood group"))

def clean_price(data):
    return (data
               .withColumn('price', translate('price', '$, ', ''))
               .withColumn('price', col("price").cast(FloatType()))
           )

def remove_price_outlier(data):
    # First find the lower & upper boundary
    q1, q3 = (data
             .select(percentile_approx('price', [0.25, 0.75])
                        .alias('quantiles'))
             .collect())[0][0]
    IQR = q3-q1
    print('Q1  :', q1)
    print('Q3  :', q3)
    print('IQR :', IQR)
    
    # Selanjutnya cari batas bawah dan atas
    lb = q1 - 1.5*IQR
    ub = q3 + 1.5*IQR
    print('Batas bawah :', lb)
    print('Batas atas  :', ub)
    
    return (data
               .filter((col('price')>=lb) & (col('price')<=ub)))

def remove_outlier_availability(data):
    return (data
               .filter((col('availability 365')>=0) \
                            & (col('availability 365')<=365)))

def drop_duplicate_id(data):
    return (data.dropDuplicates(subset = ['id']))
    

In [5]:
raw_data = (spark.read
                .option("multiline","true")
                .option("wholeFile", "true")
                .option("quote", "\"")
                .option("escape", "\"")
                .csv('dataset/Airbnb_Open_Data.csv',
                      header = True,
                      inferSchema = True))

print('Data shape:', data_shape(raw_data))
raw_data.printSchema()

Data shape: (102599, 26)
root
 |-- id: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- host id: long (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- host name: string (nullable = true)
 |-- neighbourhood group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- country: string (nullable = true)
 |-- country code: string (nullable = true)
 |-- instant_bookable: boolean (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- room type: string (nullable = true)
 |-- Construction year: integer (nullable = true)
 |-- price: string (nullable = true)
 |-- service fee: string (nullable = true)
 |-- minimum nights: integer (nullable = true)
 |-- number of reviews: integer (nullable = true)
 |-- last review: string (nullable = true)
 |-- reviews per month: double (nullable = true)
 |-- review rate number: integer (nullable = true)
 |-- c

In [8]:
# Lakukan transformasi dari awal hingga habis
data_clean = (raw_data
                 .transform(remove_missing_values)  
                 .transform(clean_neighbourhood)    
                 .transform(clean_price)  
                 .transform(remove_price_outlier)
                 .transform(remove_outlier_availability)
                 .transform(drop_duplicate_id)
             )

print('Data shape:', data_shape(data_clean))
data_clean.limit(5).toPandas()

Q1  : 340.0
Q3  : 913.0
IQR : 573.0
Batas bawah : -519.5
Batas atas  : 1772.5
Data shape: (98174, 26)


Unnamed: 0,id,NAME,host id,host_identity_verified,host name,neighbourhood group,neighbourhood,lat,long,country,...,service fee,minimum nights,number of reviews,last review,reviews per month,review rate number,calculated host listings count,availability 365,house_rules,license
0,1002403,THE VILLAGE OF HARLEM....NEW YORK !,78829239556,,Elise,Manhattan,Harlem,40.80902,-73.9419,United States,...,$124,3,0,,,5,1,352,"I encourage you to use my kitchen, cooking and...",
1,1002755,,85098326012,unconfirmed,Garry,Brooklyn,Clinton Hill,40.68514,-73.95976,United States,...,$74,30,270,7/5/2019,4.64,4,1,322,,
2,1005202,BlissArtsSpace!,90821839709,unconfirmed,Emma,Brooklyn,Bedford-Stuyvesant,40.68688,-73.95596,United States,...,$212,45,49,10/5/2017,0.4,5,1,219,House Guidelines for our BnB We are delighted ...,
3,1007411,Beautiful 1br on Upper West Side,18824631834,verified,Alan,Manhattan,Upper West Side,40.80316,-73.96545,United States,...,$121,5,53,6/22/2019,0.43,4,1,163,"My ideal guests would be warm, friendly, and r...",
4,1010725,Perfect for Your Parents + Garden,80380130347,verified,Ryan,Brooklyn,Fort Greene,40.69169,-73.97185,United States,...,,2,198,6/28/2019,1.72,5,1,96,"- Please be mindful of the neighbors, quiet ti...",


# 2. Calculate Month-Over-Month Percentage Change in Sales

In [217]:
# Read transactions data
stores = (spark.read
              .option("multiline","true")
              .option("wholeFile", "true")
              .option("quote", "\"")
              .option("escape", "\"")
              .csv('dataset/Global_Superstore2.csv',
                   header = True,
                   inferSchema = True))

print('Data shape:', data_shape(stores))
stores.show(5)

Data shape: (51290, 24)
+------+---------------+----------+----------+------------+-----------+----------------+-----------+-------------+---------------+-------------+-----------+------+-------+----------------+----------+------------+--------------------+--------+--------+--------+--------+-------------+--------------+
|Row ID|       Order ID|Order Date| Ship Date|   Ship Mode|Customer ID|   Customer Name|    Segment|         City|          State|      Country|Postal Code|Market| Region|      Product ID|  Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|Shipping Cost|Order Priority|
+------+---------------+----------+----------+------------+-----------+----------------+-----------+-------------+---------------+-------------+-----------+------+-------+----------------+----------+------------+--------------------+--------+--------+--------+--------+-------------+--------------+
| 32298| CA-2012-124891|31-07-2012|31-07-2012|    Same Day|   RH-19495|     Ric

In [218]:
stores.printSchema()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Market: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping Cost: double (nullable = true)
 |-- Order Priority: string (nullable = true)



In [219]:
from pyspark.sql.functions import to_date, year, month
from pyspark.sql.types import FloatType

clean_data = (stores
                .withColumn("Order Date", to_date(col("Order Date"), "dd-MM-yyyy"))
                .withColumn("Year", year("Order Date"))
                .withColumn("Month", month("Order Date"))
                .withColumn("Sales", col("Sales").cast(FloatType()))
                .select(['Order Date', 'Year', 'Month', 'Sales']))

print('Data shape:', data_shape(clean_data))
clean_data.show(5)

Data shape: (51290, 4)
+----------+----+-----+--------+
|Order Date|Year|Month|   Sales|
+----------+----+-----+--------+
|2012-07-31|2012|    7| 2309.65|
|2013-02-05|2013|    2|3709.395|
|2013-10-17|2013|   10|5175.171|
|2013-01-28|2013|    1| 2892.51|
|2013-11-05|2013|   11| 2832.96|
+----------+----+-----+--------+
only showing top 5 rows



In [220]:
# Lakukan group by berdasarkan year-month, kemudian order by year-month
grouped_data = (
    clean_data
        .groupBy(['Year', 'Month'])
        .agg(sum('Sales').alias('total_Sales'))
        .sort(['Year', 'Month'], ascending=[True, True])
)

print('Data shape:', data_shape(grouped_data))
grouped_data.show(12)

Data shape: (48, 3)
+----+-----+------------------+
|Year|Month|       total_Sales|
+----+-----+------------------+
|2011|    1| 98898.48894953728|
|2011|    2| 91152.15718114376|
|2011|    3|145729.36794161797|
|2011|    4|116915.76411414146|
|2011|    5|146747.83606886864|
|2011|    6|215207.38047516346|
|2011|    7|115510.41891396046|
|2011|    8|207581.49020028114|
|2011|    9| 290214.4558234811|
|2011|   10|199071.26366591454|
|2011|   11|  298496.536657691|
|2011|   12| 333925.7342272997|
+----+-----+------------------+
only showing top 12 rows



In [221]:
# Create diff
from pyspark.sql.window import Window 
from pyspark.sql.functions import lag, when, isnull, round, concat_ws

window_spec = Window.partitionBy('Year').orderBy('Month')

partition_data = (
    grouped_data
        .withColumn("prev_total_Sales", 
                    lag(col('total_Sales')).over(window_spec))
        .withColumn("diff_total_Sales", 
                    when(isnull(col('total_Sales')-col('prev_total_Sales')), 0)
                    .otherwise(col('total_Sales')-col('prev_total_Sales')))
        .withColumn("pct_change_total_Sales", 
                    when(isnull(col('diff_total_Sales')/col('prev_total_Sales')), 0.0)
                    .otherwise(100.0*col('diff_total_Sales')/col('prev_total_Sales')))
        .withColumn("pct_change_total_Sales", round(col('pct_change_total_Sales'), 2))
        .withColumn("Year-Month", concat_ws("-", *["Year", "Month"]))
        .select(['Year-Month', 'pct_change_total_Sales'])
)

print('Data shape:', data_shape(partition_data))
partition_data.show(12)

Data shape: (48, 2)
+----------+----------------------+
|Year-Month|pct_change_total_Sales|
+----------+----------------------+
|    2011-1|                   0.0|
|    2011-2|                 -7.83|
|    2011-3|                 59.87|
|    2011-4|                -19.77|
|    2011-5|                 25.52|
|    2011-6|                 46.65|
|    2011-7|                -46.33|
|    2011-8|                 79.71|
|    2011-9|                 39.81|
|   2011-10|                -31.41|
|   2011-11|                 49.94|
|   2011-12|                 11.87|
+----------+----------------------+
only showing top 12 rows



# 3. Time to Purchase Duration

In [316]:
# Read data
event_df = spark.read.csv('dataset/event_samples.csv',
                          header = True,
                          inferSchema = True)

print('Data shape:', data_shape(event_df))
event_df.show()

Data shape: (352144, 9)
+-------------------+----------+----------+-------------------+--------------------+-----------+------+-------------------+------------+
|         event_time|event_type|product_id|        category_id|       category_code|      brand| price|            user_id|user_session|
+-------------------+----------+----------+-------------------+--------------------+-----------+------+-------------------+------------+
|2020-09-24 11:57:06|      view|   1996170|2144415922528452715|electronics.telep...|       NULL|  31.9|1515915625519388267|  LJuJVLEjPT|
|2020-09-24 11:57:26|      view|    139905|2144415926932472027|computers.compone...|     zalman| 17.16|1515915625519380411|  tdicluNnRY|
|2020-09-24 11:57:27|      view|    215454|2144415927158964449|                NULL|       NULL|  9.81|1515915625513238515|  4TMArHtXQy|
|2020-09-24 11:57:33|      view|    635807|2144415923107266682|computers.periphe...|     pantum|113.81|1515915625519014356|  aGFYrNgC08|
|2020-09-24 11:57

In [317]:
event_df.dtypes

[('event_time', 'timestamp'),
 ('event_type', 'string'),
 ('product_id', 'int'),
 ('category_id', 'bigint'),
 ('category_code', 'string'),
 ('brand', 'string'),
 ('price', 'double'),
 ('user_id', 'bigint'),
 ('user_session', 'string')]

In [318]:
# Create view table -> groupBy user_id, then product_id -> get minimum event_time
from pyspark.sql.functions import min

view_df = (
    event_df
        .where(col('event_type')=='view')
        .groupBy(['user_id', 'product_id'])
        .agg(min(col('event_time')).alias('view_time'))
)

print('Data shape:', data_shape(view_df))
view_df.show()

Data shape: (230530, 3)
+-------------------+----------+-------------------+
|            user_id|product_id|          view_time|
+-------------------+----------+-------------------+
|1515915625518130982|   4060928|2020-09-24 12:15:29|
|1515915625519398646|    665981|2020-09-24 12:24:27|
|1515915625514902385|    947773|2020-09-24 13:04:33|
|1515915625519432765|   4078967|2020-09-24 13:58:47|
|1515915625519444868|   3605900|2020-09-24 14:34:18|
|1515915625519493078|   1453290|2020-09-24 17:08:31|
|1515915625519454753|   4101380|2020-09-24 17:18:31|
|1515915625519539054|    543436|2020-09-24 20:27:17|
|1515915625519544774|    780880|2020-09-24 21:02:52|
|1515915625519648316|   3829074|2020-09-25 08:31:56|
|1515915625519653795|   1282955|2020-09-25 08:51:40|
|1515915625519687999|    675363|2020-09-25 10:54:02|
|1515915625519694400|    906630|2020-09-25 11:27:50|
|1515915625519697867|    629022|2020-09-25 11:30:27|
|1515915625519704747|   1341724|2020-09-25 12:02:54|
|1515915625519712827| 

In [319]:
# Create purchase table -> groupBy user_id, then product_id -> get minimum event_time
purchase_df = (
    event_df
        .where(col('event_type')=='purchase')
        .groupBy(['user_id', 'product_id'])
        .agg(min(col('event_time')).alias('purchase_time'))
)

print('Data shape:', data_shape(purchase_df))
purchase_df.show()

Data shape: (9078, 3)
+-------------------+----------+-------------------+
|            user_id|product_id|      purchase_time|
+-------------------+----------+-------------------+
|1515915625518130982|   4060928|2020-09-24 12:19:01|
|1515915625514902385|    947773|2020-09-24 13:19:23|
|1515915625519432765|   4078967|2020-09-24 14:04:46|
|1515915625519653795|   1282955|2020-09-25 08:53:58|
|1515915625520215075|   4101558|2020-09-27 11:20:00|
|1515915625520884639|   3829355|2020-09-29 15:27:46|
|1515915625477767168|   4101127|2020-10-01 17:04:01|
|1515915625522360064|     28480|2020-10-04 16:30:58|
|1515915625522796311|   1006767|2020-10-06 06:57:10|
|1515915625523761225|    139144|2020-10-08 21:26:29|
|1515915625523771247|   1850104|2020-10-08 21:33:06|
|1515915625506903608|     52708|2020-10-08 22:26:31|
|1515915625525268971|   1452884|2020-10-12 19:54:20|
|1515915625525995498|   1443627|2020-10-14 14:19:51|
|1515915625519861582|    809948|2020-09-26 02:18:10|
|1515915625519872813|   

In [320]:
# Inner Join multiple tables by 1) user_id then 2) product_id
joined_df = (
    view_df
        .join(purchase_df, ['user_id', 'product_id'], "inner")
        .sort(['user_id', 'product_id'], ascending=[True, True])
        .withColumn('view_purchase_duration', (col('purchase_time')-col('view_time')).cast("long"))
        .withColumn('view_purchase_duration', round(col('view_purchase_duration')/60.0, 4))
        .where(col('view_purchase_duration')>=0)
)

print('Data shape:', data_shape(joined_df))
joined_df.show()

Data shape: (8792, 5)
+-------------------+----------+-------------------+-------------------+----------------------+
|            user_id|product_id|          view_time|      purchase_time|view_purchase_duration|
+-------------------+----------+-------------------+-------------------+----------------------+
|1515915625353286099|   1023383|2020-10-03 11:20:33|2020-10-03 11:23:44|                3.1833|
|1515915625353457259|    137302|2020-09-29 05:51:33|2020-09-29 05:52:55|                1.3667|
|1515915625353534622|   1428321|2020-10-06 08:29:35|2020-10-06 10:01:55|               92.3333|
|1515915625353561691|   1507368|2020-11-24 17:34:42|2020-11-24 17:37:22|                2.6667|
|1515915625353900095|     16237|2020-10-15 05:07:14|2020-10-15 05:09:35|                  2.35|
|1515915625353900095|   1795171|2020-10-15 05:03:23|2020-10-15 05:09:35|                   6.2|
|1515915625354176736|   3791403|2020-09-29 11:03:05|2020-09-29 11:31:11|                  28.1|
|15159156253545613

In [321]:
# Finalize the data: groupBy userId -> takes mean view_purchase_duration
from pyspark.sql.functions import mean

final_df = (
    joined_df
        .groupBy('user_id')
        .agg(mean(col('view_purchase_duration')).alias('mean_view_purchase_duration'))
        .sort(['user_id'], ascending=[True])
)

print('Data shape:', data_shape(final_df))
final_df.show(20)

Data shape: (7847, 2)
+-------------------+---------------------------+
|            user_id|mean_view_purchase_duration|
+-------------------+---------------------------+
|1515915625353286099|                     3.1833|
|1515915625353457259|                     1.3667|
|1515915625353534622|                    92.3333|
|1515915625353561691|                     2.6667|
|1515915625353900095|                      4.275|
|1515915625354176736|                       28.1|
|1515915625354561351|                    18.9167|
|1515915625354822642|                    23.4333|
|1515915625354857951|                    13.3167|
|1515915625355179497|                 37456.5222|
|1515915625355398801|                  1155.5667|
|1515915625355421833|                     4.1667|
|1515915625355635314|                        1.4|
|1515915625355805313|                  16034.125|
|1515915625355947830|                    12.5667|
|1515915625356051774|                    21.0833|
|1515915625356175149|       

In [322]:
# Pertama, baca datasetnya
filename = 'dataset/event_samples.csv'
data = pd.read_csv(filename)

# Validasi data
print('Data shape:', data.shape)

Data shape: (352144, 9)


In [323]:
# Selanjutnya ubah
#   - user_id -> int
#   - product_id -> int
#   - event_time -> date time
data['event_time'] = pd.to_datetime(data['event_time'])
data['user_id'] = data['user_id'].astype('int')
data['product_id'] = data['product_id'].astype('int')

In [324]:
# Create table view:
# - first, filter on view
# - then, Group by user_id -> product_id and take its minimum event_time
# - finally rename the event_time columns
view_data = (data[data['event_type']=='view']
                .groupby(by=['user_id', 'product_id'], as_index=False)
                .agg({'event_time': 'min'})
                .rename(columns={'event_time': 'view_time'}))

# Validasi
print('Data shape:', view_data.shape)
view_data.head()

Data shape: (230530, 3)


Unnamed: 0,user_id,product_id,view_time
0,1515915625353226922,4101974,2020-10-29 11:28:35+00:00
1,1515915625353230067,3506650,2020-10-06 06:30:32+00:00
2,1515915625353230683,124883,2020-11-09 08:52:51+00:00
3,1515915625353230683,125325,2020-11-09 09:08:53+00:00
4,1515915625353230683,254763,2020-11-18 10:51:35+00:00


In [325]:
# Create table purchase:
# - first, filter on purchase
# - then, Group by user_id -> product_id and take its minimum event_time
# - finally rename the event_time columns
purchase_data = (data[data['event_type']=='purchase']
                    .groupby(by=['user_id', 'product_id'], as_index=False)
                    .agg({'event_time': 'min'})
                    .rename(columns={'event_time': 'purchase_time'}))

# Validasi
print('Data shape:', purchase_data.shape)
purchase_data.head()

Data shape: (9078, 3)


Unnamed: 0,user_id,product_id,purchase_time
0,1515915625353286099,1023383,2020-10-03 11:23:44+00:00
1,1515915625353457259,137302,2020-09-29 05:52:55+00:00
2,1515915625353534622,1428321,2020-10-06 10:01:55+00:00
3,1515915625353561691,1507368,2020-11-24 17:37:22+00:00
4,1515915625353900095,16237,2020-10-15 05:09:35+00:00


In [326]:
# Selanjutnya 
# - join kedua data (inner join) menggunakan key user_id & product_id
# - next, sort dari user_id dan product_id
joined_data = (
    pd.merge(view_data, 
             purchase_data,
             on=['user_id', 'product_id'])
    .sort_values(by=['user_id', 'product_id'])
)

# - kemudian cari selisih durasi dalam satuan menit
joined_data['diff_time'] = joined_data['purchase_time'] - joined_data['view_time']
joined_data['diff_time'] = ((joined_data['diff_time']).dt.total_seconds() / 60.0).round(2)

# - dan fitter dimana diff_time >= 0 (valid data)
joined_data = joined_data[joined_data['diff_time']>=0]

# Validasi
print('Data shape:', joined_data.shape)
joined_data.head(20)

Data shape: (8792, 5)


Unnamed: 0,user_id,product_id,view_time,purchase_time,diff_time
0,1515915625353286099,1023383,2020-10-03 11:20:33+00:00,2020-10-03 11:23:44+00:00,3.18
1,1515915625353457259,137302,2020-09-29 05:51:33+00:00,2020-09-29 05:52:55+00:00,1.37
2,1515915625353534622,1428321,2020-10-06 08:29:35+00:00,2020-10-06 10:01:55+00:00,92.33
3,1515915625353561691,1507368,2020-11-24 17:34:42+00:00,2020-11-24 17:37:22+00:00,2.67
4,1515915625353900095,16237,2020-10-15 05:07:14+00:00,2020-10-15 05:09:35+00:00,2.35
5,1515915625353900095,1795171,2020-10-15 05:03:23+00:00,2020-10-15 05:09:35+00:00,6.2
6,1515915625354176736,3791403,2020-09-29 11:03:05+00:00,2020-09-29 11:31:11+00:00,28.1
7,1515915625354561351,3580373,2020-09-25 08:21:08+00:00,2020-09-25 08:40:03+00:00,18.92
8,1515915625354822642,4035841,2020-11-23 09:40:33+00:00,2020-11-23 10:03:59+00:00,23.43
9,1515915625354857951,3557532,2020-11-19 05:51:06+00:00,2020-11-19 06:04:25+00:00,13.32


In [327]:
# Terakhir,
# - Groupby berdasarkan user_id dan agregasi rata-ratanya
# - kemudian urutkan berdasarkan user id
final_data = (
    joined_data
        .groupby(by=['user_id'], as_index=False)
        .agg({'diff_time': 'mean'})
        .rename(columns={'diff_time': 'view_purchase_duration'})
        .sort_values(by='user_id')
)

# Validasi data
print('Data shape:', final_data.shape)
final_data.head(20)

Data shape: (7847, 2)


Unnamed: 0,user_id,view_purchase_duration
0,1515915625353286099,3.18
1,1515915625353457259,1.37
2,1515915625353534622,92.33
3,1515915625353561691,2.67
4,1515915625353900095,4.275
5,1515915625354176736,28.1
6,1515915625354561351,18.92
7,1515915625354822642,23.43
8,1515915625354857951,13.32
9,1515915625355179497,37456.52
