#Basic Tutorial

## Importing Necessary Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName("Add your Spark App name").getOrCreate()

##DataFrame Creation from Scratch

### From CSV File

In [0]:
loc_in_which_dataset_is_stored_in_Databricks= "/FileStore/tables/Mobiles_Dataset__2025_-1.csv"

#### Read

In [0]:
df=spark.read.format('csv').option('header',True).option('inferschema',True).load(loc_in_which_dataset_is_stored_in_Databricks)

#   ANOTHER WAY

df = spark.read.csv(loc_in_which_dataset_is_stored_in_Databricks, inferSchema= True, header=True)

OPTIONS
- **header:** This option is used to specify whether the file contains a header or not.
   - The default value is True. tells Spark to use the first row of the CSV file as the column names for the DataFrame
   - If you set it to False, then Spark will not use the first line of the file as the column names, and will instead generate column names of the form "_c0", "_c1", etc.

- **inferSchema:** This option is used to specify whether the data types of the columns should be inferred from the data or not.
  - The default value is False, i.e all columns will be treated as strings.
  - If you set it to True, then Spark will try to infer the data types of the columns from the data. 
      - This can save you the trouble of having to specify the data types manually, but it can also lead to errors if the data types are not correctly inferred.


- **sep:** This option is used to specify the separator used in the CSV file. 
  - The default value is ','.

- **mode:** This option is used to specify the behavior of the file reader when it encounters corrupt records. 
  - The default value is PERMISSIVE, which means that the file reader will attempt to parse corrupt records and assign them to a special column named _corrupt_record.
  - Other options are DROPMALFORMED which drops the rows with malformed records and FAILFAST which throws an exception when it encounters corrupt records.


- **nullValue:** This option is used to specify the string that indicates a null value. 
  - The default value is null.
- **timestampFormat:** This option is used to specify the format of timestamp columns. 
  - The default value is yyyy-MM-dd'T'HH:mm:ss.SSSXXX.
- **dateFormat:** This option is used to specify the format of date columns. 
  - The default value is yyyy-MM-dd.
- **maxColumns:** This option is used to specify the maximum number of columns to read.
- **maxCharsPerColumn:** This option is used to specify the maximum number of characters per column to read.
- **multiLine:** This option is used to specify whether or not to allow multi-line CSV records.

In [0]:
df.display(5)
        #OR
df.show(3)

Company Name,Model Name,Mobile Weight,RAM,Front Camera,Back Camera,Processor,Battery Capacity,Screen Size,Launched Price (Pakistan),Launched Price (India),Launched Price (China),Launched Price (USA),Launched Price (Dubai),Launched Year
Apple,iPhone 16 128GB,174g,6GB,12MP,48MP,A17 Bionic,"3,600mAh",6.1 inches,"PKR 224,999","INR 79,999","CNY 5,799",USD 799,"AED 2,799",2024
Apple,iPhone 16 256GB,174g,6GB,12MP,48MP,A17 Bionic,"3,600mAh",6.1 inches,"PKR 234,999","INR 84,999","CNY 6,099",USD 849,"AED 2,999",2024
Apple,iPhone 16 512GB,174g,6GB,12MP,48MP,A17 Bionic,"3,600mAh",6.1 inches,"PKR 244,999","INR 89,999","CNY 6,499",USD 899,"AED 3,199",2024
Apple,iPhone 16 Plus 128GB,203g,6GB,12MP,48MP,A17 Bionic,"4,200mAh",6.7 inches,"PKR 249,999","INR 89,999","CNY 6,199",USD 899,"AED 3,199",2024
Apple,iPhone 16 Plus 256GB,203g,6GB,12MP,48MP,A17 Bionic,"4,200mAh",6.7 inches,"PKR 259,999","INR 94,999","CNY 6,499",USD 949,"AED 3,399",2024
Apple,iPhone 16 Plus 512GB,203g,6GB,12MP,48MP,A17 Bionic,"4,200mAh",6.7 inches,"PKR 274,999","INR 104,999","CNY 6,999",USD 999,"AED 3,599",2024
Apple,iPhone 16 Pro 128GB,206g,6GB,12MP / 4K,50MP + 12MP,A17 Pro,"4,400mAh",6.1 inches,"PKR 284,999","INR 99,999","CNY 6,999",USD 999,"AED 3,499",2024
Apple,iPhone 16 Pro 256GB,206g,8GB,12MP / 4K,50MP + 12MP,A17 Pro,"4,400mAh",6.1 inches,"PKR 294,999","INR 104,999","CNY 7,099","USD 1,049","AED 3,699",2024
Apple,iPhone 16 Pro 512GB,206g,8GB,12MP / 4K,50MP + 12MP,A17 Pro,"4,400mAh",6.1 inches,"PKR 314,999","INR 114,999","CNY 7,499","USD 1,099","AED 3,899",2024
Apple,iPhone 16 Pro Max 128GB,221g,6GB,12MP / 4K,48MP + 12MP,A17 Pro,"4,500mAh",6.7 inches,"PKR 314,999","INR 109,999","CNY 7,499","USD 1,099","AED 3,799",2024


