# Finding Repeat Customers for Distributed Discounts

This notebook delves into the application of Spark to address business inquiries based on data analysis. "Distributed Discounts" has acknowledged the potential of big data and Spark through this process. Encouraged by their findings, they have intensified data collection efforts with the aim of utilizing this data to forecast if a first-time customer will become a repeat buyer. By accurately predicting customer behavior, they plan to optimize their advertising strategies. This notebook explores the data and involves constructing a machine learning model that can effectively predict which customers will become repeat buyers.


### Data Dictionary

- `capstone_customers.csv`: A dataset containing historical data about Distributed Discounts' customers. It contains fields for `customer_id`, `customer_invoices`, and whether or not the customer was a `repeat_customer`.
- `capstone_invoices.csv`: A dataset containing historical data about orders made from Distributed Discounts. Each row contains information about the order, including a `customer_id` field which can be joined with the customers data.
- `capstone_recent_customers.csv`: A dataset containing information about recent customers. This contains similar information as the `capstone_customers` dataset, but it is missing the `repeat_customer` field because these are brand new customers. The goal is to use your machine learning model to predict whether these customers will be repeat customers.
- `capstone_recent_invoices.csv`: A dataset containing information about orders made by recent customers. This dataset is very similar to `capstone_invoices.csv`, but it contains data that can be joined to the recent customers data.

### Create a SparkSession and Import Your Data

In [1]:
import os
import pyspark
from pyspark import SparkContext 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.mllib.stat import Statistics
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

%env JAVA_HOME = /usr/lib/jvm/java-8-openjdk-amd64

env: JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64


In [2]:
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("Customers") \
      .config("spark.driver.memory", "4g") \
      .config("spark.executor.memory", "4g") \
      .getOrCreate() 
        
sc = spark.sparkContext

In [3]:
# Read the customers file
customers = spark.read.csv("capstone_customers.csv", header=True, inferSchema='True')

# Read the invoice file
invoices = spark.read.csv("capstone_invoices.csv", header=True, inferSchema='True')

# Read the invoice_line.csv file
recent_customers = spark.read.csv("capstone_recent_customers.csv", header=True, inferSchema='True')

# Read the products.csv file
recent_invoices = spark.read.csv("capstone_recent_invoices.csv", header=True, inferSchema='True')

Examine the data in each of the dataframes

In [4]:
# Display the top 5 records in the customers DataFrame
print("Top 5 records in the customers DataFrame:")
customers.show(5)

# Display the total number of rows in the customers DataFrame
print("Total number of rows in the customers DataFrame:", customers.count())
print()

# Display the top 5 records in the invoice DataFrame
print("Top 5 records in the invoice DataFrame:")
invoices.show(5)

# Display the total number of rows in the invoice DataFrame
print("Total number of rows in the invoice DataFrame:", invoices.count())
print()

# Display the top 5 records in the recent customers DataFrame
print("Top 5 records in the recent customers DataFrame:")
recent_customers.show(5)

# Display the total number of rows in the recent customers DataFrame
print("Total number of rows in the recent customers DataFrame:", recent_customers.count())
print()

# Display the top 5 records in the recent invoices DataFrame
print("Top 5 records in the recent invoices DataFrame:")
recent_invoices.show(5)

# Display the total number of rows in the recent invoices DataFrame
print("Total number of rows in the products DataFrame:", recent_invoices.count())
print()

Top 5 records in the customers DataFrame:
+-----------+-------------+---------------+
|customer_id|customer_type|repeat_customer|
+-----------+-------------+---------------+
|      81769|   non-member|              0|
|      59586|       member|              0|
|      27974|   non-member|              0|
|      71976|   non-member|              0|
|      52509|   non-member|              0|
+-----------+-------------+---------------+
only showing top 5 rows

Total number of rows in the customers DataFrame: 652

Top 5 records in the invoice DataFrame:
+-------------+----------+-----------+------------------+------------+------+
|   invoice_id|product_id|customer_id|days_until_shipped|product_line| total|
+-------------+----------+-----------+------------------+------------+------+
|817-69-8206-B|      H013|      81769|                 7|        Home| 92.66|
|595-86-2894-C|      E305|      59586|                 8| Electronics| 429.9|
|279-74-2924-B|      B002|      27974|               

Checking for null values in all four dataframes

In [5]:
# Define a list of DataFrames
dataframes = [customers, invoices, recent_customers, recent_invoices]

