### 1. Introduction to PySpark

PySpark is the Python API for Apache Spark, an open-source distributed computing framework for big data processing. It enables Python users to leverage Spark’s powerful capabilities to process and analyze large datasets efficiently.<br>
#### Why Use PySpark?<br>

   - Handles large-scale data efficiently.
   - Provides fault tolerance using RDDs (Resilient Distributed Datasets).
   - Supports distributed computing across multiple machines.
   - Compatible with SQL, Streaming, Machine Learning, and Graph Processing.
   - Works with various data sources (HDFS, MySQL, S3, Cassandra, etc.).

### Installing and Setting Up PySpark

To install PySpark, use the following command:<br>
`pip install pyspark`

In [1]:
import pyspark

In [2]:
import pandas as pd
pd.read_excel(r'C:\Users\dell\Desktop\demo.xlsx')

Unnamed: 0,Name,Age
0,Adam,25
1,Bob,28
2,Jennifer,29
3,Alice,22


In [3]:
from pyspark.sql import SparkSession

- `SparkSession` is the entry point for working with DataFrames and SQL in PySpark.
- It provides methods for reading data, creating DataFrames, running SQL queries, and managing Spark configurations.
- It replaces the older `SparkContext` and `SQLContext` used in early versions of PySpark.

In [4]:

spark=SparkSession.builder.appName('Practice').getOrCreate()

This line creates or retrieves an existing Spark session. Let's break it down:

   - `SparkSession.builder:`
        It is used to configure and initialize a new Spark session.<br>

   - `.appName("Practice"):`
        Assigns the application name as "Practice", which is useful for identifying the job in the Spark UI.<br>
        If you run multiple Spark jobs, giving them unique names can help with debugging.<br>

   - `.getOrCreate():`
        Checks if a Spark session already exists.<br>
        If an existing session is found, it returns that session instead of creating a new one.<br>
        If no session exists, it creates a new Spark session.

#### 💡 Why use .getOrCreate()?

    If you run the same script multiple times in an interactive environment (e.g., Jupyter Notebook), it avoids creating multiple Spark sessions.

In [5]:
spark

In [6]:
df_pyspark=spark.read.csv(r'C:\Users\dell\Desktop\demo.csv')

In [7]:
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string]

In [8]:
df_pyspark.show()

+--------+---+----------+
|     _c0|_c1|       _c2|
+--------+---+----------+
|    Name|Age|Experience|
|    Adam| 25|         4|
|     Bob| 28|         2|
|Jennifer| 29|         1|
|   Alice| 22|         3|
+--------+---+----------+



In [9]:
df_pyspark=spark.read.option('header','true').csv(r'C:\Users\dell\Desktop\demo.csv')

| Code Snippet | Explanation |
|-------------|------------|
| `df_pyspark =` | Stores the output DataFrame in `df_pyspark`. |
| `spark.read` | Accesses the DataFrameReader to read data. |
| `.option('header', 'true')` | Treats the first row as column headers. |
| `.csv(r'C:\Users\dell\Desktop\demo.csv')` | Reads the CSV file from the specified path. |


In [10]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [11]:
df_pyspark.head(3)

[Row(Name='Adam', Age='25', Experience='4'),
 Row(Name='Bob', Age='28', Experience='2'),
 Row(Name='Jennifer', Age='29', Experience='1')]

In [12]:
# check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experience: string (nullable = true)



In [13]:
df_pyspark

DataFrame[Name: string, Age: string, Experience: string]

In [14]:
df_pyspark=spark.read.csv(r'C:\Users\dell\Desktop\demo.csv',header=True,inferSchema=True)

| Code Snippet | Explanation |
|-------------|------------|
| `df_pyspark =` | Stores the output DataFrame in `df_pyspark`. |
| `spark.read.csv()` | Reads the CSV file into a DataFrame. |
| `r'C:\Users\dell\Desktop\demo.csv'` | Specifies the file path (raw string format for Windows). |
| `header=True` | Treats the first row as column headers. |
| `inferSchema=True` | Automatically detects column data types instead of reading everything as strings. |


In [15]:
df_pyspark.show()

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
|    Adam| 25|         4|
|     Bob| 28|         2|
|Jennifer| 29|         1|
|   Alice| 22|         3|
+--------+---+----------+



