In [1]:
!pip install findspark


Defaulting to user installation because normal site-packages is not writeable


In [2]:
import findspark
findspark.init()


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MySparkApp").getOrCreate()


24/02/23 11:44:22 WARN Utils: Your hostname, tejasshinde-Nitro-AN515-55 resolves to a loopback address: 127.0.1.1; using 192.168.1.155 instead (on interface wlp0s20f3)
24/02/23 11:44:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/23 11:44:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Load Clean Data

In [4]:
data_path = "nonull.csv"
df= spark.read.csv(data_path, header=True, inferSchema=True)

                                                                                

In [5]:
df.show()

+-------------------+-------------------+-------------------+-------------------+-------+------+-----------+-----------+---------------+-------------------+
|         event_time|           order_id|         product_id|        category_id|  brand| price|      cat_1|      cat_2|          cat_3|            user_id|
+-------------------+-------------------+-------------------+-------------------+-------+------+-----------+-----------+---------------+-------------------+
|2020-04-29 20:11:49|2298069964415828136|1515966223509122874|2268105407933187062|     hp|152.52|  computers|peripherals|        printer|1515915625509647001|
|2020-04-29 23:42:11|2298175846491357353|1515966223509122666|2268105430162997728|samsung|  8.08|electronics|      audio|      headphone|1515915625511889093|
|2020-04-30 18:01:51|2298729326712980173|1515966223509089265|2360741866917331945|   beko|231.46| appliances|environment|air_conditioner|1515915625510823948|
|2020-04-30 19:27:36|2298772487720140990|15159662235093354

# Calculate RFM Values

In [5]:
from pyspark.sql.functions import col, max as max_, countDistinct, sum as sum_, datediff, lit

spark = SparkSession.builder.appName("RFM_Scores_Calculation").getOrCreate()

# Calculate recency (number of days since the last order)
max_order_date = df.groupBy("user_id").agg(max_("event_time").alias("last_order_time"))
current_time = df.select(max_("event_time")).head()[0]
df_recency = max_order_date.withColumn("recency", datediff(lit(current_time), col("last_order_time")))

# Calculate frequency (number of orders per user)
df_frequency = df.groupBy("user_id").agg(countDistinct("order_id").alias("frequency"))

# Calculate monetary value (total spending per user)
df_monetary = df.groupBy("user_id").agg(sum_("price").alias("monetary"))

# Aggregate the data to calculate RFM scores
rfm_data = df_recency.join(df_frequency, "user_id").join(df_monetary, "user_id")

rfm_data.show()




24/02/23 11:44:45 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.

+-------------------+-------------------+-------+---------+------------------+
|            user_id|    last_order_time|recency|frequency|          monetary|
+-------------------+-------------------+-------+---------+------------------+
|1515915625440051468|2020-10-10 16:27:29|     42|       19|           1619.51|
|1515915625440121544|2020-10-04 12:18:21|     48|        7|            878.32|
|1515915625440937295|2020-11-19 15:36:46|      2|       18|1493.3400000000001|
|1515915625440937382|2020-10-25 17:36:25|     27|       10| 757.6599999999999|
|1515915625440937580|2020-10-10 10:52:42|     42|        8|            275.05|
|1515915625440940742|2020-09-16 13:59:35|     66|        2|59.910000000000004|
|1515915625440940932|2020-09-01 13:45:29|     81|        6|1854.2500000000002|
|1515915625440941441|2020-01-16 12:06:23|    310|        1|             80.76|
|1515915625440943934|2020-10-11 17:17:47|     41|       24| 5877.459999999999|
|1515915625440945265|2020-11-18 19:45:39|      3|   

                                                                                

In [8]:
rfm_data.count()

                                                                                

218524

# Download RFM Data

In [20]:
output_path = "RFMData"
rfm_data.coalesce(1).write.csv(output_path, header=True)


# Calculate the minimum, maximum, mean, and average price

In [9]:
from pyspark.sql.functions import max as max_, min as min_, avg as avg_

min_price = df.select(min_("price")).collect()[0][0]
max_price = df.select(max_("price")).collect()[0][0]
mean_price = df.select(mean_("price")).collect()[0][0]
avg_price = df.select(avg_("price")).collect()[0][0]

