Initialise The PySpark Environment

In [22]:
# Create and initialise a PySpark session using the PySpark API
import os
from pyspark.sql import SparkSession # type: ignore
from pyspark.sql import Window
from pyspark.sql.types import * # type: ignore
from pyspark.sql.functions import * # type: ignore
spark = SparkSession.builder.appName("MyPySparkAutomatic").getOrCreate()

# Your PySpark session is now created and you can use it to read data, perform transformations, and write data.
print("SparkSession created successfully!")




SparkSession created successfully!


Read The CSV Data(DataFrame: flight_df_1)

In [23]:
# Read the CSV file into a Spark DataFrame
file_path = r"C:\Users\Shivam Gupta\OneDrive\Documents\Shivam_Developement\PYTHON\python_tutorial\2015-summary.csv"
flight_df_1 = spark.read.format("csv") \
	.option("header", "true") \
	.option("inferSchema", "true") \
	.option("mode", "PERMISSIVE") \
	.load(file_path)

# Show the DataFrame
flight_df_1.show(n=flight_df_1.count(), truncate=False)

#Show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", flight_df_1.count())

# Print the schema of the DataFrame
flight_df_1.printSchema()

+-------------+--------------+-----------+
|COUNTRY_1    |COUNTRY_2     |TOTAL_COUNT|
+-------------+--------------+-----------+
|United States|Canada        |15         |
|India        |United Kingdom|20         |
|Germany      |France        |abc        |
|Brazil       |Argentina     |35         |
|South Africa |Australia     |NULL       |
|NULL         |New Zealand   |50         |
|Japan        |South Korea   |60         |
+-------------+--------------+-----------+

Total number of rows in the DataFrame: 7
root
 |-- COUNTRY_1: string (nullable = true)
 |-- COUNTRY_2: string (nullable = true)
 |-- TOTAL_COUNT: string (nullable = true)



Create The Manual Schema(DataFrame: flight_df_2)

In [24]:

# File paths
file_path_0 = r"C:\Users\Shivam Gupta\OneDrive\Documents\Shivam_Developement\PYTHON\python_tutorial"
file_path_1 = os.path.join(file_path_0, "2015-summary.csv")
file_path_2 = os.path.join(file_path_0, "bad_records")

# Create a Manual schema for the DataFrame
my_schema = StructType([StructField("COUNTRY_1", StringType(), True),
                        StructField("COUNTRY_2", StringType(), True),
                        StructField("TOTAL_COUNT", IntegerType(), True),
                        StructField("_corrupt_record", StringType(), True)
                        ])
                        
# Read the CSV file into a Spark DataFrame with the manual schema
# Read CSV with schema and capture bad records
flight_df_2 = spark.read.format("csv") \
    .option("header", "true") \
    .schema(my_schema) \
    .option("mode", "PERMISSIVE") \
    .load(file_path_1)

#Shows the dataframe with bad records
flight_df_2.show(n=flight_df_2.count(), truncate=False)

# Filter and write bad records manually
bad_df = flight_df_2.filter("`_corrupt_record` IS NOT NULL")
bad_df.write.mode("overwrite").json(file_path_2)


# List the contents of the directory
import os
print("Contents of the bad records directory:")
if os.path.exists(file_path_2):
    for item in os.listdir(file_path_2):
        print(item)
else:
    print("No bad records directory found.")



+-------------+--------------+-----------+------------------+
|COUNTRY_1    |COUNTRY_2     |TOTAL_COUNT|_corrupt_record   |
+-------------+--------------+-----------+------------------+
|United States|Canada        |15         |NULL              |
|India        |United Kingdom|20         |NULL              |
|Germany      |France        |NULL       |Germany,France,abc|
|Brazil       |Argentina     |35         |NULL              |
|South Africa |Australia     |NULL       |NULL              |
|NULL         |New Zealand   |50         |NULL              |
|Japan        |South Korea   |60         |NULL              |
+-------------+--------------+-----------+------------------+

Contents of the bad records directory:
.part-00000-35dc1ebc-8753-4c03-9980-66477141c1a9-c000.json.crc
._SUCCESS.crc
part-00000-35dc1ebc-8753-4c03-9980-66477141c1a9-c000.json
_SUCCESS


How to read json file in pyspark(DataFrame: people_df)

In [25]:
# File paths
file_path_0 = r"C:\Users\Shivam Gupta\OneDrive\Documents\Shivam_Developement\PYTHON\python_tutorial"
file_path_1 = os.path.join(file_path_0, "2015-summary.csv")
file_path_2 = os.path.join(file_path_0, "bad_records")
file_path_3 = os.path.join(file_path_0, "multiline.json")

people_df=spark.read.format("json") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiline", "true") \
    .option("mode", "PERMISSIVE") \
    .load(file_path_3)
# Show the DataFrame
people_df.show(n=people_df.count(), truncate=False)
# Show the total number of rows in the DataFrame
people_df.count()



+----+------+-------+------+
|age |gender|name   |salary|
+----+------+-------+------+
|30  |NULL  |Alice  |70000 |
|25  |NULL  |Bob    |50000 |
|35  |NULL  |Charlie|60000 |
|40  |NULL  |David  |80000 |
|28  |NULL  |Eva    |55000 |
|22  |Female|NULL   |45000 |
|NULL|Male  |Frank  |75000 |
|29  |Female|Grace  |65000 |
+----+------+-------+------+