## PySpark DataFrame Operations

| Code Snippet | Explanation |
|-------------|------------|
| `df_pyspark.show()` | Displays the first 20 rows of the DataFrame in a tabular format. |
| `df_pyspark.printSchema()` | Prints the schema of the DataFrame, showing column names and data types. |
| `df_pyspark.columns` | Returns a list of all column names in the DataFrame. |
| `df_pyspark.describe().show()` | Displays summary statistics (count, mean, stddev, min, max) for numerical columns. |
| `df_pyspark.head()` | Returns the first row of the DataFrame as a `Row` object. |
| `df_pyspark.head(5)` | Returns the first 5 rows as a list of `Row` objects. |


In [16]:
# check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [17]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [18]:
# it returns list of tuple but pandas return dataframe
df_pyspark.head(3)

[Row(Name='Adam', Age=25, Experience=4),
 Row(Name='Bob', Age=28, Experience=2),
 Row(Name='Jennifer', Age=29, Experience=1)]

In [19]:
df_pyspark.show()

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
|    Adam| 25|         4|
|     Bob| 28|         2|
|Jennifer| 29|         1|
|   Alice| 22|         3|
+--------+---+----------+



In [20]:
# filter the dataframe using column
df_pyspark.select('Name').show()

+--------+
|    Name|
+--------+
|    Adam|
|     Bob|
|Jennifer|
|   Alice|
+--------+



In [21]:
df_pyspark.select(['Name','Age']).show()

+--------+---+
|    Name|Age|
+--------+---+
|    Adam| 25|
|     Bob| 28|
|Jennifer| 29|
|   Alice| 22|
+--------+---+



In [22]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

In [23]:
df_pyspark.describe().show()

+-------+--------+-----------------+------------------+
|summary|    Name|              Age|        Experience|
+-------+--------+-----------------+------------------+
|  count|       4|                4|                 4|
|   mean|    NULL|             26.0|               2.5|
| stddev|    NULL|3.162277660168379|1.2909944487358056|
|    min|    Adam|               22|                 1|
|    max|Jennifer|               29|                 4|
+-------+--------+-----------------+------------------+



In [24]:
### adding columns in dataframe
df_pyspark=df_pyspark.withColumn('Experience After 2 Years',df_pyspark['Experience']+2)

In [25]:
df_pyspark.show()

+--------+---+----------+------------------------+
|    Name|Age|Experience|Experience After 2 Years|
+--------+---+----------+------------------------+
|    Adam| 25|         4|                       6|
|     Bob| 28|         2|                       4|
|Jennifer| 29|         1|                       3|
|   Alice| 22|         3|                       5|
+--------+---+----------+------------------------+



In [26]:
df_pyspark=df_pyspark.drop('Experience After 2 Years')

In [27]:
df_pyspark.show()

+--------+---+----------+
|    Name|Age|Experience|
+--------+---+----------+
|    Adam| 25|         4|
|     Bob| 28|         2|
|Jennifer| 29|         1|
|   Alice| 22|         3|
+--------+---+----------+



In [28]:
df_pyspark=df_pyspark.withColumnRenamed('Name','New Name')

In [29]:
df_pyspark.show()

+--------+---+----------+
|New Name|Age|Experience|
+--------+---+----------+
|    Adam| 25|         4|
|     Bob| 28|         2|
|Jennifer| 29|         1|
|   Alice| 22|         3|
+--------+---+----------+



In [30]:
df_test=spark.read.csv(r'C:\Users\dell\Desktop\test.csv',header=True,inferSchema=True)

In [31]:
df_test

DataFrame[Name: string, Age: int, Experience: int, Salary: int]

In [32]:
df_test.show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|  40|        10| 40000|
|     Amit|NULL|      NULL| 45000|
|     NULL|  36|        10| 60000|
|     NULL|  34|      NULL|  NULL|
+---------+----+----------+------+



In [33]:
# drop the column
df_test.drop('Name').show()

+----+----------+------+
| Age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|  40|        10| 40000|
|NULL|      NULL| 45000|
|  36|        10| 60000|
|  34|      NULL|  NULL|
+----+----------+------+



