In [0]:
spark


 1.Header:-Specifies whether the first line of the CSV file contains the column names. "true" or "false"
 2.InferSchema:-Automatically infers the data types of each column."true" or "false" if inferSchema is set to "true", Spark will attempt to determine the data type of each column (e.g., Integer, Double, String) based on the data.If inferSchema is set to "false", all columns will be read as StringType.
 3.Mode:-Determines the behavior of Spark when encountering corrupt or missing data."PERMISSIVE", "DROPMALFORMED", "FAILFAST"
 4.PERMISSIVE:- The default mode. When corrupt records are encountered, they are placed in a special column called _corrupt_record. Missing data is set to null.
 5.DROPMALFORMED:- Drops the entire row if any column is corrupt.
 FAILFAST:- Throws an exception immediately if any row is corrupt.


In [0]:
flight_df = spark.read.format("csv")\
    .option("header","false")\
    .option("inferschema","false")\
    .option("mode","FAILFAST")\
    .load("/FileStore/tables/2010_summary-3.csv")
flight_df.show(5)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_df = spark.read.format("csv")\
    .option("header","True")\
    .option("inferschema","false")\
    .option("mode","FAILFAST")\
    .load("/FileStore/tables/2010_summary-3.csv")
flight_df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

In [0]:
# manually schema created
# StructField: Defines a single field in the schema, including the field's name, data type, and nullability.
# StructType: Defines the overall schema as a collection of StructField objects, representing the structure of a DataFrame.
my_schema = StructType(
 [ 
   StructField("DEST_COUNTRY_NAME", StringType(), True),
   StructField("ORIGIN_COUNTRY_NAME",StringType(), True),
   StructField("count",IntegerType(), True)
   ])


In [0]:
   flight_df = spark.read.format("csv")\
               .option("header","True")\
               .option("inferschema","True")\
               .schema(my_schema)\
               .option("mode","PERMISSIVE")\
               .load("/FileStore/tables/2010_summary-3.csv")
   flight_df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
# how to handle corrupted data

employee_df = spark.read.format("csv")\
            .option("header","True")\
            .option("inferschema","True")\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/employee.csv")
employee_df.show(5)

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [0]:
emp_schema = StructType(
 [ 
   StructField("id", IntegerType(), True),
   StructField("name",StringType(), True),
   StructField("age",IntegerType(), True),
   StructField("salary",IntegerType(), True),
   StructField("address",StringType(), True),
   StructField("nominee",StringType(), True),
   StructField("_corrupt_record",StringType(), True)
   ])

In [0]:
employee_df =spark.read.format("csv")\
            .option("header","True")\
            .option("inferschema","True")\
            .schema(emp_schema)\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/employee.csv")
employee_df.show(truncate = False)

+---+--------+---+------+------------+--------+-------------------------------------------+
|id |name    |age|salary|address     |nominee |_corrupt_record                            |
+---+--------+---+------+------------+--------+-------------------------------------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null                                       |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null                                       |
|3  |Pritam  |22 |150000|Bangalore   |India   |3,Pritam,22,150000,Bangalore,India,nominee3|
|4  |Prantosh|17 |200000|Kolkata     |India   |4,Prantosh,17,200000,Kolkata,India,nominee4|
|5  |Vikash  |31 |300000|null        |nominee5|null                                       |
+---+--------+---+------+------------+--------+-------------------------------------------+



In [0]:
# store currupt data file at particular address

In [0]:
employee_df =spark.read.format("csv")\
            .option("header","True")\
            .option("inferschema","True")\
            .schema(emp_schema)\
            .option("badRecordsPath","/FileStore/tables/bad_records")\
            .load("/FileStore/tables/employee.csv")
employee_df.show(truncate= False)

+---+------+---+------+------------+--------+---------------+
|id |name  |age|salary|address     |nominee |_corrupt_record|
+---+------+---+------+------------+--------+---------------+
|1  |Manish|26 |75000 |bihar       |nominee1|null           |
|2  |Nikita|23 |100000|uttarpradesh|nominee2|null           |
|5  |Vikash|31 |300000|null        |nominee5|null           |
+---+------+---+------+------------+--------+---------------+



In [0]:
#Chheck how many files are there 
# bad records are stored in json format if you want check your records then read the json file

In [0]:
%fs
ls /FileStore/tables/bad_records

path,name,size,modificationTime
dbfs:/FileStore/tables/bad_records/20240616T180632/,20240616T180632/,0,0
dbfs:/FileStore/tables/bad_records/20240817T183100/,20240817T183100/,0,0
dbfs:/FileStore/tables/bad_records/20240825T065318/,20240825T065318/,0,0
dbfs:/FileStore/tables/bad_records/20240825T070843/,20240825T070843/,0,0
dbfs:/FileStore/tables/bad_records/20240826T080454/,20240826T080454/,0,0


In [0]:
%fs
ls /FileStore/tables/bad_records/20240616T180632/

path,name,size,modificationTime
dbfs:/FileStore/tables/bad_records/20240616T180632/bad_records/,bad_records/,0,0


In [0]:
%fs
ls /FileStore/tables/bad_records/20240616T180632/bad_records/

path,name,size,modificationTime
dbfs:/FileStore/tables/bad_records/20240616T180632/bad_records/part-00000-aa7af7f9-794e-416d-85d1-c037ecc2298a,part-00000-aa7af7f9-794e-416d-85d1-c037ecc2298a,484,1718561194000


In [0]:
bad_record_df = spark.read.format("json").load("/FileStore/tables/bad_records/20240616T180632/bad_records/")
bad_record_df.show(truncate = False)

+-----------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|path                               |reason                                                                                                                          |record                                     |
+-----------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|dbfs:/FileStore/tables/employee.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 3,Pritam,22,150000,Bangalore,India,nominee3|3,Pritam,22,150000,Bangalore,India,nominee3|
|dbfs:/FileStore/tables/employee.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 4,Prantosh,17,200000,Kolkata,India

In [0]:
 # write dataframe

df = spark.read.format("csv")\
    .option("header","True")\
    .option("inferschema","false")\
    .option("mode","PERMISSIVE")\
    .load("/FileStore/tables/csv_write-1.csv")
df.show(5)




+---+----------+--------+--------+----------+-----------+
| id|      name|     age|  salary|   address|     gender|
+---+----------+--------+--------+----------+-----------+
|  1|    Manish|      26|   75000|     INDIA|          m|
|  2|    Nikita|      23|  100000|       USA|          f|
|  3|    Pritam|      22|  150000|     INDIA|          m|
|  4|  Prantosh|      17|  200000|     JAPAN|          m|
|  5|    Vikash|      31|  300000|       USA|          m|
+---+----------+--------+--------+----------+-----------+
only showing top 5 rows



