## Ingestion and Cleaning

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer



# Initialize SparkSession
spark = SparkSession.builder.appName("EcommerceETL").getOrCreate()

# Step 1: Read CSV
df = spark.read.csv("data/Ecommerce.csv", header=True, inferSchema=True)
df_clean = df.na.drop()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/11 19:03:34 WARN Utils: Your hostname, Saikirans-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.78.66.59 instead (on interface en0)
25/07/11 19:03:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/11 19:03:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
df_clean.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [4]:
df_clean.show(5)


+---------+---------+--------------------+--------+-------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|  InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/10 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+-------------+---------+----------+--------------+
only showing top 5 rows


In [5]:
# Cancelled orders have InvoiceNo that **starts with “C”(like“C54321”`).
df_clean = df_clean.withColumn(
    "Cancelled",
    when(col("InvoiceNo").startswith("C"), 1).otherwise(0)
)
df_clean.show(5)

df_clean.groupBy('Cancelled').count().show()

+---------+---------+--------------------+--------+-------------+---------+----------+--------------+---------+
|InvoiceNo|StockCode|         Description|Quantity|  InvoiceDate|UnitPrice|CustomerID|       Country|Cancelled|
+---------+---------+--------------------+--------+-------------+---------+----------+--------------+---------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/10 8:26|     2.55|     17850|United Kingdom|        0|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/10 8:26|     3.39|     17850|United Kingdom|        0|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/10 8:26|     2.75|     17850|United Kingdom|        0|
|   536365|   84029G|KNITTED UNION FLA...|       6|01/12/10 8:26|     3.39|     17850|United Kingdom|        0|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01/12/10 8:26|     3.39|     17850|United Kingdom|        0|
+---------+---------+--------------------+--------+-------------+---------+----------+--------------+---

In [6]:
df_clean.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Cancelled: integer (nullable = false)



In [7]:

# Example: encode StockCode and CustomerID -> Because machine learning models require numeric input, but columns like StockCode are Strings
stock_indexer = StringIndexer(inputCol="StockCode", outputCol="StockCode_index", handleInvalid="skip") 
customer_indexer =  StringIndexer(inputCol="CustomerID", outputCol="CustomerId_Index", handleInvalid="skip")


## Feature 1: To Predict whehter a transaction is cancelled or not

In [8]:
# Step 2: Feature Engineering (predict whether a transaction was cancelled or not,)

# Choose all input features
feature_cols = ["Quantity", "UnitPrice", "CustomerId_Index", "StockCode_index"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Index target label
indexer = StringIndexer(inputCol="Cancelled", outputCol="label")

# Create full pipeline
pipeline = Pipeline(stages=[customer_indexer, stock_indexer, assembler, indexer])

# Transform data
data = pipeline.fit(df_clean).transform(df_clean)


In [9]:
# Step 3: Train-Test Split 
train, test = data.randomSplit([0.8, 0.2], seed=42)

In [10]:
# Step 4: Train Logistic Regression Model
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)
model = lr.fit(train)
predictions = model.transform(test)
predictions.show(10, truncate=False)

25/07/11 19:03:40 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/07/11 19:03:42 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

+---------+---------+----------------------------------+--------+-------------+---------+----------+--------------+---------+----------------+---------------+------------------------+-----+----------------------------------------+------------------------------------------+----------+
|InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate  |UnitPrice|CustomerID|Country       |Cancelled|CustomerId_Index|StockCode_index|features                |label|rawPrediction                           |probability                               |prediction|
+---------+---------+----------------------------------+--------+-------------+---------+----------+--------------+---------+----------------+---------------+------------------------+-----+----------------------------------------+------------------------------------------+----------+
|536365   |71053    |WHITE METAL LANTERN               |6       |01/12/10 8:26|3.39     |17850     |United Kingdom|0        |231.0           |403

In [None]:
# Step 5: Evaluate
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print("AUC:", auc)


                                                                                

AUC: 0.9999997305425765


25/07/12 13:33:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 858212 ms exceeds timeout 120000 ms
25/07/12 13:33:14 WARN SparkContext: Killing executors is not supported by current scheduler.
25/07/12 13:33:23 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$