In [34]:
df_test.show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|  40|        10| 40000|
|     Amit|NULL|      NULL| 45000|
|     NULL|  36|        10| 60000|
|     NULL|  34|      NULL|  NULL|
+---------+----+----------+------+



In [35]:
df_test.na.drop().show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 40|        10| 40000|
+---------+---+----------+------+



In [36]:
# any==how
df_test.na.drop(how='all').show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|  40|        10| 40000|
|     Amit|NULL|      NULL| 45000|
|     NULL|  36|        10| 60000|
|     NULL|  34|      NULL|  NULL|
+---------+----+----------+------+



In [37]:
# any==how
df_test.na.drop(how='any').show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 40|        10| 40000|
+---------+---+----------+------+



In [38]:
# thresold
df_test.na.drop(how='all',thresh=3).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 40|        10| 40000|
|     NULL| 36|        10| 60000|
+---------+---+----------+------+



In [39]:
# thresold
df_test.na.drop(how='all',subset=['Experience','Salary']).show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|  40|        10| 40000|
|     Amit|NULL|      NULL| 45000|
|     NULL|  36|        10| 60000|
+---------+----+----------+------+



In [40]:
# filling the null values
df_test.na.fill('Missing Values','Experience').show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|  40|        10| 40000|
|     Amit|NULL|      NULL| 45000|
|     NULL|  36|        10| 60000|
|     NULL|  34|      NULL|  NULL|
+---------+----+----------+------+



In [41]:
df_test.na.fill(50).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 40|        10| 40000|
|     Amit| 50|        50| 45000|
|     NULL| 36|        10| 60000|
|     NULL| 34|        50|    50|
+---------+---+----------+------+



In [42]:
df_test.na.fill({'Age': 50, 'Name': 'unknown','Salary':100000,'Experience':11}).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 40|        10| 40000|
|     Amit| 50|        11| 45000|
|  unknown| 36|        10| 60000|
|  unknown| 34|        11|100000|
+---------+---+----------+------+



In [43]:
# replace missing value with the mean of column
from pyspark.ml.feature import Imputer

imputer=Imputer(
    inputCols=['Age','Experience','Salary'],
    outputCols=["{}_imputed".format(c) for c in ['Age','Experience','Salary']]

).setStrategy("mean")

In [44]:
# add imputation cols to df
imputer.fit(df_test).transform(df_test).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|  40|        10| 40000|         40|                10|         40000|
|     Amit|NULL|      NULL| 45000|         29|                 6|         45000|
|     NULL|  36|        10| 60000|         36|                10|         60000|
|     NULL|  34|      NULL| 

In [45]:
df_test.fillna({"Name": "Unknown"}).show()

+---------+----+----------+------+
|     Name| Age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|  40|        10| 40000|
|     Amit|NULL|      NULL| 45000|
|  Unknown|  36|        10| 60000|
|  Unknown|  34|      NULL|  NULL|
+---------+----+----------+------+



### Filter Operation

In [46]:
df_test=df_test.na.drop()

In [47]:
df_test.show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh| 40|        10| 40000|
+---------+---+----------+------+



In [48]:
# salary of people less than 25000
df_test.filter("Salary<=25000").show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [49]:
df_test.filter("Salary<=25000").select(['Name','Age']).show()

+---------+---+
|     Name|Age|
+---------+---+
|Sudhanshu| 30|
|    Sunny| 29|
|     Paul| 24|
|   Harsha| 21|
|  Shubham| 23|
+---------+---+



In [50]:
df_test.filter(df_test['Salary']<=25000).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [51]:
df_test.filter((df_test['Salary']<=25000) & (df_test['Age']>=25)).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
+---------+---+----------+------+



### Pyspark GroupBy and Aggregate Function

In [52]:
sample_data=spark.read.csv(r'C:\Users\dell\Downloads\sample_data.csv',header=True,inferSchema=True)

In [53]:
sample_data.show()