In [0]:
 df.write.format("csv")\
            .option("header","True")\
            .option("mode","overwrite")\
            .option("path","/FileStore/tables/csv_write_2/")\
            .save()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2014750164118794>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mcsv[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      2[0m [43m           [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mheader[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43mTrue[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      3[0m [43m           [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mmode[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43moverwrite[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      4[0m [43m 

In [0]:
dbutils.fs.ls("/FileStore/tables/csv_write_1/")



In [0]:
 df.repartition(3).write.format("csv")\
            .option("header","True")\
            .option("mode","overwrite")\
            .option("path","/FileStore/tables/csv_write_repartition1/")\
            .save()



In [0]:
dbutils.fs.ls("/FileStore/tables/csv_write_repartition1/")



In [0]:
#jon read
data = spark.read.format("json")\
    .option("inferschema","true")\
    .option("mode","PERMISSIVE")\
    .load("/FileStore/tables/sample.json")
data.show(5)



In [0]:
# partitionBy and Bucketing

df1 = spark.read.format("csv")\
    .option("header","True")\
    .option("inferschema","false")\
    .option("mode","PERMISSIVE")\
    .load("/FileStore/tables/csv_write-4.csv")
df1.show(5)



In [0]:
df1.write.format("csv")\
        .option("header","True")\
        .option("mode","overwrite")\
        .option("path","/FileStore/tables/partition_by_address2/")\
        .partitionBy("address")\
        .save()



In [0]:
dbutils.fs.ls("/FileStore/tables/partition_by_address2/")



In [0]:
df1.write.format("csv")\
        .option("header","True")\
        .option("mode","overwrite")\
        .option("path","/FileStore/tables/partition_by_address_gender1/")\
        .partitionBy("address","gender")\
        .save()




In [0]:
dbutils.fs.ls("/FileStore/tables/partition_by_address_gender1/")



In [0]:
dbutils.fs.ls("dbfs:/FileStore/tables/partition_by_address_gender1/address=INDIA/")



In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df1.write.format("csv")\
        .option("header","True")\
        .option("mode","overwrite")\
        .option("path","/FileStore/tables/bucket_by_id1/")\
        .bucketBy(3,"id")\
        .saveAsTable("bucket_by_id_table1")



In [0]:
dbutils.fs.ls("/FileStore/tables/bucket_by_id1/")



In [0]:
# create dataframe

my_data = [(1,  1),
(2,  1),
(3,  1),
(4,  2),
(5,  1),
(6,  2),
(7,  2)]



In [0]:
my_schema = ['id','num']



In [0]:
my_df = spark.createDataFrame(data = my_data, schema= my_schema)



In [0]:
my_df.show()



In [0]:
#dataframe transformation

 # write dataframe

employee_df = spark.read.format("csv")\
            .option("header","True")\
            .option("inferschema","True")\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/employee.csv")
employee_df.show(5)






In [0]:
employee_df.printSchema()



In [0]:
employee_df.select("name").show()



In [0]:
employee_df.select(col("name")).show()



In [0]:
employee_df.select("id +5").show()



In [0]:
employee_df.select(col("id") +5 ).show()



In [0]:
employee_df.select("id","name","age").show()



In [0]:
employee_df.select(col("id"),col("name"),col("age")).show()



In [0]:
employee_df.select("id",col("name"),employee_df["salary"],employee_df.address).show()



In [0]:
employee_df.select(expr("id as employee_id"),expr("name as employee_name"),expr("concat(name,address)")).show()



In [0]:
employee_df.select("*").show()



Spark SQL


TempView

In [0]:
employee_df.createOrReplaceTempView("employee_tbl")



In [0]:
spark.sql("""
select * from employee_tbl
""").show()



col function

In [0]:
employee_df.select(col("id").alias("employee_id"),"name","age").show()



filter

In [0]:
employee_df.filter(col("salary")>150000).show()



In [0]:
employee_df.where(col("salary")>150000).show()



In [0]:
employee_df.filter((col("salary")>150000) & (col("age")<18)).show()



lit function

In [0]:
employee_df.select("*",lit("kumar").alias("last_name")).show()



withColumn

In [0]:
employee_df.withColumn("sur_name", lit("singh")).show()



withColumnRenamed

In [0]:
employee_df.withColumnRenamed("id","employee_id").show()



In [0]:
employee_df.printSchema()



Change DataType

In [0]:
employee_df.withColumn("id",col("id").cast("string"))\
    .withColumn("salary",col("salary").cast("long"))\
    .printSchema()



drop

In [0]:
employee_df.drop("id",col("name")).show()



Spark Sql

In [0]:
spark.sql("""
          select * from employee_tbl where salary>150000 and age<18
          """).show()




In [0]:
spark.sql("""
          select *, "kumar" as last_name from employee_tbl where salary>150000 and age<18
          """).show()




In [0]:
spark.sql("""
          select *, "kumar" as last_name, concat(name,last_name ) as full_name from employee_tbl where salary>150000 and age<18
          """).show()




Union and Unionall and UnionByName

In [0]:
data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17)]

schema = ['id','Name','sal','manager_id']

manager_df = spark.createDataFrame(data=data,schema=schema)



In [0]:
manager_df.count()



In [0]:
data1=[(19 ,'Sohan',50000, 18),
(20 ,'Sima',75000,  17)]

schema1 = ['id','Name','sal','manager_id']

manager_df1 = spark.createDataFrame(data=data1,schema=schema1)



In [0]:
manager_df1.show()



In [0]:
manager_df.union(manager_df1).show()



In [0]:
manager_df.union(manager_df1).show()



In [0]:
duplicate_data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17),
(18 ,'Sam',65000,   17),
(18 ,'Sam',65000,   17)]
schema = ['id','Name','sal','manager_id']

duplicate_manager_df = spark.createDataFrame(data=duplicate_data,schema=schema)



In [0]:
duplicate_manager_df.show()



In [0]:
duplicate_manager_df.union(manager_df1).count()



In [0]:
duplicate_manager_df.unionAll(manager_df1).count()



In [0]:
manager_df1.createOrReplaceTempView("manager_df1_tbl")
duplicate_manager_df.createOrReplaceTempView("duplicate_manager_df_tbl")



In [0]:
spark.sql("""
select * from manager_df1_tbl
union
select * from duplicate_manager_df_tbl
""").count()



In [0]:
spark.sql("""
select * from manager_df1_tbl
union all
select * from duplicate_manager_df_tbl
""").count()



In [0]:

wrong_column_data=[(19 ,50000, 18,'Sohan'),
(20 ,75000,  17,'Sima')]

wrong_schema=['id','sal','manager_id','Name']

wrong_manager_df = spark.createDataFrame(data=wrong_column_data,schema=wrong_schema)



In [0]:
manager_df1.union(wrong_manager_df).show()



In [0]:
manager_df1.show()



In [0]:
wrong_manager_df.show()



In [0]:
manager_df1.unionByName(wrong_manager_df).show()



When and Otherwise

In [0]:
emp_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,None,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(None,None,None,None,None,None),
(7,'adam',37,65000,'us','IT')
]

schema = ['id','Name','age','salary','country','dept']

emp_df = spark.createDataFrame(data=emp_data,schema=schema)

In [0]:
emp_df.show()

+----+-------+----+------+-------+-----------+
|  id|   Name| age|salary|country|       dept|
+----+-------+----+------+-------+-----------+
|   1| manish|  26| 20000|  india|         IT|
|   2|  rahul|null| 40000|germany|engineering|
|   3|  pawan|  12| 60000|  india|      sales|
|   4|roshini|  44|  null|     uk|engineering|
|   5|raushan|  35| 70000|  india|      sales|
|   6|   null|  29|200000|     uk|         IT|
|   7|   adam|  37| 65000|     us|         IT|
|   8|  chris|  16| 40000|     us|      sales|
|null|   null|null|  null|   null|       null|
|   7|   adam|  37| 65000|     us|         IT|
+----+-------+----+------+-------+-----------+



In [0]:
emp_df.withColumn("adult",when(col("age")<18,"No")
                  .when(col("age")>18,"Yes")
                  .otherwise("Novalue")).show()

+----+-------+----+------+-------+-----------+-------+
|  id|   Name| age|salary|country|       dept|  adult|
+----+-------+----+------+-------+-----------+-------+
|   1| manish|  26| 20000|  india|         IT|    Yes|
|   2|  rahul|null| 40000|germany|engineering|Novalue|
|   3|  pawan|  12| 60000|  india|      sales|     No|
|   4|roshini|  44|  null|     uk|engineering|    Yes|
|   5|raushan|  35| 70000|  india|      sales|    Yes|
|   6|   null|  29|200000|     uk|         IT|    Yes|
|   7|   adam|  37| 65000|     us|         IT|    Yes|
|   8|  chris|  16| 40000|     us|      sales|     No|
|null|   null|null|  null|   null|       null|Novalue|
|   7|   adam|  37| 65000|     us|         IT|    Yes|
+----+-------+----+------+-------+-----------+-------+



In [0]:
emp_df.withColumn("age",when(col("age").isNull(),lit(19))
                  .otherwise(col("age")))\
    .withColumn("adult",when(col("age")>18,"Yes")
    .otherwise("No")).show()

+----+-------+---+------+-------+-----------+-----+
|  id|   Name|age|salary|country|       dept|adult|
+----+-------+---+------+-------+-----------+-----+
|   1| manish| 26| 20000|  india|         IT|  Yes|
|   2|  rahul| 19| 40000|germany|engineering|  Yes|
|   3|  pawan| 12| 60000|  india|      sales|   No|
|   4|roshini| 44|  null|     uk|engineering|  Yes|
|   5|raushan| 35| 70000|  india|      sales|  Yes|
|   6|   null| 29|200000|     uk|         IT|  Yes|
|   7|   adam| 37| 65000|     us|         IT|  Yes|
|   8|  chris| 16| 40000|     us|      sales|   No|
|null|   null| 19|  null|   null|       null|  Yes|
|   7|   adam| 37| 65000|     us|         IT|  Yes|
+----+-------+---+------+-------+-----------+-----+



In [0]:
emp_df.withColumn("age_wise",when((col("age")>0) & (col("age")<10),"Minor").when((col("age")>18) & (col("age")<30),"Mid").otherwise("major")).show()

+----+-------+----+------+-------+-----------+--------+
|  id|   Name| age|salary|country|       dept|age_wise|
+----+-------+----+------+-------+-----------+--------+
|   1| manish|  26| 20000|  india|         IT|     Mid|
|   2|  rahul|null| 40000|germany|engineering|   major|
|   3|  pawan|  12| 60000|  india|      sales|   major|
|   4|roshini|  44|  null|     uk|engineering|   major|
|   5|raushan|  35| 70000|  india|      sales|   major|
|   6|   null|  29|200000|     uk|         IT|     Mid|
|   7|   adam|  37| 65000|     us|         IT|   major|
|   8|  chris|  16| 40000|     us|      sales|   major|
|null|   null|null|  null|   null|       null|   major|
|   7|   adam|  37| 65000|     us|         IT|   major|
+----+-------+----+------+-------+-----------+--------+



In [0]:
emp_df.createOrReplaceTempView("table")

In [0]:
spark.sql("""
          select *,
          case when age<18 then 'minor'

          when age>18  then 'major'
          else 'novalue'
          end as 
          
          from table
          """).show()

+----+-------+----+------+-------+-----------+-------+
|  id|   Name| age|salary|country|       dept|     as|
+----+-------+----+------+-------+-----------+-------+
|   1| manish|  26| 20000|  india|         IT|  major|
|   2|  rahul|null| 40000|germany|engineering|novalue|
|   3|  pawan|  12| 60000|  india|      sales|  minor|
|   4|roshini|  44|  null|     uk|engineering|  major|
|   5|raushan|  35| 70000|  india|      sales|  major|
|   6|   null|  29|200000|     uk|         IT|  major|
|   7|   adam|  37| 65000|     us|         IT|  major|
|   8|  chris|  16| 40000|     us|      sales|  minor|
|null|   null|null|  null|   null|       null|novalue|
|   7|   adam|  37| 65000|     us|         IT|  major|
+----+-------+----+------+-------+-----------+-------+



In [0]:
data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17),
(15 ,'Mohit',45000,  18),
(13 ,'Nidhi',60000,  17),      
(14 ,'Priya',90000,  18),  
(18 ,'Sam',65000,   17)
     ]

schema=['id','Name','sal','mngr_id']

manager_df = spark.createDataFrame(data=data, schema=schema)

In [0]:
manager_df.show()

+---+------+-----+-------+
| id|  Name|  sal|mngr_id|
+---+------+-----+-------+
| 10|  Anil|50000|     18|
| 11| Vikas|75000|     16|
| 12| Nisha|40000|     18|
| 13| Nidhi|60000|     17|
| 14| Priya|80000|     18|
| 15| Mohit|45000|     18|
| 16|Rajesh|90000|     10|
| 17| Raman|55000|     16|
| 18|   Sam|65000|     17|
| 15| Mohit|45000|     18|
| 13| Nidhi|60000|     17|
| 14| Priya|90000|     18|
| 18|   Sam|65000|     17|
+---+------+-----+-------+



In [0]:
manager_df.distinct().show()

+---+------+-----+-------+
| id|  Name|  sal|mngr_id|
+---+------+-----+-------+
| 10|  Anil|50000|     18|
| 12| Nisha|40000|     18|
| 11| Vikas|75000|     16|
| 13| Nidhi|60000|     17|
| 15| Mohit|45000|     18|
| 14| Priya|80000|     18|
| 16|Rajesh|90000|     10|
| 17| Raman|55000|     16|
| 18|   Sam|65000|     17|
| 14| Priya|90000|     18|
+---+------+-----+-------+



In [0]:
manager_df.distinct().count()

Out[28]: 10

In [0]:
manager_df.count()

Out[30]: 13

In [0]:
manager_df.select("id","name").distinct().show()

+---+------+
| id|  name|
+---+------+
| 10|  Anil|
| 11| Vikas|
| 12| Nisha|
| 13| Nidhi|
| 15| Mohit|
| 14| Priya|
| 17| Raman|
| 16|Rajesh|
| 18|   Sam|
+---+------+



In [0]:
dropped_mngr_data= manager_df.drop_duplicates(['id','name','sal','mngr_id'])

In [0]:
dropped_mngr_data.show()

+---+------+-----+-------+
| id|  Name|  sal|mngr_id|
+---+------+-----+-------+
| 10|  Anil|50000|     18|
| 12| Nisha|40000|     18|
| 11| Vikas|75000|     16|
| 13| Nidhi|60000|     17|
| 15| Mohit|45000|     18|
| 14| Priya|80000|     18|
| 16|Rajesh|90000|     10|
| 17| Raman|55000|     16|
| 18|   Sam|65000|     17|
| 14| Priya|90000|     18|
+---+------+-----+-------+



In [0]:
manager_df.sort(col("sal")).show()

+---+------+-----+-------+
| id|  Name|  sal|mngr_id|
+---+------+-----+-------+
| 12| Nisha|40000|     18|
| 15| Mohit|45000|     18|
| 15| Mohit|45000|     18|
| 10|  Anil|50000|     18|
| 17| Raman|55000|     16|
| 13| Nidhi|60000|     17|
| 13| Nidhi|60000|     17|
| 18|   Sam|65000|     17|
| 18|   Sam|65000|     17|
| 11| Vikas|75000|     16|
| 14| Priya|80000|     18|
| 14| Priya|90000|     18|
| 16|Rajesh|90000|     10|
+---+------+-----+-------+



In [0]:
manager_df.sort(col("sal").desc()).show()

+---+------+-----+-------+
| id|  Name|  sal|mngr_id|
+---+------+-----+-------+
| 14| Priya|90000|     18|
| 16|Rajesh|90000|     10|
| 14| Priya|80000|     18|
| 11| Vikas|75000|     16|
| 18|   Sam|65000|     17|
| 18|   Sam|65000|     17|
| 13| Nidhi|60000|     17|
| 13| Nidhi|60000|     17|
| 17| Raman|55000|     16|
| 10|  Anil|50000|     18|
| 15| Mohit|45000|     18|
| 15| Mohit|45000|     18|
| 12| Nisha|40000|     18|
+---+------+-----+-------+



In [0]:
manager_df.sort(col("sal").desc(),col("name").desc()).show()

+---+------+-----+-------+
| id|  Name|  sal|mngr_id|
+---+------+-----+-------+
| 16|Rajesh|90000|     10|
| 14| Priya|90000|     18|
| 14| Priya|80000|     18|
| 11| Vikas|75000|     16|
| 18|   Sam|65000|     17|
| 18|   Sam|65000|     17|
| 13| Nidhi|60000|     17|
| 13| Nidhi|60000|     17|
| 17| Raman|55000|     16|
| 10|  Anil|50000|     18|
| 15| Mohit|45000|     18|
| 15| Mohit|45000|     18|
| 12| Nisha|40000|     18|
+---+------+-----+-------+



Aggregation

In [0]:
emp_df.count()

Out[7]: 10

In [0]:
emp_df.select(count("id")).show()

+---------+
|count(id)|
+---------+
|        9|
+---------+



In [0]:
emp_df.select(sum("salary").alias("total_salary"),max("salary"),min("salary")).show()

+------------+-----------+-----------+
|total_salary|max(salary)|min(salary)|
+------------+-----------+-----------+
|      560000|     200000|      20000|
+------------+-----------+-----------+



In [0]:
emp_df.select(sum("salary"),count("salary"),avg("salary").cast("int").alias("avg_salary")).show()

+-----------+-------------+----------+
|sum(salary)|count(salary)|avg_salary|
+-----------+-------------+----------+
|     560000|            8|     70000|
+-----------+-------------+----------+



In [0]:
emp_df.groupBy("dept")\
    .agg(sum("salary")).show()

+-----------+-----------+
|       dept|sum(salary)|
+-----------+-----------+
|         IT|     350000|
|engineering|      40000|
|      sales|     170000|
|       null|       null|
+-----------+-----------+



In [0]:
spark.sql("""
          select dept, sum("salary")
          from table
          group by dept
          """)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-179311729105774>:1[0m
[0;32m----> 1[0m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[38;5;124;43m"""[39;49m
[1;32m      2[0m [38;5;124;43m          select dept, sum([39;49m[38;5;124;43m"[39;49m[38;5;124;43msalary[39;49m[38;5;124;43m"[39;49m[38;5;124;43m)[39;49m
[1;32m      3[0m [38;5;124;43m          from table[39;49m
[1;32m      4[0m [38;5;124;43m          group by dept[39;49m
[1;32m      5[0m [38;5;124;43m          [39;49m[38;5;124;43m"""[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m   

Joins


In [0]:

customer_data = [(1,'manish','patna',"30-05-2022"),
(2,'vikash','kolkata',"12-03-2023"),
(3,'nikita','delhi',"25-06-2023"),
(4,'rahul','ranchi',"24-03-2023"),
(5,'mahesh','jaipur',"22-03-2023"),
(6,'prantosh','kolkata',"18-10-2022"),
(7,'raman','patna',"30-12-2022"),
(8,'prakash','ranchi',"24-02-2023"),
(9,'ragini','kolkata',"03-03-2023"),
(10,'raushan','jaipur',"05-02-2023")]

customer_schema=['customer_id','customer_name','address','date_of_joining']

customer_df = spark.createDataFrame(data = customer_data, schema = customer_schema)

In [0]:
customer_df.show()

+-----------+-------------+-------+---------------+
|customer_id|customer_name|address|date_of_joining|
+-----------+-------------+-------+---------------+
|          1|       manish|  patna|     30-05-2022|
|          2|       vikash|kolkata|     12-03-2023|
|          3|       nikita|  delhi|     25-06-2023|
|          4|        rahul| ranchi|     24-03-2023|
|          5|       mahesh| jaipur|     22-03-2023|
|          6|     prantosh|kolkata|     18-10-2022|
|          7|        raman|  patna|     30-12-2022|
|          8|      prakash| ranchi|     24-02-2023|
|          9|       ragini|kolkata|     03-03-2023|
|         10|      raushan| jaipur|     05-02-2023|
+-----------+-------------+-------+---------------+



In [0]:
sales_data = [(1,22,10,"01-06-2022"),
(1,27,5,"03-02-2023"),
(2,5,3,"01-06-2023"),
(5,22,1,"22-03-2023"),
(7,22,4,"03-02-2023"),
(9,5,6,"03-03-2023"),
(2,1,12,"15-06-2023"),
(1,56,2,"25-06-2023"),
(5,12,5,"15-04-2023"),
(11,12,76,"12-03-2023")]

sales_schema=['customer_id','product_id','quantity','date_of_purchase']

sales_df = spark.createDataFrame(data= sales_data, schema=sales_schema)

In [0]:
sales_df.show()

+-----------+----------+--------+----------------+
|customer_id|product_id|quantity|date_of_purchase|
+-----------+----------+--------+----------------+
|          1|        22|      10|      01-06-2022|
|          1|        27|       5|      03-02-2023|
|          2|         5|       3|      01-06-2023|
|          5|        22|       1|      22-03-2023|
|          7|        22|       4|      03-02-2023|
|          9|         5|       6|      03-03-2023|
|          2|         1|      12|      15-06-2023|
|          1|        56|       2|      25-06-2023|
|          5|        12|       5|      15-04-2023|
|         11|        12|      76|      12-03-2023|
+-----------+----------+--------+----------------+



In [0]:
product_data = [(1, 'fanta',20),
(2, 'dew',22),
(5, 'sprite',40),
(7, 'redbull',100),
(12,'mazza',45),
(22,'coke',27),
(25,'limca',21),
(27,'pepsi',14),
(56,'sting',10)]

product_schema=['id','name','price']

product_df = spark.createDataFrame(data=product_data,schema=product_schema)

In [0]:
product_df.show()

+---+-------+-----+
| id|   name|price|
+---+-------+-----+
|  1|  fanta|   20|
|  2|    dew|   22|
|  5| sprite|   40|
|  7|redbull|  100|
| 12|  mazza|   45|
| 22|   coke|   27|
| 25|  limca|   21|
| 27|  pepsi|   14|
| 56|  sting|   10|
+---+-------+-----+



In [0]:
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"inner").show()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          1|        56|       2|      25-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15-06-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        22|       1|      22-03-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        12|       5|      15

In [0]:
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"inner")\
    .select(sales_df["customer_id"]).show()

+-----------+
|customer_id|
+-----------+
|          1|
|          1|
|          1|
|          2|
|          2|
|          5|
|          5|
|          7|
|          9|
+-----------+



In [0]:
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"inner")\
    .select(sales_df["product_id"]).sort("product_id").show()

+----------+
|product_id|
+----------+
|         1|
|         5|
|         5|
|        12|
|        22|
|        22|
|        22|
|        27|
|        56|
+----------+



In [0]:
customer_df.join(sales_df,(sales_df["customer_id"]==customer_df["customer_id"])& (sales_df["product_id"]==customer_df["product_id"]),"inner")\
.select(sales_df["product_id"]).sort("product_id").show()

[0;36m  File [0;32m<command-3114273147345721>:1[0;36m[0m
[0;31m    customer_df.join(sales_df,(sales_df["customer_id"]==customer_df["customer_id"])& (sales_df["product_id"]==customer_df["product_id"]),"inner"))\[0m
[0m                                                                                                                                                 ^[0m
[0;31mSyntaxError[0m[0;31m:[0m unmatched ')'


In [0]:
sales_df.join(product_df,sales_df["product_id"]==product_df["id"],"right").show()

+-----------+----------+--------+----------------+---+-------+-----+
|customer_id|product_id|quantity|date_of_purchase| id|   name|price|
+-----------+----------+--------+----------------+---+-------+-----+
|          2|         1|      12|      15-06-2023|  1|  fanta|   20|
|       null|      null|    null|            null|  2|    dew|   22|
|          9|         5|       6|      03-03-2023|  5| sprite|   40|
|          2|         5|       3|      01-06-2023|  5| sprite|   40|
|       null|      null|    null|            null|  7|redbull|  100|
|         11|        12|      76|      12-03-2023| 12|  mazza|   45|
|          5|        12|       5|      15-04-2023| 12|  mazza|   45|
|          7|        22|       4|      03-02-2023| 22|   coke|   27|
|          5|        22|       1|      22-03-2023| 22|   coke|   27|
|          1|        22|      10|      01-06-2022| 22|   coke|   27|
|       null|      null|    null|            null| 25|  limca|   21|
|          1|        27|       5| 

In [0]:
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"outer").show()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          1|        56|       2|      25-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15-06-2023|
|          3|       nikita|  delhi|     25-06-2023|       null|      null|    null|            null|
|          4|        rahul| ranchi|     24-03-2023|       null|      null|    null|        

In [0]:
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"left_semi").show()

+-----------+-------------+-------+---------------+
|customer_id|customer_name|address|date_of_joining|
+-----------+-------------+-------+---------------+
|          1|       manish|  patna|     30-05-2022|
|          2|       vikash|kolkata|     12-03-2023|
|          5|       mahesh| jaipur|     22-03-2023|
|          7|        raman|  patna|     30-12-2022|
|          9|       ragini|kolkata|     03-03-2023|
+-----------+-------------+-------+---------------+



In [0]:
customer_df.join(sales_df,sales_df["customer_id"]==customer_df["customer_id"],"left_anti").show()

+-----------+-------------+-------+---------------+
|customer_id|customer_name|address|date_of_joining|
+-----------+-------------+-------+---------------+
|          3|       nikita|  delhi|     25-06-2023|
|          4|        rahul| ranchi|     24-03-2023|
|          6|     prantosh|kolkata|     18-10-2022|
|          8|      prakash| ranchi|     24-02-2023|
|         10|      raushan| jaipur|     05-02-2023|
+-----------+-------------+-------+---------------+



In [0]:
customer_df.crossJoin(sales_df).show()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          2|         5|       3|      01-06-2023|
|          1|       manish|  patna|     30-05-2022|          5|        22|       1|      22-03-2023|
|          1|       manish|  patna|     30-05-2022|          7|        22|       4|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          9|         5|       6|      03-03-2023|
|          1|       manish|  patna|     30-05-2022|          2|         1|      12|      15

In [0]:
customer_df.crossJoin(sales_df).count()

Out[40]: 100

In [0]:
emp_data = [(1,'manish',50000,'IT','m'),
(2,'vikash',60000,'sales','m'),
(3,'raushan',70000,'marketing','m'),
(4,'mukesh',80000,'IT','m'),
(5,'priti',90000,'sales','f'),
(6,'nikita',45000,'marketing','f'),
(7,'ragini',55000,'marketing','f'),
(8,'rashi',100000,'IT','f'),
(9,'aditya',65000,'IT','m'),
(10,'rahul',50000,'marketing','m'),
(11,'rakhi',50000,'IT','f'),
(12,'akhilesh',90000,'sales','m')]

emp_schema= ['id','name','salary','dept','gender']

emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)
emp_df = emp_df.select('id','name','salary','dept','gender')
emp_df.show()

+---+--------+------+---------+------+
| id|    name|salary|     dept|gender|
+---+--------+------+---------+------+
|  1|  manish| 50000|       IT|     m|
|  2|  vikash| 60000|    sales|     m|
|  3| raushan| 70000|marketing|     m|
|  4|  mukesh| 80000|       IT|     m|
|  5|   priti| 90000|    sales|     f|
|  6|  nikita| 45000|marketing|     f|
|  7|  ragini| 55000|marketing|     f|
|  8|   rashi|100000|       IT|     f|
|  9|  aditya| 65000|       IT|     m|
| 10|   rahul| 50000|marketing|     m|
| 11|   rakhi| 50000|       IT|     f|
| 12|akhilesh| 90000|    sales|     m|
+---+--------+------+---------+------+



Window Function

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


In [0]:
from pyspark.sql.functions import sum, col

emp_df.groupBy("dept").agg(sum(col("salary").cast("long"))).show()


+---------+---------------------------+
|     dept|sum(CAST(salary AS BIGINT))|
+---------+---------------------------+
|       IT|                     345000|
|marketing|                     220000|
|    sales|                     240000|
+---------+---------------------------+



In [0]:
from pyspark.sql.window import Window

window = Window.partitionBy("dept").orderBy("salary")

emp_df.withColumn("row_number",row_number().over(window))\
  .withColumn("Rank",rank().over(window))\
  .withColumn("Dense_Rank",dense_rank().over(window))\
  .show(truncate=False)

+---+--------+------+---------+------+----------+----+----------+
|id |name    |salary|dept     |gender|row_number|Rank|Dense_Rank|
+---+--------+------+---------+------+----------+----+----------+
|1  |manish  |50000 |IT       |m     |1         |1   |1         |
|11 |rakhi   |50000 |IT       |f     |2         |1   |1         |
|9  |aditya  |65000 |IT       |m     |3         |3   |2         |
|4  |mukesh  |80000 |IT       |m     |4         |4   |3         |
|8  |rashi   |100000|IT       |f     |5         |5   |4         |
|6  |nikita  |45000 |marketing|f     |1         |1   |1         |
|10 |rahul   |50000 |marketing|m     |2         |2   |2         |
|7  |ragini  |55000 |marketing|f     |3         |3   |3         |
|3  |raushan |70000 |marketing|m     |4         |4   |4         |
|2  |vikash  |60000 |sales    |m     |1         |1   |1         |
|5  |priti   |90000 |sales    |f     |2         |2   |2         |
|12 |akhilesh|90000 |sales    |m     |3         |2   |2         |
+---+-----

In [0]:
window = Window.partitionBy("dept","gender").orderBy("salary")

emp_df.withColumn("row_number",row_number().over(window))\
  .withColumn("Rank",rank().over(window))\
  .withColumn("Dense_Rank",dense_rank().over(window))\
  .show(truncate=False)

+---+--------+------+---------+------+----------+----+----------+
|id |name    |salary|dept     |gender|row_number|Rank|Dense_Rank|
+---+--------+------+---------+------+----------+----+----------+
|11 |rakhi   |50000 |IT       |f     |1         |1   |1         |
|8  |rashi   |100000|IT       |f     |2         |2   |2         |
|1  |manish  |50000 |IT       |m     |1         |1   |1         |
|9  |aditya  |65000 |IT       |m     |2         |2   |2         |
|4  |mukesh  |80000 |IT       |m     |3         |3   |3         |
|6  |nikita  |45000 |marketing|f     |1         |1   |1         |
|7  |ragini  |55000 |marketing|f     |2         |2   |2         |
|10 |rahul   |50000 |marketing|m     |1         |1   |1         |
|3  |raushan |70000 |marketing|m     |2         |2   |2         |
|5  |priti   |90000 |sales    |f     |1         |1   |1         |
|2  |vikash  |60000 |sales    |m     |1         |1   |1         |
|12 |akhilesh|90000 |sales    |m     |2         |2   |2         |
+---+-----

In [0]:
window = Window.partitionBy("dept","gender").orderBy("salary")

emp_df.withColumn("row_number",row_number().over(window))\
  .withColumn("Rank",rank().over(window))\
  .withColumn("Dense_Rank",dense_rank().over(window))\
      .filter(col("Dense_Rank")<=2)\
  .show(truncate=False)
  

+---+--------+------+---------+------+----------+----+----------+
|id |name    |salary|dept     |gender|row_number|Rank|Dense_Rank|
+---+--------+------+---------+------+----------+----+----------+
|11 |rakhi   |50000 |IT       |f     |1         |1   |1         |
|8  |rashi   |100000|IT       |f     |2         |2   |2         |
|1  |manish  |50000 |IT       |m     |1         |1   |1         |
|9  |aditya  |65000 |IT       |m     |2         |2   |2         |
|6  |nikita  |45000 |marketing|f     |1         |1   |1         |
|7  |ragini  |55000 |marketing|f     |2         |2   |2         |
|10 |rahul   |50000 |marketing|m     |1         |1   |1         |
|3  |raushan |70000 |marketing|m     |2         |2   |2         |
|5  |priti   |90000 |sales    |f     |1         |1   |1         |
|2  |vikash  |60000 |sales    |m     |1         |1   |1         |
|12 |akhilesh|90000 |sales    |m     |2         |2   |2         |
+---+--------+------+---------+------+----------+----+----------+



In [0]:
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

product_schema= ["product_id","product_name","sales_date","sales"]
product_df = spark.createDataFrame(data=product_data, schema=product_schema)

In [0]:
product_df.show()

+----------+------------+----------+-------+
|product_id|product_name|sales_date|  sales|
+----------+------------+----------+-------+
|         1|      iphone|01-01-2023|1500000|
|         2|     samsung|01-01-2023|1100000|
|         3|     oneplus|01-01-2023|1100000|
|         1|      iphone|01-02-2023|1300000|
|         2|     samsung|01-02-2023|1120000|
|         3|     oneplus|01-02-2023|1120000|
|         1|      iphone|01-03-2023|1600000|
|         2|     samsung|01-03-2023|1080000|
|         3|     oneplus|01-03-2023|1160000|
|         1|      iphone|01-04-2023|1700000|
|         2|     samsung|01-04-2023|1800000|
|         3|     oneplus|01-04-2023|1170000|
|         1|      iphone|01-05-2023|1200000|
|         2|     samsung|01-05-2023| 980000|
|         3|     oneplus|01-05-2023|1175000|
|         1|      iphone|01-06-2023|1100000|
|         2|     samsung|01-06-2023|1100000|
|         3|     oneplus|01-06-2023|1200000|
+----------+------------+----------+-------+



In [0]:
# Define the window specification
window = Window.partitionBy("product_id").orderBy("sales_date")

# Create a new column with the sales value of the previous month
last_month_df = product_df.withColumn("previous_month_sales", lag(col("sales"), 1).over(window))

# Show the DataFrame with the new column
last_month_df.show()

+----------+------------+----------+-------+--------------------+
|product_id|product_name|sales_date|  sales|previous_month_sales|
+----------+------------+----------+-------+--------------------+
|         1|      iphone|01-01-2023|1500000|                null|
|         1|      iphone|01-02-2023|1300000|             1500000|
|         1|      iphone|01-03-2023|1600000|             1300000|
|         1|      iphone|01-04-2023|1700000|             1600000|
|         1|      iphone|01-05-2023|1200000|             1700000|
|         1|      iphone|01-06-2023|1100000|             1200000|
|         2|     samsung|01-01-2023|1100000|                null|
|         2|     samsung|01-02-2023|1120000|             1100000|
|         2|     samsung|01-03-2023|1080000|             1120000|
|         2|     samsung|01-04-2023|1800000|             1080000|
|         2|     samsung|01-05-2023| 980000|             1800000|
|         2|     samsung|01-06-2023|1100000|              980000|
|         

In [0]:
#what is the percentage of loss or gain based on previous month sales?
last_month_df.withColumn("per_loss_gain", 
                         round(((col("sales")-col("previous_month_sales"))/col("sales"))*100,2))\
                        .show()

+----------+------------+----------+-------+--------------------+-------------+
|product_id|product_name|sales_date|  sales|previous_month_sales|per_loss_gain|
+----------+------------+----------+-------+--------------------+-------------+
|         1|      iphone|01-01-2023|1500000|                null|         null|
|         1|      iphone|01-02-2023|1300000|             1500000|       -15.38|
|         1|      iphone|01-03-2023|1600000|             1300000|        18.75|
|         1|      iphone|01-04-2023|1700000|             1600000|         5.88|
|         1|      iphone|01-05-2023|1200000|             1700000|       -41.67|
|         1|      iphone|01-06-2023|1100000|             1200000|        -9.09|
|         2|     samsung|01-01-2023|1100000|                null|         null|
|         2|     samsung|01-02-2023|1120000|             1100000|         1.79|
|         2|     samsung|01-03-2023|1080000|             1120000|         -3.7|
|         2|     samsung|01-04-2023|1800

In [0]:
#what is the percentage of sales each month based on last 9 month sales.
window = Window.partitionBy("product_id")

last_six_month_df = product_df.withColumn("previous_six_month_total_sales", sum("sales").over(window))\
.withColumn("perc_sales_each_month", round((col("sales")/col("previous_six_month_total_sales")) * 100, 2) ).show()

+----------+------------+----------+-------+------------------------------+---------------------+
|product_id|product_name|sales_date|  sales|previous_six_month_total_sales|perc_sales_each_month|
+----------+------------+----------+-------+------------------------------+---------------------+
|         1|      iphone|01-01-2023|1500000|                       8400000|                17.86|
|         1|      iphone|01-02-2023|1300000|                       8400000|                15.48|
|         1|      iphone|01-03-2023|1600000|                       8400000|                19.05|
|         1|      iphone|01-04-2023|1700000|                       8400000|                20.24|
|         1|      iphone|01-05-2023|1200000|                       8400000|                14.29|
|         1|      iphone|01-06-2023|1100000|                       8400000|                 13.1|
|         2|     samsung|01-01-2023|1100000|                       7180000|                15.32|
|         2|     sam

In [0]:
# How can I compare the current month's sales with the next month's sales for each product.


# Create a window specification
windowSpec = Window.partitionBy("product_id").orderBy("sales_date")

# Apply lead function to get the next sales value
product_df = product_df.withColumn("next_sales", lead("sales", 1).over(windowSpec))

# Show the result
product_df.show()

+----------+------------+----------+-------+----------+
|product_id|product_name|sales_date|  sales|next_sales|
+----------+------------+----------+-------+----------+
|         1|      iphone|01-01-2023|1500000|   1300000|
|         1|      iphone|01-02-2023|1300000|   1600000|
|         1|      iphone|01-03-2023|1600000|   1700000|
|         1|      iphone|01-04-2023|1700000|   1200000|
|         1|      iphone|01-05-2023|1200000|   1100000|
|         1|      iphone|01-06-2023|1100000|      null|
|         2|     samsung|01-01-2023|1100000|   1120000|
|         2|     samsung|01-02-2023|1120000|   1080000|
|         2|     samsung|01-03-2023|1080000|   1800000|
|         2|     samsung|01-04-2023|1800000|    980000|
|         2|     samsung|01-05-2023| 980000|   1100000|
|         2|     samsung|01-06-2023|1100000|      null|
|         3|     oneplus|01-01-2023|1100000|   1120000|
|         3|     oneplus|01-02-2023|1120000|   1160000|
|         3|     oneplus|01-03-2023|1160000|   1

In [0]:
product_data = [
(2,"samsung","01-01-1995",11000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-01-2006",15000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(3,"oneplus","01-01-2010",23000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

product_schema=["product_id","product_name","sales_date","sales"]

product_df2 = spark.createDataFrame(data=product_data,schema=product_schema)

product_df2.show()


+----------+------------+----------+-------+
|product_id|product_name|sales_date|  sales|
+----------+------------+----------+-------+
|         2|     samsung|01-01-1995|  11000|
|         1|      iphone|01-02-2023|1300000|
|         2|     samsung|01-02-2023|1120000|
|         3|     oneplus|01-02-2023|1120000|
|         1|      iphone|01-03-2023|1600000|
|         2|     samsung|01-03-2023|1080000|
|         3|     oneplus|01-03-2023|1160000|
|         1|      iphone|01-01-2006|  15000|
|         1|      iphone|01-04-2023|1700000|
|         2|     samsung|01-04-2023|1800000|
|         3|     oneplus|01-04-2023|1170000|
|         1|      iphone|01-05-2023|1200000|
|         2|     samsung|01-05-2023| 980000|
|         3|     oneplus|01-05-2023|1175000|
|         1|      iphone|01-06-2023|1100000|
|         3|     oneplus|01-01-2010|  23000|
|         2|     samsung|01-06-2023|1100000|
|         3|     oneplus|01-06-2023|1200000|
+----------+------------+----------+-------+



In [0]:
window= Window.partitionBy("product_id").orderBy("sales_date")

In [0]:
# find out the difference in sales of each product from their first of each product from their first month sales to latest sales?
 product_df2.withColumn("first_sales",first("sales").over(window))\
    .withColumn("latest_sales",last("sales").over(window)).show()

+----------+------------+----------+-------+-----------+------------+
|product_id|product_name|sales_date|  sales|first_sales|latest_sales|
+----------+------------+----------+-------+-----------+------------+
|         1|      iphone|01-01-2006|  15000|      15000|       15000|
|         1|      iphone|01-02-2023|1300000|      15000|     1300000|
|         1|      iphone|01-03-2023|1600000|      15000|     1600000|
|         1|      iphone|01-04-2023|1700000|      15000|     1700000|
|         1|      iphone|01-05-2023|1200000|      15000|     1200000|
|         1|      iphone|01-06-2023|1100000|      15000|     1100000|
|         2|     samsung|01-01-1995|  11000|      11000|       11000|
|         2|     samsung|01-02-2023|1120000|      11000|     1120000|
|         2|     samsung|01-03-2023|1080000|      11000|     1080000|
|         2|     samsung|01-04-2023|1800000|      11000|     1800000|
|         2|     samsung|01-05-2023| 980000|      11000|      980000|
|         2|     sam

In [0]:
window= Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

In [0]:
product_df2.withColumn("first_sales",first("sales").over(window))\
    .withColumn("latest_sales",last("sales").over(window)).show()

+----------+------------+----------+-------+-----------+------------+
|product_id|product_name|sales_date|  sales|first_sales|latest_sales|
+----------+------------+----------+-------+-----------+------------+
|         1|      iphone|01-01-2006|  15000|      15000|     1100000|
|         1|      iphone|01-02-2023|1300000|      15000|     1100000|
|         1|      iphone|01-03-2023|1600000|      15000|     1100000|
|         1|      iphone|01-04-2023|1700000|      15000|     1100000|
|         1|      iphone|01-05-2023|1200000|      15000|     1100000|
|         1|      iphone|01-06-2023|1100000|      15000|     1100000|
|         2|     samsung|01-01-1995|  11000|      11000|     1100000|
|         2|     samsung|01-02-2023|1120000|      11000|     1100000|
|         2|     samsung|01-03-2023|1080000|      11000|     1100000|
|         2|     samsung|01-04-2023|1800000|      11000|     1100000|
|         2|     samsung|01-05-2023| 980000|      11000|     1100000|
|         2|     sam

In [0]:
emp_data = [(1,"manish","11-07-2023","10:20"),
        (1,"manish","11-07-2023","11:20"),
        (2,"rajesh","11-07-2023","11:20"),
        (1,"manish","11-07-2023","11:50"),
        (2,"rajesh","11-07-2023","13:20"),
        (1,"manish","11-07-2023","19:20"),
        (2,"rajesh","11-07-2023","17:20"),
        (1,"manish","12-07-2023","10:32"),
        (1,"manish","12-07-2023","12:20"),
        (3,"vikash","12-07-2023","09:12"),
        (1,"manish","12-07-2023","16:23"),
        (3,"vikash","12-07-2023","18:08")]

emp_schema = ["id", "name", "date", "time"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)

emp_df.show()

+---+------+----------+-----+
| id|  name|      date| time|
+---+------+----------+-----+
|  1|manish|11-07-2023|10:20|
|  1|manish|11-07-2023|11:20|
|  2|rajesh|11-07-2023|11:20|
|  1|manish|11-07-2023|11:50|
|  2|rajesh|11-07-2023|13:20|
|  1|manish|11-07-2023|19:20|
|  2|rajesh|11-07-2023|17:20|
|  1|manish|12-07-2023|10:32|
|  1|manish|12-07-2023|12:20|
|  3|vikash|12-07-2023|09:12|
|  1|manish|12-07-2023|16:23|
|  3|vikash|12-07-2023|18:08|
+---+------+----------+-----+



In [0]:
# send a mail to employee who all have not completed 8 hours in office when they come to office?
emp_df = emp_df.withColumn("timestamp",
                           from_unixtime(unix_timestamp(expr("concat(date,' ',time)"),"dd-MM-yyyy HH:mm")))

In [0]:
emp_df.show()

+---+------+----------+-----+-------------------+
| id|  name|      date| time|          timestamp|
+---+------+----------+-----+-------------------+
|  1|manish|11-07-2023|10:20|2023-07-11 10:20:00|
|  1|manish|11-07-2023|11:20|2023-07-11 11:20:00|
|  2|rajesh|11-07-2023|11:20|2023-07-11 11:20:00|
|  1|manish|11-07-2023|11:50|2023-07-11 11:50:00|
|  2|rajesh|11-07-2023|13:20|2023-07-11 13:20:00|
|  1|manish|11-07-2023|19:20|2023-07-11 19:20:00|
|  2|rajesh|11-07-2023|17:20|2023-07-11 17:20:00|
|  1|manish|12-07-2023|10:32|2023-07-12 10:32:00|
|  1|manish|12-07-2023|12:20|2023-07-12 12:20:00|
|  3|vikash|12-07-2023|09:12|2023-07-12 09:12:00|
|  1|manish|12-07-2023|16:23|2023-07-12 16:23:00|
|  3|vikash|12-07-2023|18:08|2023-07-12 18:08:00|
+---+------+----------+-----+-------------------+



In [0]:
window = Window.partitionBy("id","date").orderBy("date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

In [0]:
new_df = emp_df.withColumn("login", first("timestamp").over(window)) \
    .withColumn("logout", last("timestamp").over(window)) \
    .withColumn("login", to_timestamp("login", "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("logout", to_timestamp("logout", "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("total_time", col("logout") - col("login")).show()

+---+------+----------+-----+-------------------+-------------------+-------------------+--------------------+
| id|  name|      date| time|          timestamp|              login|             logout|          total_time|
+---+------+----------+-----+-------------------+-------------------+-------------------+--------------------+
|  1|manish|11-07-2023|10:20|2023-07-11 10:20:00|2023-07-11 10:20:00|2023-07-11 19:20:00|INTERVAL '0 09:00...|
|  1|manish|11-07-2023|11:20|2023-07-11 11:20:00|2023-07-11 10:20:00|2023-07-11 19:20:00|INTERVAL '0 09:00...|
|  1|manish|11-07-2023|11:50|2023-07-11 11:50:00|2023-07-11 10:20:00|2023-07-11 19:20:00|INTERVAL '0 09:00...|
|  1|manish|11-07-2023|19:20|2023-07-11 19:20:00|2023-07-11 10:20:00|2023-07-11 19:20:00|INTERVAL '0 09:00...|
|  1|manish|12-07-2023|10:32|2023-07-12 10:32:00|2023-07-12 10:32:00|2023-07-12 16:23:00|INTERVAL '0 05:51...|
|  1|manish|12-07-2023|12:20|2023-07-12 12:20:00|2023-07-12 10:32:00|2023-07-12 16:23:00|INTERVAL '0 05:51...|
|