8

How to read Parquet file in pyspark(DataFrame: people_df_2)

In [26]:
file_path_0 = r"C:\Users\Shivam Gupta\OneDrive\Documents\Shivam_Developement\PYTHON\python_tutorial"
file_path_4 = os.path.join(file_path_0, "part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet")

parquet_df = spark.read.format("parquet") \
    .load(file_path_4)
# Show the DataFrame
parquet_df.show(n=parquet_df.count(), truncate=False)
# Show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", parquet_df.count())


+---------------------------------+---------------------------------+------+
|DEST_COUNTRY_NAME                |ORIGIN_COUNTRY_NAME              |count |
+---------------------------------+---------------------------------+------+
|United States                    |Romania                          |1     |
|United States                    |Ireland                          |264   |
|United States                    |India                            |69    |
|Egypt                            |United States                    |24    |
|Equatorial Guinea                |United States                    |1     |
|United States                    |Singapore                        |25    |
|United States                    |Grenada                          |54    |
|Costa Rica                       |United States                    |477   |
|Senegal                          |United States                    |29    |
|United States                    |Marshall Islands                 |44    |

Finding Metadata Information In Parquet File

In [27]:
########################################################################################################################################################
import pandas as pd
import pyarrow.parquet as pq

# Path to your .parquet file
file_path = r"C:\Users\Shivam Gupta\OneDrive\Documents\Shivam_Developement\PYTHON\python_tutorial\part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet"

try:
    # Read the whole Parquet file
    df = pd.read_parquet(file_path, engine="pyarrow")

    # ✅ Display all rows and columns
    with pd.option_context('display.max_rows', None, 'display.max_columns', None):
        print(df)

    # 🔽 Optionally, export to CSV for easier viewing
    #output_csv = file_path.replace(".parquet", ".csv")
    #df.to_csv(output_csv, index=False)
    #print(f"\n✅ Full data exported to: {output_csv}")

except Exception as e:
    print("❌ Error reading Parquet file:")
    print(e)



# Load the Parquet file
parquet_file = pq.ParquetFile(
    r"C:\Users\Shivam Gupta\OneDrive\Documents\Shivam_Developement\PYTHON\python_tutorial\part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet"
)

print("File metadata:")
print(parquet_file.metadata)

print("\nFirst row group metadata:")
print(parquet_file.metadata.row_group(0))

print("\nFirst column in first row group:")
print(parquet_file.metadata.row_group(0).column(0))

print("\nColumn statistics:")
print(parquet_file.metadata.row_group(0).column(0).statistics)



                     DEST_COUNTRY_NAME                ORIGIN_COUNTRY_NAME  \
0                        United States                            Romania   
1                        United States                            Ireland   
2                        United States                              India   
3                                Egypt                      United States   
4                    Equatorial Guinea                      United States   
5                        United States                          Singapore   
6                        United States                            Grenada   
7                           Costa Rica                      United States   
8                              Senegal                      United States   
9                        United States                   Marshall Islands   
10                              Guyana                      United States   
11                       United States                       Sint Maarten   

How To Write Dataframe on disk(dataframe:parquet_df)

In [28]:
file_path_0 = r"C:\Users\Shivam Gupta\OneDrive\Documents\Shivam_Developement\PYTHON\python_tutorial"
file_path_4 = os.path.join(file_path_0, "part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet")
# Example: Write the parquet_df DataFrame to Parquet format
parquet_df.repartition(2).write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(os.path.join(file_path_0, "parquet_df_write_repartioned_2"))


Implementing Partitioning & Bucketing in Pyspark

In [29]:
# Write the DataFrame to a CSV file Using Partitioning
parquet_df.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .partitionBy("DEST_COUNTRY_NAME") \
    .save(os.path.join(file_path_0, "parquet_df_write_partition_by_Example"))

# Write the DataFrame to a Parquet table Using Bucketing (CSV does not support bucketing)
"""
parquet_df.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .bucketBy(2, "") \
    .saveAsTable("parquet_df_write_bucket_by_Example")
"""

'\nparquet_df.write.format("csv")     .mode("overwrite")     .option("header", "true")     .bucketBy(2, "")     .saveAsTable("parquet_df_write_bucket_by_Example")\n'

Tranformation in PySpark:How To Create Dataframe API
             


In [30]:
#################################################DATA ENGINEERING PIPLINE######################################################################
# READ----------------------------------------TRANSFORM----------------------------------------WRITE#
                                #DataFrame API--------------------SPARK SQL#

##Create a DataFrame using the DataFrame API for pe4rforming transformations in PySpark
# Create data for dataframe
data = [(1, 1),(2, 1),(3, 1),(4, 2),(5, 1),(6, 2),(7, 2)]

# Create a schema DataFrame
columns = ["id", "num"]
# Create a DataFrame using the data and schema
example_df = spark.createDataFrame(data, columns)
# Show the DataFrame
example_df.show()
example_df.printSchema()
print(example_df.columns) #it is an attriburt not callable function in pyspark
example_df.count()  


+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  1|
|  3|  1|
|  4|  2|
|  5|  1|
|  6|  2|
|  7|  2|
+---+---+

root
 |-- id: long (nullable = true)
 |-- num: long (nullable = true)

['id', 'num']


7

Transformation in PySpark:Using Select Method