# Iterate over each DataFrame
for df in dataframes:
    print(f"Null value counts in the {df} DataFrame:")
    df.select([sum(col(col_name).isNull().cast("integer")).alias(col_name) for col_name in df.columns]).show()
    print()

Null value counts in the DataFrame[customer_id: int, customer_type: string, repeat_customer: int] DataFrame:
+-----------+-------------+---------------+
|customer_id|customer_type|repeat_customer|
+-----------+-------------+---------------+
|          1|            1|              0|
+-----------+-------------+---------------+


Null value counts in the DataFrame[invoice_id: string, product_id: string, customer_id: int, days_until_shipped: int, product_line: string, total: double] DataFrame:
+----------+----------+-----------+------------------+------------+-----+
|invoice_id|product_id|customer_id|days_until_shipped|product_line|total|
+----------+----------+-----------+------------------+------------+-----+
|         0|         0|          1|                 1|           0|    0|
+----------+----------+-----------+------------------+------------+-----+


Null value counts in the DataFrame[customer_id: int, customer_type: string] DataFrame:
+-----------+-------------+
|customer_id|cus

Since there aren't many null values, they were dropped below:

In [6]:
# Iterate over each DataFrame
for i in range(len(dataframes)):
    # Drop rows with null values and update the original DataFrame reference
    dataframes[i] = dataframes[i].na.drop()

# Retrieve the modified DataFrames
customers, invoices, recent_customers, recent_invoices = dataframes

The invoices and customers have customer_id as a shared field, so an inner join to combine them is necessary

In [7]:
# Perform innner join on customer_id column
joined_customers = invoices.join(customers, "customer_id", "inner")
joined_customers.limit(5).toPandas()

Unnamed: 0,customer_id,invoice_id,product_id,days_until_shipped,product_line,total,customer_type,repeat_customer
0,81769,817-69-8206-B,H013,7,Home,92.66,non-member,0
1,59586,595-86-2894-C,E305,8,Electronics,429.9,member,0
2,27974,279-74-2924-B,B002,7,Health,269.25,non-member,0
3,71976,719-76-3868-C,H246,6,Home,148.14,non-member,0
4,52509,525-09-8450-B,E302,4,Electronics,172.8,non-member,0


And do the same for the recent unlabled data...

In [8]:
# Perform innner join on customer_id column
joined_recent_customers = recent_invoices.join(recent_customers, "customer_id", "inner")
joined_recent_customers.limit(5).toPandas()

Unnamed: 0,customer_id,invoice_id,product_id,days_until_shipped,product_line,total,customer_type
0,80270,802-70-5316-A,B004,7,Health,499.66,member
1,87546,875-46-5808-B,E303,6,Electronics,304.4,non-member
2,29121,291-21-5991-B,H014,9,Home,368.13,non-member
3,50002,500-02-2261-C,B006,8,Health,724.0,member
4,32678,326-78-5178-C,T202,5,Travel,476.84,non-member


In [9]:
# Check out summary statistics
joined_customers.describe().toPandas().set_index('summary').transpose()

summary,count,mean,stddev,min,max
customer_id,651,50104.80798771121,23185.021922485896,10117,89804
invoice_id,651,,,101-17-6199-A,898-04-2717-A
product_id,651,,,A050,T260
days_until_shipped,651,6.443932411674347,1.6560859941561532,1,11
product_line,651,,,Electronics,Travel
total,651,310.5081259600615,238.36422598430727,2.9,987.0
customer_type,651,,,member,non-member
repeat_customer,651,0.2872503840245776,0.4528273200976013,0,1


The customer_id and invoice_id columns likely won't be useful to the analysis so those will be dropped. The other non-numeric columns to see how much feature space will be needed to map them.

In [10]:
# Select the columns you want to examine
column_names = ['customer_type', 'product_line', 'product_id']

# Count the occurrences for each unique value in each column
count_results = []
for column_name in column_names:
    counts = joined_customers.groupBy(column_name).agg(count('*').alias('count'))
    count_results.append(counts)

# Show the unique values and their counts for each column
for i, column_name in enumerate(column_names):
    print(f"Counts for {column_name}:")
    count_results[i].show()

Counts for customer_type:
+-------------+-----+
|customer_type|count|
+-------------+-----+
|   non-member|  321|
|       member|  330|
+-------------+-----+