print("Minimum Price:", min_price)
print("Maximum Price:", max_price)
print("Average Price:", avg_price)




Minimum Price: 0.02
Maximum Price: 50925.9
Average Price: 160.222006341935


                                                                                

# Assign a class ID to each user based on their maximum price using the cluster boundaries

In [50]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("KMeans_MaxPrice_Grouping").getOrCreate()

# Step 1: Find the maximum price for each distinct user ID
max_prices = df.groupBy("user_id").agg(F.max("price").alias("max_price"))

# Step 2: Prepare the data for KMeans clustering
assembler = VectorAssembler(inputCols=["max_price"], outputCol="features")
data = assembler.transform(max_prices.select("user_id", "max_price"))

# Step 3: Fit a KMeans model to cluster the maximum prices
k = 5 
kmeans = KMeans(k=k, seed=42)
model = kmeans.fit(data)

# Step 4: Analyze the cluster centers to determine the boundaries for the categories
cluster_centers = model.clusterCenters()
cluster_boundaries = sorted([center[0] for center in cluster_centers])

df_with_class = max_prices.withColumn("class_id",
    when(max_prices["max_price"] <= cluster_boundaries[0], "1")
    .when((max_prices["max_price"] > cluster_boundaries[0]) & (max_prices["max_price"] <= cluster_boundaries[1]), "2")
    .when((max_prices["max_price"] > cluster_boundaries[1]) & (max_prices["max_price"] <= cluster_boundaries[2]), "3")
    .when((max_prices["max_price"] > cluster_boundaries[2]) & (max_prices["max_price"] <= cluster_boundaries[3]), "4")
    .otherwise("5")
)

df_with_class.show()


[Stage 368:>                                                        (0 + 8) / 8]

+-------------------+---------+--------+
|            user_id|max_price|class_id|
+-------------------+---------+--------+
|1515915625447920758|   555.53|       3|
|1515915625514155523|   486.09|       3|
|1515915625484629394|   671.27|       3|
|1515915625443650194|   636.55|       3|
|1515915625512376820|   694.42|       3|
|1515915625512375843|  1041.64|       4|
|1515915625496322977|   479.14|       3|
|1515915625514800986|   185.16|       2|
|1515915625476291004|  1388.87|       5|
|1515915625461255450|   555.53|       3|
|1515915625513057327|   486.09|       3|
|1515915625442959158|   601.83|       3|
|1515915625450900260|   763.63|       3|
|1515915625492428068|   763.63|       3|
|1515915625512154161|   925.67|       4|
|1515915625492842511|  1192.11|       4|
|1515915625457032478|  1387.01|       5|
|1515915625475085544|    300.9|       2|
|1515915625461618764|   162.01|       2|
|1515915625512876282|   949.05|       4|
+-------------------+---------+--------+
only showing top

                                                                                

# Group the DataFrame by class_id and calculate the min and max of max_price

In [51]:
min_max_price = df_with_class.groupBy("class_id").agg(F.min("max_price").alias("min_max_price"), F.max("max_price").alias("max_max_price"))

min_max_price.show()


[Stage 371:>                                                        (0 + 8) / 8]

+--------+-------------+-------------+
|class_id|min_max_price|max_max_price|
+--------+-------------+-------------+
|       3|       453.68|       833.31|
|       5|      1377.29|      50925.9|
|       1|         0.02|       137.94|
|       4|       835.63|      1367.12|
|       2|        138.4|       453.47|
+--------+-------------+-------------+



                                                                                

# Assign classes on basis of class ID in new column 

In [52]:
from pyspark.sql.functions import col, when

class_conditions = [
    (col("class_id") == 5, "Upper class"),
    (col("class_id") == 4, "Upper middle class"),
    (col("class_id") == 3, "Middle class"),
    (col("class_id") == 2, "Lower middle class"),
    (col("class_id") == 1, "Lower class")
]

class_cat = df_with_class.withColumn("class_category", 
                   when(class_conditions[0][0], class_conditions[0][1])
                   .when(class_conditions[1][0], class_conditions[1][1])
                   .when(class_conditions[2][0], class_conditions[2][1])
                   .when(class_conditions[3][0], class_conditions[3][1])
                   .when(class_conditions[4][0], class_conditions[4][1])
                   .otherwise(None))