In [31]:
file_path = r"C:\Users\Shivam Gupta\OneDrive\Documents\Shivam_Developement\PYTHON\python_tutorial\employee_data.csv"
employee_df = spark.read.format("csv") \
	.option("header", "true") \
	.option("inferSchema", "true") \
	.option("mode", "PERMISSIVE") \
	.load(file_path)

# Create a Transformation using the DataFrame API & storing in another DataFrame as variable
employee_df_1 = employee_df.select("id", "salary", (col("id") + 5).alias("id_plus_5"), employee_df.gender, employee_df["address"])
employee_df_2 = employee_df.select(expr("id+5").alias("id_plus_5"), expr("salary*2").alias("salary_times_2"), expr("concat(name, address)").alias("name_address"))

#show the DataFrame
employee_df_1.show(truncate=False)
employee_df_2.show(truncate=False)





+---+------+---------+------+-------+
|id |salary|id_plus_5|gender|address|
+---+------+---------+------+-------+
|1  |75000 |6        |m     |INDIA  |
|2  |100000|7        |f     |USA    |
|3  |150000|8        |m     |INDIA  |
|4  |200000|9        |m     |JAPAN  |
|5  |300000|10       |m     |USA    |
|6  |300000|11       |m     |INDIA  |
|7  |540000|12       |m     |USA    |
|8  |70000 |13       |m     |JAPAN  |
|9  |150000|14       |m     |JAPAN  |
|10 |25000 |15       |f     |RUSSIA |
|11 |35000 |16       |f     |INDIA  |
|12 |200000|17       |f     |INDIA  |
|13 |650000|18       |m     |USA    |
|14 |95000 |19       |m     |RUSSIA |
|15 |750000|20       |m     |INDIA  |
+---+------+---------+------+-------+

+---------+--------------+-------------+
|id_plus_5|salary_times_2|name_address |
+---------+--------------+-------------+
|6        |150000        |ManishINDIA  |
|7        |200000        |NikitaUSA    |
|8        |300000        |PritamINDIA  |
|9        |400000        |Prant

Transformation in PySpark Using Spark SQL:Query using the select statement 

In [32]:
#Crating a temporary view for the DataFrame
employee_df.createOrReplaceTempView("employee_tbl")
# Create a SQL query to select the desired columns
employee_tbl_1=spark.sql("""select * from employee_tbl where salary > 70000""")
employee_tbl_1.show(truncate=False)

+---+--------+---+------+-------+------+
|id |name    |age|salary|address|gender|
+---+--------+---+------+-------+------+
|1  |Manish  |26 |75000 |INDIA  |m     |
|2  |Nikita  |23 |100000|USA    |f     |
|3  |Pritam  |22 |150000|INDIA  |m     |
|4  |Prantosh|17 |200000|JAPAN  |m     |
|5  |Vikash  |31 |300000|USA    |m     |
|6  |Rahul   |55 |300000|INDIA  |m     |
|7  |Raju    |67 |540000|USA    |m     |
|9  |Dev     |32 |150000|JAPAN  |m     |
|12 |Sweta   |43 |200000|INDIA  |f     |
|13 |Raushan |48 |650000|USA    |m     |
|14 |Mukesh  |36 |95000 |RUSSIA |m     |
|15 |Prakash |52 |750000|INDIA  |m     |
+---+--------+---+------+-------+------+



Transformation in Pyspark: Using Filter,Aliases,Literal,Casting,etc

In [33]:
#Alises the DataFrame
employee_df_4 = employee_df.select("id", "salary", (col("id") + 5).alias("id_plus_5"), employee_df.gender, employee_df["address"])

# Filtering the DataFrame using the DataFrame API
employee_df_5 = employee_df.filter(col("address") == "JAPAN")
employee_df_6 = employee_df.filter((col("address") == "JAPAN") & (col("salary") > 70000)) \
    .select("id", "salary", (col("id") + 5).alias("id_plus_5"))
employee_df_7 = employee_df.select("id", "salary", (col("id") + 5).alias("id_plus_5")).where("address = 'JAPAN' and salary > 70000")

#Literal function used to create a column with a constant value
employee_df_8 = employee_df.select("*", lit("Gupta").alias("last_name"))
employee_df_9= employee_df.withColumn("last_name", lit("Gupta"))

#Renaming the columns
employee_df_10 = (
    employee_df.withColumnRenamed("id", "emp_id")
    .withColumnRenamed("salary", "emp_salary")
    .withColumnRenamed("address", "emp_address")
    .withColumnRenamed("gender", "emp_gender")
    .withColumnRenamed("name", "emp_name")
    .withColumnRenamed("last_name", "emp_last_name")
    .withColumnRenamed("age", "emp_age")
)

#Casting the column
employee_df_11 = employee_df.withColumn("id", col("id").cast(StringType())).withColumn("salary", col("salary").cast("long"))

#Dropping the column
employee_df_12 = employee_df.drop("last_name", "age", "address", "gender", "name", "id",)

# Show the DataFrame
employee_df_4.show(truncate=False)
employee_df_5.show(truncate=False)
employee_df_6.show(truncate=False)
employee_df_7.show(truncate=False)
employee_df_8.show(truncate=False)
employee_df_9.show(truncate=False)
employee_df_10.show(truncate=False)
employee_df_11.printSchema()
employee_df_12.show(truncate=False)
employee_df.show(truncate=False)


