<a href="https://colab.research.google.com/github/Manya123-max/Big-Data-Framework/blob/main/BDF4_SPARK_CREATING_AND__BUILDING_SPARK_APPLICATIONS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**AIM-**To create and build a Spark session and application using PySpark, perform operations on a
sample dataset, and verify the setup.


**STEPS**

**Step 1:** Install and Setup Dependencies

Purpose: Ensures all prerequisites for running Spark are met.

Tasks Performed:
1. Installing Java 8, a prerequisite for Spark.
2. Downloading and extracting Apache Spark 3.0.0 compatible with Hadoop 2.7.
3. Installing Findspark (to simplify Spark initialization in Python) and PySpark (Python API for Spark).

In [None]:
!apt-get install openjdk-8-jdk-headless -qq
!wget -q http://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install findspark



**Step 2**: Set Environment Variables

1. Set Java and Spark Home Paths:
Define the paths for Java and Spark installatio
2. Initialize Findspark:
Findspark is initialized to link the Spark environment


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

**Step 3**: Build a SparkSession
1. Create a Spark Session:

A SparkSession is built explicitly with the following parameters:

master("local[*]"): Indicates that the application will run locally, utilizing all available CPU cores.
appName("MySparkApp"): Names the Spark application as "MySparkApp".

config: Additional configurations (e.g., specifying the classpath for driver dependencies

In [None]:
import findSpark
findSpark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("MySparkApp").config("spark.some.config.option", "some-value").getOrCreate()

# Use Spark!
**Step 4:**
1. Initializes Findspark to allow PySpark to locate the installed Spark directory.
2. Creates a local SparkSession. The "local[*]" argument uses all available CPU cores for processing

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

In [None]:
# Start a Spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

**Step 5**: E-Commerce Data Analysis



1.Define a Sample Dataset

A Python list of dictionaries is created to represent the dataset

In [None]:
data = [
    ("C001", "USA", "2024-10-01", 5, 100, "Laptop"),
    ("C002", "USA", "2024-10-02", 3, 60, "Smartphone"),
    ("C003", "India", "2024-10-05", 10, 200, "Tablet"),
    ("C004", "India", "2024-10-06", 7, 140, "Smartwatch"),
    ("C005", "USA", "2024-10-10", 2, 40, "Headphones"),
    ("C006", "India", "2024-10-11", 1, 20, "Smartphone"),
    ("C007", "UK", "2024-10-01", 8, 160, "Laptop"),
    ("C008", "UK", "2024-10-15", 6, 120, "Smartwatch"),
    ("C009", "USA", "2024-10-14", 4, 80, "Headphones"),
    ("C010", "India", "2024-10-12", 9, 180, "Tablet")
]

2. Creates a DataFrame with specified columns.

In [None]:
# Create DataFrame from the sample data
columns = ["Customer_ID", "Country", "Purchase_Date", "Quantity", "Sale_Amount"]
df = spark.createDataFrame(data, columns)

3. Displays the first 5 rows of the dataset.

In [None]:
# Display first 5 rows
print("First 5 Rows:")
df.show(5)

First 5 Rows:
+-----------+-------+-------------+--------+-----------+----------+
|Customer_ID|Country|Purchase_Date|Quantity|Sale_Amount|        _6|
+-----------+-------+-------------+--------+-----------+----------+
|       C001|    USA|   2024-10-01|       5|        100|    Laptop|
|       C002|    USA|   2024-10-02|       3|         60|Smartphone|
|       C003|  India|   2024-10-05|      10|        200|    Tablet|
|       C004|  India|   2024-10-06|       7|        140|Smartwatch|
|       C005|    USA|   2024-10-10|       2|         40|Headphones|
+-----------+-------+-------------+--------+-----------+----------+
only showing top 5 rows



4. Displays the schema of the DataFrame, showing column names and data types.

In [None]:
# Print schema
print("Schema:")
df.printSchema()

Schema:
root
 |-- Customer_ID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchase_Date: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Sale_Amount: long (nullable = true)
 |-- _6: string (nullable = true)



**OPERATIONS**

1. Country with the Highest Purchase
Groups data by country and calculates the total sales for each country.

*  Groups data by country and calculates the total sales for each country.

*   Orders the results in descending order of sales and retrieves the top  result.
*   Purpose: Identify the country with the highest total sales.










In [None]:
# 1. Country with the highest purchase
highest_purchase_country = df.groupBy("Country").sum("Sale_Amount").orderBy(col("sum(Sale_Amount)").desc()).limit(1)
highest_purchase_country.show()

+-------+----------------+
|Country|sum(Sale_Amount)|
+-------+----------------+
|  India|             540|
+-------+----------------+



2. Number of Customers from Each Country


*   Counts the number of customers for each country using groupBy and agg.

*  Renames the resulting column for better readability.
*   Purpose: Determine the customer distribution by country.




In [None]:
# 2. Number of customers from each country
customers_per_country = df.groupBy("Country").agg({"Customer_ID": "count"}).withColumnRenamed("count(Customer_ID)", "No_of_Customers")
customers_per_country.show()

+-------+---------------+
|Country|No_of_Customers|
+-------+---------------+
|  India|              4|
|    USA|              4|
|     UK|              2|
+-------+---------------+



3. Maximum Quantity Purchased by Each Customer


*  Finds the maximum quantity purchased by each customer.
*  Purpose: Identify customers with the highest single purchase quantity.



In [None]:
# 3. Max quantity purchased by each customer
max_quantity_per_customer = df.groupBy("Customer_ID").agg({"Quantity": "max"}).withColumnRenamed("max(Quantity)", "Max_Quantity")
max_quantity_per_customer.show()

+-----------+------------+
|Customer_ID|Max_Quantity|
+-----------+------------+
|       C006|           1|
|       C010|           9|
|       C007|           8|
|       C003|          10|
|       C004|           7|
|       C009|           4|
|       C008|           6|
|       C005|           2|
|       C001|           5|
|       C002|           3|
+-----------+------------+



4. Maximum Sale within a Date Range

*  Converts the Purchase_Date column to a proper date format.

*  Filters rows to include only those within the specified date range.
*  Calculates the maximum sale amount within the range.


*  Purpose: Analyze peak sales during a specific period.



In [None]:
# 4. Max sale between 01.10.2024 and 15.10.2024
df = df.withColumn("Purchase_Date", to_date(col("Purchase_Date"), "yyyy-MM-dd"))
max_sale_in_date_range = df.filter((col("Purchase_Date") >= "2024-10-01") & (col("Purchase_Date") <= "2024-10-15")) \
    .agg({"Sale_Amount": "max"}).withColumnRenamed("max(Sale_Amount)", "Max_Sale_Between_01_10_24_and_15_10_24")
max_sale_in_date_range.show()

+--------------------------------------+
|Max_Sale_Between_01_10_24_and_15_10_24|
+--------------------------------------+
|                                   200|
+--------------------------------------+



5. Minimum Sale within a Date Range


*  Similar to the previous query but calculates the minimum sale amount.




In [None]:
# 5. Min sale between 01.10.2024 and 15.10.2024
min_sale_in_date_range = df.filter((col("Purchase_Date") >= "2024-10-01") & (col("Purchase_Date") <= "2024-10-15")) \
    .agg({"Sale_Amount": "min"}).withColumnRenamed("min(Sale_Amount)", "Min_Sale_Between_01_10_24_and_15_10_24")
min_sale_in_date_range.show()

+--------------------------------------+
|Min_Sale_Between_01_10_24_and_15_10_24|
+--------------------------------------+
|                                    20|
+--------------------------------------+



6. Total Sales

*  Calculates the total sales across all transactions.

*   Purpose: Summarize overall sales performance.

In [None]:
# 6. Total Sale
total_sale = df.agg({"Sale_Amount": "sum"}).withColumnRenamed("sum(Sale_Amount)", "Total_Sale")
total_sale.show()

+----------+
|Total_Sale|
+----------+
|      1100|
+----------+



7. Country with the Most Purchases


*   Similar to the earlier query for the highest purchase country, ensuring data consistency.


In [None]:
# 7. What country does most purchase?
country_with_most_purchase = df.groupBy("Country").sum("Sale_Amount").orderBy(col("sum(Sale_Amount)").desc()).limit(1)
country_with_most_purchase.show()

+-------+----------------+
|Country|sum(Sale_Amount)|
+-------+----------------+
|  India|             540|
+-------+----------------+



8. Earliest Purchase by Each Customer


*   Identifies the earliest purchase date for each customer.

*   Purpose: Understand customer purchase history.







In [None]:
# 8. Customer earliest purchase made by customer on e-commerce platform
earliest_purchase = df.groupBy("Customer_ID").agg({"Purchase_Date": "min"}).withColumnRenamed("min(Purchase_Date)", "Earliest_Purchase")
earliest_purchase.show()

+-----------+-----------------+
|Customer_ID|Earliest_Purchase|
+-----------+-----------------+
|       C006|       2024-10-11|
|       C010|       2024-10-12|
|       C007|       2024-10-01|
|       C003|       2024-10-05|
|       C004|       2024-10-06|
|       C009|       2024-10-14|
|       C008|       2024-10-15|
|       C005|       2024-10-10|
|       C001|       2024-10-01|
|       C002|       2024-10-02|
+-----------+-----------------+



9. Customer Purchase Frequency


*  Counts the number of purchases per customer.
*  Purpose: Measure customer engagement on the platform.







In [None]:
# 9. How often a customer buys something
# Calculate the number of purchases per customer
purchase_frequency = df.groupBy("Customer_ID").agg({"Purchase_Date": "count"}).withColumnRenamed("count(Purchase_Date)", "Purchases_Per_Customer")
purchase_frequency.show()

+-----------+----------------------+
|Customer_ID|Purchases_Per_Customer|
+-----------+----------------------+
|       C006|                     1|
|       C010|                     1|
|       C007|                     1|
|       C003|                     1|
|       C004|                     1|
|       C009|                     1|
|       C008|                     1|
|       C005|                     1|
|       C001|                     1|
|       C002|                     1|
+-----------+----------------------+



**Step 6**: Stop Spark Session

Terminates the Spark session and releases resources.

In [None]:
# Stop SparkSession
spark.stop()

# **Results**

This PySpark-based application successfully accomplishes the following tasks:


1.   Spark session was initialized successfully with proper installation and environment setup
2.  The code successfully demonstrated execution of various analytical queries to extract meaningful insights about sales, customer behavior, and market trends from the dataset.


