In [56]:
!pip install pyspark
!pip install pyngrok



In [57]:
import time
import pyspark
import numpy as np
from pyngrok import ngrok
from pyspark.sql import SparkSession , Window
from pyspark.sql import functions as F


In [58]:
# Create a SparkSession with custom memory settings
spark = SparkSession.builder.appName("instamart_analysis") \
    .config("spark.driver.memory","25g") \
    .getOrCreate()


In [59]:
def show_time(start):
    return time.time()-start

In [60]:
departments_df = spark.read.options(header=True,inferSchema=True).csv("/kaggle/input/instacart-market-basket-analysis/departments.csv")
products_df = spark.read.options(header=True,inferSchema=True).csv("/kaggle/input/instacart-market-basket-analysis/products.csv")
prior_product_orders = spark.read.options(header=True,inferSchema=True).csv("/kaggle/input/instacart-market-basket-analysis/order_products__prior.csv").repartition(12)
train_product_orders = spark.read.options(header=True,inferSchema=True).csv("/kaggle/input/instacart-market-basket-analysis/order_products__train.csv").repartition(8)
orders_df = spark.read.options(header=True,inferSchema=True).csv("/kaggle/input/instacart-market-basket-analysis/orders.csv").repartition(8)
aisels_df = spark.read.options(header=True,inferSchema=True).csv("/kaggle/input/instacart-market-basket-analysis/aisles.csv")


                                                                                

In [61]:
# Create a tunnel to the Spark UI
ngrok.set_auth_token('2kvaYw5ZiG5bL8iM8YJBVJPk1Ru_3C16mMgmpKEBYb28PPLUe')  # Optional: set your Ngrok auth token if you have one
tunnel = ngrok.connect(4040)
print("Ngrok tunnel \"{}\" -> \"http://localhost:4040\"".format(tunnel.public_url))


Ngrok tunnel "https://fe03-34-32-219-39.ngrok-free.app" -> "http://localhost:4040"


In [62]:
prior_product_orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)



In [63]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: double (nullable = true)



In [64]:
orders_df.cache()

24/10/08 14:44:56 WARN CacheManager: Asked to cache already cached data.


DataFrame[order_id: int, user_id: int, eval_set: string, order_number: int, order_dow: int, order_hour_of_day: int, days_since_prior_order: double]

In [65]:
train_orders_df = orders_df.filter(orders_df["eval_set"] =='train').drop("eval_set")
prior_orders_df = orders_df.filter(orders_df["eval_set"] == 'prior').drop("eval_set")
train_orders_df.cache()
train_product_orders.cache()
prior_orders_df.cache()
prior_product_orders.cache()

24/10/08 14:44:56 WARN CacheManager: Asked to cache already cached data.
24/10/08 14:44:56 WARN CacheManager: Asked to cache already cached data.
24/10/08 14:44:56 WARN CacheManager: Asked to cache already cached data.
24/10/08 14:44:56 WARN CacheManager: Asked to cache already cached data.


DataFrame[order_id: int, product_id: int, add_to_cart_order: int, reordered: int]

In [66]:
# how often user has reorderd
prior_product_orders.select("reordered","order_id").join(
        prior_orders_df.select("user_id","order_id"),how="left",on="order_id"
    ).select("user_id","reordered") \
     .groupBy("user_id").agg(
            F.count(F.col("reordered")).alias("frequency of reorder")
        )

DataFrame[user_id: int, frequency of reorder: bigint]

In [67]:
# time since privious order
prior_orders_df.select("user_id","days_since_prior_order","order_hour_of_day","order_number","order_id") \
                .withColumn("privious_order_hour",
                            F.lag("order_hour_of_day",1) \
                            .over(Window.partitionBy("user_id").orderBy("order_number"))) \
                .withColumn("time_since_Last_order",
                            F.col("days_since_prior_order") * 24 + 
                            F.col("order_hour_of_day") - 
                            F.col("privious_order_hour") 
                           ) \
                .select("order_id","time_since_last_order")


DataFrame[order_id: int, time_since_last_order: double]

In [68]:
#time of the day user visits
prior_orders_df.select("user_id" , "order_hour_of_day","order_id") \
                .groupBy("user_id","order_hour_of_day") \
                .agg(F.count("order_id").alias("frequency")) \
                .groupBy("user_id") \
                .agg(F.max("frequency").alias("maximum_frquency"))

DataFrame[user_id: int, maximum_frquency: bigint]

In [69]:
# whether user has ordered glutan free , organic , Asian item or not
prior_product_orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)



In [70]:
products_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)



In [71]:
# does the user have ordered asian , gluten free, or organic item 
prior_product_orders.select("order_id","product_id") \
            .join(products_df.select("product_id","product_name"), on="product_id", how='left') \
            .join(prior_orders_df.select("user_id","order_id"), on="order_id", how='left') \
            .groupBy("user_id", "order_id") \
            .agg(F.collect_list("product_name").alias("list_of_products")) \
            .withColumn("normalized_list", F.expr("transform(list_of_products, x -> lower(x))")) \
            .withColumn("contains_or_not", 
                F.expr("exists(normalized_list,x -> x like '%organic%')")
              | F.expr("exists(normalized_list, x -> x like '%asian%')")
              | F.expr("exists(normalized_list, x-> x like '%gluten free%')")
            ) \
            .filter(F.col("contains_or_not") == True) \
            .select("user_id", "order_id") 