In [53]:
class_cat.show()



+-------------------+---------+--------+------------------+
|            user_id|max_price|class_id|    class_category|
+-------------------+---------+--------+------------------+
|1515915625447920758|   555.53|       3|      Middle class|
|1515915625514155523|   486.09|       3|      Middle class|
|1515915625484629394|   671.27|       3|      Middle class|
|1515915625443650194|   636.55|       3|      Middle class|
|1515915625512376820|   694.42|       3|      Middle class|
|1515915625512375843|  1041.64|       4|Upper middle class|
|1515915625496322977|   479.14|       3|      Middle class|
|1515915625514800986|   185.16|       2|Lower middle class|
|1515915625476291004|  1388.87|       5|       Upper class|
|1515915625461255450|   555.53|       3|      Middle class|
|1515915625513057327|   486.09|       3|      Middle class|
|1515915625442959158|   601.83|       3|      Middle class|
|1515915625450900260|   763.63|       3|      Middle class|
|1515915625492428068|   763.63|       3|

                                                                                

# Join class category to original dataframe

In [54]:
classdf = df.join(class_cat, "user_id", "left")
classdf.show()




+-------------------+-------------------+-------------------+-------------------+-------------------+----------+-------+-------------+-------------+-------------+---------+--------+------------------+
|            user_id|         event_time|           order_id|         product_id|        category_id|     brand|  price|        cat_1|        cat_2|        cat_3|max_price|class_id|    class_category|
+-------------------+-------------------+-------------------+-------------------+-------------------+----------+-------+-------------+-------------+-------------+---------+--------+------------------+
|1515915625441612480|2020-02-10 16:48:37|2348791608045994250|1515966223509122835|2268105430162997728|       neo|  20.81|  electronics|        audio|    headphone|  1337.71|       4|Upper middle class|
|1515915625443940057|2020-02-08 11:57:41|2348791388851666961|1515966223509335434|2309018577547559593|       ggg|  30.07|      apparel|        shirt|miscellaneous|   694.42|       3|      Middle cl

                                                                                

# Drop the product_id column

In [55]:
classdf = classdf.drop("product_id")


In [56]:
classdf = classdf.withColumnRenamed("max_price", "max_purchase_by_user")

# Categorize customers on basis RFM score 

In [68]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Customer_Value_Categorization_KMeans").getOrCreate()

# Step 1: Normalize the RFM scores
normalized_rfm = rfm_data.withColumn("recency", F.log1p("recency")).withColumn("frequency", F.log1p("frequency")).withColumn("monetary", F.log1p("monetary"))

# Step 2: Combine RFM scores into a single feature vector
assembler = VectorAssembler(inputCols=["recency", "frequency", "monetary"], outputCol="features")
rfm_vector = assembler.transform(rfm_data)

# Step 3: Apply KMeans clustering
k = 3
kmeans = KMeans(k=k, seed=42)
model = kmeans.fit(rfm_vector)

# Step 4: Make predictions
predictions = model.transform(rfm_vector)

# Step 5: Assign categories based on cluster assignments
df_with_categories = predictions.withColumn("category",
    F.when(predictions["prediction"] == 0, "Low-Value")
    .when(predictions["prediction"] == 1, "High-Value")
    .otherwise("Mid-Value")
)

df_with_categories.show()




