# 1. Creating DataFrame Manually with Hardcoded Values:

In [0]:
bdf = [(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David"), (5, "Eve")]
columns = ["ID", "Name"]

In [0]:
df = spark.createDataFrame(bdf, columns)
df.show()

+---+-------+
| ID|   Name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
|  4|  David|
|  5|    Eve|
+---+-------+



# 2. Creating DataFrame from Pandas: import pandas

In [0]:
import pandas as pd



In [0]:
pdf = pd.DataFrame(bdf, columns=columns)
pdf.head()

Unnamed: 0,ID,Name
0,1,Alice
1,2,Bob
2,3,Charlie
3,4,David
4,5,Eve


# 3. Create DataFrame from Dictionary:

In [0]:
ddict = [
  {"ID":1, 
  "Name" : "Alice"},
  {"ID":2, 
  "Name" : "Bob"},
  {"ID":3, 
  "Name" : "Ava"}
  ]

dfdict = spark.createDataFrame(ddict)

dfdict.show()

+---+-----+
| ID| Name|
+---+-----+
|  1|Alice|
|  2|  Bob|
|  3|  Ava|
+---+-----+



# 4. Create Empty DataFrame:

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
schema = StructType([
    StructField("ID", IntegerType(), True), 
    StructField("Name", StringType(), True)
])

emptydf = spark.createDataFrame([], schema)
emptydf.show()

+---+----+
| ID|Name|
+---+----+
+---+----+



# 5. Creating DataFrame from Structured Data (CSV, JSON, Parquet)

## Reading CSV file into DataFrame

In [0]:
dfcsv = spark.read.csv("dbfs:/FileStore/Stocks20_10Years.csv", header=True, inferSchema=True)
dfcsv.show()

+-------+----------+---------+---------+---------+--------+---------+----------+------------+-------------------+-------------+-----------+----------+
| Symbol|      Date|PrevClose|OpenPrice|HighPrice|LowPrice|LastPrice|ClosePrice|AveragePrice|TotalTradedQuantity| TurnoverInRs|No.ofTrades|    Sector|
+-------+----------+---------+---------+---------+--------+---------+----------+------------+-------------------+-------------+-----------+----------+
|SIEMENS|2014-11-21|    898.1|    902.8|    912.0|   891.6|    908.0|    906.85|      904.29|             202861|1.834459894E8|       5561|Technology|
|SIEMENS|2014-11-24|   906.85|   911.05|   939.35|  908.05|    937.3|     935.1|      926.69|             362855|3.362550064E8|      12234|Technology|
|SIEMENS|2014-11-25|    935.1|    940.0|    942.0|   894.0|    911.0|    916.95|      913.88|             615328|5.623351183E8|      26039|Technology|
|SIEMENS|2014-11-26|   916.95|    909.7|    928.3|   900.0|    902.0|     903.4|      913.13| 

## Reading JSON file into DataFrame

In [0]:
dfjson = spark.read.json("dbfs:/FileStore/data-helpers-spark-labs-2023/data/music.json")
dfjson.show()

+---------------+--------+-----+-------------------+
|         artist|category|price|              title|
+---------------+--------+-----+-------------------+
|        Madonna|     Pop|10.97|       Ray of Light|
|    Miley Cyrus|     Pop|12.99|      Wrecking Ball|
|      Aerosmith|    Rock|14.97|           Big Ones|
|Dexter Freebish|     Rap|14.99|A Life of Saturdays|
| Kendrick Lamar|     Rap|17.99|                DNA|
|      Van Halen|    Rock|11.97|               1984|
|      Lady Gaga|     Pop|16.99|           The Cure|
+---------------+--------+-----+-------------------+



# show() Function in PySpark DataFrames

In [0]:
dfjson.show(n=10,truncate=10, vertical=False)

+----------+--------+-----+----------+
|    artist|category|price|     title|
+----------+--------+-----+----------+
|   Madonna|     Pop|10.97|Ray of ...|
|Miley C...|     Pop|12.99|Wreckin...|
| Aerosmith|    Rock|14.97|  Big Ones|
|Dexter ...|     Rap|14.99|A Life ...|
|Kendric...|     Rap|17.99|       DNA|
| Van Halen|    Rock|11.97|      1984|
| Lady Gaga|     Pop|16.99|  The Cure|
+----------+--------+-----+----------+



In [0]:
dfjson.show(n=5,truncate=25, vertical=True)

-RECORD 0-----------------------
 artist   | Madonna             
 category | Pop                 
 price    | 10.97               
 title    | Ray of Light        
-RECORD 1-----------------------
 artist   | Miley Cyrus         
 category | Pop                 
 price    | 12.99               
 title    | Wrecking Ball       
-RECORD 2-----------------------
 artist   | Aerosmith           
 category | Rock                
 price    | 14.97               
 title    | Big Ones            
-RECORD 3-----------------------
 artist   | Dexter Freebish     
 category | Rap                 
 price    | 14.99               
 title    | A Life of Saturdays 
-RECORD 4-----------------------
 artist   | Kendrick Lamar      
 category | Rap                 
 price    | 17.99               
 title    | DNA                 
only showing top 5 rows



# Loading Data from CSV File into a DataFrame

## 1. Import Required Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

## 2. Define the Schema

In [0]:
schems = StructType([
  StructField("ID", IntegerType(), True)
])

In [0]:
dfjson.printSchema()

root
 |-- artist: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)
 |-- title: string (nullable = true)



In [0]:
display(dfjson)

artist,category,price,title
Madonna,Pop,10.97,Ray of Light
Miley Cyrus,Pop,12.99,Wrecking Ball
Aerosmith,Rock,14.97,Big Ones
Dexter Freebish,Rap,14.99,A Life of Saturdays
Kendrick Lamar,Rap,17.99,DNA
Van Halen,Rock,11.97,1984
Lady Gaga,Pop,16.99,The Cure


### Interview Question: How Does inferSchema Work?
- Behind the Scenes: When you use inferSchema, Spark runs a job that scans the CSV file from top to bottom to identify the best-suited data type for each column based on the values it encounters.
Does It Make Sense to Use inferSchema?

#### Pros:
- Useful when the schema of the file keeps changing, as it allows Spark to automatically detect the data types.

#### Cons:
- **Performance Impact**: Spark must scan the entire file, which can take extra time, especially for large files.

- **Loss of Control:** You lose the ability to explicitly define the schema, which may lead to incorrect data types if the data is inconsistent.

## 1. Defining Schema Programmatically with StructType

In [0]:
from pyspark.sql.types import *

In [0]:
employeeschema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Salary", DoubleType(), True),
    StructField("Joining_Date", StringType(),True),
    StructField("Department", StringType(), True),
    StructField("Performance_Rating", IntegerType(), True),
    StructField("Email", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("Phone", StringType(), True)
])