Counts for product_line:
+--------------+-----+
|  product_line|count|
+--------------+-----+
|          Home|  112|
|       Fashion|  113|
|        Travel|  108|
|        Health|  100|
|   Electronics|  102|
|Food and Drink|  116|
+--------------+-----+

Counts for product_id:
+----------+-----+
|product_id|count|
+----------+-----+
|      A501|   15|
|      T203|    8|
|      B002|    5|
|      A056|   17|
|      F470|   16|
|      T027|   20|
|      T020|   10|
|      F403|   11|
|      A050|   14|
|      H245|   13|
|      F040|   10|
|      E305|   23|
|      T201|   12|
|      H015|    8|
|      F324|   10|
|      B000|   12|
|      T260|   14|
|      F406|    8|
|      B003|   15|
|      H009|   13|
+----------+-----+
only showing top 20 rows



The "product_id" column has a large number of unique values relative to the size of the data. Including it as a feature could lead to a high-dimensional feature space. This can pose challenges in terms of model complexity so this column will be dropped as well.

In [11]:
# Store columns to drop
cols_to_drop = ['customer_id','invoice_id','product_id']

# Drop the columns and examine new dataset
joined_customers = joined_customers.select([col for col in joined_customers.columns if col not in cols_to_drop])
joined_customers.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type,repeat_customer
0,7,Home,92.66,non-member,0
1,8,Electronics,429.9,member,0
2,7,Health,269.25,non-member,0
3,6,Home,148.14,non-member,0
4,4,Electronics,172.8,non-member,0


Change repeat_customer column name to label since that is what we are trying to predict.

In [12]:
old = 'repeat_customer'
new = 'label'
joined_customers = joined_customers.withColumnRenamed(old, new)

Now, its time to start the preprocessing pipeline. We'll start using the StringIndexer to handle the non-numeric values.

In [13]:
# Create StringIndexer instances for each column with string data
indexer1 = StringIndexer(inputCol="product_line", outputCol="product_line_indexed")
indexer2 = StringIndexer(inputCol="customer_type", outputCol="customer_type_indexed")

# Fit and transform the DataFrame with StringIndexer
indexed_df = indexer1.fit(joined_customers).transform(joined_customers)
indexed_df = indexer2.fit(indexed_df).transform(indexed_df)

# Show the transformed DataFrame
indexed_df.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type,label,product_line_indexed,customer_type_indexed
0,7,Home,92.66,non-member,0,2.0,1.0
1,8,Electronics,429.9,member,0,4.0,0.0
2,7,Health,269.25,non-member,0,5.0,1.0
3,6,Home,148.14,non-member,0,2.0,1.0
4,4,Electronics,172.8,non-member,0,4.0,1.0


Now that we have it indexed, the OneHotEncoderEstimator will convert them to binary vectors.

In [14]:
# Create OneHotEncoderEstimator instance
encoder = OneHotEncoderEstimator(
    inputCols=["product_line_indexed", "customer_type_indexed"],
    outputCols=["product_line_encoded", "customer_type_encoded"])

# Fit and transform the DataFrame with OneHotEncoderEstimator
encoded_df = encoder.fit(indexed_df).transform(indexed_df)

# Show the transformed DataFrame
encoded_df.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type,label,product_line_indexed,customer_type_indexed,product_line_encoded,customer_type_encoded
0,7,Home,92.66,non-member,0,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0)",(0.0)
1,8,Electronics,429.9,member,0,4.0,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0)",(1.0)
2,7,Health,269.25,non-member,0,5.0,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0)",(0.0)
3,6,Home,148.14,non-member,0,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0)",(0.0)
4,4,Electronics,172.8,non-member,0,4.0,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0)",(0.0)


Now that the non-numerical values are ready, we can add the numerical columns to a binary vector

In [15]:
# Create VectorAssembler instance
numerical_assembler = VectorAssembler(
    inputCols=['days_until_shipped', 'total'],
    outputCol='numerical_vectorized_features')

# Transform the DataFrame with VectorAssembler
assembler_df = numerical_assembler.transform(encoded_df)

# Display the first 5 rows of the transformed DataFrame
assembler_df.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type,label,product_line_indexed,customer_type_indexed,product_line_encoded,customer_type_encoded,numerical_vectorized_features
0,7,Home,92.66,non-member,0,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0)",(0.0),"[7.0, 92.66]"
1,8,Electronics,429.9,member,0,4.0,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0)",(1.0),"[8.0, 429.9]"
2,7,Health,269.25,non-member,0,5.0,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0)",(0.0),"[7.0, 269.25]"
3,6,Home,148.14,non-member,0,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0)",(0.0),"[6.0, 148.14]"
4,4,Electronics,172.8,non-member,0,4.0,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0)",(0.0),"[4.0, 172.8]"