+---+------+---------+------+-------+
|id |salary|id_plus_5|gender|address|
+---+------+---------+------+-------+
|1  |75000 |6        |m     |INDIA  |
|2  |100000|7        |f     |USA    |
|3  |150000|8        |m     |INDIA  |
|4  |200000|9        |m     |JAPAN  |
|5  |300000|10       |m     |USA    |
|6  |300000|11       |m     |INDIA  |
|7  |540000|12       |m     |USA    |
|8  |70000 |13       |m     |JAPAN  |
|9  |150000|14       |m     |JAPAN  |
|10 |25000 |15       |f     |RUSSIA |
|11 |35000 |16       |f     |INDIA  |
|12 |200000|17       |f     |INDIA  |
|13 |650000|18       |m     |USA    |
|14 |95000 |19       |m     |RUSSIA |
|15 |750000|20       |m     |INDIA  |
+---+------+---------+------+-------+

+---+--------+---+------+-------+------+
|id |name    |age|salary|address|gender|
+---+--------+---+------+-------+------+
|4  |Prantosh|17 |200000|JAPAN  |m     |
|8  |Praveen |28 |70000 |JAPAN  |m     |
|9  |Dev     |32 |150000|JAPAN  |m     |
+---+--------+---+------+------

Transformation in Pyspark: Union & Union All(Same in Datafreme API But Different in Spark SQL)

In [34]:
#Create a Data for manager1
data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17),
(18 ,'Sam',65000,   17)]
# Create a schema for the DataFrame
schema=['id', 'name', 'sal', 'mngr_id']
# Create a DataFrame using the DataFrame API
manager_df_1 = spark.createDataFrame(data, schema)
# Show the DataFrame
manager_df_1.show(truncate=False)
#show the schema of the DataFrame
manager_df_1.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", manager_df_1.count())





#create data for manager2
data1=[(19 ,'Sohan',50000, 18),
(20 ,'Sima',75000,  17)]
# Create a schema for the DataFrame
schema1=['id', 'name', 'sal', 'mngr_id']
# Create a DataFrame using the DataFrame API
manager_df_2 = spark.createDataFrame(data1, schema1)
# Show the DataFrame
manager_df_2.show(truncate=False)
#show the schema of the DataFrame
manager_df_2.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", manager_df_2.count())




#Union of two DataFrames
manager_df_union = manager_df_1.union(manager_df_2)
manager_df_unionAll= manager_df_1.unionAll(manager_df_2)
manager_df_unionByName= manager_df_1.unionByName(manager_df_2)

# Show the DataFrame
manager_df_union.show(truncate=False)
print("Total number of rows in the DataFrame:", manager_df_union.count())
manager_df_unionAll.show(truncate=False)
print("Total number of rows in the DataFrame:", manager_df_unionAll.count())
manager_df_unionByName.show(truncate=False)
print("Total number of rows in the DataFrame:", manager_df_unionByName.count())




+---+------+-----+-------+
|id |name  |sal  |mngr_id|
+---+------+-----+-------+
|10 |Anil  |50000|18     |
|11 |Vikas |75000|16     |
|12 |Nisha |40000|18     |
|13 |Nidhi |60000|17     |
|14 |Priya |80000|18     |
|15 |Mohit |45000|18     |
|16 |Rajesh|90000|10     |
|17 |Raman |55000|16     |
|18 |Sam   |65000|17     |
|18 |Sam   |65000|17     |
+---+------+-----+-------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- mngr_id: long (nullable = true)

Total number of rows in the DataFrame: 10
+---+-----+-----+-------+
|id |name |sal  |mngr_id|
+---+-----+-----+-------+
|19 |Sohan|50000|18     |
|20 |Sima |75000|17     |
+---+-----+-----+-------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- mngr_id: long (nullable = true)

Total number of rows in the DataFrame: 2
+---+------+-----+-------+
|id |name  |sal  |mngr_id|
+---+------+-----+-------+
|10 |Anil  |

Transformation in Pyspark: Case(if-else comaprison using when/otherwise)

In [35]:

# Create data for DataFrame

emp_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,None,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(None,None,None,None,None,None),
(7,'adam',37,65000,'us','IT')
]
# Create a schema for the DataFrame
schema = ['id', 'name', 'age', 'salary', 'address', 'department']
# Create a DataFrame using the DataFrame API
emp_df= spark.createDataFrame(emp_data, schema)
# Show the DataFrame
emp_df.show(truncate=False)
#show the schema of the DataFrame
emp_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", emp_df.count())



#Checking the Age of the employee if they are adult or not(otherwise).Assuming emp_df is your original DataFrame
emp_df_1 = emp_df.withColumn(
    "is_adult",
    when(col("age").isNull(), None)           # If age is null → null
    .when(col("age") > 18, "Yes")             # If age > 18 → "Yes"
    .otherwise("No")                          # Otherwise → "No"
)


emp_df_2 = emp_df.withColumn(
    "is_adult",
    when((col("age")>0) &(col("age")<18), "minor")          
    .when((col("age")>18) &(col("age")<30), "medium")                        
    .otherwise("major")                          
)

# Show the DataFrame
emp_df_1.show(truncate=False)
emp_df_2.show(truncate=False)