In [0]:
df = spark.read.csv("dbfs:/FileStore/PySpark0toHero/large_employee_data.csv", header=True, schema=employeeschema)
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: double (nullable = true)
 |-- Joining_Date: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Performance_Rating: integer (nullable = true)
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Phone: string (nullable = true)



In [0]:
display(df)

ID,Name,Age,Salary,Joining_Date,Department,Performance_Rating,Email,Address,Phone
1,Jessica Smith,50,113764.26,2023-08-06,Engineering,5,austin80@peterson.org,"5311 Kevin Island, Randolphville, VA 25633",+1-059-527-4077x4981
2,Richard Lang,29,75042.96,2020-04-30,Marketing,5,odavis@yahoo.com,"024 Carol Ferry, Mirandaville, MA 28286",(067)974-4914x924
3,Charles Griffin,27,112415.4,2021-01-09,HR,4,nancygallagher@hotmail.com,"947 Rachel Coves Apt. 626, Gregoryhaven, WY 53913",+1-755-052-0789
4,Cindy Henderson,36,46902.56,2020-09-15,Sales,4,tylerbrown@robinson-marshall.net,"04016 Jennifer Port Suite 537, South Jasmineview, ID 44996",+1-542-124-8110x625
5,Jennifer Skinner,47,105806.8,2025-01-10,Finance,1,dmccoy@yahoo.com,"138 Olson Viaduct, Perezberg, AL 30703",001-322-802-9445x243
6,Jay Davis,49,49205.93,2020-04-05,Marketing,4,william77@gmail.com,"566 Johnson Squares, Millerport, NV 63868",+1-799-609-0368x64784
7,Melissa Williams,38,47783.91,2021-02-15,Finance,4,tatejeffery@gmail.com,"656 Abigail Meadow Apt. 185, North Teresa, CA 89004",(962)590-6578
8,Kristen Jacobs,46,116546.52,2020-03-23,HR,5,wgomez@hotmail.com,"Unit 6222 Box 5809, DPO AP 60259",+1-221-426-2441
9,Spencer Henson,51,116163.07,2021-02-09,Sales,2,samantha16@moran.com,"0846 Floyd Fork Apt. 362, Huangmouth, AL 39281",958.724.0047x917
10,Robin Bryant,49,54309.67,2021-03-15,Finance,2,zimmermanchristopher@yahoo.com,"39763 Kyle Loop Apt. 996, Port Thomas, MN 92667",+1-977-396-3526


