In [0]:
spark

Read file in Spark

In [0]:
flight_df=spark.read.format("csv")\
          .option("header","False")\
              .option("inferschema","False")\
                  .option("mode","FAILFAST")\
                      .load("/FileStore/tables/2010_summary.csv")


flight_df.show(5)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_df_header=spark.read.format("csv")\
          .option("header","True")\
              .option("inferschema","True")\
                  .option("mode","FAILFAST")\
                      .load("/FileStore/tables/2010_summary.csv")

flight_df_header.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_df_header.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



Creating Schema Using Struct type, field 

In [0]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType

In [0]:
my_schema= StructType(
    [
        StructField("Dest_country",StringType(),True),
        StructField("Orgin_country",StringType(),True),
        StructField("cnt",IntegerType(),True)
    ]
)

In [0]:
flight_df_My_schema=spark.read.format("csv")\
          .option("header","False")\
              .option("inferschema","False")\
                  .schema(my_schema)\
                      .option("mode","Permissive")\
                           .load("/FileStore/tables/2010_summary.csv")

                

In [0]:
flight_df_My_schema.show(5)

+-----------------+-------------------+----+
|     Dest_country|      Orgin_country| cnt|
+-----------------+-------------------+----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|null|
|    United States|            Romania|   1|
|    United States|            Ireland| 264|
|    United States|              India|  69|
|            Egypt|      United States|  24|
+-----------------+-------------------+----+
only showing top 5 rows



In [0]:
flight_df_Skiprow=spark.read.format("csv")\
          .option("header","False")\
              .option("skiprow",1)\
                  .option("inferschema","False")\
                      .schema(my_schema)\
                          .option("mode","Permissive")\
                             .load("/FileStore/tables/2010_summary.csv")


flight_df_Skiprow.show(5)

+-----------------+-------------------+----+
|     Dest_country|      Orgin_country| cnt|
+-----------------+-------------------+----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|null|
|    United States|            Romania|   1|
|    United States|            Ireland| 264|
|    United States|              India|  69|
|            Egypt|      United States|  24|
+-----------------+-------------------+----+
only showing top 5 rows



Handling Corrupted Records

In [0]:
#premissive mode
emp_data=spark.read.format("csv")\
                     .option("header",True)\
                         .option("inferschema",True)\
                             .option("mode","Permissive")\
                                 .load("/FileStore/tables/emp.csv")



In [0]:
emp_data.show()

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [0]:
#dropmalformed
emp_data_1=spark.read.format("csv")\
                     .option("header",True)\
                         .option("inferschema",True)\
                             .option("mode","DropmalFormed")\
                                 .load("/FileStore/tables/emp.csv")

In [0]:
emp_data_1.show()

+---+------+---+------+------------+--------+
| id|  name|age|salary|     address| nominee|
+---+------+---+------+------------+--------+
|  1|Manish| 26| 75000|       bihar|nominee1|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|
|  5|Vikash| 31|300000|        null|nominee5|
+---+------+---+------+------------+--------+



In [0]:
emp_schema= StructType(
    [
        StructField("id",IntegerType(),True),
        StructField("name",StringType(),True),
        StructField("age",IntegerType(),True),
        StructField("salary",IntegerType(),True),
        StructField("address",StringType(),True),
        StructField("nominee",StringType(),True),
        StructField("_corrupted_record",StringType(),True)
    ]
)

In [0]:
emp_data_corr=spark.read.format("csv")\
                     .option("header",True)\
                         .option("inferschema",True)\
                             .option("mode","Permissive")\
                                 .schema(emp_schema)\
                                 .load("/FileStore/tables/emp.csv")

emp_data_corr.show(truncate=False)

+---+--------+---+------+------------+--------+-----------------+
|id |name    |age|salary|address     |nominee |_corrupted_record|
+---+--------+---+------+------------+--------+-----------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null             |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null             |
|3  |Pritam  |22 |150000|Bangalore   |India   |nominee3         |
|4  |Prantosh|17 |200000|Kolkata     |India   |nominee4         |
|5  |Vikash  |31 |300000|null        |nominee5|null             |
+---+--------+---+------+------------+--------+-----------------+



Partition and Bucketing

In [0]:
state_data=spark.read.format("csv")\
                 .option("header", True)\
                   .option("inferschema",True)\
                     .option("mode","Permissive")\
                       .load("/FileStore/tables/states_file-2.csv")

In [0]:
state_data.show()

+---+--------+---+------+-------+------+
| id|    name|age|salary|address|gender|
+---+--------+---+------+-------+------+
|  1|  Manish| 26| 75000|  INDIA|     m|
|  2|  Nikita| 23|100000|    USA|     f|
|  3|  Pritam| 22|150000|  INDIA|     m|
|  4|Prantosh| 17|200000|  JAPAN|     m|
|  5|  Vikash| 31|300000|    USA|     m|
|  6|   Rahul| 55|300000|  INDIA|     m|
|  7|    Raju| 67|540000|    USA|     m|
|  8| Praveen| 28| 70000|  JAPAN|     m|
|  9|     Dev| 32|150000|  JAPAN|     m|
| 10|  Sherin| 16| 25000| RUSSIA|     f|
| 11|    Ragu| 12| 35000|  INDIA|     f|
| 12|   Sweta| 43|200000|  INDIA|     f|
| 13| Raushan| 48|650000|    USA|     m|
| 14|  Mukesh| 36| 95000| RUSSIA|     m|
| 15| Prakash| 52|750000|  INDIA|     m|
+---+--------+---+------+-------+------+