+------------+---------------+-------------+---+------------+-----------+----------+----------------+-----------+-------------------------+----------------------+----------------------+--------------------+----------------------+-------------+
|Company Name|     Model Name|Mobile Weight|RAM|Front Camera|Back Camera| Processor|Battery Capacity|Screen Size|Launched Price (Pakistan)|Launched Price (India)|Launched Price (China)|Launched Price (USA)|Launched Price (Dubai)|Launched Year|
+------------+---------------+-------------+---+------------+-----------+----------+----------------+-----------+-------------------------+----------------------+----------------------+--------------------+----------------------+-------------+
|       Apple|iPhone 16 128GB|         174g|6GB|        12MP|       48MP|A17 Bionic|        3,600mAh| 6.1 inches|              PKR 224,999|            INR 79,999|             CNY 5,799|             USD 799|             AED 2,799|         2024|
|       Apple|iPhone 16 

####Write

In [0]:
df.write.csv("path/to/output.csv", header=True, mode="overwrite").save("path/to/output.csv")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-1888523386317378>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mcsv[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mpath/to/output.csv[39;49m[38;5;124;43m"[39;49m[43m,[49m[43m [49m[43mheader[49m[38;5;241;43m=[39;49m[38;5;28;43;01mTrue[39;49;00m[43m,[49m[43m [49m[43mmode[49m[38;5;241;43m=[39;49m[38;5;124;43m"[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43msave[49m([38;5;124m"[39m[38;5;124mpath/to/output.csv[39m[38;5;124m"[39m)

[0;31mAttributeError[0m: 'NoneType' object has no attribute 'save'

- **Mode**: is used to specify the behavior when the output file already exists.
  - The available options are **overwrite**, **append** and **ignore**.
- **.save("path/to/output.csv")**: It is used to save the dataframe to a csv file. 

### From JSON File

In [0]:
df.withColumnRenamed('Launched Year','LAUNCHED YEAR').display()



In [0]:
loc_in_which_dataset_is_stored_in_Databricks= "/FileStore/tables/Mobiles_Dataset__2025_-1.csv"



#### Read

#### Multiple Columns

In [0]:
df.withColumnRenamed("Model Name","MODEL NAME") \
  .withColumnRenamed("Company Name","COMPANY NAME").display()



In [0]:
df=spark.read.format('json').option('multiline',True).load(loc_in_which_dataset_is_stored_in_Databricks)

#   ANOTHER WAY

df = spark.read.option("multiline", True).json(loc_in_which_dataset_is_stored_in_Databricks) 



**OPTIONS**

- **compression :** It is used to specify the codec to use when reading compressed json files.

- **multiLine :** It is used to read json files in multi-line mode.


- **columnNameOfCorruptRecord : ** This option is used to specify the column name for the additional column that is used to capture the malformed JSON records.

- **mode:** This is used to specify the behavior when data or table already exists. 
  - Acceptable options are "overwrite", "append", "ignore", "error".

In [0]:
df.display(5)
        #OR
df.show(3)



####Write

In [0]:
df.write.json("path/to/file.json", compression="gzip",) 



 

- **compression :** This is used to specify the codec to use when writing compressed json files.
- **multiLine :** This is used to write json files in multi-line mode.

- **timeZone :** This is used to specify the time zone to use when writing timestamps.
- **mode :** This is used to specify the behavior when data or table already exists.
  - Acceptable options are 
    - overwrite
    - append
    - ignore
    - error

### From Scratch

In [0]:

data = [
    ["Alice", "HR", 50000, 28],
    ["Bob", "Finance", 60000, None],
    ["Charlie", None, 70000, 35],
    ["David", "HR", None, 40],
    ["Bob", "Finance", 60000, None],
    ["David", "HR", None, 40]
]

In [0]:
Schema = '''
            Name STRING,
            Department STRING,
            Salary INTEGER,
            Age INTEGER
        ''' 

In [0]:
df = spark.createDataFrame(data=data, schema = Schema)

In [0]:
df.printSchema()
df.show(3)
df.display(3)

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)

+-------+----------+------+----+
|   Name|Department|Salary| Age|
+-------+----------+------+----+
|  Alice|        HR| 50000|  28|
|    Bob|   Finance| 60000|null|
|Charlie|      null| 70000|  35|
+-------+----------+------+----+
only showing top 3 rows



Name,Department,Salary,Age
Alice,HR,50000.0,28.0
Bob,Finance,60000.0,
Charlie,,70000.0,35.0
David,HR,,40.0
Bob,Finance,60000.0,
David,HR,,40.0


![](image.png)

##DataFrame Operations

###withColumnRenamed: [ Rename the Existing Column ]

- It Allows us to rename an Existing column in a DataFrame.

- This is especially helpful for - 
  - improving the readability of your data
  - adhering to specific naming standards 
  - preparing for data integration tasks.

#### Syntax

###### `withColumnRenamed(existingName,newNam)`

this function takes two parameters
-  existingName – The existing column we want to change
-  newName – New name of the column





Returns a new DataFrame with a column renamed.




#### Single Column

In [0]:
df=df.withColumnRenamed('Name','NAME')
df.display()

NAME,Department,Salary,Age
Alice,HR,50000.0,28.0
Bob,Finance,60000.0,
Charlie,,70000.0,35.0
David,HR,,40.0
Bob,Finance,60000.0,
David,HR,,40.0


#### Multiple Columns

In [0]:
df=df.withColumnRenamed("Department","DEPARTMENT") \
  .withColumnRenamed("Salary","SALARY")
df.display()

NAME,DEPARTMENT,SALARY,Age
Alice,HR,50000.0,28.0
Bob,Finance,60000.0,
Charlie,,70000.0,35.0
David,HR,,40.0
Bob,Finance,60000.0,
David,HR,,40.0


### WithColumn: [ Add a new column ]

#### With Constant Value

- to add a new column to DataFrame use lit() function by importing from pyspark.sql.functions. 
- lit() function takes a constant value you wanted to add and returns a Column type.
- In case you want to add a NULL/None use lit(None)



`from pyspark.sql.functions import lit`

`df = df.withColumn('New Column's Name',lit("new value")) `

In [0]:
df=df.withColumn('Flag',lit("Y"))
#This will create a new column named 'flag' in which each row has the same value 'Y'

#### With Existing Column

In [0]:
df=df.withColumn("Bonus",df['Salary']*0.25)
df=df.withColumn("Bonus",col('Salary')*0.25)
df.display()
#This will create a new column named 'Bonus'

NAME,DEPARTMENT,SALARY,Age,Flag,Bonus
Alice,HR,50000.0,28.0,Y,12500.0
Bob,Finance,60000.0,,Y,15000.0
Charlie,,70000.0,35.0,Y,17500.0
David,HR,,40.0,Y,
Bob,Finance,60000.0,,Y,15000.0
David,HR,,40.0,Y,


#### Based on a specific Condition

use `when otherwise` and `lit()` together.

Method 1:

In [0]:
df=df.withColumn("YOUNG or OLD", when(df['Age'] < 30, "Young").otherwise("Old")) 
df.display() 

#OR

df=df.withColumn("YOUNG or OLD",\
              when((df['Age'] < 30), "Young Blood") \
              .when((df['Age'] < 50) & (df['Age'] > 40), "Middle Aged") \
               .otherwise("Oldie")\
               )
df.display()

NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
Alice,HR,50000.0,28.0,Y,12500.0,Young
Bob,Finance,60000.0,,Y,15000.0,Old
Charlie,,70000.0,35.0,Y,17500.0,Old
David,HR,,40.0,Y,,Old
Bob,Finance,60000.0,,Y,15000.0,Old
David,HR,,40.0,Y,,Old


NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
Alice,HR,50000.0,28.0,Y,12500.0,Young Blood
Bob,Finance,60000.0,,Y,15000.0,Oldie
Charlie,,70000.0,35.0,Y,17500.0,Oldie
David,HR,,40.0,Y,,Oldie
Bob,Finance,60000.0,,Y,15000.0,Oldie
David,HR,,40.0,Y,,Oldie


Method 2:

In [0]:
df=df.withColumn("YOUNG or OLD", when(col('Age') < 30, "Young").otherwise("Old")) 
df.display() 

#OR

df=df.withColumn("YOUNG or OLD",when((col('Age') < 30), lit("Young Blood")) \
     .when((col('Age') < 50) & (col('Age') > 40), lit("Middle Aged")) \
     .otherwise(lit("Oldie")))
df.display()

NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
Alice,HR,50000.0,28.0,Y,12500.0,Young
Bob,Finance,60000.0,,Y,15000.0,Old
Charlie,,70000.0,35.0,Y,17500.0,Old
David,HR,,40.0,Y,,Old
Bob,Finance,60000.0,,Y,15000.0,Old
David,HR,,40.0,Y,,Old


NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
Alice,HR,50000.0,28.0,Y,12500.0,Young Blood
Bob,Finance,60000.0,,Y,15000.0,Oldie
Charlie,,70000.0,35.0,Y,17500.0,Oldie
David,HR,,40.0,Y,,Oldie
Bob,Finance,60000.0,,Y,15000.0,Oldie
David,HR,,40.0,Y,,Oldie


### WithColumn: [ Update Column ]

#### With Constant Value

- to Update a Column in DataFrame 
we can use lit() function by importing from pyspark.sql.functions. 
- lit() function takes a constant value you wanted to add and returns a Column type.
- In case we want to update a column with NULL/None use lit(None)



`from pyspark.sql.functions import lit`

`df = df.withColumn('Column name to update',lit("new value")) `

In [0]:
df=df.withColumn('Flag',lit("IRRELEVANT"))
df.display()

NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
Alice,HR,50000.0,28.0,IRRELEVANT,12500.0,Young Blood
Bob,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
Charlie,,70000.0,35.0,IRRELEVANT,17500.0,Oldie
David,HR,,40.0,IRRELEVANT,,Oldie
Bob,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
David,HR,,40.0,IRRELEVANT,,Oldie


#### With Existing Column

Method 1:

In [0]:
df=df.withColumn("Age",df['Age']+100)
df.display()

NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
,HR,50000.0,128.0,IRRELEVANT,12500.0,Young Blood
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,,70000.0,135.0,IRRELEVANT,17500.0,Oldie
,HR,,140.0,IRRELEVANT,,Oldie
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,HR,,140.0,IRRELEVANT,,Oldie


Method 2:

In [0]:
df=df.withColumn("Age",col('Age')-100)
df.display()

NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
,HR,50000.0,28.0,IRRELEVANT,12500.0,Young Blood
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,,70000.0,35.0,IRRELEVANT,17500.0,Oldie
,HR,,40.0,IRRELEVANT,,Oldie
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,HR,,40.0,IRRELEVANT,,Oldie


#### Based on a specific Condition

use `when otherwise` and `lit()` together.

Method 1:

In [0]:
df=df.withColumn("Age", when(df['Age'] == 40, 41).otherwise(df["Age"])) 
df.display() 

#OR

df.withColumn("Age",\
                when(
                      (  (df['Name'] == 'Charlie') & (df['Department'].isNull())  ), "100"
                    ).otherwise(df["Age"]) ).display()

NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
,HR,50000.0,28.0,IRRELEVANT,12500.0,Young Blood
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,,70000.0,35.0,IRRELEVANT,17500.0,Oldie
,HR,,41.0,IRRELEVANT,,Oldie
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,HR,,41.0,IRRELEVANT,,Oldie


NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
,HR,50000.0,28.0,IRRELEVANT,12500.0,Young Blood
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,,70000.0,35.0,IRRELEVANT,17500.0,Oldie
,HR,,41.0,IRRELEVANT,,Oldie
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,HR,,41.0,IRRELEVANT,,Oldie


Method 2:

In [0]:
df=df.withColumn("Age", when(col('Age') == 40, 41).otherwise(col("Age")))
df.display() 

#OR

df.withColumn("Age",\
                when(
                      (  (col('Name') == 'Charlie') & (col('Department').isNull())  ), "100"
                    ).otherwise(col("Age")) ).display()

NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
,HR,50000.0,28.0,IRRELEVANT,12500.0,Young Blood
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,,70000.0,35.0,IRRELEVANT,17500.0,Oldie
,HR,,41.0,IRRELEVANT,,Oldie
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,HR,,41.0,IRRELEVANT,,Oldie


NAME,DEPARTMENT,SALARY,Age,Flag,Bonus,YOUNG or OLD
,HR,50000.0,28.0,IRRELEVANT,12500.0,Young Blood
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,,70000.0,35.0,IRRELEVANT,17500.0,Oldie
,HR,,41.0,IRRELEVANT,,Oldie
,Finance,60000.0,,IRRELEVANT,15000.0,Oldie
,HR,,41.0,IRRELEVANT,,Oldie


### drop: [ Remove Column ]

- It Allows us to remove column/columns from a DataFrame.
- It returns a new DataFrame after dropping the given column

#### Single Column

Method 1:

In [0]:

df=df.drop("Flag")
df.display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD
,HR,50000.0,28.0,12500.0,Young Blood
,Finance,60000.0,,15000.0,Oldie
,,70000.0,35.0,17500.0,Oldie
,HR,,41.0,,Oldie
,Finance,60000.0,,15000.0,Oldie
,HR,,41.0,,Oldie


Method 2:

In [0]:
df.drop(col('Flag'))
df.display()


NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD
,HR,50000.0,28.0,12500.0,Young Blood
,Finance,60000.0,,15000.0,Oldie
,,70000.0,35.0,17500.0,Oldie
,HR,,41.0,,Oldie
,Finance,60000.0,,15000.0,Oldie
,HR,,41.0,,Oldie


#### Multiple Columns

In [0]:
df.drop("Age","Salary").display()

NAME,DEPARTMENT,Bonus,YOUNG or OLD
,HR,12500.0,Young Blood
,Finance,15000.0,Oldie
,,17500.0,Oldie
,HR,,Oldie
,Finance,15000.0,Oldie
,HR,,Oldie


##  Select - [ Column Selection ] 

#### Single Column: 

Method 1:

In [0]:
df.select('Name').display()

Name
""
""
""
""
""
""


Method 2:

In [0]:
df.select(df['Name']).display()

Name
""
""
""
""
""
""


Method 3:

In [0]:
df.select(col('Name')).display()


Name
""
""
""
""
""
""


> it selects a single column from DataFrame df and returns a new DataFrame

#### Multiple Columns: 

Method 1:

In [0]:
df.select('Name','Department').display()

Name,Department
,HR
,Finance
,
,HR
,Finance
,HR


Method 2:

In [0]:

df.select(['Name','Department']).display()

Name,Department
,HR
,Finance
,
,HR
,Finance
,HR


Method 3:

In [0]:
df.select(col('Name'),col('Department')).display()

Name,Department
,HR
,Finance
,
,HR
,Finance
,HR


> it selects multiple columns from DataFrame df and returns a new DataFrame

#### All Columns: 

In [0]:
df.select("*").display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD
,HR,50000.0,28.0,12500.0,Young Blood
,Finance,60000.0,,15000.0,Oldie
,,70000.0,35.0,17500.0,Oldie
,HR,,41.0,,Oldie
,Finance,60000.0,,15000.0,Oldie
,HR,,41.0,,Oldie


> it selects all columns of the DataFrame df and returns a new DataFrame

#### Using Alias: 

In [0]:
df.select(df['Name'].alias('NAME')).display()

NAME
""
""
""
""
""
""


In [0]:
df.select(col('Name').alias('NAME')).display()

NAME
""
""
""
""
""
""


## Filter - [ Column Selection ] 
  - It is used to filter rows in a DataFrame based on one or more conditions. 
  - It can be used with a variety of conditions, including equality checks, range queries, and string operations.

#### Single Condition

Method 1:

In [0]:
df1 = df.filter(df['Name'] == "Bob")
df1 = df[df['Name']=="Bob"]
df1.display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD


Method 2:

In [0]:
f1=df.filter( col("Name")=="Bob" )
f1.display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD


#### Multiple condition

In [0]:
f2 = df.filter((df["Department"] < "Finance") | (df["Name"] == "Charlie"))


In [0]:
f2=df.filter((col("Name") == "Bob") | (col("Department") == "Finance"))
f2.display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD
,Finance,60000,,15000.0,Oldie
,Finance,60000,,15000.0,Oldie


> This can be achieved by combining individual conditions using logical operators like 
- & (AND)

`(col("Company Name") == "Apple") & (col("RAM") == "6 GB")`
- | (OR)

`(col("Company Name") == "Apple") | (col("RAM") == "6 GB")`
- ~ (NOT).

`~(col("Company Name") == "Apple")`

#### Multiple Values in a Column 
####  Using isin() 

In [0]:
f3=df.filter( (col("Name").isin("Alice","Bob")) )
f3=df.filter( (df["Name"].isin("Alice","Bob")) )
f3.display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD


#### Using Column Functions

##### 4.1 between()
Checks if the columns values are between lower and upper bound. Returns boolean value.

In [0]:
f4=df.filter(df["Salary"].between(4000,7000))
f4=df.filter(col("Salary").between(4000,7000))
f4.display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD


##### 4.2 isNull() / isNotNull()
Checks if the DataFrame column has NULL or non NULL values.

In [0]:
f5=df.filter(df['Age'].isNull())
f5=df.filter(col('Age').isNull())
f5.display()

f5=df.filter(df['Salary'].isNotNull())
f6=df.filter(col("Salary").isNotNull())
f6.display()


NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD
,Finance,60000,,15000.0,Oldie
,Finance,60000,,15000.0,Oldie


NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD
,HR,50000,28.0,12500.0,Young Blood
,Finance,60000,,15000.0,Oldie
,,70000,35.0,17500.0,Oldie
,Finance,60000,,15000.0,Oldie


##### 4.3 like()
It is similar to SQL LIKE operator that is used to match based on wildcard characters

[ (%) Percentage, (%) Underscore] to filter the rows.

In [0]:
f7=df.filter( df["Department"].like("%R") )
f7=df.filter( col("Department").like("%R") )
f3.display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD


In [0]:
df.describe().display()

summary,NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD
count,0.0,5,4.0,4.0,4.0,6
mean,,,60000.0,36.25,15000.0,
stddev,,,8164.96580927726,6.184658438426491,2041.241452319315,
min,,Finance,50000.0,28.0,12500.0,Oldie
max,,HR,70000.0,41.0,17500.0,Young Blood


##Join - [ Combine multiple DataFrames.]
- It is used to combine two or DataFrames based on a common key.
- It is a crucial operation in data processing and analysis, allowing us to merge data from different sources and create more meaningful insights.

###Syntax -
`table1.join(table2, on=table1.id == table2.id, how=Join_Method)`

- table1: Left side of the table here table1 is on Left Side
- table 2: Right side of the table here table2 is on the right side 
- on: A Common Key, here we will connect 2 keys of different tables with same name 
- how: Method by which we want to join hee default inner. Must be one of inner, cross, outer,full, full_outer, left, left_outer, right, right_outer,left_semi, and left_anti.

### Join Types -

#### Right Outer Join
- Similar to the left outer join, but returns all rows from the right DataFrame.
- Syntax: 
  - `df1.join(df2, on="key", how="right_outer")` 
  - `df1.join(df2, on="key", how="right")`

####Full Outer Join
- Returns all rows from both DataFrames, with matching rows from both sides where available.
- If there is no match, the result is null on the side of the DataFrame without a match.
- Syntax: 
  - `df1.join(df2, on="key", how="outer") `
  - `df1.join(df2, on="key", how="full")`

#### Inner Join
- Combines rows from both DataFrames that match the join condition.
- combines two DataFrames based on the key (common column) provided and results in rows where there is a matching found. Rows from both DataFrames are dropped with a non-matching key.
- Syntax: 
   - `df1.join(df2, on="key", how="inner")`

#### Cross Join
- A cross join returns the Cartesian product of rows from both DataFrames, which results in a DataFrame with every possible combination of rows from the input DataFrames.


- Combines each row of the first DataFrame with every row of the second DataFrame.
- Often used for exhaustive combinations.
- Syntax: 
  
  - `df1.crossJoin(df2)`

## Sort - [ Arrange rows based on one or more columns ]
- It is used to arrange the rows in a DataFrame in ascending or descending order based on one or more column values.

- By default, the sort() function sorts data in ascending order, but we can specify descending order as well.

###Syntax
`df.sort(col("Column1"),col("column2").desc())`

###Sorting by Single Column


#### Ascending Order

In [0]:
df_sorted = df.sort("Department")
df_sorted.display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD
,,70000.0,35.0,17500.0,Oldie
,Finance,60000.0,,15000.0,Oldie
,Finance,60000.0,,15000.0,Oldie
,HR,50000.0,28.0,12500.0,Young Blood
,HR,,41.0,,Oldie
,HR,,41.0,,Oldie


> it sorts the df DataFrame in ascending order based on the Weight column.

####Descending Order

In [0]:
from pyspark.sql.functions import col

df_sorted_desc = df.sort(col("Age").desc())
df_sorted_desc.display()

NAME,DEPARTMENT,SALARY,Age,Bonus,YOUNG or OLD
,HR,,41.0,,Oldie
,HR,,41.0,,Oldie
,,70000.0,35.0,17500.0,Oldie
,HR,50000.0,28.0,12500.0,Young Blood
,Finance,60000.0,,15000.0,Oldie
,Finance,60000.0,,15000.0,Oldie


> It sorts the df DataFrame in descending order based on the Weight column.

> First, It will sort the df DataFrame first by the Weight column in descending order

> Then, by the Length column in ascending order for rows with the same 'Weight'.

##  Group By - [ Data Aggregation ] 

- DataFrame aggregation is a fundamental operation for summarizing and transforming large datasets by performing aggregate functions on groups of rows, rather than on individual rows, enabling us to aggregate various statistics,metrics and summarize data across different groups or the entire dataset

- In Short, It groups rows that have the same values in specified columns into summary rows. 

In [0]:
df.groupBy("Department").agg(mean("Age")).display()

Department,avg(Age)
HR,36.66666666666666
Finance,
,35.0


- **Grouping**: 
  - We  specify one or more columns in the **groupBy()** function to define the grouping criteria.

  - Rows with identical values in the specified columns are grouped together into distinct groups.


- **Aggregation**: 
  - After grouping the rows, you can apply aggregate functions such as COUNT, SUM, AVG, MIN, MAX, etc., to each group.

  - These aggregate functions compute summary statistics for the rows within each group.

### Aggregate Functions

 > **count():** 
 Counts the number of rows in each group.

> **mean():**	
 Calculates the arithmetic mean (average) of the values in each group.

> **max():**	
 Finds the maximum value in each group.

> **min():**
 Finds the minimum value in each group.

> **sum():**	Calculate the sum of all the values in each group.

> **avg():** Returns the average for values for each group.

> **agg():**
Using it groupBy().agg() function, we can calculate more than one aggregate at a time.



### Single Column: 

#### Without Alias: 

In [0]:
df.groupBy('Department').agg(mean('Salary')).display()

Department,avg(Salary)
HR,50000.0
Finance,60000.0
,70000.0


#### With Alias: 

In [0]:
df.groupBy('Department').agg(avg('Age').alias("Maximun Age in Each Department")).display()

Department,Maximun Age in Each Department
HR,41.0
Finance,
,35.0


### Multiple Column: 

#### Without Alias: 

In [0]:
df.groupBy('Department').agg(mean('Age'),mean('Salary')).display()

Department,avg(Age),avg(Salary)
HR,36.66666666666666,50000.0
Finance,,60000.0
,35.0,70000.0


In [0]:
df.groupBy('Department','YOUNG or OLD').agg(mean('Age'),max('Salary')).display()

Department,YOUNG or OLD,avg(Age),max(Salary)
HR,Young Blood,28.0,50000.0
Finance,Oldie,,60000.0
,Oldie,35.0,70000.0
HR,Oldie,41.0,


#### With Alias: 

In [0]:
df.groupBy('Department').agg(count('Name').alias("Total No. of Employees in each Department")).display()

Department,Total No. of Employees in each Department
HR,0
Finance,0
,0


In [0]:
df.groupBy('Department','YOUNG or OLD').agg(mean('Age').alias('Average Age of an Employee'),min('Salary').alias('Minimum Salary of An Employee')).display()

Department,YOUNG or OLD,Average Age of an Employee,Minimum Salary of An Employee
HR,Young Blood,28.0,50000.0
Finance,Oldie,,60000.0
,Oldie,35.0,70000.0
HR,Oldie,41.0,


### Using filter on aggregate data

In [0]:
df.groupBy('Launched Year').agg(count('Launched Year').alias('No of units sold yearwise')).where(col('No of units sold yearwise')>=10).display()




| Pandas | PySpark | Purpose |
| ----------- | ----------- | ----------- |
|df=pd.read_csv() |df=spark.read.csv() |Read CSV files |
|df.head() |df.show() |Display first few rows |
|df.info() |df.printSchema() |Display DataFrame structure |
|df['column'] |df['column'] |Select a column |
|df[['col1', 'col2']] |df[['col1', 'col2']] |Select multiple columns |
|df[df['col'] > value] |df.filter(df['col'] > value) |Filter rows based on condition |
|df['new_col'] = ... |df.withColumn('new_col', ...) |Add or modify columns |
|df.fillna(value) |df.na.fill(value) |Fill missing values |
|df.groupby('col').mean() |df.groupBy('col').agg(avg('col')) |Group by and aggregate |
|df.drop('column', axis=1) |df.drop('column') |Remove a column |

#Data Cleaning

##1. Start a PySpark Session

In [0]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
	.appName("DataCleaning") \
	.getOrCreate()

## 2.  Get the Data

- Generate a Sample Dataset
You can work with any dataset of your choice. 

- We’ll create a dataset with some typical data quality issues such as missing values, duplicate records, and the like.
- This is Just for Illustration Purpose

In [0]:
import random
import pandas as pd
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Random Data Generation") \
    .getOrCreate()

spark.conf.set("spark.driver.memory", "8g")
spark.conf.set("spark.executor.memory", "8g")


# Function to generate random data with some missing values and duplicates
def generate_data(n):
    customer_ids = [f'C{str(i).zfill(5)}' for i in range(1, 101)]
    product_categories = ['Electronics', 'Books', 'Clothing', 'Groceries', 'Furniture']

    data = []
    for i in range(n):
        customer_id = random.choice(customer_ids) if i % 10 != 0 else None  # Introduce some missing values
        transaction_id = f'T{str(random.randint(10000, 99999))}'
        transaction_date = pd.Timestamp('2023-01-01') + pd.to_timedelta(random.randint(0, 180), unit='d')
        
        # Generate amount as a float and round it correctly
        amount = float("{:.2f}".format(random.uniform(5, 500)))  # Use string formatting to round
        
        product_category = random.choice(product_categories)
        data.append((customer_id, transaction_id, transaction_date, amount, product_category))

    # Introduce duplicates (only duplicate the first 10 rows)
    data.extend(data[:10])

    return data

# Generate 10,000 rows of data using the function
data = generate_data(100)

# Define column names for the DataFrame
columns = ['CustomerID', 'TransactionID', 'TransactionDate', 'Amount', 'ProductCategory']

# Convert the generated data into a Pandas DataFrame
df = pd.DataFrame(data, columns=columns)

# Create a Spark DataFrame from the Pandas DataFrame
spark_df = spark.createDataFrame(df)

# Display the first few rows of the Spark DataFrame
print("Initial DataFrame:")
spark_df.show(5)

# Example: Adding a constant value to the "Amount" column using lit()
from pyspark.sql.functions import lit

spark_df_with_constant = spark_df.withColumn("AdjustedAmount", spark_df["Amount"] + lit(10.0))

# Show updated DataFrame with the new column
print("DataFrame after adding constant value to Amount:")
spark_df_with_constant.show(5)

# Stop the Spark session when done
spark.stop()




[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-164661248898043>:10[0m
[1;32m      5[0m [38;5;66;03m# Initialize Spark session[39;00m
[1;32m      6[0m spark [38;5;241m=[39m SparkSession[38;5;241m.[39mbuilder \
[1;32m      7[0m     [38;5;241m.[39mappName([38;5;124m"[39m[38;5;124mRandom Data Generation[39m[38;5;124m"[39m) \
[1;32m      8[0m     [38;5;241m.[39mgetOrCreate()
[0;32m---> 10[0m spark[38;5;241m.[39mconf[38;5;241m.[39mset([38;5;124m"[39m[38;5;124mspark.driver.memory[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m8g[39m[38;5;124m"[39m)
[1;32m     11[0m spark[38;5;241m.[39mconf[38;5;241m.[39mset([38;5;124m"[39m[38;5;124mspark.executor.memory[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m8g[39m[38;5;124m"[39m)
[1;32m     14[0m [38;5;66;03m# Function to generate random data wit

##3. Handle Null/Missing Values

- Missing values are a common problem in most datasets.
 
- There are 2 ways by which We can handle the missing values by -
    - dropping records with missing values
    (this isn’t normally recommended!).
    - filling them with default values.

### 3.1 Drop rows

In [0]:
spark_df = spark_df.dropna(subset=["CustomerID"])

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-164661248898046>:1[0m
[0;32m----> 1[0m spark_df [38;5;241m=[39m [43mspark_df[49m[38;5;241m.[39mdropna(subset[38;5;241m=[39m[[38;5;124m"[39m[38;5;124mCustomerID[39m[38;5;124m"[39m])

[0;31mNameError[0m: name 'spark_df' is not defined

### 3.2 Fill Empty Rows

In [0]:
spark_df = spark_df.fillna({"CustomerID": "Unknown"})



##4. Remove Duplicates

In [0]:
spark_df = spark_df.dropDuplicates(subset=["TransactionID"])



##5. Transform Columns

###5.1 Numerical Data

#### Normalization-

In [0]:
from pyspark.sql.functions import col, min, max

# Normalize the 'Amount' column

min_amount = spark_df.select(min(col("Amount"))).collect()[0][0]
max_amount = spark_df.select(max(col("Amount"))).collect()[0][0]

# Perform Normalization
spark_df = spark_df.withColumn("Normalized Amount", (col("Amount") - min_amount) / (max_amount - min_amount))

spark_df.show(5)



#### Standardization-

In [0]:
from pyspark.sql.functions import col, mean, stddev

# Standardize the 'Amount' column, by Calculating Mean and Average of Amount

mean_amount = spark_df.select(mean(col("Amount"))).collect()[0][0]
stddev_amount = spark_df.select(stddev(col("Amount"))).collect()[0][0]

# Perform Standardization
spark_df = spark_df.withColumn("Standardized Amount",(col("Amount")-mean_amount)/stddev_amount)

spark_df.show(5)




###5.2 Categorical Data

#### Label Encoding

In [0]:
from pyspark.ml.feature import StringIndexer

# Initialize StringIndexer
indexer = StringIndexer(inputCol="ProductCategory", outputCol="ProductCategory_Index")

# Fit and Transform the DataFrame
df_indexed = indexer.fit(df).transform(df)
df_indexed.show()



####One Hot Encoding

In [0]:
from pyspark.ml.feature import OneHotEncoder

# Initialize OneHotEncoder
encoder = OneHotEncoder(inputCol="Eatable", outputCol="Eatable_OneHot")

# Transform the DataFrame
df_encoded = encoder.fit(df_indexed).transform(df_indexed)
df_encoded.show()



## 6. Remove Outliers 

In [0]:
from pyspark.sql.functions import col, expr

# Calculate Q1, Q3, and IQR
quantiles = spark_df.approxQuantile("Amount", [0.25, 0.75], 0.05)
Q1 = quantiles[0]
Q3 = quantiles[1]
IQR = Q3 - Q1

# Define the upper and lower bounds
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Filter out the outliers
spark_df = spark_df.filter((col("Amount") >= lower_bound) & (col("Amount") <= upper_bound))



#Reference
- PySpark
  - Cheatsheet
    - [Datacamp Pyspark](https://media.datacamp.com/legacy/image/upload/v1676302905/Marketing/Blog/PySpark_SQL_Cheat_Sheet.pdf)
    - [Spark Playground](https://www.sparkplayground.com/pyspark-syntax-cheatsheet#csv-files)

  - Medium
    - [Pyspark and SQL Comparison - Kaushal Vishal](https://medium.com/@kaushalvishal228/data-analytics-with-pyspark-b129695f9be2)
    - [Data transformation in Pyspark - Jones n tongana](https://medium.com/@jonesntongana345/data-transformation-in-pyspark-a-beginners-guide-42cc284c9d66)
    - [Feature Engineering - Roshmita Dey](https://medium.com/@roshmitadey/feature-engineering-in-pyspark-techniques-for-data-transformation-and-model-improvement-30c0cda4969f#:~:text=Data%20Preprocessing%20and%20Cleaning,and%20filtering%20out%20irrelevant%20data.)
  - Github
    - [Pyspark Tutorial - Coder2j](https://github.com/coder2j/pyspark-tutorial/blob/main/07-DataFrame-Operations.ipynb)
    - [Data Wrangling with pyspark - Amandeep Singh Khanna](https://amandeepsinghkhanna.github.io/data-wrangling-with-pyspark/)
    - [PySpark Tutorial - CERN](https://github.com/cerndb/SparkTraining/blob/master/notebooks/Tutorial-DataFrame.ipynb)
    - [PySpark Full Course - Ansh Lamba](https://github.com/anshlambagit/PySpark-Full-Course/blob/main/DATA%20and%20NOTEBOOK/1_Tutoral.ipynb)

  - Web
    - [Statology](https://www.statology.org/pyspark-guides/)
    - [Sparkcodehub](https://www.sparkcodehub.com/pyspark-tutorial)
    - [Databrewer](https://www.databrewer.co/pyspark-tutorial/getting-started)
    - [Sparkbyexamples](https://sparkbyexamples.com/pyspark-dataframe-tutorial-with-examples/)

  
- SparkSQL
  - Github 
    - [Spark SQL Tutorial - CERN](https://github.com/cerndb/SparkTraining/blob/master/notebooks/Tutorial-SparkSQL.ipynb)

- Other Resources
    - [Machine learning plus](https://www.machinelearningplus.com/category/pyspark/)
    - [data science parichay](https://datascienceparichay.com/career-guide/business-analyst/)
    - [Projectpro](https://www.projectpro.io/article/pyspark-learning-spark-with-python/554)
    - Analytics Vidhya
      - [Feature Engineering for ML](https://www.analyticsvidhya.com/blog/2021/03/step-by-step-process-of-feature-engineering-for-machine-learning-algorithms-in-data-science/)
    - Kaggle
      - [Pandas Notes](https://www.kaggle.com/code/vikasbz/pandas-dataframe-notes)