+----+-------+----+------+-------+-----------+
|id  |name   |age |salary|address|department |
+----+-------+----+------+-------+-----------+
|1   |manish |26  |20000 |india  |IT         |
|2   |rahul  |NULL|40000 |germany|engineering|
|3   |pawan  |12  |60000 |india  |sales      |
|4   |roshini|44  |NULL  |uk     |engineering|
|5   |raushan|35  |70000 |india  |sales      |
|6   |NULL   |29  |200000|uk     |IT         |
|7   |adam   |37  |65000 |us     |IT         |
|8   |chris  |16  |40000 |us     |sales      |
|NULL|NULL   |NULL|NULL  |NULL   |NULL       |
|7   |adam   |37  |65000 |us     |IT         |
+----+-------+----+------+-------+-----------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- salary: long (nullable = true)
 |-- address: string (nullable = true)
 |-- department: string (nullable = true)

Total number of rows in the DataFrame: 10
+----+-------+----+------+-------+-----------+--------+
|id  |name   |age |

Transformation in Pyspark: Case(Unique & Sorted Record in datafarame)

In [36]:
# Create data for DataFrame
data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17),
(15 ,'Mohit',45000,  18),
(13 ,'Nidhi',60000,  17),      
(14 ,'Priya',90000,  18),  
(18 ,'Sam',65000,   17)
]
# Create a schema for the DataFrame
schema = ['id', 'name', 'sal', 'mngr_id']
# Create a DataFrame using the DataFrame API
mngr_df = spark.createDataFrame(data, schema)
# Show the DataFrame
mngr_df.show(truncate=False)
#show the schema of the DataFrame
mngr_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", mngr_df.count())



# Finding unique records & deleting/droping duplicates in the DataFrame
mngr_df_1 = mngr_df.distinct()
mngr_df_2 = mngr_df.select("id", "name").distinct() #selecting distinct records from dataframe created using the id & name columns
mngr_df_3 = mngr_df.dropDuplicates(["id", "name", "sal", "mngr_id"]) #droping duplicates from the DataFrame using the id & name columns


#sorting the DataFrame
mngr_df_4 = mngr_df_1.sort(col("sal").desc(),col("name").asc()) #sorting the DataFrame using the sal column



#show the schema of the DataFrame
mngr_df_1.show(truncate=False)
mngr_df_2.show(truncate=False)
mngr_df_3.show(truncate=False)
mngr_df_4.show(truncate=False)








+---+------+-----+-------+
|id |name  |sal  |mngr_id|
+---+------+-----+-------+
|10 |Anil  |50000|18     |
|11 |Vikas |75000|16     |
|12 |Nisha |40000|18     |
|13 |Nidhi |60000|17     |
|14 |Priya |80000|18     |
|15 |Mohit |45000|18     |
|16 |Rajesh|90000|10     |
|17 |Raman |55000|16     |
|18 |Sam   |65000|17     |
|15 |Mohit |45000|18     |
|13 |Nidhi |60000|17     |
|14 |Priya |90000|18     |
|18 |Sam   |65000|17     |
+---+------+-----+-------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- mngr_id: long (nullable = true)

Total number of rows in the DataFrame: 13
+---+------+-----+-------+
|id |name  |sal  |mngr_id|
+---+------+-----+-------+
|10 |Anil  |50000|18     |
|11 |Vikas |75000|16     |
|12 |Nisha |40000|18     |
|13 |Nidhi |60000|17     |
|14 |Priya |80000|18     |
|15 |Mohit |45000|18     |
|16 |Rajesh|90000|10     |
|17 |Raman |55000|16     |
|18 |Sam   |65000|17     |
|14 |Priya |90000|18     |
+--

Transformation in Pyspark: Aggregate function

In [37]:
## Create data for DataFrame
empl_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,None,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(None,None,None,None,None,None),
(7,'adam',37,65000,'us','IT')
]
# Create a schema for the DataFrame
schema = ['id', 'name', 'age', 'salary', 'address', 'department']
# Create a DataFrame using the DataFrame API
empl_df= spark.createDataFrame(empl_data, schema)
# Show the DataFrame
empl_df.show(truncate=False)
#show the schema of the DataFrame
empl_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", empl_df.count())


#count() function is used to count the number of rows in the DataFrame
empl_df_1 = empl_df.select(count("*"))
empl_df_2 = empl_df.select(count("name")) 
empl_df_3 = empl_df.select(countDistinct("address").alias("distinct_address_count")) #counting the distinct records in the DataFrame using the address column

#min().max() and avg() function is used to find the minimum, maximum and average of the column in the DataFrame
empl_df_4 = empl_df.select(min("salary").alias("min_salary"), max("salary").alias("max_salary"), avg("salary").alias("avg_salary")) #finding the min, max and avg of the salary column in the DataFrame

#show the DataFrame
empl_df_1.show(truncate=False)
empl_df_2.show(truncate=False)
empl_df_3.show(truncate=False)
empl_df_4.show(truncate=False)


+----+-------+----+------+-------+-----------+
|id  |name   |age |salary|address|department |
+----+-------+----+------+-------+-----------+
|1   |manish |26  |20000 |india  |IT         |
|2   |rahul  |NULL|40000 |germany|engineering|
|3   |pawan  |12  |60000 |india  |sales      |
|4   |roshini|44  |NULL  |uk     |engineering|
|5   |raushan|35  |70000 |india  |sales      |
|6   |NULL   |29  |200000|uk     |IT         |
|7   |adam   |37  |65000 |us     |IT         |
|8   |chris  |16  |40000 |us     |sales      |
|NULL|NULL   |NULL|NULL  |NULL   |NULL       |
|7   |adam   |37  |65000 |us     |IT         |
+----+-------+----+------+-------+-----------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- salary: long (nullable = true)
 |-- address: string (nullable = true)
 |-- department: string (nullable = true)