# PySpark Column Selection & Manipulation: Key Techniques

### 1. Different Methods to Select Columns

In [0]:
from pyspark.sql.functions import col

In [0]:
df.select(col("Name")).show()

+-----------------+
|             Name|
+-----------------+
|    Jessica Smith|
|     Richard Lang|
|  Charles Griffin|
|  Cindy Henderson|
| Jennifer Skinner|
|        Jay Davis|
| Melissa Williams|
|   Kristen Jacobs|
|   Spencer Henson|
|     Robin Bryant|
|     Charles Moss|
|      Ashley Cruz|
|     Andrew Hodge|
|     Sherry Lopez|
|   Jessica Pierce|
|  Gregory Johnson|
|   Margaret Davis|
|Danielle Gonzalez|
|    Breanna Davis|
|     Andrew Mccoy|
+-----------------+
only showing top 20 rows



In [0]:
df.select("Salary").show()

+---------+
|   Salary|
+---------+
|113764.26|
| 75042.96|
| 112415.4|
| 46902.56|
| 105806.8|
| 49205.93|
| 47783.91|
|116546.52|
|116163.07|
| 54309.67|
| 87546.07|
| 91598.35|
| 79847.49|
|111891.14|
| 91420.82|
| 94524.62|
| 40896.36|
| 63401.06|
| 65131.38|
|109317.08|
+---------+
only showing top 20 rows



### 2. Selecting Multiple Columns Together

In [0]:
df.select("ID","Name", "Email", col("Salary")).show()

+---+-----------------+--------------------+---------+
| ID|             Name|               Email|   Salary|
+---+-----------------+--------------------+---------+
|  1|    Jessica Smith|austin80@peterson...|113764.26|
|  2|     Richard Lang|    odavis@yahoo.com| 75042.96|
|  3|  Charles Griffin|nancygallagher@ho...| 112415.4|
|  4|  Cindy Henderson|tylerbrown@robins...| 46902.56|
|  5| Jennifer Skinner|    dmccoy@yahoo.com| 105806.8|
|  6|        Jay Davis| william77@gmail.com| 49205.93|
|  7| Melissa Williams|tatejeffery@gmail...| 47783.91|
|  8|   Kristen Jacobs|  wgomez@hotmail.com|116546.52|
|  9|   Spencer Henson|samantha16@moran.com|116163.07|
| 10|     Robin Bryant|zimmermanchristop...| 54309.67|
| 11|     Charles Moss|    karl63@yahoo.com| 87546.07|
| 12|      Ashley Cruz|gloria41@hotmail.com| 91598.35|
| 13|     Andrew Hodge|darrylrodriguez@y...| 79847.49|
| 14|     Sherry Lopez|moorejennifer@smi...|111891.14|
| 15|   Jessica Pierce|  justin70@yahoo.com| 91420.82|
| 16|  Gre

In [0]:
df.columns

Out[24]: ['ID',
 'Name',
 'Age',
 'Salary',
 'Joining_Date',
 'Department',
 'Performance_Rating',
 'Email',
 'Address',
 'Phone']

### 4.Renaming Columns with alias()

