<a href="https://colab.research.google.com/github/Geetanjali-18/CHS-PowerBi/blob/main/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

 ### In PySpark, create a program that reads a CSV file containing sales data, performs data cleaning by handling missing values and removing duplicates, calculates the total sales amount for each product, and finally, outputs the results to a new CSV file. Ensure to use transformations and actions in your PySpark script

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a9b64c4779ecf0e87d06c72c0ecda61c955c80b171443184679c6962d8849043
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

In [None]:
# create a spark session
spark = SparkSession.builder.appName("Sales Data Cleaning").getOrCreate()

In [None]:
# Define paths for input and output files
input_path = '/sale data.csv'
output_path = '/content/output/cleaned_csv.csv'

In [None]:
# Read csv files to a dataframe
sales_df = spark.read\
.option("header", True)\
.csv(input_path)

In [None]:
sales_df.describe()

DataFrame[summary: string, product_id: string, sales_amount: string, product_name: string]

In [None]:
sales_df.show()

+----------+------------+------------+
|product_id|sales_amount|product_name|
+----------+------------+------------+
|         0|          72|       Pants|
|         1|          48|         Hat|
|         2|          23|       Shoes|
|         3|          81|       Shirt|
|         4|        NULL|       Pants|
|         6|          64|       Shoes|
|         7|          92|       Shirt|
+----------+------------+------------+



## Handling missing values:
1. Dropping columns
2. Dropping Rows
3. Various parameter in dropping functionalities
4. Handling missing values by mean, mode median.

In [None]:
# 1. Dropping columns
cleaned_df = sales_df.drop('sales_amount')

In [None]:
cleaned_df.show()

+----------+------------+
|product_id|product_name|
+----------+------------+
|         0|       Pants|
|         1|         Hat|
|         2|       Shoes|
|         3|       Shirt|
|         4|       Pants|
|         6|       Shoes|
|         7|       Shirt|
+----------+------------+



In [None]:
# 2. Dropping rows
cleaned_df = sales_df.na.drop()

In [None]:
cleaned_df.show()

+----------+------------+------------+
|product_id|sales_amount|product_name|
+----------+------------+------------+
|         0|          72|       Pants|
|         1|          48|         Hat|
|         2|          23|       Shoes|
|         3|          81|       Shirt|
|         6|          64|       Shoes|
|         7|          92|       Shirt|
+----------+------------+------------+



In [None]:
# there are two parameter - "how" and "threshhold"
# - how - any or all
# when how = "any" - row having any null should be dropped
# when how = "all" - row having all null should be removed
cleaned_df = sales_df.na.drop(how = "any").show()

+----------+------------+------------+
|product_id|sales_amount|product_name|
+----------+------------+------------+
|         0|          72|       Pants|
|         1|          48|         Hat|
|         2|          23|       Shoes|
|         3|          81|       Shirt|
|         6|          64|       Shoes|
|         7|          92|       Shirt|
+----------+------------+------------+



In [None]:
sales_df.na.drop(how = "all").show()

+----------+------------+------------+
|product_id|sales_amount|product_name|
+----------+------------+------------+
|         0|          72|       Pants|
|         1|          48|         Hat|
|         2|          23|       Shoes|
|         3|          81|       Shirt|
|         4|        NULL|       Pants|
|         6|          64|       Shoes|
|         7|          92|       Shirt|
+----------+------------+------------+



In [None]:
# thresh - how many null value row will be removed
sales_df.na.drop(thresh = 0).show()

+----------+------------+------------+
|product_id|sales_amount|product_name|
+----------+------------+------------+
|         0|          72|       Pants|
|         1|          48|         Hat|
|         2|          23|       Shoes|
|         3|          81|       Shirt|
|         4|        NULL|       Pants|
|         6|          64|       Shoes|
|         7|          92|       Shirt|
+----------+------------+------------+



In [None]:
# 3. Various parameter in dropping functionalities
sales_df.na.fill("values missing", "sales_amount").show()

+----------+--------------+------------+
|product_id|  sales_amount|product_name|
+----------+--------------+------------+
|         0|            72|       Pants|
|         1|            48|         Hat|
|         2|            23|       Shoes|
|         3|            81|       Shirt|
|         4|values missing|       Pants|
|         6|            64|       Shoes|
|         7|            92|       Shirt|
+----------+--------------+------------+



In [None]:
# 4. Handling missing value by mean
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['sales_amount'],
    outputCols = ["{}_imputed".format(c) for c in ['sales_amount']]
).setStrategy("mean")

## Handling duplicate value


In [None]:
sales_df.dropDuplicates(['product_name']).show()


+----------+------------+------------+
|product_id|sales_amount|product_name|
+----------+------------+------------+
|         1|          48|         Hat|
|         0|          72|       Pants|
|         3|          81|       Shirt|
|         2|          23|       Shoes|
+----------+------------+------------+



In [None]:
# Calculate total sale amount each product
sales = sales_df.groupBy("product_name")\
.agg({"sales_amount":"sum"}).show()

+------------+-----------------+
|product_name|sum(sales_amount)|
+------------+-----------------+
|         Hat|             48.0|
|       Pants|             72.0|
|       Shirt|            173.0|
|       Shoes|             87.0|
+------------+-----------------+



In [None]:
# Save result in new file
sales_df.write.csv(output_path, header=True)

In [None]:
spark.stop()

In [None]:
print("Preprocessed data written to ", output_path)

Preprocessed data written to  /content/output/cleaned_csv.csv