+-----------+-----------+------+-----+--------+------+
|   Category|Subcategory|Region|Sales|Quantity|Profit|
+-----------+-----------+------+-----+--------+------+
|Electronics|     Mobile| North|  500|       5|    50|
|Electronics|     Laptop| South| 1500|      10|   200|
|  Furniture|      Chair| North|  200|       2|    20|
|  Furniture|      Table|  East|  300|       3|    40|
|   Clothing|      Shirt|  West|  100|       1|    10|
|   Clothing|      Jeans|  West|  150|       2|    15|
|Electronics|     Mobile| South|  700|       7|    70|
+-----------+-----------+------+-----+--------+------+



In [54]:
sample_data.printSchema()

root
 |-- Category: string (nullable = true)
 |-- Subcategory: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Sales: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Profit: integer (nullable = true)



In [55]:
#### GroupBy
sample_data.groupBy('Category').sum().show()

+-----------+----------+-------------+-----------+
|   Category|sum(Sales)|sum(Quantity)|sum(Profit)|
+-----------+----------+-------------+-----------+
|Electronics|      2700|           22|        320|
|   Clothing|       250|            3|         25|
|  Furniture|       500|            5|         60|
+-----------+----------+-------------+-----------+



In [56]:
sample_data.groupBy('Category').sum().show()

+-----------+----------+-------------+-----------+
|   Category|sum(Sales)|sum(Quantity)|sum(Profit)|
+-----------+----------+-------------+-----------+
|Electronics|      2700|           22|        320|
|   Clothing|       250|            3|         25|
|  Furniture|       500|            5|         60|
+-----------+----------+-------------+-----------+



In [57]:
sample_data.groupBy('Category').agg({'Sales': 'sum'}).show()

+-----------+----------+
|   Category|sum(Sales)|
+-----------+----------+
|Electronics|      2700|
|   Clothing|       250|
|  Furniture|       500|
+-----------+----------+



In [58]:
sample_data.groupBy('Category').agg({'Sales':'sum','Profit':'mean','Quantity':'max'}).show()

+-----------+----------+-------------+------------------+
|   Category|sum(Sales)|max(Quantity)|       avg(Profit)|
+-----------+----------+-------------+------------------+
|Electronics|      2700|           10|106.66666666666667|
|   Clothing|       250|            2|              12.5|
|  Furniture|       500|            3|              30.0|
+-----------+----------+-------------+------------------+



In [59]:
sample_data.filter(sample_data['Category']=='Electronics').show()

+-----------+-----------+------+-----+--------+------+
|   Category|Subcategory|Region|Sales|Quantity|Profit|
+-----------+-----------+------+-----+--------+------+
|Electronics|     Mobile| North|  500|       5|    50|
|Electronics|     Laptop| South| 1500|      10|   200|
|Electronics|     Mobile| South|  700|       7|    70|
+-----------+-----------+------+-----+--------+------+



In [61]:
# Count the number of rows
print("Number of rows:", sample_data.count())

Number of rows: 7


In [62]:
# Distinct categories
sample_data.select("Category").distinct().show()

+-----------+
|   Category|
+-----------+
|Electronics|
|   Clothing|
|  Furniture|
+-----------+



In [63]:
sample_data.filter(col("Category") == "Electronics").show()

+-----------+-----------+------+-----+--------+------+
|   Category|Subcategory|Region|Sales|Quantity|Profit|
+-----------+-----------+------+-----+--------+------+
|Electronics|     Mobile| North|  500|       5|    50|
|Electronics|     Laptop| South| 1500|      10|   200|
|Electronics|     Mobile| South|  700|       7|    70|
+-----------+-----------+------+-----+--------+------+



In [64]:
# Add a new column with a constant value
sample_data =sample_data.withColumn("Tax", lit(10))
sample_data.show()

+-----------+-----------+------+-----+--------+------+---+
|   Category|Subcategory|Region|Sales|Quantity|Profit|Tax|
+-----------+-----------+------+-----+--------+------+---+
|Electronics|     Mobile| North|  500|       5|    50| 10|
|Electronics|     Laptop| South| 1500|      10|   200| 10|
|  Furniture|      Chair| North|  200|       2|    20| 10|
|  Furniture|      Table|  East|  300|       3|    40| 10|
|   Clothing|      Shirt|  West|  100|       1|    10| 10|
|   Clothing|      Jeans|  West|  150|       2|    15| 10|
|Electronics|     Mobile| South|  700|       7|    70| 10|
+-----------+-----------+------+-----+--------+------+---+



