In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=0c952264e954b5c4ad68c1553471b5d6ad362bc9b266a6f48ca95dcf946bafca
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [None]:
# Import necessary PySpark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Flipkart Sale Discount Prediction") \
    .getOrCreate()


In [None]:
# Load train dataset
train_data = spark.read.csv("/content/train.csv", header=True, inferSchema=True)

# Load test dataset
test_data = spark.read.csv("/content/test.csv", header=True, inferSchema=True)

# Show the first few rows
train_data.show(5)
test_data.show(5)


+-----+--------------------+------+---------+--------+------+---------+-------+---------+----------+-------+-------+-------+-------+-------+----------+
|   id|               title|Rating|maincateg|platform|price1|actprice1|Offer %|norating1|noreviews1|star_5f|star_4f|star_3f|star_2f|star_1f|fulfilled1|
+-----+--------------------+------+---------+--------+------+---------+-------+---------+----------+-------+-------+-------+-------+-------+----------+
|16695|Fashionable & Com...|   3.9|    Women|Flipkart|   698|      999| 0.3013|       38|         7|     17|      9|      6|      3|      3|         0|
| 5120|Combo Pack of 4 C...|   3.8|      Men|Flipkart|   999|     1999| 0.5003|      531|        69|    264|     92|     73|     29|     73|         1|
|18391|Cilia Mode Leo Sn...|   4.4|    Women|Flipkart|  2749|     4999| 0.4501|       17|         4|     11|      3|      2|      1|      0|         1|
|  495|Men Black Sports ...|   4.2|      Men|Flipkart|   518|      724| 0.1585|    46413

In [None]:
# Checking for missing values in the dataset
train_data.select([col(c).isNull().alias(c) for c in train_data.columns]).show()


+-----+-----+------+---------+--------+------+---------+-------+---------+----------+-------+-------+-------+-------+-------+----------+
|   id|title|Rating|maincateg|platform|price1|actprice1|Offer %|norating1|noreviews1|star_5f|star_4f|star_3f|star_2f|star_1f|fulfilled1|
+-----+-----+------+---------+--------+------+---------+-------+---------+----------+-------+-------+-------+-------+-------+----------+
|false|false| false|    false|   false| false|    false|  false|    false|     false|  false|  false|  false|  false|  false|     false|
|false|false| false|    false|   false| false|    false|  false|    false|     false|  false|  false|  false|  false|  false|     false|
|false|false| false|    false|   false| false|    false|  false|    false|     false|  false|  false|  false|  false|  false|     false|
|false|false| false|    false|   false| false|    false|  false|    false|     false|  false|  false|  false|  false|  false|     false|
|false|false| false|    false|   false| f

In [None]:
# Replace null values with mean or mode (depending on the column type)
from pyspark.sql.functions import mean

# Example: Filling missing values in the 'Rating' column with the mean rating
mean_rating = train_data.select(mean(col('Rating'))).collect()[0][0]
train_data = train_data.na.fill({'Rating': mean_rating})


In [None]:
# Create a new column 'Price Difference'
from pyspark.sql.functions import col, when
train_data = train_data.withColumn("Price_Difference", col('actprice1') - col('price1'))

# Create a rating category column
train_data = train_data.withColumn("Rating_Category",
                                   when(col('Rating') >= 4.5, "Excellent")
                                   .when(col('Rating') >= 4, "Good")
                                   .when(col('Rating') >= 3.5, "Average")
                                   .otherwise("Poor"))


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Define the features for the model
feature_columns = ['price1', 'norating1', 'noreviews1', 'star_5f', 'star_4f', 'star_3f']

# Assemble feature columns into a feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

train_data = assembler.transform(train_data)

# Select final features and label
train_data = train_data.select("features", col("Offer %").alias("label"))
train_data.show(5)

+--------------------+------+
|            features| label|
+--------------------+------+
|[698.0,38.0,7.0,1...|0.3013|
|[999.0,531.0,69.0...|0.5003|
|[2749.0,17.0,4.0,...|0.4501|
|[518.0,46413.0,62...|0.1585|
|[1379.0,77.0,3.0,...|0.4002|
+--------------------+------+
only showing top 5 rows



In [None]:
train, test = train_data.randomSplit([0.8, 0.2], seed=12345)
lr = LinearRegression(featuresCol="features", labelCol="label")

# Train the model
lr_model = lr.fit(train)

In [None]:
predictions = lr_model.transform(test)

# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 0.1883555768817478