In [0]:
from pyspark.sql.functions import column

In [0]:
df.select(col("Name").alias("Employeename"), 
          col("Salary").alias("EmployeeSalary"),
          column("Department"),
          df.Joining_Date).show()

+-----------------+--------------+-----------+------------+
|     Employeename|EmployeeSalary| Department|Joining_Date|
+-----------------+--------------+-----------+------------+
|    Jessica Smith|     113764.26|Engineering|  2023-08-06|
|     Richard Lang|      75042.96|  Marketing|  2020-04-30|
|  Charles Griffin|      112415.4|         HR|  2021-01-09|
|  Cindy Henderson|      46902.56|      Sales|  2020-09-15|
| Jennifer Skinner|      105806.8|    Finance|  2025-01-10|
|        Jay Davis|      49205.93|  Marketing|  2020-04-05|
| Melissa Williams|      47783.91|    Finance|  2021-02-15|
|   Kristen Jacobs|     116546.52|         HR|  2020-03-23|
|   Spencer Henson|     116163.07|      Sales|  2021-02-09|
|     Robin Bryant|      54309.67|    Finance|  2021-03-15|
|     Charles Moss|      87546.07|Engineering|  2023-09-02|
|      Ashley Cruz|      91598.35|      Sales|  2023-10-30|
|     Andrew Hodge|      79847.49|         HR|  2022-09-01|
|     Sherry Lopez|     111891.14|    Fi

### 5. Using selectExpr() for Concise Column Selection

selectExpr() allows you to use SQL expressions directly and rename columns concisely:

In [0]:
df.selectExpr("Name as EmployeeName", "Salary as Compensation", "Department").show()

+-----------------+------------+-----------+
|     EmployeeName|Compensation| Department|
+-----------------+------------+-----------+
|    Jessica Smith|   113764.26|Engineering|
|     Richard Lang|    75042.96|  Marketing|
|  Charles Griffin|    112415.4|         HR|
|  Cindy Henderson|    46902.56|      Sales|
| Jennifer Skinner|    105806.8|    Finance|
|        Jay Davis|    49205.93|  Marketing|
| Melissa Williams|    47783.91|    Finance|
|   Kristen Jacobs|   116546.52|         HR|
|   Spencer Henson|   116163.07|      Sales|
|     Robin Bryant|    54309.67|    Finance|
|     Charles Moss|    87546.07|Engineering|
|      Ashley Cruz|    91598.35|      Sales|
|     Andrew Hodge|    79847.49|         HR|
|     Sherry Lopez|   111891.14|    Finance|
|   Jessica Pierce|    91420.82|    Finance|
|  Gregory Johnson|    94524.62|      Sales|
|   Margaret Davis|    40896.36|Engineering|
|Danielle Gonzalez|    63401.06|  Marketing|
|    Breanna Davis|    65131.38|  Marketing|
|     Andr

Summary
- Use col(), column(), or string names to select columns.
- Use expr() and selectExpr() for SQL-like expressions and renaming.
- Use alias() to rename columns.
- Get the list of columns using df.columns.

# PySpark DataFrame Manipulation part 2: Adding, Renaming, and Dropping Columns

## 1. Adding New Columns with withColumn()

### Add a constant value column:

In [0]:
from pyspark.sql import functions as F

newdf = df.withColumn("NewColumn", F.lit(1))
newdf.show(4)

+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+---------+
| ID|           Name|Age|   Salary|Joining_Date| Department|Performance_Rating|               Email|             Address|               Phone|NewColumn|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+---------+
|  1|  Jessica Smith| 50|113764.26|  2023-08-06|Engineering|                 5|austin80@peterson...|5311 Kevin Island...|+1-059-527-4077x4981|        1|
|  2|   Richard Lang| 29| 75042.96|  2020-04-30|  Marketing|                 5|    odavis@yahoo.com|024 Carol Ferry, ...|   (067)974-4914x924|        1|
|  3|Charles Griffin| 27| 112415.4|  2021-01-09|         HR|                 4|nancygallagher@ho...|947 Rachel Coves ...|     +1-755-052-0789|        1|
|  4|Cindy Henderson| 36| 46902.56|  2020-09-15|      Sales|                 4|tyl