partitionBy

In [0]:
state_data.write.format("csv")\
        .option("header",True)\
            .option("mode","overwrite")\
                .option("path","/FileStore/tables/partion_by_address")\
                    .partitionBy("address")\
                       .save()

In [0]:
dbutils.fs.ls("/FileStore/tables/partion_by_address/",)

Out[28]: [FileInfo(path='dbfs:/FileStore/tables/partion_by_address/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1717313526000),
 FileInfo(path='dbfs:/FileStore/tables/partion_by_address/address=INDIA/', name='address=INDIA/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/partion_by_address/address=JAPAN/', name='address=JAPAN/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/partion_by_address/address=RUSSIA/', name='address=RUSSIA/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/partion_by_address/address=USA/', name='address=USA/', size=0, modificationTime=0)]

bucketBy

In [0]:
state_data.write.format("csv")\
        .option("header",True)\
            .option("mode","overwrite")\
                .option("path","/FileStore/tables/Bucket_by_address")\
                    .bucketBy(3,"id")\
                       .saveAsTable("Bucket_state")

In [0]:
dbutils.fs.ls("/FileStore/tables/Bucket_by_address/")

Out[31]: [FileInfo(path='dbfs:/FileStore/tables/Bucket_by_address/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1717314038000),
 FileInfo(path='dbfs:/FileStore/tables/Bucket_by_address/_committed_1846091711879996821', name='_committed_1846091711879996821', size=306, modificationTime=1717314038000),
 FileInfo(path='dbfs:/FileStore/tables/Bucket_by_address/_started_1846091711879996821', name='_started_1846091711879996821', size=0, modificationTime=1717314038000),
 FileInfo(path='dbfs:/FileStore/tables/Bucket_by_address/part-00000-tid-1846091711879996821-6cad12bd-6e26-4896-9930-71686bcd738b-20-1_00000.c000.csv', name='part-00000-tid-1846091711879996821-6cad12bd-6e26-4896-9930-71686bcd738b-20-1_00000.c000.csv', size=270, modificationTime=1717314038000),
 FileInfo(path='dbfs:/FileStore/tables/Bucket_by_address/part-00000-tid-1846091711879996821-6cad12bd-6e26-4896-9930-71686bcd738b-20-2_00001.c000.csv', name='part-00000-tid-1846091711879996821-6cad12bd-6e26-4896-9930-71686bcd738b-20-2

DataFrame Transformations

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
#emp_data.select("name").show()
emp_data.select(col("name")).show()

+--------+
|    name|
+--------+
|  Manish|
|  Nikita|
|  Pritam|
|Prantosh|
|  Vikash|
+--------+



In [0]:
emp_data.select("id","name","salary").show()

+---+--------+------+
| id|    name|salary|
+---+--------+------+
|  1|  Manish| 75000|
|  2|  Nikita|100000|
|  3|  Pritam|150000|
|  4|Prantosh|200000|
|  5|  Vikash|300000|
+---+--------+------+



Spark SQL

In [0]:
 emp_data.createOrReplaceTempView("emp_table")
 spark.sql("""select * from emp_table""").show()

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



Alisaing the column

In [0]:
emp_data.select(col("id").alias("empid"),"name","age").show()

+-----+--------+---+
|empid|    name|age|
+-----+--------+---+
|    1|  Manish| 26|
|    2|  Nikita| 23|
|    3|  Pritam| 22|
|    4|Prantosh| 17|
|    5|  Vikash| 31|
+-----+--------+---+



Where and Filter Transformation

In [0]:
emp_data.filter(col("salary")>150000).show()

+---+--------+---+------+-------+--------+
| id|    name|age|salary|address| nominee|
+---+--------+---+------+-------+--------+
|  4|Prantosh| 17|200000|Kolkata|   India|
|  5|  Vikash| 31|300000|   null|nominee5|
+---+--------+---+------+-------+--------+



In [0]:
emp_data.where(col("salary")>150000).show()

+---+--------+---+------+-------+--------+
| id|    name|age|salary|address| nominee|
+---+--------+---+------+-------+--------+
|  4|Prantosh| 17|200000|Kolkata|   India|
|  5|  Vikash| 31|300000|   null|nominee5|
+---+--------+---+------+-------+--------+



In [0]:
emp_data.filter((col("salary")>150000) & (col("age")<18) ).show()

+---+--------+---+------+-------+-------+
| id|    name|age|salary|address|nominee|
+---+--------+---+------+-------+-------+
|  4|Prantosh| 17|200000|Kolkata|  India|
+---+--------+---+------+-------+-------+



Literals

In [0]:
 emp_data.select("*",lit("Kumar").alias("last_name")).show()

+---+--------+---+------+------------+--------+---------+
| id|    name|age|salary|     address| nominee|last_name|
+---+--------+---+------+------------+--------+---------+
|  1|  Manish| 26| 75000|       bihar|nominee1|    Kumar|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|    Kumar|
|  3|  Pritam| 22|150000|   Bangalore|   India|    Kumar|
|  4|Prantosh| 17|200000|     Kolkata|   India|    Kumar|
|  5|  Vikash| 31|300000|        null|nominee5|    Kumar|
+---+--------+---+------+------------+--------+---------+



withColumn transformations

In [0]:
emp_data.withColumn("sur_name",lit("singh")).show()

+---+--------+---+------+------------+--------+--------+
| id|    name|age|salary|     address| nominee|sur_name|
+---+--------+---+------+------------+--------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|   singh|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|   singh|
|  3|  Pritam| 22|150000|   Bangalore|   India|   singh|
|  4|Prantosh| 17|200000|     Kolkata|   India|   singh|
|  5|  Vikash| 31|300000|        null|nominee5|   singh|
+---+--------+---+------+------------+--------+--------+



withColumnRemane transformations

In [0]:
new_emp_data=emp_data.withColumnRenamed("id","employee_id")
new_emp_data.show()

+-----------+--------+---+------+------------+--------+
|employee_id|    name|age|salary|     address| nominee|
+-----------+--------+---+------+------------+--------+
|          1|  Manish| 26| 75000|       bihar|nominee1|
|          2|  Nikita| 23|100000|uttarpradesh|nominee2|
|          3|  Pritam| 22|150000|   Bangalore|   India|
|          4|Prantosh| 17|200000|     Kolkata|   India|
|          5|  Vikash| 31|300000|        null|nominee5|
+-----------+--------+---+------+------------+--------+



Casting Datatype

In [0]:
new_emp_data.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- nominee: string (nullable = true)



In [0]:
new_emp_data.withColumn("employee_id",col("employee_id").cast("string"))\
             .withColumn("salary",col("salary").cast("long"))\
                 .printSchema()

root
 |-- employee_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: long (nullable = true)
 |-- address: string (nullable = true)
 |-- nominee: string (nullable = true)



Removing Column 

In [0]:
new_emp_data.drop("employee_id",col("employee_id")).show()

+--------+---+------+------------+--------+
|    name|age|salary|     address| nominee|
+--------+---+------+------------+--------+
|  Manish| 26| 75000|       bihar|nominee1|
|  Nikita| 23|100000|uttarpradesh|nominee2|
|  Pritam| 22|150000|   Bangalore|   India|
|Prantosh| 17|200000|     Kolkata|   India|
|  Vikash| 31|300000|        null|nominee5|
+--------+---+------+------------+--------+



Union and UnionAll

In [0]:
data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17)]

In [0]:
schema=["id","name","sal","mgr_id"]
manager_df=spark.createDataFrame(data=data,schema=schema)

In [0]:
manager_df.show()

+---+------+-----+------+
| id|  name|  sal|mgr_id|
+---+------+-----+------+
| 10|  Anil|50000|    18|
| 11| Vikas|75000|    16|
| 12| Nisha|40000|    18|
| 13| Nidhi|60000|    17|
| 14| Priya|80000|    18|
| 15| Mohit|45000|    18|
| 16|Rajesh|90000|    10|
| 17| Raman|55000|    16|
| 18|   Sam|65000|    17|
+---+------+-----+------+



In [0]:
manager_df.count()

Out[24]: 9

In [0]:
data1=[(19 ,'Sohan',50000, 18),
(20 ,'Sima',75000,  17)]
manager_df1=spark.createDataFrame(data=data1,schema=schema)

In [0]:
manager_df.union(manager_df1).show()
manager_df.unionAll(manager_df1).show()

+---+------+-----+------+
| id|  name|  sal|mgr_id|
+---+------+-----+------+
| 10|  Anil|50000|    18|
| 11| Vikas|75000|    16|
| 12| Nisha|40000|    18|
| 13| Nidhi|60000|    17|
| 14| Priya|80000|    18|
| 15| Mohit|45000|    18|
| 16|Rajesh|90000|    10|
| 17| Raman|55000|    16|
| 18|   Sam|65000|    17|
| 19| Sohan|50000|    18|
| 20|  Sima|75000|    17|
+---+------+-----+------+

+---+------+-----+------+
| id|  name|  sal|mgr_id|
+---+------+-----+------+
| 10|  Anil|50000|    18|
| 11| Vikas|75000|    16|
| 12| Nisha|40000|    18|
| 13| Nidhi|60000|    17|
| 14| Priya|80000|    18|
| 15| Mohit|45000|    18|
| 16|Rajesh|90000|    10|
| 17| Raman|55000|    16|
| 18|   Sam|65000|    17|
| 19| Sohan|50000|    18|
| 20|  Sima|75000|    17|
+---+------+-----+------+



If-Else in Pysaprk

In [0]:
emp_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,None,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(None,None,None,None,None,None),
(7,'adam',37,65000,'us','IT')
]

In [0]:
schema1=["id","name","age","salary","country","dept"]
employee_df=spark.createDataFrame(data=emp_data,schema=schema1)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2765228542154176>:2[0m
[1;32m      1[0m schema1[38;5;241m=[39m[[38;5;124m"[39m[38;5;124mid[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mname[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mage[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124msalary[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mcountry[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mdept[39m[38;5;124m"[39m]
[0;32m----> 2[0m employee_df[38;5;241m=[39mspark[38;5;241m.[39mcreateDataFrame(data[38;5;241m=[39memp_data,schema[38;5;241m=[39mschema1)

[0;31mNameError[0m: name 'emp_data' is not defined

In [0]:
employee_df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-2765228542154177>:1[0m
[0;32m----> 1[0m [43memployee_df[49m[38;5;241m.[39mshow()

[0;31mNameError[0m: name 'employee_df' is not defined

When-Then and Otherwise

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


employee_df.withColumn("adult",when(col("age")<18,"No")
                               .when(col("age")>18,"Yes")
                               .otherwise("Novalue")
                       ).show()

+----+-------+----+------+-------+-----------+-------+
|  id|   name| age|salary|country|       dept|  adult|
+----+-------+----+------+-------+-----------+-------+
|   1| manish|  26| 20000|  india|         IT|    Yes|
|   2|  rahul|null| 40000|germany|engineering|Novalue|
|   3|  pawan|  12| 60000|  india|      sales|     No|
|   4|roshini|  44|  null|     uk|engineering|    Yes|
|   5|raushan|  35| 70000|  india|      sales|    Yes|
|   6|   null|  29|200000|     uk|         IT|    Yes|
|   7|   adam|  37| 65000|     us|         IT|    Yes|
|   8|  chris|  16| 40000|     us|      sales|     No|
|null|   null|null|  null|   null|       null|Novalue|
|   7|   adam|  37| 65000|     us|         IT|    Yes|
+----+-------+----+------+-------+-----------+-------+



In [0]:
 update_employee_df=employee_df.withColumn("age",when(col("age").isNull(),lit(19))
                        .otherwise(col("age")))
 update_employee_df.show()

+----+-------+---+------+-------+-----------+
|  id|   name|age|salary|country|       dept|
+----+-------+---+------+-------+-----------+
|   1| manish| 26| 20000|  india|         IT|
|   2|  rahul| 19| 40000|germany|engineering|
|   3|  pawan| 12| 60000|  india|      sales|
|   4|roshini| 44|  null|     uk|engineering|
|   5|raushan| 35| 70000|  india|      sales|
|   6|   null| 29|200000|     uk|         IT|
|   7|   adam| 37| 65000|     us|         IT|
|   8|  chris| 16| 40000|     us|      sales|
|null|   null| 19|  null|   null|       null|
|   7|   adam| 37| 65000|     us|         IT|
+----+-------+---+------+-------+-----------+



In [0]:
update_employee_df.withColumn("age_wise",when((col("age")>0) & (col("age")<18), "minor")
                                    .when(col("age")>18 , "adult")
                              
                              ).show()

+----+-------+---+------+-------+-----------+--------+
|  id|   name|age|salary|country|       dept|age_wise|
+----+-------+---+------+-------+-----------+--------+
|   1| manish| 26| 20000|  india|         IT|   adult|
|   2|  rahul| 19| 40000|germany|engineering|   adult|
|   3|  pawan| 12| 60000|  india|      sales|   minor|
|   4|roshini| 44|  null|     uk|engineering|   adult|
|   5|raushan| 35| 70000|  india|      sales|   adult|
|   6|   null| 29|200000|     uk|         IT|   adult|
|   7|   adam| 37| 65000|     us|         IT|   adult|
|   8|  chris| 16| 40000|     us|      sales|   minor|
|null|   null| 19|  null|   null|       null|   adult|
|   7|   adam| 37| 65000|     us|         IT|   adult|
+----+-------+---+------+-------+-----------+--------+



Unique and Sort the Record

In [0]:
data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17),
(15 ,'Mohit',45000,  18),
(13 ,'Nidhi',60000,  17),      
(14 ,'Priya',90000,  18),  
(18 ,'Sam',65000,   17)
     ]

In [0]:
schema_mgr=("id","name","salary","age")
mgr_df=spark.createDataFrame(data=data,schema=schema_mgr)

In [0]:
mgr_df.show()

+---+------+------+---+
| id|  name|salary|age|
+---+------+------+---+
| 10|  Anil| 50000| 18|
| 11| Vikas| 75000| 16|
| 12| Nisha| 40000| 18|
| 13| Nidhi| 60000| 17|
| 14| Priya| 80000| 18|
| 15| Mohit| 45000| 18|
| 16|Rajesh| 90000| 10|
| 17| Raman| 55000| 16|
| 18|   Sam| 65000| 17|
| 15| Mohit| 45000| 18|
| 13| Nidhi| 60000| 17|
| 14| Priya| 90000| 18|
| 18|   Sam| 65000| 17|
+---+------+------+---+



Distinct 

In [0]:
mgr_df.distinct().show()

+---+------+------+---+
| id|  name|salary|age|
+---+------+------+---+
| 10|  Anil| 50000| 18|
| 12| Nisha| 40000| 18|
| 11| Vikas| 75000| 16|
| 13| Nidhi| 60000| 17|
| 15| Mohit| 45000| 18|
| 14| Priya| 80000| 18|
| 16|Rajesh| 90000| 10|
| 17| Raman| 55000| 16|
| 18|   Sam| 65000| 17|
| 14| Priya| 90000| 18|
+---+------+------+---+



In [0]:
#want to see distinct value on Specfic column
mgr_df.select("id","name").distinct().show()

+---+------+
| id|  name|
+---+------+
| 10|  Anil|
| 11| Vikas|
| 12| Nisha|
| 13| Nidhi|
| 15| Mohit|
| 14| Priya|
| 17| Raman|
| 16|Rajesh|
| 18|   Sam|
+---+------+



Droping Duplicate Records

In [0]:
mgr_df.dropDuplicates(["id","name","salary","age"]).show()

+---+------+------+---+
| id|  name|salary|age|
+---+------+------+---+
| 10|  Anil| 50000| 18|
| 12| Nisha| 40000| 18|
| 11| Vikas| 75000| 16|
| 13| Nidhi| 60000| 17|
| 15| Mohit| 45000| 18|
| 14| Priya| 80000| 18|
| 16|Rajesh| 90000| 10|
| 17| Raman| 55000| 16|
| 18|   Sam| 65000| 17|
| 14| Priya| 90000| 18|
+---+------+------+---+



Sort the DataFrame

In [0]:
mgr_df.sort(col("salary"),).show()

+---+------+------+---+
| id|  name|salary|age|
+---+------+------+---+
| 12| Nisha| 40000| 18|
| 15| Mohit| 45000| 18|
| 15| Mohit| 45000| 18|
| 10|  Anil| 50000| 18|
| 17| Raman| 55000| 16|
| 13| Nidhi| 60000| 17|
| 13| Nidhi| 60000| 17|
| 18|   Sam| 65000| 17|
| 18|   Sam| 65000| 17|
| 11| Vikas| 75000| 16|
| 14| Priya| 80000| 18|
| 16|Rajesh| 90000| 10|
| 14| Priya| 90000| 18|
+---+------+------+---+



In [0]:
mgr_df.sort(col("salary").desc()).show()

+---+------+------+---+
| id|  name|salary|age|
+---+------+------+---+
| 16|Rajesh| 90000| 10|
| 14| Priya| 90000| 18|
| 14| Priya| 80000| 18|
| 11| Vikas| 75000| 16|
| 18|   Sam| 65000| 17|
| 18|   Sam| 65000| 17|
| 13| Nidhi| 60000| 17|
| 13| Nidhi| 60000| 17|
| 17| Raman| 55000| 16|
| 10|  Anil| 50000| 18|
| 15| Mohit| 45000| 18|
| 15| Mohit| 45000| 18|
| 12| Nisha| 40000| 18|
+---+------+------+---+



Aggregate Functions in Pysaprk

In [0]:
employee_df.show()

+----+-------+----+------+-------+-----------+
|  id|   name| age|salary|country|       dept|
+----+-------+----+------+-------+-----------+
|   1| manish|  26| 20000|  india|         IT|
|   2|  rahul|null| 40000|germany|engineering|
|   3|  pawan|  12| 60000|  india|      sales|
|   4|roshini|  44|  null|     uk|engineering|
|   5|raushan|  35| 70000|  india|      sales|
|   6|   null|  29|200000|     uk|         IT|
|   7|   adam|  37| 65000|     us|         IT|
|   8|  chris|  16| 40000|     us|      sales|
|null|   null|null|  null|   null|       null|
|   7|   adam|  37| 65000|     us|         IT|
+----+-------+----+------+-------+-----------+



In [0]:
#Count
employee_df.count()

Out[4]: 10

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
employee_df.select(count(col("name"))).show()

+-----------+
|count(name)|
+-----------+
|          8|
+-----------+



In [0]:
#Sum,min,max

employee_df.select(sum(col("salary")).alias("sum_sal"),
                   max(col("salary")).alias("max_sal"),
                   min(col("salary")).alias("min_sal")
                   ).show()

+-------+-------+-------+
|sum_sal|max_sal|min_sal|
+-------+-------+-------+
| 560000| 200000|  20000|
+-------+-------+-------+



GroupBy Function

In [0]:
  employee_df.show()

+----+-------+----+------+-------+-----------+
|  id|   name| age|salary|country|       dept|
+----+-------+----+------+-------+-----------+
|   1| manish|  26| 20000|  india|         IT|
|   2|  rahul|null| 40000|germany|engineering|
|   3|  pawan|  12| 60000|  india|      sales|
|   4|roshini|  44|  null|     uk|engineering|
|   5|raushan|  35| 70000|  india|      sales|
|   6|   null|  29|200000|     uk|         IT|
|   7|   adam|  37| 65000|     us|         IT|
|   8|  chris|  16| 40000|     us|      sales|
|null|   null|null|  null|   null|       null|
|   7|   adam|  37| 65000|     us|         IT|
+----+-------+----+------+-------+-----------+



In [0]:
employee_df1=employee_df.filter(col("name").isNotNull())

In [0]:
employee_df1.groupBy("dept")\
            .agg(sum("salary")).show()

+-----------+-----------+
|       dept|sum(salary)|
+-----------+-----------+
|         IT|     150000|
|engineering|      40000|
|      sales|     170000|
+-----------+-----------+



In [0]:
employee_df1.groupBy("dept","country")\
            .agg(avg("salary").alias("avg_sal")).show()

+-----------+-------+-------+
|       dept|country|avg_sal|
+-----------+-------+-------+
|         IT|  india|20000.0|
|engineering|germany|40000.0|
|      sales|  india|65000.0|
|engineering|     uk|   null|
|         IT|     us|65000.0|
|      sales|     us|40000.0|
+-----------+-------+-------+



In [0]:
#sparkSQL
employee_df1.createOrReplaceTempView("employee_dept")
spark.sql("""
          select dept, sum(salary) as sum_sal
          from employee_dept
          group by dept
          
          """).show()

+-----------+-------+
|       dept|sum_sal|
+-----------+-------+
|         IT| 150000|
|engineering|  40000|
|      sales| 170000|
+-----------+-------+



Joins in Pyspark

In [0]:
customer_data = [(1,'manish','patna',"30-05-2022"),
(2,'vikash','kolkata',"12-03-2023"),
(3,'nikita','delhi',"25-06-2023"),
(4,'rahul','ranchi',"24-03-2023"),
(5,'mahesh','jaipur',"22-03-2023"),
(6,'prantosh','kolkata',"18-10-2022"),
(7,'raman','patna',"30-12-2022"),
(8,'prakash','ranchi',"24-02-2023"),
(9,'ragini','kolkata',"03-03-2023"),
(10,'raushan','jaipur',"05-02-2023")]

sales_data = [(1,22,10,"01-06-2022"),
(1,27,5,"03-02-2023"),
(2,5,3,"01-06-2023"),
(5,22,1,"22-03-2023"),
(7,22,4,"03-02-2023"),
(9,5,6,"03-03-2023"),
(2,1,12,"15-06-2023"),
(1,56,2,"25-06-2023"),
(5,12,5,"15-04-2023"),
(11,12,76,"12-03-2023")]

product_data = [(1, 'fanta',20),
(2, 'dew',22),
(5, 'sprite',40),
(7, 'redbull',100),
(12,'mazza',45),
(22,'coke',27),
(25,'limca',21),
(27,'pepsi',14),
(56,'sting',10)]


customer_schema=['customer_id','customer_name','address','date_of_joining']
sales_schema=['customer_id','product_id','quantity','date_of_purchase']
product_schema=['id','name','price']

In [0]:
customer_df=spark.createDataFrame(data=customer_data,schema=customer_schema)
sales_df=spark.createDataFrame(data=sales_data,schema=sales_schema)
product_df=spark.createDataFrame(data=product_data,schema=product_schema)

In [0]:
 customer_df.show()

+-----------+-------------+-------+---------------+
|customer_id|customer_name|address|date_of_joining|
+-----------+-------------+-------+---------------+
|          1|       manish|  patna|     30-05-2022|
|          2|       vikash|kolkata|     12-03-2023|
|          3|       nikita|  delhi|     25-06-2023|
|          4|        rahul| ranchi|     24-03-2023|
|          5|       mahesh| jaipur|     22-03-2023|
|          6|     prantosh|kolkata|     18-10-2022|
|          7|        raman|  patna|     30-12-2022|
|          8|      prakash| ranchi|     24-02-2023|
|          9|       ragini|kolkata|     03-03-2023|
|         10|      raushan| jaipur|     05-02-2023|
+-----------+-------------+-------+---------------+



In [0]:
sales_df.show()

+-----------+----------+--------+----------------+
|customer_id|product_id|quantity|date_of_purchase|
+-----------+----------+--------+----------------+
|          1|        22|      10|      01-06-2022|
|          1|        27|       5|      03-02-2023|
|          2|         5|       3|      01-06-2023|
|          5|        22|       1|      22-03-2023|
|          7|        22|       4|      03-02-2023|
|          9|         5|       6|      03-03-2023|
|          2|         1|      12|      15-06-2023|
|          1|        56|       2|      25-06-2023|
|          5|        12|       5|      15-04-2023|
|         11|        12|      76|      12-03-2023|
+-----------+----------+--------+----------------+



In [0]:
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"inner").show()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          1|        56|       2|      25-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15-06-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        22|       1|      22-03-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        12|       5|      15

In [0]:
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"inner")\
            .select(sales_df["customer_id"],"customer_name","product_id","quantity")\
                .show()

+-----------+-------------+----------+--------+
|customer_id|customer_name|product_id|quantity|
+-----------+-------------+----------+--------+
|          1|       manish|        22|      10|
|          1|       manish|        27|       5|
|          1|       manish|        56|       2|
|          2|       vikash|         5|       3|
|          2|       vikash|         1|      12|
|          5|       mahesh|        22|       1|
|          5|       mahesh|        12|       5|
|          7|        raman|        22|       4|
|          9|       ragini|         5|       6|
+-----------+-------------+----------+--------+



In [0]:
#left Join
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"left")\
            .select(customer_df["customer_id"],"customer_name","product_id","quantity")\
                .show()

+-----------+-------------+----------+--------+
|customer_id|customer_name|product_id|quantity|
+-----------+-------------+----------+--------+
|          1|       manish|        56|       2|
|          1|       manish|        27|       5|
|          1|       manish|        22|      10|
|          2|       vikash|         1|      12|
|          2|       vikash|         5|       3|
|          3|       nikita|      null|    null|
|          4|        rahul|      null|    null|
|          5|       mahesh|        12|       5|
|          5|       mahesh|        22|       1|
|          6|     prantosh|      null|    null|
|          7|        raman|        22|       4|
|          8|      prakash|      null|    null|
|          9|       ragini|         5|       6|
|         10|      raushan|      null|    null|
+-----------+-------------+----------+--------+



In [0]:
#right Join
sales_df.join(product_df,sales_df["product_id"]==product_df["id"],"right")\
                .show()

+-----------+----------+--------+----------------+---+-------+-----+
|customer_id|product_id|quantity|date_of_purchase| id|   name|price|
+-----------+----------+--------+----------------+---+-------+-----+
|          2|         1|      12|      15-06-2023|  1|  fanta|   20|
|       null|      null|    null|            null|  2|    dew|   22|
|          9|         5|       6|      03-03-2023|  5| sprite|   40|
|          2|         5|       3|      01-06-2023|  5| sprite|   40|
|       null|      null|    null|            null|  7|redbull|  100|
|         11|        12|      76|      12-03-2023| 12|  mazza|   45|
|          5|        12|       5|      15-04-2023| 12|  mazza|   45|
|          7|        22|       4|      03-02-2023| 22|   coke|   27|
|          5|        22|       1|      22-03-2023| 22|   coke|   27|
|          1|        22|      10|      01-06-2022| 22|   coke|   27|
|       null|      null|    null|            null| 25|  limca|   21|
|          1|        27|       5| 

In [0]:
#left anti join
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"left_anti").show()

+-----------+-------------+-------+---------------+
|customer_id|customer_name|address|date_of_joining|
+-----------+-------------+-------+---------------+
|          3|       nikita|  delhi|     25-06-2023|
|          4|        rahul| ranchi|     24-03-2023|
|          6|     prantosh|kolkata|     18-10-2022|
|          8|      prakash| ranchi|     24-02-2023|
|         10|      raushan| jaipur|     05-02-2023|
+-----------+-------------+-------+---------------+



Window Function in Spark

In [0]:
emp_data = [(1,'manish',50000,'IT','m'),
(2,'vikash',60000,'sales','m'),
(3,'raushan',70000,'marketing','m'),
(4,'mukesh',80000,'IT','m'),
(5,'priti',90000,'sales','f'),
(6,'nikita',45000,'marketing','f'),
(7,'ragini',55000,'marketing','f'),
(8,'rashi',100000,'IT','f'),
(9,'aditya',65000,'IT','m'),
(10,'rahul',50000,'marketing','m'),
(11,'rakhi',50000,'IT','f'),
(12,'akhilesh',90000,'sales','m')]

schema1_dept=("id","name","salary","dept","gender")

In [0]:
emp_dept_df=spark.createDataFrame(data=emp_data,schema=schema1_dept)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
window_dept=Window.partitionBy("dept").orderBy("salary")

emp_dept_df.withColumn("rn",row_number().over(window_dept))\
            .withColumn("rk",rank().over(window_dept))\
                .withColumn("drn",dense_rank().over(window_dept))\
           .show(truncate=False)

+---+--------+------+---------+------+---+---+---+
|id |name    |salary|dept     |gender|rn |rk |drn|
+---+--------+------+---------+------+---+---+---+
|1  |manish  |50000 |IT       |m     |1  |1  |1  |
|11 |rakhi   |50000 |IT       |f     |2  |1  |1  |
|9  |aditya  |65000 |IT       |m     |3  |3  |2  |
|4  |mukesh  |80000 |IT       |m     |4  |4  |3  |
|8  |rashi   |100000|IT       |f     |5  |5  |4  |
|6  |nikita  |45000 |marketing|f     |1  |1  |1  |
|10 |rahul   |50000 |marketing|m     |2  |2  |2  |
|7  |ragini  |55000 |marketing|f     |3  |3  |3  |
|3  |raushan |70000 |marketing|m     |4  |4  |4  |
|2  |vikash  |60000 |sales    |m     |1  |1  |1  |
|5  |priti   |90000 |sales    |f     |2  |2  |2  |
|12 |akhilesh|90000 |sales    |m     |3  |2  |2  |
+---+--------+------+---------+------+---+---+---+



In [0]:
emp_dept_df.withColumn("drk",dense_rank().over(window_dept))\
            .filter(col("drk")<=2)\
                .show(truncate=False)

+---+--------+------+---------+------+---+
|id |name    |salary|dept     |gender|drk|
+---+--------+------+---------+------+---+
|1  |manish  |50000 |IT       |m     |1  |
|11 |rakhi   |50000 |IT       |f     |1  |
|9  |aditya  |65000 |IT       |m     |2  |
|6  |nikita  |45000 |marketing|f     |1  |
|10 |rahul   |50000 |marketing|m     |2  |
|2  |vikash  |60000 |sales    |m     |1  |
|5  |priti   |90000 |sales    |f     |2  |
|12 |akhilesh|90000 |sales    |m     |2  |
+---+--------+------+---------+------+---+



Lead and Lag

In [0]:
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

product_schema=("pid","product_name","sale_date","sales")


In [0]:
product_sale_df=spark.createDataFrame(data=product_data,schema=product_schema)

In [0]:
window_product=Window.partitionBy("pid").orderBy("sale_date")

last_month_df=product_sale_df.withColumn("perivous_month_sale",lag(col("sales"),1).over(window_product))

last_month_df.show()

+---+------------+----------+-------+-------------------+
|pid|product_name| sale_date|  sales|perivous_month_sale|
+---+------------+----------+-------+-------------------+
|  1|      iphone|01-01-2023|1500000|               null|
|  1|      iphone|01-02-2023|1300000|            1500000|
|  1|      iphone|01-03-2023|1600000|            1300000|
|  1|      iphone|01-04-2023|1700000|            1600000|
|  1|      iphone|01-05-2023|1200000|            1700000|
|  1|      iphone|01-06-2023|1100000|            1200000|
|  2|     samsung|01-01-2023|1100000|               null|
|  2|     samsung|01-02-2023|1120000|            1100000|
|  2|     samsung|01-03-2023|1080000|            1120000|
|  2|     samsung|01-04-2023|1800000|            1080000|
|  2|     samsung|01-05-2023| 980000|            1800000|
|  2|     samsung|01-06-2023|1100000|             980000|
|  3|     oneplus|01-01-2023|1100000|               null|
|  3|     oneplus|01-02-2023|1120000|            1100000|
|  3|     onep

In [0]:
last_month_df.withColumn("per_loss_or_gain",
                         ((col("sales")-col("perivous_month_sale"))/col("sales"))*100)\
                         .show()

+---+------------+----------+-------+-------------------+-------------------+
|pid|product_name| sale_date|  sales|perivous_month_sale|   per_loss_or_gain|
+---+------------+----------+-------+-------------------+-------------------+
|  1|      iphone|01-01-2023|1500000|               null|               null|
|  1|      iphone|01-02-2023|1300000|            1500000|-15.384615384615385|
|  1|      iphone|01-03-2023|1600000|            1300000|              18.75|
|  1|      iphone|01-04-2023|1700000|            1600000|   5.88235294117647|
|  1|      iphone|01-05-2023|1200000|            1700000| -41.66666666666667|
|  1|      iphone|01-06-2023|1100000|            1200000| -9.090909090909092|
|  2|     samsung|01-01-2023|1100000|               null|               null|
|  2|     samsung|01-02-2023|1120000|            1100000| 1.7857142857142856|
|  2|     samsung|01-03-2023|1080000|            1120000|-3.7037037037037033|
|  2|     samsung|01-04-2023|1800000|            1080000|       

First_value and Last_Value

In [0]:
product_sale_df.show()

+---+------------+----------+-------+
|pid|product_name| sale_date|  sales|
+---+------------+----------+-------+
|  1|      iphone|01-01-2023|1500000|
|  2|     samsung|01-01-2023|1100000|
|  3|     oneplus|01-01-2023|1100000|
|  1|      iphone|01-02-2023|1300000|
|  2|     samsung|01-02-2023|1120000|
|  3|     oneplus|01-02-2023|1120000|
|  1|      iphone|01-03-2023|1600000|
|  2|     samsung|01-03-2023|1080000|
|  3|     oneplus|01-03-2023|1160000|
|  1|      iphone|01-04-2023|1700000|
|  2|     samsung|01-04-2023|1800000|
|  3|     oneplus|01-04-2023|1170000|
|  1|      iphone|01-05-2023|1200000|
|  2|     samsung|01-05-2023| 980000|
|  3|     oneplus|01-05-2023|1175000|
|  1|      iphone|01-06-2023|1100000|
|  2|     samsung|01-06-2023|1100000|
|  3|     oneplus|01-06-2023|1200000|
+---+------------+----------+-------+



In [0]:
window_dept=Window.partitionBy("pid").orderBy("sale_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

product_sale_df.withColumn("first_value",first("sales").over(window_dept))\
    .withColumn("last_value",last("sales").over(window_dept))\
      .show()

+---+------------+----------+-------+-----------+----------+
|pid|product_name| sale_date|  sales|first_value|last_value|
+---+------------+----------+-------+-----------+----------+
|  1|      iphone|01-01-2023|1500000|    1500000|   1100000|
|  1|      iphone|01-02-2023|1300000|    1500000|   1100000|
|  1|      iphone|01-03-2023|1600000|    1500000|   1100000|
|  1|      iphone|01-04-2023|1700000|    1500000|   1100000|
|  1|      iphone|01-05-2023|1200000|    1500000|   1100000|
|  1|      iphone|01-06-2023|1100000|    1500000|   1100000|
|  2|     samsung|01-01-2023|1100000|    1100000|   1100000|
|  2|     samsung|01-02-2023|1120000|    1100000|   1100000|
|  2|     samsung|01-03-2023|1080000|    1100000|   1100000|
|  2|     samsung|01-04-2023|1800000|    1100000|   1100000|
|  2|     samsung|01-05-2023| 980000|    1100000|   1100000|
|  2|     samsung|01-06-2023|1100000|    1100000|   1100000|
|  3|     oneplus|01-01-2023|1100000|    1100000|   1200000|
|  3|     oneplus|01-02-

In [0]:
window_3months=Window.partitionBy("pid").orderBy("sale_date").rowsBetween(-2,0)

product_sale_df.withColumn("running_sum",sum("sales").over(window_3months)).show()

+---+------------+----------+-------+-----------+
|pid|product_name| sale_date|  sales|running_sum|
+---+------------+----------+-------+-----------+
|  1|      iphone|01-01-2023|1500000|    1500000|
|  1|      iphone|01-02-2023|1300000|    2800000|
|  1|      iphone|01-03-2023|1600000|    4400000|
|  1|      iphone|01-04-2023|1700000|    4600000|
|  1|      iphone|01-05-2023|1200000|    4500000|
|  1|      iphone|01-06-2023|1100000|    4000000|
|  2|     samsung|01-01-2023|1100000|    1100000|
|  2|     samsung|01-02-2023|1120000|    2220000|
|  2|     samsung|01-03-2023|1080000|    3300000|
|  2|     samsung|01-04-2023|1800000|    4000000|
|  2|     samsung|01-05-2023| 980000|    3860000|
|  2|     samsung|01-06-2023|1100000|    3880000|
|  3|     oneplus|01-01-2023|1100000|    1100000|
|  3|     oneplus|01-02-2023|1120000|    2220000|
|  3|     oneplus|01-03-2023|1160000|    3380000|
|  3|     oneplus|01-04-2023|1170000|    3450000|
|  3|     oneplus|01-05-2023|1175000|    3505000|
