In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("ecommerce_dummy") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "1") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .getOrCreate()

25/06/06 13:31:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [87]:
# !gsutil mv gs://dataproc-staging-us-central1-1097408646578-yvstuqxm/external_dataset/customers_500mb.csv gs://dataproc-staging-us-central1-1097408646578-yvstuqxm/ecommerce_dummy/customers_500mb.csv

#### Read

In [4]:
# The dataset is in GCS (distributed), we can get the path using gsutil URI provided  
cust_path = 'gs://dataproc-staging-us-central1-1097408646578-yvstuqxm/ecommerce_dummy/customers_500mb.csv' 

# define schema coz our data is 500-ish MB
schema = StructType ([
StructField("customer_id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("country", StringType(), True),
StructField("registration_date", StringType(), True),
StructField("is_active", BooleanType(), True),
])

# read data
df = spark.read\
.format('csv')\
.option('header','true')\
.schema(schema)\
.load(cust_path)

In [5]:
df.show(5)

                                                                                

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|   Mumbai|  Telangana|  India|       2023-03-21|     true|
|          1|Customer_1|  Chennai|West Bengal|  India|       2023-05-27|    false|
|          2|Customer_2|     Pune|  Karnataka|  India|       2023-10-11|    false|
|          3|Customer_3|Hyderabad|    Gujarat|  India|       2023-11-11|    false|
|          4|Customer_4|   Mumbai|  Karnataka|  India|       2023-05-09|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows



In [6]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- is_active: boolean (nullable = true)



In [6]:
df = df.cache()

#### Process

In [8]:
# check the format of date column (as i run, it takes long time but still okay just few seconds)

df_correct_date = df.withColumn("parsed_date", to_date(col("registration_date"), "yyyy-MM-dd"))
df_correct_date.filter(col("parsed_date").isNull()).select("registration_date").distinct().show(10, False)



+-----------------+
|registration_date|
+-----------------+
+-----------------+



In [None]:
# Since the date format is same in all rows, we change the registration_date col type into date type yyyy-MM-dd 

In [7]:
df = df.withColumn("registration_date", to_date(col("registration_date"), "yyyy-MM-dd"))

In [10]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- is_active: boolean (nullable = true)



In [None]:
# Check if there is any null or nan value in each columns

In [11]:
def count_invalid_values(df, cols=None):
    """
    Menghitung jumlah nilai yang dianggap tidak valid (null, NaN, string kosong, "null", "n/a", dst.)
    untuk setiap kolom yang ditentukan (atau semua kolom).
    
    Returns: DataFrame 1 baris dengan jumlah invalid per kolom
    """
    if cols is None:
        cols = df.columns
        
    exprs = []
    for c in cols:
        expr = when(
            col(c).isNull() |
            (trim(col(c)) == "") |
            (lower(trim(col(c))).isin("null", "n/a", "na", "missing", "?")),
            1
        ).otherwise(0)
        exprs.append(sum(expr).alias(c))
    
    return df.select(exprs)

In [12]:
count_invalid_values(df).show()



+-----------+----+----+-----+-------+-----------------+---------+
|customer_id|name|city|state|country|registration_date|is_active|
+-----------+----+----+-----+-------+-----------------+---------+
|          0|   0|   0|    0|      0|                0|        0|
+-----------+----+----+-----+-------+-----------------+---------+



                                                                                

In [None]:
# Since there is no null values then we don't use Fillna. However if there is any invalid value we can use Fillna as in the code below

In [None]:
# def replace_invalid_values(df, replacement_map):
#     """
#     Ganti nilai tidak valid di beberapa kolom sesuai replacement_map.
#     Nilai tidak valid: null, NaN, "", "null", "n/a", "na", "missing", "?"
#     """
#     for c, replacement in replacement_map.items():
#         df = df.withColumn(
#             c,
#             when(
#                 col(c).isNull() |
#                 (trim(col(c)) == "") |
#                 (lower(trim(col(c))).isin("null", "n/a", "na", "missing", "?")),
#                 replacement
#             ).otherwise(col(c))
#         )
#     return df

In [None]:
# values = {
#     'city': 'Unknown',
#     'state': 'Unknown',
#     'country': 'Unknown',
# }

# df_clean = replace_invalid_values(df, values)

In [None]:
# Extract year, month, dayofmonth, and day name from registration_date

In [8]:
df = df.withColumn('registration_year', year(col('registration_date'))) \
       .withColumn('registration_month', month(col('registration_date'))) \
       .withColumn('registration_day', dayofmonth(col('registration_date'))) \
       .withColumn('registration_dayname', date_format(col('registration_date'), 'EEEE'))

In [9]:
df.show(5)



+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+----------------+--------------------+
|customer_id|      name|     city|      state|country|registration_date|is_active|registration_year|registration_month|registration_day|registration_dayname|
+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+----------------+--------------------+
|          0|Customer_0|   Mumbai|  Telangana|  India|       2023-03-21|     true|             2023|                 3|              21|             Tuesday|
|          1|Customer_1|  Chennai|West Bengal|  India|       2023-05-27|    false|             2023|                 5|              27|            Saturday|
|          2|Customer_2|     Pune|  Karnataka|  India|       2023-10-11|    false|             2023|                10|              11|           Wednesday|
|          3|Customer_3|Hyderabad|    Gujarat|  Indi

                                                                                

In [10]:
df.groupBy("city").count().orderBy(col('count'), ascending=False).show()



+---------+-------+
|     city|  count|
+---------+-------+
|Ahmedabad|1097162|
|  Kolkata|1096777|
|Hyderabad|1096426|
|    Delhi|1096183|
|   Mumbai|1095815|
|     Pune|1095748|
|  Chennai|1095052|
|Bangalore|1094195|
+---------+-------+



                                                                                

In [20]:
df.groupBy("state").count().orderBy(col('count'), ascending=False).show()



+-----------+-------+
|      state|  count|
+-----------+-------+
| Tamil Nadu|1254446|
|      Delhi|1253174|
|  Telangana|1252686|
|    Gujarat|1252211|
|Maharashtra|1251967|
|  Karnataka|1251570|
|West Bengal|1251304|
+-----------+-------+



                                                                                

In [21]:
df.groupBy("city","state").count().orderBy(col('count'), ascending=False).show()



+---------+-----------+------+
|     city|      state| count|
+---------+-----------+------+
|    Delhi| Tamil Nadu|157252|
|   Mumbai| Tamil Nadu|157161|
|  Kolkata|    Gujarat|157090|
|  Kolkata|Maharashtra|157052|
|Ahmedabad| Tamil Nadu|156982|
|Ahmedabad|      Delhi|156979|
|Ahmedabad|  Karnataka|156958|
|    Delhi|      Delhi|156950|
|  Chennai|West Bengal|156904|
|  Kolkata| Tamil Nadu|156897|
|Hyderabad|Maharashtra|156874|
|Hyderabad|West Bengal|156822|
|Hyderabad|  Telangana|156796|
|Ahmedabad|  Telangana|156756|
|     Pune|      Delhi|156750|
|   Mumbai|      Delhi|156726|
|     Pune| Tamil Nadu|156713|
|     Pune|  Karnataka|156705|
|     Pune|  Telangana|156698|
|Hyderabad|      Delhi|156659|
+---------+-----------+------+
only showing top 20 rows



                                                                                

In [23]:
# count active user and incative user in each state using pivot

In [24]:
df.groupBy("state").pivot("is_active").count().show()



+-----------+------+------+
|      state| false|  true|
+-----------+------+------+
|    Gujarat|626059|626152|
|      Delhi|627162|626012|
|  Karnataka|625222|626348|
|  Telangana|626289|626397|
|Maharashtra|626401|625566|
| Tamil Nadu|627387|627059|
|West Bengal|625316|625988|
+-----------+------+------+



                                                                                

In [43]:
df_date = spark.range(1).select(current_date().alias("today"))

# Tampilkan
df_date.show()

+----------+
|     today|
+----------+
|2025-06-06|
+----------+



In [46]:
# df_recent_cust.unpersist()
# # diunpersist dulu karena mau ganti logic code di bawah

In [11]:
df_recent_cust = df.filter(
    (col("registration_date") >= lit("2023-05-01")) &
    (col("registration_date") <= lit("2023-06-01")) # anggap current datenya ini
).cache()

In [48]:
df_recent_cust.show(5)

+-----------+-----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+----------------+--------------------+
|customer_id|       name|     city|      state|country|registration_date|is_active|registration_year|registration_month|registration_day|registration_dayname|
+-----------+-----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+----------------+--------------------+
|          1| Customer_1|  Chennai|West Bengal|  India|       2023-05-27|    false|             2023|                 5|              27|            Saturday|
|          4| Customer_4|   Mumbai|  Karnataka|  India|       2023-05-09|    false|             2023|                 5|               9|             Tuesday|
|         24|Customer_24|Bangalore| Tamil Nadu|  India|       2023-05-14|     true|             2023|                 5|              14|              Sunday|
|         36|Customer_36|  Chennai| Tamil Nadu

In [49]:
df_recent_cust.count()

768468

In [50]:
df.count()

8767358

In [51]:
# get the oldest and newest customer in each city

In [52]:
df.groupBy('city').agg(min('registration_date').alias('oldest'),max('registration_date').alias('newest')).show()




+---------+----------+----------+
|     city|    oldest|    newest|
+---------+----------+----------+
|    Delhi|2023-01-01|2023-12-31|
|  Kolkata|2023-01-01|2023-12-31|
|Hyderabad|2023-01-01|2023-12-31|
|Bangalore|2023-01-01|2023-12-31|
|Ahmedabad|2023-01-01|2023-12-31|
|  Chennai|2023-01-01|2023-12-31|
|   Mumbai|2023-01-01|2023-12-31|
|     Pune|2023-01-01|2023-12-31|
+---------+----------+----------+



                                                                                

In [12]:
df_recent_nonactive_cust = df_recent_cust.filter(col('is_active') == False).cache()
df_recent_active_cust = df_recent_cust.filter(col('is_active') == True).cache()

In [64]:
df_recent_nonactive_cust.count(), df_recent_active_cust.count()

                                                                                

(383654, 384814)

In [63]:
df_recent_active_cust.groupBy("city").pivot("state").count().show()



+---------+-----+-------+---------+-----------+----------+---------+-----------+
|     city|Delhi|Gujarat|Karnataka|Maharashtra|Tamil Nadu|Telangana|West Bengal|
+---------+-----+-------+---------+-----------+----------+---------+-----------+
|    Delhi| 6741|   6948|     6854|       7011|      6923|     6908|       6901|
|  Kolkata| 6825|   6928|     6850|       6894|      6912|     6885|       6894|
|Hyderabad| 7043|   6893|     6760|       6860|      6881|     6902|       6948|
|Bangalore| 6836|   6834|     7052|       6787|      6765|     6895|       6876|
|Ahmedabad| 6758|   6839|     6731|       6778|      6862|     6960|       6905|
|  Chennai| 6764|   6987|     6847|       6836|      6903|     6896|       6961|
|   Mumbai| 6753|   6856|     6784|       6593|      6912|     6867|       6808|
|     Pune| 6814|   6890|     6892|       6946|      6963|     6985|       6918|
+---------+-----+-------+---------+-----------+----------+---------+-----------+



                                                                                

Analyze orders data

In [13]:
# The dataset is in GCS (distributed), we can get the path using gsutil URI provided  
orders_path = 'gs://dataproc-staging-us-central1-1097408646578-yvstuqxm/ecommerce_dummy/orders.csv' 

# define schema coz our data is 400-ish MB
schema_order = StructType ([
StructField("order_id", IntegerType(), True),
StructField("customer_id", IntegerType(), True),
StructField("order_date", DateType(), True),
StructField("total_amount", DoubleType(), True),
StructField("status", StringType(), True),
])

# read data
orders = spark.read\
.format('csv')\
.option('header','true')\
.schema(schema_order)\
.load(orders_path)

In [14]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- status: string (nullable = true)



In [15]:
orders.show(5)

+--------+-----------+----------+-----------------+---------+
|order_id|customer_id|order_date|     total_amount|   status|
+--------+-----------+----------+-----------------+---------+
|       0|    3194509|2024-04-12|772.3507972244216|  Shipped|
|       1|    8003925|2024-05-06| 404.843827081314|  Shipped|
|       2|    1946602|2024-01-11| 886.516239811537|Cancelled|
|       3|    8511049|2024-05-28|268.3271760083327|Cancelled|
|       4|    5013836|2024-07-23|408.1673106991602|Cancelled|
+--------+-----------+----------+-----------------+---------+
only showing top 5 rows



In [16]:
orders = orders.cache()

In [25]:
max_amount = orders.agg(max("total_amount")).collect()[0][0]

In [26]:
max_amount

999.9998648332053

In [27]:
orders.filter(col("total_amount") == max_amount).show()

+--------+-----------+----------+-----------------+---------+
|order_id|customer_id|order_date|     total_amount|   status|
+--------+-----------+----------+-----------------+---------+
|  285908|    8125778|2024-10-29|999.9998648332053|Cancelled|
+--------+-----------+----------+-----------------+---------+



In [28]:
orders.select("status").distinct().show()



+---------+
|   status|
+---------+
|Cancelled|
|Delivered|
|  Shipped|
|  Pending|
+---------+



                                                                                

In [29]:
orders.groupBy("status").count().show()



+---------+-------+
|   status|  count|
+---------+-------+
|Cancelled|2190854|
|Delivered|2193938|
|  Shipped|2191748|
|  Pending|2190818|
+---------+-------+



                                                                                

In [30]:
success_orders = orders.filter(col('status') == 'Delivered').cache()

In [32]:
success_orders.show(5)



+--------+-----------+----------+------------------+---------+
|order_id|customer_id|order_date|      total_amount|   status|
+--------+-----------+----------+------------------+---------+
|       9|    8298598|2024-12-08|53.509402682214954|Delivered|
|      14|    7014407|2024-01-22| 431.8540239941009|Delivered|
|      33|     865279|2024-12-27| 534.4659633185123|Delivered|
|      41|     493094|2024-07-02| 90.86529401137149|Delivered|
|      46|    3277795|2024-05-23| 917.1587356598935|Delivered|
+--------+-----------+----------+------------------+---------+
only showing top 5 rows



                                                                                

In [35]:
success_orders.agg(avg(col("total_amount")).alias("Average"), max(col("total_amount")).alias("Maximum"), min(col("total_amount")).alias("Minimum")).show()

+-----------------+-----------------+-----------------+
|          Average|          Maximum|          Minimum|
+-----------------+-----------------+-----------------+
|504.9787530243163|999.9998261750726|10.00002425168607|
+-----------------+-----------------+-----------------+



Join Customers and Orders

In [42]:
cust_order_df = df.join(orders, "customer_id", "inner")

In [43]:
cust_order_df.count()

                                                                                

8767358

In [47]:
cust_order_df = cust_order_df.cache()

25/06/06 14:34:49 WARN CacheManager: Asked to cache already cached data.


In [50]:
cust_order_df.unpersist()

DataFrame[customer_id: int, name: string, city: string, state: string, country: string, registration_date: date, is_active: boolean, registration_year: int, registration_month: int, registration_day: int, registration_dayname: string, order_id: int, order_date: date, total_amount: double, status: string]

In [51]:
selected = cust_order_df \
           .select("customer_id", "name", "city", "state", "registration_date", "is_active", "order_id", "order_date", "total_amount", "status") \
           .cache()

VIP Customers by the count of their orders

In [52]:
selected.groupBy("customer_id").count().orderBy(col('count'), ascending=False).show(100)



+-----------+-----+
|customer_id|count|
+-----------+-----+
|     960513|   10|
|    4685817|    9|
|    4758607|    9|
|    1680051|    9|
|    4177972|    9|
|    7695252|    9|
|    7596142|    9|
|    3952305|    9|
|    4353840|    8|
|    2687241|    8|
|    2727030|    8|
|    4737613|    8|
|     387115|    8|
|    7235039|    8|
|    3346013|    8|
|    3943540|    8|
|    6944921|    8|
|    1920164|    8|
|    3642000|    8|
|    8531908|    8|
|    2698942|    8|
|    2269868|    8|
|    8033718|    8|
|    1102326|    8|
|    8068134|    8|
|    1343906|    8|
|    3708421|    8|
|    1645353|    8|
|    6489121|    8|
|    8159121|    8|
|    1362838|    8|
|    8744405|    8|
|      14218|    8|
|    2984163|    8|
|    3177272|    8|
|    8172823|    8|
|    1572951|    8|
|    4713558|    8|
|    1075371|    8|
|    8433052|    8|
|    2251742|    8|
|    5263797|    8|
|    5304216|    8|
|    6929977|    8|
|    2144260|    8|
|    3778322|    8|
|    4544608|    8|


                                                                                

In [53]:
selected.filter(col("customer_id") == 960513).show()



+-----------+---------------+-------+---------+-----------------+---------+--------+----------+------------------+---------+
|customer_id|           name|   city|    state|registration_date|is_active|order_id|order_date|      total_amount|   status|
+-----------+---------------+-------+---------+-----------------+---------+--------+----------+------------------+---------+
|     960513|Customer_960513|Kolkata|Karnataka|       2023-11-16|     true| 6749527|2024-09-12|177.56366200886345|Cancelled|
|     960513|Customer_960513|Kolkata|Karnataka|       2023-11-16|     true| 7294650|2024-09-05|  737.694216798412|Delivered|
|     960513|Customer_960513|Kolkata|Karnataka|       2023-11-16|     true| 8731277|2024-06-22|249.19149892200264|  Pending|
|     960513|Customer_960513|Kolkata|Karnataka|       2023-11-16|     true|  509741|2024-09-08| 752.8890242063479|  Pending|
|     960513|Customer_960513|Kolkata|Karnataka|       2023-11-16|     true|  701153|2024-10-25|142.48716858710432|Delivered|


                                                                                

Total spent (Success orders only / status is delivered) per customer in Delhi state

In [55]:
selected.filter(col("state") == "Delhi") \
        .filter(col("status") == "Delivered")\
        .groupBy("customer_id")\
        .agg(sum("total_amount").alias("Total Spent (Success Order)"))\
        .orderBy("Total Spent (Success Order)", ascending=False).show(10)



+-----------+---------------------------+
|customer_id|Total Spent (Success Order)|
+-----------+---------------------------+
|    2402967|         3272.0459097804614|
|    6989819|         3206.3600927177818|
|    6674611|          3186.050298286749|
|    4708179|          3117.681295008326|
|    4649616|         3114.4620501476393|
|    4455197|          3099.774482792612|
|    4268463|          3020.268699009355|
|    5817051|         2969.9164323738996|
|    1631735|         2913.8458165055713|
|    7927876|          2884.821448494191|
+-----------+---------------------------+
only showing top 10 rows



                                                                                

How many orders come into the system per month?

In [58]:
order_per_month = selected.withColumn("order_month", month(col("order_date")))\
                          .groupBy("order_month")\
                          .count()\
                          .orderBy(col("order_month"))

In [59]:
order_per_month.show()



+-----------+------+
|order_month| count|
+-----------+------+
|          1|744380|
|          2|694891|
|          3|745393|
|          4|719513|
|          5|745008|
|          6|719718|
|          7|745388|
|          8|743828|
|          9|720546|
|         10|745026|
|         11|721347|
|         12|722320|
+-----------+------+



                                                                                

In [61]:
selected = selected.withColumn("order_month", month(col("order_date")))
selected.show(5)

+-----------+-------------+---------+-----------+-----------------+---------+--------+----------+-----------------+---------+-----------+
|customer_id|         name|     city|      state|registration_date|is_active|order_id|order_date|     total_amount|   status|order_month|
+-----------+-------------+---------+-----------+-----------------+---------+--------+----------+-----------------+---------+-----------+
|        148| Customer_148|Bangalore|Maharashtra|       2023-10-22|     true| 5464329|2024-02-12|920.1930359397587|  Pending|          2|
|       1088|Customer_1088|  Chennai| Tamil Nadu|       2023-01-06|    false| 3130459|2024-11-21|73.31849307250877|Delivered|         11|
|       1088|Customer_1088|  Chennai| Tamil Nadu|       2023-01-06|    false| 3858678|2024-09-13|691.6821804120165|  Shipped|          9|
|       1088|Customer_1088|  Chennai| Tamil Nadu|       2023-01-06|    false| 4225900|2024-04-14| 442.882851154523|  Shipped|          4|
|       1591|Customer_1591|    Del

In [66]:
pivot_ordermonth_status = selected.groupBy("order_month").pivot("status").count()

                                                                                

In [68]:
pivot_ordermonth_status.orderBy("order_month").show()



+-----------+---------+---------+-------+-------+
|order_month|Cancelled|Delivered|Pending|Shipped|
+-----------+---------+---------+-------+-------+
|          1|   186287|   186290| 185731| 186072|
|          2|   173091|   173414| 174035| 174351|
|          3|   186586|   186898| 185826| 186083|
|          4|   180123|   180053| 179408| 179929|
|          5|   186297|   186328| 185944| 186439|
|          6|   179387|   180493| 180482| 179356|
|          7|   185890|   186625| 186300| 186573|
|          8|   185794|   185731| 186147| 186156|
|          9|   180012|   180171| 179832| 180531|
|         10|   186038|   186543| 186479| 185966|
|         11|   180204|   180758| 180522| 179863|
|         12|   181145|   180634| 180112| 180429|
+-----------+---------+---------+-------+-------+



                                                                                

Find customers with high order frequency but low total spent (delivered status only)

In [69]:
delivered_orders = selected.filter(col("status") == "Delivered").cache()

In [78]:
target_customers = delivered_orders\
.groupBy("customer_id")\
.agg(count("customer_id").alias("Freq Order"), sum("total_amount").alias("Total Spent"))\
.orderBy(col("Freq Order").desc(), col("Total Spent").asc())

#### Write to GCS 

Save recent inactive customers <br>
Save customers clean version <br>
Save cust_order_df (join) <br>

In [79]:
output_path = "gs://dataproc-staging-us-central1-1097408646578-yvstuqxm/ecommerce_dummy/recent_inactive_customers"
df_recent_nonactive_cust.write.mode('overwrite').parquet(output_path)

                                                                                

In [80]:
output_path_cust_processed = "gs://dataproc-staging-us-central1-1097408646578-yvstuqxm/ecommerce_dummy/customers_processed"
df.write.mode('overwrite').parquet(output_path_cust_processed)

                                                                                

In [81]:
output_path_cust_order_join = "gs://dataproc-staging-us-central1-1097408646578-yvstuqxm/ecommerce_dummy/customers_orders"
cust_order_df.write.mode('overwrite').parquet(output_path_cust_order_join)

                                                                                

In [82]:
df.unpersist()
df_recent_nonactive_cust.unpersist()
df_recent_active_cust.unpersist()
df_recent_cust.unpersist()

DataFrame[customer_id: int, name: string, city: string, state: string, country: string, registration_date: date, is_active: boolean, registration_year: int, registration_month: int, registration_day: int, registration_dayname: string]

In [83]:
orders.unpersist()
success_orders.unpersist()
selected.unpersist()
delivered_orders.unpersist()

DataFrame[customer_id: int, name: string, city: string, state: string, registration_date: date, is_active: boolean, order_id: int, order_date: date, total_amount: double, status: string, order_month: int]

In [84]:
print(df_recent_nonactive_cust.is_cached), print(df_recent_active_cust.is_cached), print(df_recent_cust.is_cached), print(df.is_cached)

False
False
False
False


(None, None, None, None)

In [85]:
print(orders.is_cached), print(success_orders.is_cached), print(selected.is_cached), print(delivered_orders.is_cached)

False
False
False
False


(None, None, None, None)

In [86]:
spark.stop()