<a href="https://colab.research.google.com/github/Gowtham933/Datawarehousing-Projects/blob/main/Pyspark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Apache Sparks(PySparks)
Python API with apache Sparks is power full tools for big data handaling and analytics.
###Advantage of Pysparks:
- **Handles Big Data:** PySpark can work with huge amounts of data that can't fit on one computer.

- **Works with Other Big Data Tools:** It easily connects with other big data technologies like Hadoop and Hive.

- **Fast Performance:** It processes data quickly by using memory efficiently and optimizing queries.

- **All-in-One Platform:** You can run SQL queries and also do advanced data analysis and machine learning all in one place.

- **Easy to Use:** You can write SQL queries just like in a traditional database.

- **Flexible Data Handling:** PySpark's DataFrames make it easier to work with and transform structured data.

- **Uses Python Libraries:** You can use popular Python libraries for data science together with PySpark.

- **Reads Different Data Formats:** It can read and write many types of data files like JSON, Parquet, and more.

- **Strong Community:** There's a big community for support, with lots of documentation and tutorials.

AIM: Creating Pyspark session ▶ (DDL) Created a table Schema (Saved as Parquet file) ▶ Import the data into Data Frame ▶ (DML) perform SQL query operation to extract the data.

In [2]:
#Install PySpark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=95418efc3ab08d4fd2457a99f99fcf2a570b400400533a033e05c285995ce59d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
#Import Required Libraries for the operation
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
import pandas as pd
import os

In [4]:
#Initilaize Spark Session
spark = SparkSession.builder \
    .appName("Spark SQL Database") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
#Importing CSV file into the data Frame
df = pd.read_csv('/content/supply_chain_data.csv')

In [7]:
#Defining the Schema
schema = StructType([
    StructField("Product type", StringType(), True),
    StructField("SKU", StringType(), True),
    StructField("Price", FloatType(), True),
    StructField("Availability", IntegerType(), True),
    StructField("Number of products sold", IntegerType(), True),
    StructField("Revenue generated", FloatType(), True),
    StructField("Customer demographics", StringType(), True),
    StructField("Stock levels", IntegerType(), True),
    StructField("Lead times", IntegerType(), True),
    StructField("Order quantities", IntegerType(), True),
    StructField("Shipping times", IntegerType(), True),
    StructField("Shipping carriers", StringType(), True),
    StructField("Shipping costs", FloatType(), True),
    StructField("Supplier name", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Lead time", IntegerType(), True),
    StructField("Production volumes", IntegerType(), True),
    StructField("Manufacturing lead time", IntegerType(), True),
    StructField("Manufacturing costs", FloatType(), True),
    StructField("Inspection results", StringType(), True),
    StructField("Defect rates", FloatType(), True),
    StructField("Transportation modes", StringType(), True),
    StructField("Routes", StringType(), True),
    StructField("Costs", FloatType(), True)
])


In [8]:
#Creating Spark DataFrame:
spark_df = spark.createDataFrame(df, schema=schema)

In [9]:
#Creating a Temporary View define the name Supplychain_data
spark_df.createOrReplaceTempView("Supplychain_data")

In [12]:
#Perfroming the SQL Query operation to check for the results operation
result = spark.sql("SELECT * FROM Supplychain_data")
result.show()

+------------+-----+---------+------------+-----------------------+-----------------+---------------------+------------+----------+----------------+--------------+-----------------+--------------+-------------+---------+---------+------------------+-----------------------+-------------------+------------------+------------+--------------------+-------+---------+
|Product type|  SKU|    Price|Availability|Number of products sold|Revenue generated|Customer demographics|Stock levels|Lead times|Order quantities|Shipping times|Shipping carriers|Shipping costs|Supplier name| Location|Lead time|Production volumes|Manufacturing lead time|Manufacturing costs|Inspection results|Defect rates|Transportation modes| Routes|    Costs|
+------------+-----+---------+------------+-----------------------+-----------------+---------------------+------------+----------+----------------+--------------+-----------------+--------------+-------------+---------+---------+------------------+---------------------

 ### Why Parquet?
 - The Parquet file format in Apache Spark is a highly efficient and optimized way to store and query large datasets, particularly suited for big data analytics due to its columnar storage format and strong integration with Spark and other big data tools.

In [13]:
#Saving the DataFrame as a Parquet File fromate saved into the local file
output_path = "/content/Supplychain_data.parquet"
spark_df.write.parquet(output_path)

In [14]:
# List files to ensure the Parquet file is saved
os.listdir('/content')

['.config', 'Supplychain_data.parquet', 'supply_chain_data.csv', 'sample_data']

In [15]:
from google.colab import files

files.download('/content/Supplychain_data.parquet')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## Spark SQL Session
Session to for data maipulation task to retrive required data.

In [16]:
# Path to the saved Parquet file
parquet_file_path = "/content/Supplychain_data.parquet"

# Load the Parquet file into a DataFrame
parquet_df = spark.read.parquet(parquet_file_path)

In [18]:
# Perform a simple query
result = spark.sql("SELECT * FROM Supplychain_data LIMIT 5")
result.show()


+------------+----+---------+------------+-----------------------+-----------------+---------------------+------------+----------+----------------+--------------+-----------------+--------------+-------------+--------+---------+------------------+-----------------------+-------------------+------------------+------------+--------------------+-------+---------+
|Product type| SKU|    Price|Availability|Number of products sold|Revenue generated|Customer demographics|Stock levels|Lead times|Order quantities|Shipping times|Shipping carriers|Shipping costs|Supplier name|Location|Lead time|Production volumes|Manufacturing lead time|Manufacturing costs|Inspection results|Defect rates|Transportation modes| Routes|    Costs|
+------------+----+---------+------------+-----------------------+-----------------+---------------------+------------+----------+----------------+--------------+-----------------+--------------+-------------+--------+---------+------------------+-----------------------+---

In [69]:
# Example of an advanced query: Calculate average price by product type
Total_price_by_product_type = spark.sql("""
SELECT `Product type`, SUM(Price) AS Total_price
FROM Supplychain_data
GROUP BY `Product type`
""")
Total_price_by_product_type.show()

+------------+------------------+
|Product type|       Total_price|
+------------+------------------+
|    skincare|1890.3731541633606|
|   cosmetics|1491.3875029087067|
|    haircare|1564.4854910373688|
+------------+------------------+