Now, we can scale the vector

In [16]:
scaler = StandardScaler().\
    setInputCol('numerical_vectorized_features').\
    setOutputCol('scaled_features')
    
scaler_model = scaler.fit(assembler_df)
scaler_df = scaler_model.transform(assembler_df)
pd.set_option('display.max_colwidth', 100)
scaler_df.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type,label,product_line_indexed,customer_type_indexed,product_line_encoded,customer_type_encoded,numerical_vectorized_features,scaled_features
0,7,Home,92.66,non-member,0,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0)",(0.0),"[7.0, 92.66]","[4.226833645535901, 0.3887328294225673]"
1,8,Electronics,429.9,member,0,4.0,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0)",(1.0),"[8.0, 429.9]","[4.830667023469601, 1.8035424494794052]"
2,7,Health,269.25,non-member,0,5.0,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0)",(0.0),"[7.0, 269.25]","[4.226833645535901, 1.1295738649042333]"
3,6,Home,148.14,non-member,0,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0)",(0.0),"[6.0, 148.14]","[3.623000267602201, 0.621485876868758]"
4,4,Electronics,172.8,non-member,0,4.0,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0)",(0.0),"[4.0, 172.8]","[2.4153335117348007, 0.7249409985346389]"


Finally, the VectorAssembler is used to combine them all into one binary vector.

In [17]:
assembler = VectorAssembler().\
    setInputCols(['scaled_features', 'product_line_encoded' ,
                  'customer_type_encoded']).setOutputCol('features')

input_df = assembler.transform(scaler_df)
input_df.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type,label,product_line_indexed,customer_type_indexed,product_line_encoded,customer_type_encoded,numerical_vectorized_features,scaled_features,features
0,7,Home,92.66,non-member,0,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0)",(0.0),"[7.0, 92.66]","[4.226833645535901, 0.3887328294225673]","(4.226833645535901, 0.3887328294225673, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)"
1,8,Electronics,429.9,member,0,4.0,0.0,"(0.0, 0.0, 0.0, 0.0, 1.0)",(1.0),"[8.0, 429.9]","[4.830667023469601, 1.8035424494794052]","(4.830667023469601, 1.8035424494794052, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0)"
2,7,Health,269.25,non-member,0,5.0,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0)",(0.0),"[7.0, 269.25]","[4.226833645535901, 1.1295738649042333]","(4.226833645535901, 1.1295738649042333, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
3,6,Home,148.14,non-member,0,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0)",(0.0),"[6.0, 148.14]","[3.623000267602201, 0.621485876868758]","(3.623000267602201, 0.621485876868758, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)"
4,4,Electronics,172.8,non-member,0,4.0,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0)",(0.0),"[4.0, 172.8]","[2.4153335117348007, 0.7249409985346389]","(2.4153335117348007, 0.7249409985346389, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)"


In [18]:
# Create the pipeline
pipeline_stages = Pipeline().setStages([indexer1,  
                                        indexer2, 
                                        encoder,
                                        numerical_assembler,
                                        scaler, 
                                        assembler])

Since we will need to transform the train, test, and the dataset we are making predictions on, the pipeline helps organize all of the data transformations above so it can easily be applied across the data that needs to be transformed.

In [19]:
# Create the train/test split and print the sizes
train, test = joined_customers.randomSplit([0.8, 0.2], seed=2022)
print("Training Data Size: {}".format(str(train.count())))
print("Testing Data Size: {}".format(str(test.count())))

Training Data Size: 513
Testing Data Size: 138


Now that the data has been split, the pipeline_stages are fit on the training set and then applied to both the training and test sets.