Total number of rows in the DataFrame: 10
+--------+
|count(1)|
+--------+
|10      |
+--------+

+-----------+
|coun

Transformation in Pyspark: Groupby

In [38]:
#data for DataFrame
data=[(1,'manish',50000,"IT"),
(2,'vikash',60000,"sales"),
(3,'raushan',70000,"marketing"),
(4,'mukesh',80000,"IT"),
(5,'pritam',90000,"sales"),
(6,'nikita',45000,"marketing"),
(7,'ragini',55000,"marketing"),
(8,'rakesh',100000,"IT"),
(9,'aditya',65000,"IT"),
(10,'rahul',50000,"marketing")]
# Create a schema for the DataFrame
schema = ['id', 'name', 'salary', 'department']
# Create a DataFrame using the DataFrame API
dept_df = spark.createDataFrame(data, schema)
# Show the DataFrame
dept_df.show(truncate=False)
#show the schema of the DataFrame
dept_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", dept_df.count())





#groupby() function is used to group the DataFrame by the department column
dept_df_2 = dept_df.groupBy("department").agg(count("*").alias("count"), avg("salary").alias("avg_salary"), min("salary").alias("min_salary"), max("salary").alias("max_salary")) #counting the number of records in the DataFrame using the department column and finding the min, max and avg of the salary column in the DataFrame

# Show the DataFrame
dept_df_2.show(truncate=False)







+---+-------+------+----------+
|id |name   |salary|department|
+---+-------+------+----------+
|1  |manish |50000 |IT        |
|2  |vikash |60000 |sales     |
|3  |raushan|70000 |marketing |
|4  |mukesh |80000 |IT        |
|5  |pritam |90000 |sales     |
|6  |nikita |45000 |marketing |
|7  |ragini |55000 |marketing |
|8  |rakesh |100000|IT        |
|9  |aditya |65000 |IT        |
|10 |rahul  |50000 |marketing |
+---+-------+------+----------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- department: string (nullable = true)

Total number of rows in the DataFrame: 10
+----------+-----+----------+----------+----------+
|department|count|avg_salary|min_salary|max_salary|
+----------+-----+----------+----------+----------+
|IT        |4    |73750.0   |50000     |100000    |
|sales     |2    |75000.0   |60000     |90000     |
|marketing |4    |55000.0   |45000     |70000     |
+----------+-----+----------+----------+-----

Transformation in Pyspark: Joins

In [39]:
#Create 'costomer_data' data for dataframe
customer_data = [(1,'manish','patna',"30-05-2022"),
(2,'vikash','kolkata',"12-03-2023"),
(3,'nikita','delhi',"25-06-2023"),
(4,'rahul','ranchi',"24-03-2023"),
(5,'mahesh','jaipur',"22-03-2023"),
(6,'prantosh','kolkata',"18-10-2022"),
(7,'raman','patna',"30-12-2022"),
(8,'prakash','ranchi',"24-02-2023"),
(9,'ragini','kolkata',"03-03-2023"),
(10,'raushan','jaipur',"05-02-2023")]
# Create a schema for the DataFrame
customer_schema=['customer_id','customer_name','address','date_of_joining']
# Create a DataFrame using the DataFrame API
customer_df = spark.createDataFrame(customer_data, customer_schema)
# Show the DataFrame
customer_df.show(truncate=False)
#show the schema of the DataFrame
customer_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", customer_df.count())
# Create a new DataFrame with the date column converted to a date type





#Create 'sales_data' data for dataframe
sales_data = [(1,22,10,"01-06-2022"),
(1,27,5,"03-02-2023"),
(2,5,3,"01-06-2023"),
(5,22,1,"22-03-2023"),
(7,22,4,"03-02-2023"),
(9,5,6,"03-03-2023"),
(2,1,12,"15-06-2023"),
(1,56,2,"25-06-2023"),
(5,12,5,"15-04-2023"),
(11,12,76,"12-03-2023")]
# Create a schema for the DataFrame
sales_schema=['customer_id','product_id','quantity','date_of_purchase']
# Create a DataFrame using the DataFrame API
sales_df = spark.createDataFrame(sales_data, sales_schema)
# Show the DataFrame
sales_df.show(truncate=False)
#show the schema of the DataFrame
sales_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", sales_df.count())



#Create 'product_data' data for dataframe
product_data = [(1, 'fanta',20),
(2, 'dew',22),
(5, 'sprite',40),
(7, 'redbull',100),
(12,'mazza',45),
(22,'coke',27),
(25,'limca',21),
(27,'pepsi',14),
(56,'sting',10)]
# Create a schema for the DataFrame
product_schema=['id','name','price']
# Create a DataFrame using the DataFrame API
product_df = spark.createDataFrame(product_data, product_schema)
# Show the DataFrame
product_df.show(truncate=False)
#show the schema of the DataFrame
product_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", product_df.count())