In [65]:
# Add a column with a calculated value
sample_data= sample_data.withColumn("Net_Profit", col("Profit") - col("Tax"))
sample_data.show()

+-----------+-----------+------+-----+--------+------+---+----------+
|   Category|Subcategory|Region|Sales|Quantity|Profit|Tax|Net_Profit|
+-----------+-----------+------+-----+--------+------+---+----------+
|Electronics|     Mobile| North|  500|       5|    50| 10|        40|
|Electronics|     Laptop| South| 1500|      10|   200| 10|       190|
|  Furniture|      Chair| North|  200|       2|    20| 10|        10|
|  Furniture|      Table|  East|  300|       3|    40| 10|        30|
|   Clothing|      Shirt|  West|  100|       1|    10| 10|         0|
|   Clothing|      Jeans|  West|  150|       2|    15| 10|         5|
|Electronics|     Mobile| South|  700|       7|    70| 10|        60|
+-----------+-----------+------+-----+--------+------+---+----------+



### Sorting Data

In [67]:
# Sort by Sales (ascending)
sample_data.orderBy("Sales").show()

+-----------+-----------+------+-----+--------+------+---+----------+----------+
|   Category|Subcategory|Region|Sales|Quantity|Profit|Tax|Net_Profit|High_Sales|
+-----------+-----------+------+-----+--------+------+---+----------+----------+
|   Clothing|      Shirt|  West|  100|       1|    10| 10|         0|        No|
|   Clothing|      Jeans|  West|  150|       2|    15| 10|         5|        No|
|  Furniture|      Chair| North|  200|       2|    20| 10|        10|        No|
|  Furniture|      Table|  East|  300|       3|    40| 10|        30|        No|
|Electronics|     Mobile| North|  500|       5|    50| 10|        40|        No|
|Electronics|     Mobile| South|  700|       7|    70| 10|        60|       Yes|
|Electronics|     Laptop| South| 1500|      10|   200| 10|       190|       Yes|
+-----------+-----------+------+-----+--------+------+---+----------+----------+



In [68]:
# Sort by Sales (descending)
sample_data.orderBy(desc("Sales")).show()

+-----------+-----------+------+-----+--------+------+---+----------+----------+
|   Category|Subcategory|Region|Sales|Quantity|Profit|Tax|Net_Profit|High_Sales|
+-----------+-----------+------+-----+--------+------+---+----------+----------+
|Electronics|     Laptop| South| 1500|      10|   200| 10|       190|       Yes|
|Electronics|     Mobile| South|  700|       7|    70| 10|        60|       Yes|
|Electronics|     Mobile| North|  500|       5|    50| 10|        40|        No|
|  Furniture|      Table|  East|  300|       3|    40| 10|        30|        No|
|  Furniture|      Chair| North|  200|       2|    20| 10|        10|        No|
|   Clothing|      Jeans|  West|  150|       2|    15| 10|         5|        No|
|   Clothing|      Shirt|  West|  100|       1|    10| 10|         0|        No|
+-----------+-----------+------+-----+--------+------+---+----------+----------+



## Aggregations

In [70]:
# Group by Category and sum Sales
sample_data.groupBy("Category").sum("Sales").show()

+-----------+----------+
|   Category|sum(Sales)|
+-----------+----------+
|Electronics|      2700|
|   Clothing|       250|
|  Furniture|       500|
+-----------+----------+



In [71]:
# Group by Region and calculate average Profit
sample_data.groupBy("Region").avg("Profit").show()

+------+-----------+
|Region|avg(Profit)|
+------+-----------+
| South|      135.0|
|  East|       40.0|
|  West|       12.5|
| North|       35.0|
+------+-----------+



In [72]:
# Group by Category and Region, and calculate multiple aggregates
sample_data.groupBy("Category", "Region").agg(
    sum("Sales").alias("Total_Sales"),
    avg("Profit").alias("Avg_Profit"),
    max("Quantity").alias("Max_Quantity")
).show()