In [20]:
pipeline_model = pipeline_stages.fit(train)
train = pipeline_model.transform(train)
test = pipeline_model.transform(test)
train.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type,label,product_line_indexed,customer_type_indexed,product_line_encoded,customer_type_encoded,numerical_vectorized_features,scaled_features,features
0,1,Food and Drink,2.9,non-member,0,0.0,1.0,"(1.0, 0.0, 0.0, 0.0, 0.0)",(0.0),"[1.0, 2.9]","[0.6057155110261719, 0.012079192890247009]","(0.6057155110261719, 0.012079192890247009, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
1,1,Food and Drink,5.8,member,0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0)",(1.0),"[1.0, 5.8]","[0.6057155110261719, 0.024158385780494018]","(0.6057155110261719, 0.024158385780494018, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0)"
2,2,Food and Drink,5.8,member,0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0)",(1.0),"[2.0, 5.8]","[1.2114310220523439, 0.024158385780494018]","(1.2114310220523439, 0.024158385780494018, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0)"
3,2,Food and Drink,13.14,member,0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0)",(1.0),"[2.0, 13.14]","[1.2114310220523439, 0.054731239509601966]","(1.2114310220523439, 0.054731239509601966, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0)"
4,2,Food and Drink,20.01,member,1,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0)",(1.0),"[2.0, 20.01]","[1.2114310220523439, 0.08334643094270437]","(1.2114310220523439, 0.08334643094270437, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0)"


Now the data is ready to be trained using Logistic Regression

In [21]:
# Create Logistic Regression model and fit it on the train set
lr = LogisticRegression(
    featuresCol = 'features', labelCol = 'label', maxIter = 5)
lr_model = lr.fit(train)

In [22]:
# Make predictions on the test data set
predictions = lr_model.transform(test)

# Make predictions on the training set
trainPredictions = lr_model.transform(train)

# View the predictions
predictions.select(
    'label','features','rawPrediction','prediction','probability').limit(5).toPandas()

Unnamed: 0,label,features,rawPrediction,prediction,probability
0,0,"(0.6057155110261719, 0.023950123834110448, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","[2.8720181873938664, -2.8720181873938664]",0.0,"[0.9464457344717744, 0.0535542655282256]"
1,0,"(1.2114310220523439, 0.024158385780494018, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","[3.43754211161333, -3.43754211161333]",0.0,"[0.9688574401114208, 0.031142559888579176]"
2,0,"(1.8171465330785157, 0.024158385780494018, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0)","[3.189254864919419, -3.189254864919419]",0.0,"[0.9604279104553272, 0.039572089544672784]"
3,0,"(1.8171465330785157, 0.1766061305332666, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","[3.5586447068133484, -3.5586447068133484]",0.0,"[0.9723111135336499, 0.027688886466350208]"
4,0,"(1.8171465330785157, 0.23612739480969067, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","[2.6201373441560163, -2.6201373441560163]",0.0,"[0.9321463938870233, 0.06785360611297671]"


Now that the model has been trained, its time to evaluate the model

In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Check the ROC
evaluator = BinaryClassificationEvaluator()

print("Test Area under ROC: {}".format(evaluator.evaluate(predictions)))
print("Train Area under ROC: {}".format(evaluator.evaluate(trainPredictions)))

Test Area under ROC: 0.9689199689199689
Train Area under ROC: 0.9631895594224388


In [30]:
# Check the accuracy of the model on the test set
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Test Accuracy : ",accuracy)

# Evaluate the model on test data
trainAccuracy = evaluator.evaluate(trainPredictions)
print("Train Accuracy: {:.4f}".format(trainAccuracy))

Test Accuracy :  0.9202898550724637
Train Accuracy: 0.9632


In [31]:
# Store coefficients
weights = lr_model.coefficients

# Put the order of the features for the index of a pandas df to examine weights
print("Model Weights:")
pd.DataFrame([float(w) for w in weights], columns=['Feature Weight'], 
             index = ['days_until_shipped', 'total', 'product0', 
                      'product1', 'product2', 'product3','product4','member'])

Model Weights:


Unnamed: 0,Feature Weight
days_until_shipped,-0.93465
total,2.919225
product0,-0.764751
product1,-0.550942
product2,-0.163701
product3,-0.743172
product4,0.233005
member,0.814419


The coefficients in a model represent the direction and magnitude of the relationship between each feature and the target variable. Since the total cost has the largest weight, that tells us it has the highest effect on the predicted outcome.

Now, it's time to make predictions on the unlabeled data. First, let's examine that dataset again.

In [32]:
joined_recent_customers.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type
0,7,Health,499.66,member
1,6,Electronics,304.4,non-member
2,9,Home,368.13,non-member
3,8,Health,724.0,member
4,5,Travel,476.84,non-member