# Inner Join
customer_sales_inner_df = customer_df.join(sales_df, customer_df.customer_id == sales_df.customer_id, "inner").select(sales_df.product_id).sort(col("product_id").asc())#join on single column
# Left Join
customer_sales_left_df = customer_df.join(sales_df, customer_df.customer_id == sales_df.customer_id, "left").select(sales_df.product_id).sort(col("product_id").asc())#join on single column
# Right Join
customer_sales_right_df = customer_df.join(sales_df, customer_df.customer_id == sales_df.customer_id, "right").select(sales_df.product_id).sort(col("product_id").asc())#join on single column
#Outer Join
customer_sales_outer_df = customer_df.join(sales_df, customer_df.customer_id == sales_df.customer_id, "outer").select(sales_df.product_id).sort(col("product_id").asc())#join on single column
# Cross Join
customer_sales_cross_df = customer_df.crossJoin(sales_df)


# Show the DataFrame
customer_sales_inner_df.show(truncate=False)
customer_sales_left_df.show(truncate=False)
customer_sales_right_df.show(truncate=False)
customer_sales_outer_df.show(truncate=False)
customer_sales_cross_df.show(truncate=False)


+-----------+-------------+-------+---------------+
|customer_id|customer_name|address|date_of_joining|
+-----------+-------------+-------+---------------+
|1          |manish       |patna  |30-05-2022     |
|2          |vikash       |kolkata|12-03-2023     |
|3          |nikita       |delhi  |25-06-2023     |
|4          |rahul        |ranchi |24-03-2023     |
|5          |mahesh       |jaipur |22-03-2023     |
|6          |prantosh     |kolkata|18-10-2022     |
|7          |raman        |patna  |30-12-2022     |
|8          |prakash      |ranchi |24-02-2023     |
|9          |ragini       |kolkata|03-03-2023     |
|10         |raushan      |jaipur |05-02-2023     |
+-----------+-------------+-------+---------------+

root
 |-- customer_id: long (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- date_of_joining: string (nullable = true)

Total number of rows in the DataFrame: 10
+-----------+----------+--------+----------------+


Transformation in Pyspark: Window Function

In [40]:
#data for DataFrame
e_data = [(1,'manish',50000,'IT','m'),
(2,'vikash',60000,'sales','m'),
(3,'raushan',70000,'marketing','m'),
(4,'mukesh',80000,'IT','m'),
(5,'priti',90000,'sales','f'),
(6,'nikita',45000,'marketing','f'),
(7,'ragini',55000,'marketing','f'),
(8,'rashi',100000,'IT','f'),
(9,'aditya',65000,'IT','m'),
(10,'rahul',50000,'marketing','m'),
(11,'rakhi',50000,'IT','f'),
(12,'akhilesh',90000,'sales','m')]
# Create a schema for the DataFrame
schema = ['id', 'name', 'salary', 'department', 'gender']
# Create a DataFrame using the DataFrame API
e_df = spark.createDataFrame(e_data, schema)
# Show the DataFrame
e_df.show(truncate=False)
#show the schema of the DataFrame
e_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", e_df.count())



# Create product_data for DataFrame API
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]
# Create a schema for the DataFrame
product_schema = ['product_id', 'product_name', 'sales_date', 'sales']
# Create a DataFrame using the DataFrame API
product_df = spark.createDataFrame(product_data, product_schema)
# Show the DataFrame
product_df.show(truncate=False)
#show the schema of the DataFrame
product_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", product_df.count())





#create data for DataFrame
empls_data = [(1,"manish","11-07-2023","10:20"),
        (1,"manish","11-07-2023","11:20"),
        (2,"rajesh","11-07-2023","11:20"),
        (1,"manish","11-07-2023","11:50"),
        (2,"rajesh","11-07-2023","13:20"),
        (1,"manish","11-07-2023","19:20"),
        (2,"rajesh","11-07-2023","17:20"),
        (1,"manish","12-07-2023","10:32"),
        (1,"manish","12-07-2023","12:20"),
        (3,"vikash","12-07-2023","09:12"),
        (1,"manish","12-07-2023","16:23"),
        (3,"vikash","12-07-2023","18:08")]
# Create a schema for the DataFrame
emp_schema = ["id", "name", "date", "time"]
# Create a DataFrame using the DataFrame API
empls_df = spark.createDataFrame(data=empls_data, schema=emp_schema)
# Show the DataFrame
empls_df.show(truncate=False)
#show the schema of the DataFrame
empls_df.printSchema()
#show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", empls_df.count())






##0.Create a Window specification
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
# Create a new column with the rank of each row within its department
e_window_df = e_df.withColumn("row_number", row_number().over(window_spec)).withColumn("rank", rank().over(window_spec)).withColumn("dense_rank", dense_rank().over(window_spec)).withColumn("ntile", ntile(3).over(window_spec))
#show the DataFrame
e_window_df.show(truncate=False)







##1.Create a new column with the percentage of sales of each product in each month
# Step 1: Define the lag window (ordered by month per product)
#window_spec_lag = Window.partitionBy("product_id").orderBy("sales_month")
# Step 2: Get previous month's sales
# product_previous_df = product_df.withColumn("previous_month_sales",lag("sales").over(window_spec_lag))
# Step 3: Calculate percentage gain/loss (handle nulls safely)
#per_loss_gain_df = product_previous_df.withColumn("percentage_loss_gain",round(((col("sales") - col("previous_month_sales")) / col("previous_month_sales")) * 100,2)
# Show result
#per_loss_gain_df.select("product_id", "sales_month", "sales", "previous_month_sales", "percentage_loss_gain").show()