### Add a column based on an expression:

In [0]:
newdf = df.withColumn("Age>30", F.expr("Age > 30"))
newdf.show(4)

+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+------+
| ID|           Name|Age|   Salary|Joining_Date| Department|Performance_Rating|               Email|             Address|               Phone|Age>30|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+------+
|  1|  Jessica Smith| 50|113764.26|  2023-08-06|Engineering|                 5|austin80@peterson...|5311 Kevin Island...|+1-059-527-4077x4981|  true|
|  2|   Richard Lang| 29| 75042.96|  2020-04-30|  Marketing|                 5|    odavis@yahoo.com|024 Carol Ferry, ...|   (067)974-4914x924| false|
|  3|Charles Griffin| 27| 112415.4|  2021-01-09|         HR|                 4|nancygallagher@ho...|947 Rachel Coves ...|     +1-755-052-0789| false|
|  4|Cindy Henderson| 36| 46902.56|  2020-09-15|      Sales|                 4|tylerbrown@robins...|

## 2. Renaming Columns with withColumnRenamed()

### Renaming Column

In [0]:
df.columns

Out[30]: ['ID',
 'Name',
 'Age',
 'Salary',
 'Joining_Date',
 'Department',
 'Performance_Rating',
 'Email',
 'Address',
 'Phone']

In [0]:
newdf = df.withColumnRenamed("Email", "EmailID")

newdf.show(3)

+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+
| ID|           Name|Age|   Salary|Joining_Date| Department|Performance_Rating|             EmailID|             Address|               Phone|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+
|  1|  Jessica Smith| 50|113764.26|  2023-08-06|Engineering|                 5|austin80@peterson...|5311 Kevin Island...|+1-059-527-4077x4981|
|  2|   Richard Lang| 29| 75042.96|  2020-04-30|  Marketing|                 5|    odavis@yahoo.com|024 Carol Ferry, ...|   (067)974-4914x924|
|  3|Charles Griffin| 27| 112415.4|  2021-01-09|         HR|                 4|nancygallagher@ho...|947 Rachel Coves ...|     +1-755-052-0789|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+

## 3. Dropping Columns with drop()

In [0]:
df2 = df.drop("Phone")
df2.show(3)

+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+
| ID|           Name|Age|   Salary|Joining_Date| Department|Performance_Rating|               Email|             Address|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+
|  1|  Jessica Smith| 50|113764.26|  2023-08-06|Engineering|                 5|austin80@peterson...|5311 Kevin Island...|
|  2|   Richard Lang| 29| 75042.96|  2020-04-30|  Marketing|                 5|    odavis@yahoo.com|024 Carol Ferry, ...|
|  3|Charles Griffin| 27| 112415.4|  2021-01-09|         HR|                 4|nancygallagher@ho...|947 Rachel Coves ...|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+
only showing top 3 rows



### Drop multiple columns:

In [0]:
df3 = df.drop("Phone", "Email")
df3.show(3)

+---+---------------+---+---------+------------+-----------+------------------+--------------------+
| ID|           Name|Age|   Salary|Joining_Date| Department|Performance_Rating|             Address|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+
|  1|  Jessica Smith| 50|113764.26|  2023-08-06|Engineering|                 5|5311 Kevin Island...|
|  2|   Richard Lang| 29| 75042.96|  2020-04-30|  Marketing|                 5|024 Carol Ferry, ...|
|  3|Charles Griffin| 27| 112415.4|  2021-01-09|         HR|                 4|947 Rachel Coves ...|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+
only showing top 3 rows



## 4. Immutability of DataFrames

In Spark, DataFrames are immutable by nature. This means that after creating a DataFrame, its contents cannot be changed. All transformations like adding, renaming, or dropping columns result in a new DataFrame, keeping the original one intact.

**Key Points**
- Use withColumn() for adding columns, with lit() for constant values and expressions for computed values.
- Use withColumnRenamed() to rename columns and backticks for special characters or spaces.
- Use drop() to remove one or more columns.
- DataFrames are immutable in Spark—transformations result in new DataFrames, leaving the original unchanged.

