In [1]:
from pyspark.sql import SparkSession

In [18]:
spark = SparkSession.builder.appName('Ai_pipeline_project').master('local[*]').getOrCreate()

In [20]:
spark

In [22]:
spark.range(5).show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [4]:
df = spark.read.csv(r"C:\Users\ak810\OneDrive\Desktop\raw_sale.csv",header=True,inferSchema=True)
df.show(5)

+--------+----------+--------+----------+--------+----------+
|order_id|order_date|store_id|product_id|quantity|unit_price|
+--------+----------+--------+----------+--------+----------+
|       1|01-01-2024|     101|       501|       2|       500|
|       2|01-01-2024|     101|       502|       1|      1200|
|       3|02-01-2024|     102|       503|       5|       300|
|       4|03-01-2024|     101|       501|      20|       500|
|       5|03-01-2024|     102|       503|       1|       300|
+--------+----------+--------+----------+--------+----------+
only showing top 5 rows


In [5]:
df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: integer (nullable = true)



In [6]:
from pyspark.sql.functions import *

In [7]:
df_silver= df.withColumn('order_date',to_date(col('order_date'),'dd-MM-yyyy'))\
        .withColumn('total_amount',col('quantity') * col('unit_price'))
df_silver.printSchema()
df_silver.show(5)

root
 |-- order_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: integer (nullable = true)
 |-- total_amount: integer (nullable = true)

+--------+----------+--------+----------+--------+----------+------------+
|order_id|order_date|store_id|product_id|quantity|unit_price|total_amount|
+--------+----------+--------+----------+--------+----------+------------+
|       1|2024-01-01|     101|       501|       2|       500|        1000|
|       2|2024-01-01|     101|       502|       1|      1200|        1200|
|       3|2024-01-02|     102|       503|       5|       300|        1500|
|       4|2024-01-03|     101|       501|      20|       500|       10000|
|       5|2024-01-03|     102|       503|       1|       300|         300|
+--------+----------+--------+----------+--------+----------+------------+
only showing top 5 rows


In [8]:
from pyspark.sql.functions import monotonically_increasing_id

In [9]:
df_silver_id = df_silver.withColumn('row_id',monotonically_increasing_id())
df_silver_id.show(5)

+--------+----------+--------+----------+--------+----------+------------+------+
|order_id|order_date|store_id|product_id|quantity|unit_price|total_amount|row_id|
+--------+----------+--------+----------+--------+----------+------------+------+
|       1|2024-01-01|     101|       501|       2|       500|        1000|     0|
|       2|2024-01-01|     101|       502|       1|      1200|        1200|     1|
|       3|2024-01-02|     102|       503|       5|       300|        1500|     2|
|       4|2024-01-03|     101|       501|      20|       500|       10000|     3|
|       5|2024-01-03|     102|       503|       1|       300|         300|     4|
+--------+----------+--------+----------+--------+----------+------------+------+
only showing top 5 rows


In [10]:
df_silver_id.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: integer (nullable = true)
 |-- total_amount: integer (nullable = true)
 |-- row_id: long (nullable = false)



In [11]:
pdf = df_silver_id.select('row_id','quantity','total_amount').toPandas()
pdf


Unnamed: 0,row_id,quantity,total_amount
0,0,2,1000
1,1,1,1200
2,2,5,1500
3,3,20,10000
4,4,1,300
5,5,50,60000
6,6,3,2100
7,7,100,70000
8,8,2,1000
9,9,4,1200


In [12]:
from sklearn.ensemble import IsolationForest

model = IsolationForest(contamination=0.1,random_state = 42)


In [13]:
pdf['anomaly_flag'] = model.fit_predict(pdf[['quantity','total_amount']])

In [14]:
pdf['is_anomaly'] = (pdf['anomaly_flag']== -1).astype(int)

In [15]:
pdf

Unnamed: 0,row_id,quantity,total_amount,anomaly_flag,is_anomaly
0,0,2,1000,1,0
1,1,1,1200,1,0
2,2,5,1500,1,0
3,3,20,10000,1,0
4,4,1,300,1,0
5,5,50,60000,1,0
6,6,3,2100,1,0
7,7,100,70000,-1,1
8,8,2,1000,1,0
9,9,4,1200,1,0


In [26]:
gold_df = pdf.copy()
gold_df

Unnamed: 0,row_id,quantity,total_amount,anomaly_flag,is_anomaly
0,0,2,1000,1,0
1,1,1,1200,1,0
2,2,5,1500,1,0
3,3,20,10000,1,0
4,4,1,300,1,0
5,5,50,60000,1,0
6,6,3,2100,1,0
7,7,100,70000,-1,1
8,8,2,1000,1,0
9,9,4,1200,1,0