DataFrame[user_id: int, order_id: int]

In [72]:
# feature based on order size 
prior_product_orders.select("product_id","order_id") \
                    .join(prior_orders_df.select("user_id","order_id") , on="order_id", how="left") \
                    .groupBy("user_id",'order_id') \
                    .agg(
                            F.count(F.col("product_id")).alias("count_of_product")
                        ) \
                    .groupBy("user_id") \
                    .agg(
                            F.max(F.col("count_of_product")).alias("max_count_of_products"),
                            F.min(F.col("count_of_product")).alias("min_count_of_products"),
                            F.mean(F.col("count_of_product")).alias("mean_count_of_products")
                        ) 

DataFrame[user_id: int, max_count_of_products: bigint, min_count_of_products: bigint, mean_count_of_products: double]

In [73]:
# How many of the user’s orders contained no previously purchased items
prior_product_orders.select("order_id","reordered") \
                    .join(prior_orders_df.select("order_id","user_id") , on = 'order_id' , how = 'left') \
                    .groupBy("user_Id","order_id") \
                    .agg(
                            F.collect_list(F.col("reordered")).alias("reordered_array")
                        ) \
                    .withColumn("doesnt_contains_reordered" ,
                            F.when(F.array_contains("reordered_array",1),0).otherwise(1)
                        ) 

DataFrame[user_Id: int, order_id: int, reordered_array: array<int>, doesnt_contains_reordered: int]

In [74]:
# how often the item has purchaced 
prior_product_orders.select("product_id","order_id") \
                     .groupBy("product_id") \
                     .agg(
                             F.count(F.col("order_id")).alias("product_count")
                        ) 

DataFrame[product_id: int, product_count: bigint]

In [75]:
# position of product 
prior_product_orders.select("product_id","add_to_cart_order") \
                    .groupBy("product_id") \
                    .agg(
                            F.mean(F.col("add_to_cart_order")).alias("product_mean_of_position")
                        ) 

DataFrame[product_id: int, product_mean_of_position: double]

In [76]:
# How many users buy it as "one shot" item
prior_product_orders.select("order_id","product_id") \
                    .groupBy("order_id") \
                    .agg(F.collect_list("product_id").alias("list_of_products")) \
                    .withColumn("is_one_shot_order",
                                   F.when(F.size(F.col("list_of_products")) == 1,1).otherwise(0)
                               ) \
                    .withColumn("product_id",F.explode(F.col("list_of_products"))) \
                    .join(prior_orders_df.select("user_id","order_id"),on="order_id",how='left') \
                    .groupBy("product_id","user_id") \
                    .agg(F.collect_list(F.col("is_one_shot_order")).alias("is_one_shot_order_list")) \
                    .withColumn("has_user_purchased_one_shot",F.when(F.array_contains("is_one_shot_order_list",1),1).otherwise(0)) \
                    .groupBy("product_id") \
                    .agg(
                            F.sum(F.col("has_user_purchased_one_shot")).alias("number_of_user_purchased_item")
                        ) 

DataFrame[product_id: int, number_of_user_purchased_item: bigint]

In [95]:
# Stats on the number of items that co-occur with this item

# 1. number of time that a item has co occured.
# Perform a self-join on prior_product_orders
result_df = (
    prior_product_orders
    .select("product_id", "order_id")
    .alias("df1")
    .join(
        prior_product_orders.select("product_id", "order_id")
        .withColumnRenamed("product_id", "product_id_1")
        .alias("df2"),
        (F.col("df1.order_id") == F.col("df2.order_id")) & (F.col("df1.product_id") != F.col("df2.product_id_1")),
        "left"
    )
    .groupBy("df1.product_id")
    .agg(F.count(F.col("df2.product_id_1")).alias("number_of_product_co_occurred"))
)

# 2 average number of items that is co ocuured with this item in single order

result_df = (
                prior_product_orders.select("product_id","order_id").alias("ppo1") 
                .join(
                    prior_product_orders.select("product_id","order_id")
                    .alias("ppo2"),
                    (F.col("ppo1.order_id") == F.col("ppo2.order_id")) & 
                    (F.col("ppo1.product_id") != F.col("ppo2.product_id")),
                    how='left'
                ) 
                .groupBy("ppo1.product_id","ppo1.order_id")
                .agg(F.count(F.col("ppo2.product_id")).alias("count_of_co_ocuured_product_per_order"))
                .groupBy("ppo1.product_id")
                .agg(
                    F.mean(F.col("count_of_co_ocuured_product_per_order")).alias("mean_of_co_ocuured_product_per_order"),
                    F.min(F.col("count_of_co_ocuured_product_per_order")).alias("min_of_co_ocuured_product_per_order"),
                    F.max(F.col("count_of_co_ocuured_product_per_order")).alias("max_of_co_ocuured_product_per_order"),

                )
)