# Day2 - 19-02-2025

# 1. Changing Data Types (Schema Transformation)

In [0]:
from pyspark.sql.functions import col

In [0]:
df.printSchema()

In [0]:
df = df.withColumn("Salary", col("Salary").cast("double"))

df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: double (nullable = true)
 |-- Joining_Date: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Performance_Rating: integer (nullable = true)
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Phone: string (nullable = true)



In [0]:
df = df.withColumn("Phone").cast("string")

df.printSchema()

# 2. Filtering Data

In [0]:
df.count()

Out[40]: 50000

In [0]:
filtered_df = df.filter(col("Salary")>75000.0)
filtered_df.count()

Out[43]: 25117

In [0]:
filtered_df_age = df.filter(col("Age")> 35)
filtered_df_age.count()

Out[44]: 32041

In [0]:
filtered_df_null = df.filter(df["Age"].isNotNull()) 
filtered_df_null.count()

Out[46]: 50000

# 3. Multiple Filters (Chaining Conditions)

In [0]:
df.show(3)

+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+
| ID|           Name|Age|   Salary|Joining_Date| Department|Performance_Rating|               Email|             Address|               Phone|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+
|  1|  Jessica Smith| 50|113764.26|  2023-08-06|Engineering|                 5|austin80@peterson...|5311 Kevin Island...|+1-059-527-4077x4981|
|  2|   Richard Lang| 29| 75042.96|  2020-04-30|  Marketing|                 5|    odavis@yahoo.com|024 Carol Ferry, ...|   (067)974-4914x924|
|  3|Charles Griffin| 27| 112415.4|  2021-01-09|         HR|                 4|nancygallagher@ho...|947 Rachel Coves ...|     +1-755-052-0789|
+---+---------------+---+---------+------------+-----------+------------------+--------------------+--------------------+--------------------+

In [0]:
filtered_df_dept = df.filter((col("Department") == 'Engineering') & (col("Salary")>20000))
filtered_df_dept.count()

Out[53]: 10063

In [0]:
df['Address'].isNull()

Out[57]: Column<'(Address IS NULL)'>

# 5. Handling Unique or Distinct Data

In [0]:
unique_df = df.distinct()

unique_df.count()

Out[58]: 50000

In [0]:
unique_dept = df.select("Department").distinct()

unique_dept.show()

+-----------+
| Department|
+-----------+
|      Sales|
|Engineering|
|         HR|
|    Finance|
|  Marketing|
+-----------+



In [0]:
unique_df = df.dropDuplicates(["Email"]) 
unique_df.count()


Out[62]: 48779

In [0]:
# Remove duplicates based on both 'Phone' and 'Email'

unique_df_dupli = df.dropDuplicates(['Phone', 'Email'])
unique_df_dupli.count()

Out[64]: 50000

# 6. Counting Distinct Values

In [0]:
df.select("Department").distinct().count()

Out[65]: 5

In [0]:
df.select("Department", "Performance_Rating").distinct().count()

Out[68]: 25

Mastering PySpark DataFrame Operations
1. **Changing Data Types**: Easily modify column types using .cast(). E.g., change 'Salary' to double or 'Phone' to string for better data handling.
2. **Filtering Data**: Use .filter() or .where() to extract specific rows. For example, filter employees with a salary over 50,000 or non-null Age.
3. **Multiple Conditions**: Chain filters with & and | to apply complex conditions, such as finding employees over 30 in the IT department.
4. **Handling NULLs**: Use .isNull() and .isNotNull() to filter rows with missing or available values, such as missing addresses or valid emails.
5. **Unique/Distinct Values**: Use .distinct() to get unique rows or distinct values in a column. Remove duplicates based on specific fields like Email or Phone using .dropDuplicates().
6. **Count Distinct Values**: Count distinct values in one or multiple columns to analyze data diversity, such as counting unique departments or combinations of Department and Performance_Rating.

# Sample Data Creation

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, asc, concat, initcap, lower, upper, instr, length, lit