In [33]:
# Dropping the same columns as we did with the dataset used to train the model
cols_to_drop = ['customer_id','invoice_id','product_id']
joined_recent_customers = joined_recent_customers.select(
    [col for col in joined_recent_customers.columns if col not in cols_to_drop])
joined_recent_customers.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type
0,7,Health,499.66,member
1,6,Electronics,304.4,non-member
2,9,Home,368.13,non-member
3,8,Health,724.0,member
4,5,Travel,476.84,non-member


In [34]:
# Using the pipeline to transform the unlabeled data
new_customers = pipeline_model.transform(joined_recent_customers)
new_customers.limit(5).toPandas()

Unnamed: 0,days_until_shipped,product_line,total,customer_type,product_line_indexed,customer_type_indexed,product_line_encoded,customer_type_encoded,numerical_vectorized_features,scaled_features,features
0,7,Health,499.66,member,5.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0)",(1.0),"[7.0, 499.66]","[4.240008577183204, 2.081203282600283]","(4.240008577183204, 2.081203282600283, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0)"
1,6,Electronics,304.4,non-member,3.0,1.0,"(0.0, 0.0, 0.0, 1.0, 0.0)",(0.0),"[6.0, 304.4]","[3.6342930661570314, 1.2678987295831687]","(3.6342930661570314, 1.2678987295831687, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)"
2,9,Home,368.13,non-member,2.0,1.0,"(0.0, 0.0, 1.0, 0.0, 0.0)",(0.0),"[9.0, 368.13]","[5.451439599235547, 1.533349406443666]","(5.451439599235547, 1.533349406443666, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)"
3,8,Health,724.0,member,5.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0)",(1.0),"[8.0, 724.0]","[4.8457240882093755, 3.015632983634081]","(4.8457240882093755, 3.015632983634081, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0)"
4,5,Travel,476.84,non-member,4.0,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0)",(0.0),"[5.0, 476.84]","[3.02857755513086, 1.9861525302708218]","(3.02857755513086, 1.9861525302708218, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)"


In [35]:
# Make predictions on the test data set
new_predictions = lr_model.transform(new_customers)
new_predictions.select(
    'features','rawPrediction','prediction','probability').limit(10).toPandas()

Unnamed: 0,features,rawPrediction,prediction,probability
0,"(4.240008577183204, 2.081203282600283, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","[-1.3159461987274026, 1.3159461987274026]",1.0,"[0.21149353101618468, 0.7885064689838153]"
1,"(3.6342930661570314, 1.2678987295831687, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","[2.049732316629971, -2.049732316629971]",0.0,"[0.885920568010669, 0.11407943198933096]"
2,"(5.451439599235547, 1.533349406443666, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)","[2.3937462807555794, -2.3937462807555794]",0.0,"[0.9163491813783053, 0.08365081862169453]"
3,"(4.8457240882093755, 3.015632983634081, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","[-3.477625007360781, 3.477625007360781]",1.0,"[0.029955616008334535, 0.9700443839916656]"
4,"(3.02857755513086, 1.9861525302708218, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0)","[-1.589321150216904, 1.589321150216904]",1.0,"[0.16947942777853514, 0.8305205722214648]"
5,"(4.8457240882093755, 1.3689057735791996, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","[2.8871337876182936, -2.8871337876182936]",0.0,"[0.9472067369422913, 0.05279326305770873]"
6,"(4.240008577183204, 2.148763458007113, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0)","[-0.7699974105299392, 0.7699974105299392]",1.0,"[0.3164796664183253, 0.6835203335816747]"
7,"(4.8457240882093755, 3.612095198076623, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)","[-4.240712629515472, 4.240712629515472]",1.0,"[0.014192987131203627, 0.9858070128687965]"
8,"(2.4228620441046878, 0.5946295093143665, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","[1.3253017037903225, -1.3253017037903225]",0.0,"[0.790062419540656, 0.20993758045934394]"
9,"(3.6342930661570314, 1.2333272464834963, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0)","[1.3578143635698199, -1.3578143635698199]",0.0,"[0.7954042448982543, 0.20459575510174569]"


It appears that the model is functioning correctly. Notably, the second number in the features column corresponds to the total cost, which seems to have the most significant impact on the model's predictions. Generally, when the total cost value is higher, the model tends to predict a higher likelihood of customer return. This suggests a strong positive relationship between the total cost and the predicted outcomes. 

It is important to consider that these observations are specific to the analyzed model and dataset. The coefficients' interpretations can vary based on the model's assumptions and the characteristics of the data. 