+-------------------+-------------------+-------+---------+------------------+--------------------+----------+----------+
|            user_id|    last_order_time|recency|frequency|          monetary|            features|prediction|  category|
+-------------------+-------------------+-------+---------+------------------+--------------------+----------+----------+
|1515915625440051468|2020-10-10 16:27:29|     42|       19|           1619.51| [42.0,19.0,1619.51]|         2| Mid-Value|
|1515915625440121544|2020-10-04 12:18:21|     48|        7|            878.32|   [48.0,7.0,878.32]|         0| Low-Value|
|1515915625440937295|2020-11-19 15:36:46|      2|       18|1493.3400000000001|[2.0,18.0,1493.34...|         2| Mid-Value|
|1515915625440937382|2020-10-25 17:36:25|     27|       10| 757.6599999999999|[27.0,10.0,757.65...|         0| Low-Value|
|1515915625440937580|2020-10-10 10:52:42|     42|        8|            275.05|   [42.0,8.0,275.05]|         0| Low-Value|
|1515915625440940742|202

                                                                                

# Chech RFM values for assigned categories

In [65]:
max = df_with_categories.groupBy("prediction").agg({"recency": "max"})

max.show()



+----------+------------+
|prediction|max(recency)|
+----------+------------+
|         1|         260|
|         2|         313|
|         0|         321|
+----------+------------+



                                                                                

In [67]:
max = df_with_categories.groupBy("prediction").agg({"frequency": "max"})

max.show()



+----------+--------------+
|prediction|max(frequency)|
+----------+--------------+
|         1|            79|
|         2|            52|
|         0|            43|
+----------+--------------+



                                                                                

In [63]:
max = df_with_categories.groupBy("prediction").agg({"monetary": "max"})

max.show()



+----------+-------------+
|prediction|max(monetary)|
+----------+-------------+
|         1|     52328.31|
|         2|      3414.71|
|         0|      1409.64|
+----------+-------------+



                                                                                

# Using category create the category_value column

In [69]:
from pyspark.sql.functions import when

category_conditions = [
    (col("category") == "High-Value", 1),
    (col("category") == "Mid-Value", 2),
    (col("category") == "Low-Value", 3)
]

df_with_categories = df_with_categories.withColumn("category_value", 
                                 when(category_conditions[0][0], category_conditions[0][1])
                                 .when(category_conditions[1][0], category_conditions[1][1])
                                 .when(category_conditions[2][0], category_conditions[2][1])
                                 .otherwise(None))

df_with_categories.show()




+-------------------+-------------------+-------+---------+------------------+--------------------+----------+----------+--------------+
|            user_id|    last_order_time|recency|frequency|          monetary|            features|prediction|  category|category_value|
+-------------------+-------------------+-------+---------+------------------+--------------------+----------+----------+--------------+
|1515915625440051468|2020-10-10 16:27:29|     42|       19|           1619.51| [42.0,19.0,1619.51]|         2| Mid-Value|             2|
|1515915625440121544|2020-10-04 12:18:21|     48|        7|            878.32|   [48.0,7.0,878.32]|         0| Low-Value|             3|
|1515915625440937295|2020-11-19 15:36:46|      2|       18|1493.3400000000001|[2.0,18.0,1493.34...|         2| Mid-Value|             2|
|1515915625440937382|2020-10-25 17:36:25|     27|       10| 757.6599999999999|[27.0,10.0,757.65...|         0| Low-Value|             3|
|1515915625440937580|2020-10-10 10:52:42|

                                                                                

# Drop unnecessary columns

In [70]:
df_cat = df_with_categories.drop("last_order_time", "features", "prediction")

In [71]:
df_cat.show()



+-------------------+-------+---------+------------------+----------+--------------+
|            user_id|recency|frequency|          monetary|  category|category_value|
+-------------------+-------+---------+------------------+----------+--------------+
|1515915625440051468|     42|       19|           1619.51| Mid-Value|             2|
|1515915625440121544|     48|        7|            878.32| Low-Value|             3|
|1515915625440937295|      2|       18|1493.3400000000001| Mid-Value|             2|
|1515915625440937382|     27|       10| 757.6599999999999| Low-Value|             3|
|1515915625440937580|     42|        8|            275.05| Low-Value|             3|
|1515915625440940742|     66|        2|59.910000000000004| Low-Value|             3|
|1515915625440940932|     81|        6|1854.2500000000002| Mid-Value|             2|
|1515915625440941441|    310|        1|             80.76| Low-Value|             3|
|1515915625440943934|     41|       24| 5877.459999999999|High-Va

                                                                                

# Join category dataframe with original dataframe

In [72]:
catdf = classdf.join(df_cat, "user_id", "left")
catdf.show()


[Stage 1141:>                                                       (0 + 8) / 8]

+-------------------+-------------------+-------------------+-------------------+----------+-------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+------------------+----------+--------------+
|            user_id|         event_time|           order_id|        category_id|     brand|  price|        cat_1|        cat_2|        cat_3|max_purchase_by_user|class_id|    class_category|recency|frequency|          monetary|  category|category_value|
+-------------------+-------------------+-------------------+-------------------+----------+-------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+------------------+----------+--------------+
|1515915625441612480|2020-02-10 16:48:37|2348791608045994250|2268105430162997728|       neo|  20.81|  electronics|        audio|    headphone|             1337.71|       4|Upper middle class|     18|       27|3760.3399999999997|High-Va

                                                                                

# Download file

In [73]:
# output_path = "CrystalClean"
# catdf.coalesce(1).write.csv(output_path, header=True)


                                                                                

# Check null values in dataframe 

In [9]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in catdf.columns]

null_cnt_df = catdf.agg(*null_cnt)

null_cnt_df.show()



+-------+----------+--------+-----------+-----+-----+-----+-----+-----+--------------------+--------+--------------+-------+---------+--------+--------+--------------+
|user_id|event_time|order_id|category_id|brand|price|cat_1|cat_2|cat_3|max_purchase_by_user|class_id|class_category|recency|frequency|monetary|category|category_value|
+-------+----------+--------+-----------+-----+-----+-----+-----+-----+--------------------+--------+--------------+-------+---------+--------+--------+--------------+
|      0|         0|       0|          0|    0|    0|    0|    0|    0|                   0|       0|             0|      0|        0|       0|       0|             0|
+-------+----------+--------+-----------+-----+-----+-----+-----+-----+--------------------+--------+--------------+-------+---------+--------+--------+--------------+



                                                                                

In [8]:
catdf.count()

2060046

# Load the clean file 

In [7]:
data_path = "crystalClean.csv"
catdf= spark.read.csv(data_path, header=True, inferSchema=True)

                                                                                

# Create new column Date from event_time column

In [10]:
from pyspark.sql.functions import col, to_date

newdf = catdf.withColumn("Date", to_date(col("event_time")))
newdf.show()


+-------------------+-------------------+-------------------+-------------------+-----------+------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+--------+---------+--------------+----------+
|            user_id|         event_time|           order_id|        category_id|      brand| price|        cat_1|        cat_2|        cat_3|max_purchase_by_user|class_id|    class_category|recency|frequency|monetary| category|category_value|      Date|
+-------------------+-------------------+-------------------+-------------------+-----------+------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+--------+---------+--------------+----------+
|1515915625440051468|2020-02-04 15:00:09|2348787268686184498|2268105402673529732|    rowenta| 25.44|miscellaneous|miscellaneous|miscellaneous|              403.45|       2|Lower middle class|     42|       19| 1619.51|Mid-Value|       

In [11]:
newdf.count()

2060046

# Check unique user IDs 

In [12]:
from pyspark.sql import functions as F

# Count the number of unique user_ids
unique_user_count = newdf.select(F.countDistinct("user_id")).first()[0]

# Print the count
print("Number of unique user_ids:", unique_user_count)




Number of unique user_ids: 218524


                                                                                

# Create new column Time (Hour, Minute, Second) from event_time

In [13]:
from pyspark.sql.functions import concat, format_string, hour, minute, second

newdf = newdf.withColumn("Time", format_string("%02d:%02d:%02d", hour("event_time"), minute("event_time"), second("event_time")))

newdf.show()


+-------------------+-------------------+-------------------+-------------------+-----------+------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+--------+---------+--------------+----------+--------+
|            user_id|         event_time|           order_id|        category_id|      brand| price|        cat_1|        cat_2|        cat_3|max_purchase_by_user|class_id|    class_category|recency|frequency|monetary| category|category_value|      Date|    Time|
+-------------------+-------------------+-------------------+-------------------+-----------+------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+--------+---------+--------------+----------+--------+
|1515915625440051468|2020-02-04 15:00:09|2348787268686184498|2268105402673529732|    rowenta| 25.44|miscellaneous|miscellaneous|miscellaneous|              403.45|       2|Lower middle class|     42|       19

# Drop event_time column

In [14]:
newdf = newdf.drop("event_time")

# Create spending column which contains total spend by unique user

In [15]:
from pyspark.sql import Window
from pyspark.sql import functions as F

purchase = newdf.withColumn("spending", F.sum("price").over(Window.partitionBy("user_id")))

purchase.show()




+-------------------+-------------------+-------------------+-----------+------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+--------+---------+--------------+----------+--------+-----------------+
|            user_id|           order_id|        category_id|      brand| price|        cat_1|        cat_2|        cat_3|max_purchase_by_user|class_id|    class_category|recency|frequency|monetary| category|category_value|      Date|    Time|         spending|
+-------------------+-------------------+-------------------+-----------+------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+--------+---------+--------------+----------+--------+-----------------+
|1515915625440051468|2348787268686184498|2268105402673529732|    rowenta| 25.44|miscellaneous|miscellaneous|miscellaneous|              403.45|       2|Lower middle class|     42|       19| 1619.51|Mid-Value|      

                                                                                

# Check min, max, avg of spending column

In [16]:
from pyspark.sql.functions import max as max_, min as min_, avg as avg_

min_price = purchase.select(min_("spending")).collect()[0][0]
max_price = purchase.select(max_("spending")).collect()[0][0]
avg_price = purchase.select(avg_("spending")).collect()[0][0]

print("Minimum Price:", min_price)
print("Maximum Price:", max_price)
print("Average Price:", avg_price)




Minimum Price: 0.02
Maximum Price: 52328.31
Average Price: 2203.42764967584


                                                                                

In [17]:
purchase = purchase.withColumnRenamed("category", "customer_type")
purchase = purchase.withColumnRenamed("category_value", "cust_value")

In [18]:
purchase.show()



+-------------------+-------------------+-------------------+-----------+------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+--------+-------------+----------+----------+--------+-----------------+
|            user_id|           order_id|        category_id|      brand| price|        cat_1|        cat_2|        cat_3|max_purchase_by_user|class_id|    class_category|recency|frequency|monetary|customer_type|cust_value|      Date|    Time|         spending|
+-------------------+-------------------+-------------------+-----------+------+-------------+-------------+-------------+--------------------+--------+------------------+-------+---------+--------+-------------+----------+----------+--------+-----------------+
|1515915625440051468|2348787268686184498|2268105402673529732|    rowenta| 25.44|miscellaneous|miscellaneous|miscellaneous|              403.45|       2|Lower middle class|     42|       19| 1619.51|    Mid-Value|  

                                                                                

# Create FDF dataframe

In [19]:
fdf = purchase

In [24]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in fdf.columns]

null_cnt_df = fdf.agg(*null_cnt)

null_cnt_df.show()



+-------+--------+-----------+-----+-----+-----+-----+-----+--------------------+--------+--------------+-------+---------+--------+-------------+----------+----+----+--------+
|user_id|order_id|category_id|brand|price|cat_1|cat_2|cat_3|max_purchase_by_user|class_id|class_category|recency|frequency|monetary|customer_type|cust_value|Date|Time|spending|
+-------+--------+-----------+-----+-----+-----+-----+-----+--------------------+--------+--------------+-------+---------+--------+-------------+----------+----+----+--------+
|      0|       0|          0|    0|    0|    0|    0|    0|                   0|       0|             0|      0|        0|       0|            0|         0|   0|   0|       0|
+-------+--------+-----------+-----+-----+-----+-----+-----+--------------------+--------+--------------+-------+---------+--------+-------------+----------+----+----+--------+



                                                                                

In [22]:
fdf.dtypes

[('user_id', 'bigint'),
 ('order_id', 'bigint'),
 ('category_id', 'bigint'),
 ('brand', 'string'),
 ('price', 'double'),
 ('cat_1', 'string'),
 ('cat_2', 'string'),
 ('cat_3', 'string'),
 ('max_purchase_by_user', 'double'),
 ('class_id', 'int'),
 ('class_category', 'string'),
 ('recency', 'int'),
 ('frequency', 'int'),
 ('monetary', 'double'),
 ('customer_type', 'string'),
 ('cust_value', 'int'),
 ('Date', 'date'),
 ('Time', 'string'),
 ('spending', 'double')]

In [23]:
output_path = "Crystal_Clean_Data"
fdf.coalesce(1).write.csv(output_path, header=True)


                                                                                