In [0]:
data = [
    ("USA", "North America", 100, 50.5),
    ("India", "Asia", 300, 20.0), 
    ("Germany", "Europe", 200, 30.5), 
    ("Australia", "Oceania", 150, 60.0), 
    ("Japan", "Asia", 120, 45.0), 
    ("Brazil", "South America", 180, 25.0)
]

In [0]:
columns = ["Country", "Region", "UnitsSold", "UnitPrice"]

df = spark.createDataFrame(data,columns)

df.show()

+---------+-------------+---------+---------+
|  Country|       Region|UnitsSold|UnitPrice|
+---------+-------------+---------+---------+
|      USA|North America|      100|     50.5|
|    India|         Asia|      300|     20.0|
|  Germany|       Europe|      200|     30.5|
|Australia|      Oceania|      150|     60.0|
|    Japan|         Asia|      120|     45.0|
|   Brazil|South America|      180|     25.0|
+---------+-------------+---------+---------+



## Sorting the DataFrame

In [0]:
# 1. Sort by a single column (ascending order):
df.orderBy("Country").show(5)

+---------+-------------+---------+---------+
|  Country|       Region|UnitsSold|UnitPrice|
+---------+-------------+---------+---------+
|Australia|      Oceania|      150|     60.0|
|   Brazil|South America|      180|     25.0|
|  Germany|       Europe|      200|     30.5|
|    India|         Asia|      300|     20.0|
|    Japan|         Asia|      120|     45.0|
+---------+-------------+---------+---------+
only showing top 5 rows



In [0]:
# 2. Sort by multiple columns:
df.orderBy("Country", "UnitsSold").show()

+---------+-------------+---------+---------+
|  Country|       Region|UnitsSold|UnitPrice|
+---------+-------------+---------+---------+
|Australia|      Oceania|      150|     60.0|
|   Brazil|South America|      180|     25.0|
|  Germany|       Europe|      200|     30.5|
|    India|         Asia|      300|     20.0|
|    Japan|         Asia|      120|     45.0|
|      USA|North America|      100|     50.5|
+---------+-------------+---------+---------+



In [0]:
df.orderBy(desc("Country")).limit(3).show()

+-------+-------------+---------+---------+
|Country|       Region|UnitsSold|UnitPrice|
+-------+-------------+---------+---------+
|    USA|North America|      100|     50.5|
|  Japan|         Asia|      120|     45.0|
|  India|         Asia|      300|     20.0|
+-------+-------------+---------+---------+



## 3. Sort by a column in descending order and limit:

In [0]:
df.createOrReplaceTempView("df")

In [0]:
df.show()

+---------+-------------+---------+---------+
|  Country|       Region|UnitsSold|UnitPrice|
+---------+-------------+---------+---------+
|      USA|North America|      100|     50.5|
|    India|         Asia|      300|     20.0|
|  Germany|       Europe|      200|     30.5|
|Australia|      Oceania|      150|     60.0|
|    Japan|         Asia|      120|     45.0|
|   Brazil|South America|      180|     25.0|
+---------+-------------+---------+---------+



In [0]:
%sql
show tables;

database,tableName,isTemporary
,df,True


In [0]:
df.columns

Out[86]: ['Country', 'Region', 'UnitsSold', 'UnitPrice']

In [0]:
%sql
insert into df values ("Sri Lanka", 'South Asia', 125, 26.25);

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2308204184109311>:7[0m
[1;32m      5[0m     display(df)
[1;32m      6[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 7[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m      8[0m [38;5;28;01mfinally[39;00m:
[1;32m      9[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-2308204184109311>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;

In [0]:
spark.sql("insert into df values ('Sri Lanka', 'South Asia', 125, 26.00);")
df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2308204184109312>:1[0m
[0;32m----> 1[0m spark[38;5;241m.[39msql([38;5;124m"[39m[38;5;124minsert into df values ([39m[38;5;124m'[39m[38;5;124mSri Lanka[39m[38;5;124m'[39m[38;5;124m, [39m[38;5;124m'[39m[38;5;124mSouth Asia[39m[38;5;124m'[39m[38;5;124m, 125, 26.00);[39m[38;5;124m"[39m)
[1;32m      2[0m df[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m

# Day2 - 19-02-2025 - 22

# String Functions