+-----------+------+-----------+----------+------------+
|   Category|Region|Total_Sales|Avg_Profit|Max_Quantity|
+-----------+------+-----------+----------+------------+
|  Furniture|  East|        300|      40.0|           3|
|   Clothing|  West|        250|      12.5|           2|
|  Furniture| North|        200|      20.0|           2|
|Electronics| North|        500|      50.0|           5|
|Electronics| South|       2200|     135.0|          10|
+-----------+------+-----------+----------+------------+



In [77]:
sample_data.show()

+-----------+-----------+------+-----+--------+------+---+----------+----------+
|   Category|Subcategory|Region|Sales|Quantity|Profit|Tax|Net_Profit|High_Sales|
+-----------+-----------+------+-----+--------+------+---+----------+----------+
|Electronics|     Mobile| North|  500|       5|    50| 10|        40|        No|
|Electronics|     Laptop| South| 1500|      10|   200| 10|       190|       Yes|
|  Furniture|      Chair| North|  200|       2|    20| 10|        10|        No|
|  Furniture|      Table|  East|  300|       3|    40| 10|        30|        No|
|   Clothing|      Shirt|  West|  100|       1|    10| 10|         0|        No|
|   Clothing|      Jeans|  West|  150|       2|    15| 10|         5|        No|
|Electronics|     Mobile| South|  700|       7|    70| 10|        60|       Yes|
+-----------+-----------+------+-----+--------+------+---+----------+----------+



## Pivot and Unpivot

In [78]:
# Pivot: Total Sales by Region and Category
sample_data.groupBy("Region").pivot("Category").sum("Sales").show()

+------+--------+-----------+---------+
|Region|Clothing|Electronics|Furniture|
+------+--------+-----------+---------+
| South|    NULL|       2200|     NULL|
|  East|    NULL|       NULL|      300|
|  West|     250|       NULL|     NULL|
| North|    NULL|        500|      200|
+------+--------+-----------+---------+



# **What is RDD in PySpark?**  

RDD (**Resilient Distributed Dataset**) is the **fundamental data structure** in PySpark. It is an **immutable**, **distributed**, and **fault-tolerant** collection of objects that can be processed in parallel across multiple nodes in a cluster.  

## **Key Features of RDD**  
- **Immutable** → Once created, it cannot be changed.  
- **Distributed** → Data is distributed across multiple machines.  
- **Lazy Evaluation** → Computations are performed only when an action is triggered.  
- **Fault-Tolerant** → Automatically recovers from failures.  
- **Parallel Processing** → Leverages multiple CPU cores and machines.  

---

## **RDD vs. Pandas DataFrame**  

| Feature | RDD | Pandas DataFrame |
|---------|-----|-----------------|
| **Data Handling** | Works with unstructured and structured data. | Works mainly with structured data (tables). |
| **Storage** | Distributed across multiple machines. | Stored in a single machine's memory. |
| **Performance** | Optimized for big data but lacks built-in optimizations. | Fast for small to medium-sized data. |
| **Operations** | Uses **map, filter, reduce** functions for transformations. | Uses **vectorized operations** (NumPy-based) for speed. |
| **Lazy Evaluation** | Yes, computations run only when an action is called. | No, operations execute immediately. |
| **Ease of Use** | Requires functional programming concepts. | Simple and user-friendly with intuitive syntax. |
| **Schema Support** | No schema enforcement (unstructured data). | Enforces schema and supports mixed data types. |

---

## **Example Usage**
### **Creating an RDD in PySpark**
```python<br>
`from pyspark.sql import SparkSession`

### Initialize Spark Session
`spark = SparkSession.builder.appName("RDD Example").getOrCreate()`

### Create an RDD from a Python list
`rdd = spark.sparkContext.parallelize([("Alice", 25), ("Bob", 30), ("Charlie", 35)])`

### Display the RDD content
`print(rdd.collect())`

## Using Pandas
`import pandas as pd`

### Create a DataFrame
`df = pd.DataFrame([("Alice", 25), ("Bob", 30), ("Charlie", 35)], columns=["Name", "Age"])`

### Display the DataFrame
`print(df)`


### When to Use RDD vs. Pandas?

   - **Use RDD** when dealing with **large-scale distributed data processing**.
   - Use **Pandas** for small to **medium-sized datasets** that fit in **memory.**