##2.Create a new column with the percentage of sales of each product in each month
# Step 1: Truncate date to month level
#product_df = product_df.withColumn("sales_month", trunc("sales_date", "month"))
# Step 2: Define window partitioned by product_id and ordered by month
#window_spec = Window.partitionBy("product_id").orderBy("sales_month")
# Step 3: Calculate monthly sum of sales for each product
#sum_sales_df = product_df.withColumn("sum_sales", sum(col("sales")).over(window_spec))
# Step 4: Calculate percentage
#per_sales_each_month_df = sum_sales_df.withColumn("percentage_sales_each_month",(col("sales") / col("sum_sales") * 100).cast("double"))
# Optional: Round the percentage to 2 decimals
#per_sales_each_month_df = per_sales_each_month_df.withColumn("percentage_sales_each_month", round("percentage_sales_each_month", 2))
# Show final result
#per_sales_each_month_df.select("product_id", "sales_month", "sales", "sum_sales", "percentage_sales_each_month").show()



##3.Create a new column with the first and latest sales & to find the difference between the sales of each product from the first and last month sales
#Ensure sales_date is of DateType
#sales_df = sales_df.withColumn("sales_date", to_date(col("sales_date")))
# Define window partitioned by product and ordered by sales_date
#w = Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
# Add first and last sales for each product
#sales_df_with_extremes = sales_df.withColumn("first_sale", first("sales_amount").over(w)).withColumn("last_sale", last("sales_amount").over(w))
# Calculate difference
#sales_df_final = sales_df_with_extremes.withColumn("sales_difference", col("last_sale") - col("first_sale"))
# Show only unique result per product (optional)
#sales_df_final.select("product_id", "first_sale", "last_sale", "sales_difference").distinct().show()




##4.send an email to all employees who have not completed compulsory 8 hour office work when they are in the office




##5.find out the performance of the sales based on the last three months average sales of each product
# Ensure the sales_date column is in DateType
#sales_df = sales_df.withColumn("sales_date", to_date(col("sales_date")))
# Define a window partitioned by product_id and ordered by date
#window_spec = Window.partitionBy("product_id").orderBy(col("sales_date")).rowsBetween(-90, -1)
# Calculate 3-month rolling average (excluding the current row)
#sales_df_with_avg = sales_df.withColumn("avg_sales_last_3_months",avg("sales_amount").over(window_spec))
# Calculate performance compared to average
#sales_df_with_perf = sales_df_with_avg.withColumn("performance_vs_avg",when(col("avg_sales_last_3_months").isNull(), None).when(col("sales_amount") > col("avg_sales_last_3_months"), "Above Average").when(col("sales_amount") < col("avg_sales_last_3_months"), "Below Average").otherwise("Average"))
#.select("product_id", "sales_date", "sales_amount", "avg_sales_last_3_months", "performance_vs_avg").show()
#last_3_months_avg_sales_df = product_df.withColumn("last_3_months_avg_sales", avg(col("sales")).over(window_spec_agg))







+---+--------+------+----------+------+
|id |name    |salary|department|gender|
+---+--------+------+----------+------+
|1  |manish  |50000 |IT        |m     |
|2  |vikash  |60000 |sales     |m     |
|3  |raushan |70000 |marketing |m     |
|4  |mukesh  |80000 |IT        |m     |
|5  |priti   |90000 |sales     |f     |
|6  |nikita  |45000 |marketing |f     |
|7  |ragini  |55000 |marketing |f     |
|8  |rashi   |100000|IT        |f     |
|9  |aditya  |65000 |IT        |m     |
|10 |rahul   |50000 |marketing |m     |
|11 |rakhi   |50000 |IT        |f     |
|12 |akhilesh|90000 |sales     |m     |
+---+--------+------+----------+------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- department: string (nullable = true)
 |-- gender: string (nullable = true)

Total number of rows in the DataFrame: 12
+----------+------------+----------+-------+
|product_id|product_name|sales_date|sales  |
+----------+------------+----------+-

Transformation In Pyspark:Nested JSON Flatenning

In [None]:

#Read Nested json file
file_path_0 = r"C:\Users\Shivam Gupta\OneDrive\Documents\Shivam_Developement\PYTHON\python_tutorial"
file_path_1 = os.path.join(file_path_0, "resturant_json_data.json")

# Read the JSON file into a DataFrame
nested_json_df = spark.read.format("json") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiline", "true") \
    .option("mode", "PERMISSIVE") \
    .load(file_path_1)
# Show the DataFrame
nested_json_df.show(truncate=False)
# Show the schema of the DataFrame
nested_json_df.printSchema()
# Show the total number of rows in the DataFrame
print("Total number of rows in the DataFrame:", nested_json_df.count())





##Flatening the nested JSON DataFrame
nested_json_df_0 = nested_json_df.select("*", explode(col("restaurants")).alias("new_restaurant"))#add new column schema'new_restaurant' to its end
#nested_json_df_0.printSchema()#showing the schema 'nested_json_df_0'
nested_json_df_1 = nested_json_df_0.drop("restaurants")#deleting the old schema 'restaurants'
nested_json_df_1.printSchema()#showing the schema 'nested_json_df_1'
nested_json_df_2=nested_json_df_1.select("new_restaurant.restaurant.R.res_id")# simple example to select the singlecolumn value
nested_json_df_2.show(truncate=False)














+----+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------