# read_csv_data

In [0]:
flight_df = spark.read.format("csv")\
            .option("header","false")\
            .option("inferschema","false")\
            .option("mode","FAILFAST")\
            .load("/FileStore/tables/2010_summary.csv")

flight_df.show(10)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
|    United States|          Singapore|   25|
|    United States|            Grenada|   54|
|       Costa Rica|      United States|  477|
|          Senegal|      United States|   29|
+-----------------+-------------------+-----+
only showing top 10 rows



In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType
schema = StructType([
    StructField('Destination',StringType(),True),
    StructField('Origin',StringType(),True),
    StructField('Count',IntegerType(),True)
])

# flight_df_2 = spark.read.format("csv")\
#             .option("header","false")\
#             .option("inferschema","false")\
#             .schema(schema)\
#             .option("mode","FAILFAST")\
#             .load("/FileStore/tables/2010_summary.csv")

# Above code fails because first line of the file is a hearder which contain the name of the column. 
# If we want to run the code so 
    # either chanege the header option to true which will not read the first line 
    # OR change the mode to PERMISSIVE so the even if there are malformed data, then also let the dataframe create.  
    # OR use skiprow option to skip the n first line of the data  


flight_df_2 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","false")\
            .schema(schema)\
            .option("mode","FAILFAST")\
            .load("/FileStore/tables/2010_summary.csv")

flight_df_2.show(10)

+-----------------+----------------+-----+
|      Destination|          Origin|Count|
+-----------------+----------------+-----+
|    United States|         Romania|    1|
|    United States|         Ireland|  264|
|    United States|           India|   69|
|            Egypt|   United States|   24|
|Equatorial Guinea|   United States|    1|
|    United States|       Singapore|   25|
|    United States|         Grenada|   54|
|       Costa Rica|   United States|  477|
|          Senegal|   United States|   29|
|    United States|Marshall Islands|   44|
+-----------------+----------------+-----+
only showing top 10 rows



# Handling Corrupt records


In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

empSchemaWithCorruptRecord = StructType([
    StructField('Id',IntegerType(),True),
    StructField('Name',StringType(),True),
    StructField('Age',IntegerType(),True),
    StructField('Salary',IntegerType(),True),
    StructField('Address',StringType(),True),
    StructField('Nominee',StringType(),True),
    StructField('_corrupt_record',StringType(),True)
])

First let's see the different mode in which we can read the records

In [0]:

emp_df_1 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","FAILFAST")\
            .load("/FileStore/tables/emp_data.csv")

emp_df_1.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1989740164521342>:7[0m
[1;32m      1[0m emp_df_1 [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)\
[1;32m      2[0m             [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      3[0m             [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferschema[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      4[0m             [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"[39m)\
[1;32m      5[0m             [38;5;241m.[39mload([38;5;124m"[39m[38;5;124m/FileStore/tables/emp_data.csv[39

In [0]:

emp_df_2 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/emp_data.csv")

emp_df_2.show()

+---+--------+---+------+---------+--------+
| id|    name|age|salary|  address| nominee|
+---+--------+---+------+---------+--------+
|  1|  Manish| 26| 75000|    Bihar|nominee1|
|  2|  Nikita| 23|100000|       UP|nominee2|
|  3|  Pritam| 22|150000|Bangalore|   India|
|  4|Prantosh| 17|200000|  Kolkata|   India|
|  5|  Vikash| 31|300000|     null|nominee5|
+---+--------+---+------+---------+--------+



In [0]:

emp_df_3 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","DROPMALFORMED")\
            .load("/FileStore/tables/emp_data.csv")

emp_df_3.show()

+---+------+---+------+-------+--------+
| id|  name|age|salary|address| nominee|
+---+------+---+------+-------+--------+
|  1|Manish| 26| 75000|  Bihar|nominee1|
|  2|Nikita| 23|100000|     UP|nominee2|
|  5|Vikash| 31|300000|   null|nominee5|
+---+------+---+------+-------+--------+



Write the currupt data into a different column.

In [0]:

# For this we need to a special column called '_currupt_record' in the schema and its type should be the string type.  

emp_df_4 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .schema(empSchemaWithCorruptRecord)\
            .load("/FileStore/tables/emp_data.csv")

emp_df_4.show()

+---+--------+---+------+---------+--------+--------------------+
| Id|    Name|Age|Salary|  Address| Nominee|     _corrupt_record|
+---+--------+---+------+---------+--------+--------------------+
|  1|  Manish| 26| 75000|    Bihar|nominee1|                null|
|  2|  Nikita| 23|100000|       UP|nominee2|                null|
|  3|  Pritam| 22|150000|Bangalore|   India|3,Pritam,22,15000...|
|  4|Prantosh| 17|200000|  Kolkata|   India|4,Prantosh,17,200...|
|  5|  Vikash| 31|300000|     null|nominee5|                null|
+---+--------+---+------+---------+--------+--------------------+



Save the currupt data into a location. 

In [0]:

# For this we need to a special column called '_currupt_record' in the schema and its type should be the string type and we pass one more option called 'badRecordsPath' along with the path where we want to store the records. 
# REMEMBER that we don't specify the mode when we save the records. 

emp_df_5 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option('badRecordsPath',"/FileStore/tables/bad_records")\
            .schema(empSchemaWithCorruptRecord)\
            .load("/FileStore/tables/emp_data.csv")

emp_df_5.show()

+---+------+---+------+-------+--------+---------------+
| Id|  Name|Age|Salary|Address| Nominee|_corrupt_record|
+---+------+---+------+-------+--------+---------------+
|  1|Manish| 26| 75000|  Bihar|nominee1|           null|
|  2|Nikita| 23|100000|     UP|nominee2|           null|
|  5|Vikash| 31|300000|   null|nominee5|           null|
+---+------+---+------+-------+--------+---------------+



In [0]:
%fs
ls /FileStore/tables/bad_records/20250129T165623/bad_records/part-00000-e7ddbfee-0dc2-4266-9b67-cfba8ed64918

path,name,size,modificationTime
dbfs:/FileStore/tables/bad_records/20250129T165623/bad_records/part-00000-e7ddbfee-0dc2-4266-9b67-cfba8ed64918,part-00000-e7ddbfee-0dc2-4266-9b67-cfba8ed64918,484,1738169786000


In [0]:
currupt_data = spark.read.json("/FileStore/tables/bad_records/20250129T165623/bad_records/part-00000-e7ddbfee-0dc2-4266-9b67-cfba8ed64918")
currupt_data.show(truncate=False)

+-----------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|path                               |reason                                                                                                                          |record                                     |
+-----------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|dbfs:/FileStore/tables/emp_data.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 3,Pritam,22,150000,Bangalore,India,nominee3|3,Pritam,22,150000,Bangalore,India,nominee3|
|dbfs:/FileStore/tables/emp_data.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 4,Prantosh,17,200000,Kolkata,India

# Read JSON File

In [0]:
%fs
ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121,1727404507000
dbfs:/FileStore/tables/Multi_line_correct.json,Multi_line_correct.json,310,1738255715000
dbfs:/FileStore/tables/Multi_line_incorrect.json,Multi_line_incorrect.json,274,1738255715000
dbfs:/FileStore/tables/bad_records/,bad_records/,0,0
dbfs:/FileStore/tables/corrupted.json,corrupted.json,218,1738254523000
dbfs:/FileStore/tables/emp_data.csv,emp_data.csv,222,1738169182000
dbfs:/FileStore/tables/line_delimited.json,line_delimited.json,244,1738254522000
dbfs:/FileStore/tables/single_file.json,single_file.json,232,1738254523000


line delimited json means that there would be only one record in a single line. Spark is good in reading line delimited json. In multi-line json, spark have to consider the whole document as a object and then it has to find where is the end of the records.  

In [0]:
line_delimited_json = spark.read.format("json")\
                                .option('inferSchema','true')\
                                .option('mode','PERMISSIVE')\
                                .load("/FileStore/tables/line_delimited.json")
line_delimited_json.show()

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



In [0]:
line_delimited_json_with_extra_field = spark.read.format("json")\
                                .option('inferSchema','true')\
                                .option('mode','PERMISSIVE')\
                                .load("/FileStore/tables/single_file.json")
line_delimited_json_with_extra_field.show()

+---+------+--------+------+
|age|gender|    name|salary|
+---+------+--------+------+
| 20|  null|  Manish| 20000|
| 25|  null|  Nikita| 21000|
| 16|  null|  Pritam| 22000|
| 35|  null|Prantosh| 25000|
| 67|     M|  Vikash| 40000|
+---+------+--------+------+



By default, multiline option is set to default. 

In [0]:
Multi_line_correct_json = spark.read.format("json")\
                                .option('inferSchema','true')\
                                .option('mode','PERMISSIVE')\
                                .option('multiline','true')\
                                .load("/FileStore/tables/Multi_line_correct.json")
Multi_line_correct_json.show()

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



In [0]:
Multi_line_incorrect_json = spark.read.format("json")\
                                .option('inferSchema','true')\
                                .option('mode','PERMISSIVE')\
                                .option('multiline','true')\
                                .load("/FileStore/tables/Multi_line_incorrect.json")
Multi_line_incorrect_json.show()

+---+------+------+
|age|  name|salary|
+---+------+------+
| 20|Manish| 20000|
+---+------+------+



In [0]:
incorrect_json = spark.read.format("json")\
                                .option('inferSchema','true')\
                                .option('mode','PERMISSIVE')\
                                .load("/FileStore/tables/corrupted.json")
incorrect_json.show()

+--------------------+----+--------+------+
|     _corrupt_record| age|    name|salary|
+--------------------+----+--------+------+
|                null|  20|  Manish| 20000|
|                null|  25|  Nikita| 21000|
|                null|  16|  Pritam| 22000|
|                null|  35|Prantosh| 25000|
|{"name":"Vikash",...|null|    null|  null|
+--------------------+----+--------+------+



In [0]:
nested_json = spark.read.format("json")\
                                .option('inferSchema','true')\
                                .option('mode','PERMISSIVE')\
                                .option('multiline','true')\
                                .load("/FileStore/tables/nested_json.json")
nested_json.show()

+----+-------+--------------------+-------------+-------------+-------------+------+
|code|message|         restaurants|results_found|results_shown|results_start|status|
+----+-------+--------------------+-------------+-------------+-------------+------+
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17066603}, b9...|         6835|           20|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17093124}, b9...|         8680|           20|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17580142}, b9...|          943|           20|            1|  null|
|null|   null|                  []|            0|            0|  

# Write Dataframe API

In [0]:
customer = spark.read.format('csv')\
                     .option('header','true')\
                     .option('inferSchema','true')\
                     .load("/FileStore/tables/customer.csv")

customer.show() 

+---+----------+--------+--------+----------+-----------+
| id|      name|     age|  salary|   address|     gender|
+---+----------+--------+--------+----------+-----------+
|  1|    Manish|    26.0| 75000.0|     INDIA|          m|
|  2|    Nikita|    23.0|100000.0|       USA|          f|
|  3|    Pritam|    22.0|150000.0|     INDIA|          m|
|  4|  Prantosh|    17.0|200000.0|     JAPAN|          m|
|  5|    Vikash|    31.0|300000.0|       USA|          m|
|  6|     Rahul|    55.0|300000.0|     INDIA|          m|
|  7|      Raju|    67.0|540000.0|       USA|          m|
|  8|   Praveen|    28.0| 70000.0|     JAPAN|          m|
|  9|       Dev|    32.0|150000.0|     JAPAN|          m|
| 10|    Sherin|    16.0| 25000.0|    RUSSIA|          f|
| 11|      Ragu|    12.0| 35000.0|     INDIA|          f|
| 12|     Sweta|    43.0|200000.0|     INDIA|          f|
| 13|   Raushan|    48.0|650000.0|       USA|          m|
| 14|    Mukesh|    36.0| 95000.0|    RUSSIA|          m|
| 15|   Prakas

In [0]:
customer.printSchema()

root
 |-- id: integer (nullable = true)
 |--  name: string (nullable = true)
 |--      age: double (nullable = true)
 |--   salary: double (nullable = true)
 |--    address: string (nullable = true)
 |--     gender: string (nullable = true)



In [0]:
customer.write.format('csv')\
              .option('header','true')\
              .option('mode','overwrite')\
              .option("path","/FileStore/tables/write_csv")\
              .save()

# PartitionBy or BucketBy

In [0]:
new_customer = spark.read.format('csv')\
                     .option('header','true')\
                     .option('inferSchema','true')\
                     .load("/FileStore/tables/new_customer.csv")

new_customer.show() 

+---+--------+---+------+-------+------+
| id|    name|age|salary|address|gender|
+---+--------+---+------+-------+------+
|  1|  Manish| 26| 75000|  INDIA|     m|
|  2|  Nikita| 23|100000|    USA|     f|
|  3|  Pritam| 22|150000|  INDIA|     m|
|  4|Prantosh| 17|200000|  JAPAN|     m|
|  5|  Vikash| 31|300000|    USA|     m|
|  6|   Rahul| 55|300000|  INDIA|     m|
|  7|    Raju| 67|540000|    USA|     m|
|  8| Praveen| 28| 70000|  JAPAN|     m|
|  9|     Dev| 32|150000|  JAPAN|     m|
| 10|  Sherin| 16| 25000| RUSSIA|     f|
| 11|    Ragu| 12| 35000|  INDIA|     f|
| 12|   Sweta| 43|200000|  INDIA|     f|
| 13| Raushan| 48|650000|    USA|     m|
| 14|  Mukesh| 36| 95000| RUSSIA|     m|
| 15| Prakash| 52|750000|  INDIA|     m|
+---+--------+---+------+-------+------+



In [0]:
new_customer.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- gender: string (nullable = true)



In [0]:
new_customer.write.format('csv')\
              .option('header','true')\
              .option('mode','overwrite')\
              .partitionBy('address')\
              .option("path","/FileStore/tables/partitionByAddress")\
              .save()

# partitionBy('column_1','column_2',...) if we provides a sequence of columns then data is furthere subdivided into the sub-folders.

In [0]:
new_customer.write.format('csv')\
              .option('header','true')\
              .option('mode','overwrite')\
              .bucketBy(3,'id')\
              .option("path","/FileStore/tables/bucketById")\
              .saveAsTable('bucketByIdTable')

# .bucketBy(3,'id')\ 3 means divide whole data into 3 bucket based on id column 
# if we use .save() method to save the data, then it will give us error - 'save' does not support bucketBy right now. So we need to save this data as a table which will be stored into the hive store

# Dataframe Transformations in Spark

In [0]:
from pyspark.sql import functions as func 

In [0]:
new_customer.select(func.col('salary')+1).show()

+------+
|salary|
+------+
| 75000|
|100000|
|150000|
|200000|
|300000|
|300000|
|540000|
| 70000|
|150000|
| 25000|
| 35000|
|200000|
|650000|
| 95000|
|750000|
+------+



In [0]:
# multiple ways to select a columns

new_customer.select('id',func.col('name'),new_customer['age'],new_customer.salary).show()

+---+--------+---+------+
| id|    name|age|salary|
+---+--------+---+------+
|  1|  Manish| 26| 75000|
|  2|  Nikita| 23|100000|
|  3|  Pritam| 22|150000|
|  4|Prantosh| 17|200000|
|  5|  Vikash| 31|300000|
|  6|   Rahul| 55|300000|
|  7|    Raju| 67|540000|
|  8| Praveen| 28| 70000|
|  9|     Dev| 32|150000|
| 10|  Sherin| 16| 25000|
| 11|    Ragu| 12| 35000|
| 12|   Sweta| 43|200000|
| 13| Raushan| 48|650000|
| 14|  Mukesh| 36| 95000|
| 15| Prakash| 52|750000|
+---+--------+---+------+



Alias

In [0]:
new_customer.select(func.col('id').alias("customer_id")).show()

+-----------+
|customer_id|
+-----------+
|          1|
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
|         10|
|         11|
|         12|
|         13|
|         14|
|         15|
+-----------+



Filter or Where

both are same. We can do same things using both 

In [0]:
new_customer.filter((func.col('salary')>150000) & (func.col('age')<18)).show()

+---+--------+---+------+-------+------+
| id|    name|age|salary|address|gender|
+---+--------+---+------+-------+------+
|  4|Prantosh| 17|200000|  JAPAN|     m|
+---+--------+---+------+-------+------+



Literal - It is used to assige the default value to the a culumn or row

In [0]:
new_customer.select(func.col('id'),func.lit('ab_ra_ka_dabra').alias('last_name')).show()

+---+--------------+
| id|     last_name|
+---+--------------+
|  1|ab_ra_ka_dabra|
|  2|ab_ra_ka_dabra|
|  3|ab_ra_ka_dabra|
|  4|ab_ra_ka_dabra|
|  5|ab_ra_ka_dabra|
|  6|ab_ra_ka_dabra|
|  7|ab_ra_ka_dabra|
|  8|ab_ra_ka_dabra|
|  9|ab_ra_ka_dabra|
| 10|ab_ra_ka_dabra|
| 11|ab_ra_ka_dabra|
| 12|ab_ra_ka_dabra|
| 13|ab_ra_ka_dabra|
| 14|ab_ra_ka_dabra|
| 15|ab_ra_ka_dabra|
+---+--------------+



Add a new column using withColumn

In [0]:
new_customer.withColumn('sur_name',func.lit('KAKE')).show()

+---+--------+---+------+-------+------+--------+
| id|    name|age|salary|address|gender|sur_name|
+---+--------+---+------+-------+------+--------+
|  1|  Manish| 26| 75000|  INDIA|     m|    KAKE|
|  2|  Nikita| 23|100000|    USA|     f|    KAKE|
|  3|  Pritam| 22|150000|  INDIA|     m|    KAKE|
|  4|Prantosh| 17|200000|  JAPAN|     m|    KAKE|
|  5|  Vikash| 31|300000|    USA|     m|    KAKE|
|  6|   Rahul| 55|300000|  INDIA|     m|    KAKE|
|  7|    Raju| 67|540000|    USA|     m|    KAKE|
|  8| Praveen| 28| 70000|  JAPAN|     m|    KAKE|
|  9|     Dev| 32|150000|  JAPAN|     m|    KAKE|
| 10|  Sherin| 16| 25000| RUSSIA|     f|    KAKE|
| 11|    Ragu| 12| 35000|  INDIA|     f|    KAKE|
| 12|   Sweta| 43|200000|  INDIA|     f|    KAKE|
| 13| Raushan| 48|650000|    USA|     m|    KAKE|
| 14|  Mukesh| 36| 95000| RUSSIA|     m|    KAKE|
| 15| Prakash| 52|750000|  INDIA|     m|    KAKE|
+---+--------+---+------+-------+------+--------+



Rename a column

In [0]:
new_customer.withColumnRenamed(existing='id',new='customer_id').show()

+-----------+--------+---+------+-------+------+
|customer_id|    name|age|salary|address|gender|
+-----------+--------+---+------+-------+------+
|          1|  Manish| 26| 75000|  INDIA|     m|
|          2|  Nikita| 23|100000|    USA|     f|
|          3|  Pritam| 22|150000|  INDIA|     m|
|          4|Prantosh| 17|200000|  JAPAN|     m|
|          5|  Vikash| 31|300000|    USA|     m|
|          6|   Rahul| 55|300000|  INDIA|     m|
|          7|    Raju| 67|540000|    USA|     m|
|          8| Praveen| 28| 70000|  JAPAN|     m|
|          9|     Dev| 32|150000|  JAPAN|     m|
|         10|  Sherin| 16| 25000| RUSSIA|     f|
|         11|    Ragu| 12| 35000|  INDIA|     f|
|         12|   Sweta| 43|200000|  INDIA|     f|
|         13| Raushan| 48|650000|    USA|     m|
|         14|  Mukesh| 36| 95000| RUSSIA|     m|
|         15| Prakash| 52|750000|  INDIA|     m|
+-----------+--------+---+------+-------+------+



casting the column data type

In [0]:
new_customer.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- gender: string (nullable = true)



In [0]:
new_customer.withColumn('id',func.col('id').cast('string')).printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- gender: string (nullable = true)



Drop a column

In [0]:
new_customer.drop('age').show()

+---+--------+------+-------+------+
| id|    name|salary|address|gender|
+---+--------+------+-------+------+
|  1|  Manish| 75000|  INDIA|     m|
|  2|  Nikita|100000|    USA|     f|
|  3|  Pritam|150000|  INDIA|     m|
|  4|Prantosh|200000|  JAPAN|     m|
|  5|  Vikash|300000|    USA|     m|
|  6|   Rahul|300000|  INDIA|     m|
|  7|    Raju|540000|    USA|     m|
|  8| Praveen| 70000|  JAPAN|     m|
|  9|     Dev|150000|  JAPAN|     m|
| 10|  Sherin| 25000| RUSSIA|     f|
| 11|    Ragu| 35000|  INDIA|     f|
| 12|   Sweta|200000|  INDIA|     f|
| 13| Raushan|650000|    USA|     m|
| 14|  Mukesh| 95000| RUSSIA|     m|
| 15| Prakash|750000|  INDIA|     m|
+---+--------+------+-------+------+



# Union, UnionAll and UnionByName


* Both union and unionAll is same in pyspark dataframe API, But is SQL, union removes the duplicate records and unionAll keeps them.
* Union and UnionAll just stack up the dataframe one upon another, so sequence of column becomes important in that case. This problem is solved by the UnionByName



In [0]:
new_customer_1 = new_customer.filter(func.col('age')<18)
new_customer_2 = new_customer.filter(func.col('age')>=18)

In [0]:
print(new_customer_1.count())
print(new_customer_2.count())

3
12


In [0]:
new_customer_1.union(new_customer_2).count()

Out[29]: 15

UnionByName

In [0]:
new_customer_3 = new_customer.filter(func.col('age')<18).select('name','age','salary')
new_customer_4 = new_customer.filter(func.col('age')>=18).select('salary','age','name')

In [0]:
new_customer_3.show()

+--------+---+------+
|    name|age|salary|
+--------+---+------+
|Prantosh| 17|200000|
|  Sherin| 16| 25000|
|    Ragu| 12| 35000|
+--------+---+------+



In [0]:
new_customer_4.show()

+------+---+-------+
|salary|age|   name|
+------+---+-------+
| 75000| 26| Manish|
|100000| 23| Nikita|
|150000| 22| Pritam|
|300000| 31| Vikash|
|300000| 55|  Rahul|
|540000| 67|   Raju|
| 70000| 28|Praveen|
|150000| 32|    Dev|
|200000| 43|  Sweta|
|650000| 48|Raushan|
| 95000| 36| Mukesh|
|750000| 52|Prakash|
+------+---+-------+



In [0]:
# showing the shortcoming of union or unionAll

new_customer_3.union(new_customer_4).show()

+--------+---+-------+
|    name|age| salary|
+--------+---+-------+
|Prantosh| 17| 200000|
|  Sherin| 16|  25000|
|    Ragu| 12|  35000|
|   75000| 26| Manish|
|  100000| 23| Nikita|
|  150000| 22| Pritam|
|  300000| 31| Vikash|
|  300000| 55|  Rahul|
|  540000| 67|   Raju|
|   70000| 28|Praveen|
|  150000| 32|    Dev|
|  200000| 43|  Sweta|
|  650000| 48|Raushan|
|   95000| 36| Mukesh|
|  750000| 52|Prakash|
+--------+---+-------+



In [0]:
# to overcome the above problem we use unionByName

new_customer_3.unionByName(new_customer_4).show()

+--------+---+------+
|    name|age|salary|
+--------+---+------+
|Prantosh| 17|200000|
|  Sherin| 16| 25000|
|    Ragu| 12| 35000|
|  Manish| 26| 75000|
|  Nikita| 23|100000|
|  Pritam| 22|150000|
|  Vikash| 31|300000|
|   Rahul| 55|300000|
|    Raju| 67|540000|
| Praveen| 28| 70000|
|     Dev| 32|150000|
|   Sweta| 43|200000|
| Raushan| 48|650000|
|  Mukesh| 36| 95000|
| Prakash| 52|750000|
+--------+---+------+



# CASE and WHEN in Spark

* We use the CASE and WHEN for if and else in spark for conditional statements 

In [0]:
new_customer.withColumn("Adult", 
                        func.when(func.col('age')<18, 'No')\
                        .when(func.col('age')>=18, 'Yes')\
                        .otherwise('No_Value')
                        ).show()

+---+--------+---+------+-------+------+-----+
| id|    name|age|salary|address|gender|Adult|
+---+--------+---+------+-------+------+-----+
|  1|  Manish| 26| 75000|  INDIA|     m|  Yes|
|  2|  Nikita| 23|100000|    USA|     f|  Yes|
|  3|  Pritam| 22|150000|  INDIA|     m|  Yes|
|  4|Prantosh| 17|200000|  JAPAN|     m|   No|
|  5|  Vikash| 31|300000|    USA|     m|  Yes|
|  6|   Rahul| 55|300000|  INDIA|     m|  Yes|
|  7|    Raju| 67|540000|    USA|     m|  Yes|
|  8| Praveen| 28| 70000|  JAPAN|     m|  Yes|
|  9|     Dev| 32|150000|  JAPAN|     m|  Yes|
| 10|  Sherin| 16| 25000| RUSSIA|     f|   No|
| 11|    Ragu| 12| 35000|  INDIA|     f|   No|
| 12|   Sweta| 43|200000|  INDIA|     f|  Yes|
| 13| Raushan| 48|650000|    USA|     m|  Yes|
| 14|  Mukesh| 36| 95000| RUSSIA|     m|  Yes|
| 15| Prakash| 52|750000|  INDIA|     m|  Yes|
+---+--------+---+------+-------+------+-----+



# Unique and Sorting the records

In [0]:

data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17),
(15 ,'Mohit',45000,  18),
(13 ,'Nidhi',60000,  17),      
(14 ,'Priya',90000,  18),  
(18 ,'Sam',65000,   17)
     ]

schema = ["Id","Name","Salary","Manager_Id"]

manager_df = spark.createDataFrame(data=data,schema=schema)

manager_df.show()


+---+------+------+----------+
| Id|  Name|Salary|Manager_Id|
+---+------+------+----------+
| 10|  Anil| 50000|        18|
| 11| Vikas| 75000|        16|
| 12| Nisha| 40000|        18|
| 13| Nidhi| 60000|        17|
| 14| Priya| 80000|        18|
| 15| Mohit| 45000|        18|
| 16|Rajesh| 90000|        10|
| 17| Raman| 55000|        16|
| 18|   Sam| 65000|        17|
| 15| Mohit| 45000|        18|
| 13| Nidhi| 60000|        17|
| 14| Priya| 90000|        18|
| 18|   Sam| 65000|        17|
+---+------+------+----------+



Show only the unique records

In [0]:
manager_df.distinct().show()

+---+------+------+----------+
| Id|  Name|Salary|Manager_Id|
+---+------+------+----------+
| 10|  Anil| 50000|        18|
| 12| Nisha| 40000|        18|
| 11| Vikas| 75000|        16|
| 13| Nidhi| 60000|        17|
| 15| Mohit| 45000|        18|
| 14| Priya| 80000|        18|
| 16|Rajesh| 90000|        10|
| 17| Raman| 55000|        16|
| 18|   Sam| 65000|        17|
| 14| Priya| 90000|        18|
+---+------+------+----------+



Sort the dataframe

In [0]:
from pyspark.sql import functions as func
manager_df.sort(func.col('salary')).show()

+---+------+------+----------+
| Id|  Name|Salary|Manager_Id|
+---+------+------+----------+
| 12| Nisha| 40000|        18|
| 15| Mohit| 45000|        18|
| 15| Mohit| 45000|        18|
| 10|  Anil| 50000|        18|
| 17| Raman| 55000|        16|
| 13| Nidhi| 60000|        17|
| 13| Nidhi| 60000|        17|
| 18|   Sam| 65000|        17|
| 18|   Sam| 65000|        17|
| 11| Vikas| 75000|        16|
| 14| Priya| 80000|        18|
| 16|Rajesh| 90000|        10|
| 14| Priya| 90000|        18|
+---+------+------+----------+



In [0]:
manager_df.sort( func.col('Salary').desc() , func.col('Name').desc() ).show()

+---+------+------+----------+
| Id|  Name|Salary|Manager_Id|
+---+------+------+----------+
| 16|Rajesh| 90000|        10|
| 14| Priya| 90000|        18|
| 14| Priya| 80000|        18|
| 11| Vikas| 75000|        16|
| 18|   Sam| 65000|        17|
| 18|   Sam| 65000|        17|
| 13| Nidhi| 60000|        17|
| 13| Nidhi| 60000|        17|
| 17| Raman| 55000|        16|
| 10|  Anil| 50000|        18|
| 15| Mohit| 45000|        18|
| 15| Mohit| 45000|        18|
| 12| Nisha| 40000|        18|
+---+------+------+----------+



# Aggregation in Spark

Count - It is an action (df.count()) as well as transformation (df.select(count('column')).some_action)


In [0]:
emp_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,None,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(None,None,None,None,None,None),
(7,'adam',37,65000,'us','IT')
]

schema = ['Id','Name','Age','Salary','Country','Department']
emp_df = spark.createDataFrame(data=emp_data,schema=schema)
emp_df.show()

+----+-------+----+------+-------+-----------+
|  Id|   Name| Age|Salary|Country| Department|
+----+-------+----+------+-------+-----------+
|   1| manish|  26| 20000|  india|         IT|
|   2|  rahul|null| 40000|germany|engineering|
|   3|  pawan|  12| 60000|  india|      sales|
|   4|roshini|  44|  null|     uk|engineering|
|   5|raushan|  35| 70000|  india|      sales|
|   6|   null|  29|200000|     uk|         IT|
|   7|   adam|  37| 65000|     us|         IT|
|   8|  chris|  16| 40000|     us|      sales|
|null|   null|null|  null|   null|       null|
|   7|   adam|  37| 65000|     us|         IT|
+----+-------+----+------+-------+-----------+



In [0]:
emp_df.count() # it will count null records also in the count value 

Out[12]: 10

In [0]:
emp_df.select(func.count('*')).show()

+--------+
|count(1)|
+--------+
|      10|
+--------+



In [0]:
from pyspark.sql import functions as func
emp_df.select(func.count('Name')).show() # it will skip null records in the count value. 

# This is the difference if we run the count function on a specific column. it will only count the non-null value. But if we run the count function in all the columns then it consider the null values also.

+-----------+
|count(Name)|
+-----------+
|          8|
+-----------+



# Group By

In [0]:
emp_df.groupBy('Country').agg(func.sum('Salary')).show()

+-------+-----------+
|Country|sum(Salary)|
+-------+-----------+
|  india|     150000|
|germany|      40000|
|     uk|     200000|
|     us|     170000|
|   null|       null|
+-------+-----------+



# Window functions

In [0]:
emp_data = [(1,'manish',50000,'IT','m'),
(2,'vikash',60000,'sales','m'),
(3,'raushan',70000,'marketing','m'),
(4,'mukesh',80000,'IT','m'),
(5,'priti',90000,'sales','f'),
(6,'nikita',45000,'marketing','f'),
(7,'ragini',55000,'marketing','f'),
(8,'rashi',100000,'IT','f'),
(9,'aditya',65000,'IT','m'),
(10,'rahul',50000,'marketing','m'),
(11,'rakhi',50000,'IT','f'),
(12,'akhilesh',90000,'sales','m')]

schema = ['Id','Name','Salary','Department','Gender']
emp_df = spark.createDataFrame(data=emp_data,schema=schema)
emp_df.show()

+---+--------+------+----------+------+
| Id|    Name|Salary|Department|Gender|
+---+--------+------+----------+------+
|  1|  manish| 50000|        IT|     m|
|  2|  vikash| 60000|     sales|     m|
|  3| raushan| 70000| marketing|     m|
|  4|  mukesh| 80000|        IT|     m|
|  5|   priti| 90000|     sales|     f|
|  6|  nikita| 45000| marketing|     f|
|  7|  ragini| 55000| marketing|     f|
|  8|   rashi|100000|        IT|     f|
|  9|  aditya| 65000|        IT|     m|
| 10|   rahul| 50000| marketing|     m|
| 11|   rakhi| 50000|        IT|     f|
| 12|akhilesh| 90000|     sales|     m|
+---+--------+------+----------+------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as func

# over here, we are saying that create a window based on the department column i.e. combine all the entries into one window with same department
window = Window.partitionBy("Department")

# over here, we are creating a new column called total_salary and putting the total salary of the department which the person belongs to. This is where the window function is helping us.
  
emp_df.withColumn('total_Salary',func.sum(func.col('Salary')).over(window)).show(truncate=False)



+---+--------+------+----------+------+------------+
|Id |Name    |Salary|Department|Gender|total_Salary|
+---+--------+------+----------+------+------------+
|1  |manish  |50000 |IT        |m     |345000      |
|4  |mukesh  |80000 |IT        |m     |345000      |
|8  |rashi   |100000|IT        |f     |345000      |
|9  |aditya  |65000 |IT        |m     |345000      |
|11 |rakhi   |50000 |IT        |f     |345000      |
|3  |raushan |70000 |marketing |m     |220000      |
|6  |nikita  |45000 |marketing |f     |220000      |
|7  |ragini  |55000 |marketing |f     |220000      |
|10 |rahul   |50000 |marketing |m     |220000      |
|2  |vikash  |60000 |sales     |m     |240000      |
|5  |priti   |90000 |sales     |f     |240000      |
|12 |akhilesh|90000 |sales     |m     |240000      |
+---+--------+------+----------+------+------------+



* Row Number - it gives the ranking based on the order by, even if there are ties between the records then also it will keep giving the rank as incremently
* Rank - It maintain the gaps while giving the rank like if there are 3 person with salary 50K, 50K and 60K, then rank would be 1,1, and 3 respectively. It has skip the 2nd rank. 
* Dense Rank - It does not maintain the gaps while giving the rank like if there are 3 person with salary 50K, 50K and 60K, then rank would be 1,1, and 2 respectively. It does not skip the 2nd rank.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as func

window = Window.partitionBy("Department").orderBy('Salary')
  
emp_df.withColumn('Row_number',func.row_number().over(window))\
    .withColumn('Rank',func.rank().over(window))\
    .withColumn('Dense_rank',func.dense_rank().over(window))\
    .show(truncate=False)


+---+--------+------+----------+------+----------+----+----------+
|Id |Name    |Salary|Department|Gender|Row_number|Rank|Dense_rank|
+---+--------+------+----------+------+----------+----+----------+
|1  |manish  |50000 |IT        |m     |1         |1   |1         |
|11 |rakhi   |50000 |IT        |f     |2         |1   |1         |
|9  |aditya  |65000 |IT        |m     |3         |3   |2         |
|4  |mukesh  |80000 |IT        |m     |4         |4   |3         |
|8  |rashi   |100000|IT        |f     |5         |5   |4         |
|6  |nikita  |45000 |marketing |f     |1         |1   |1         |
|10 |rahul   |50000 |marketing |m     |2         |2   |2         |
|7  |ragini  |55000 |marketing |f     |3         |3   |3         |
|3  |raushan |70000 |marketing |m     |4         |4   |4         |
|2  |vikash  |60000 |sales     |m     |1         |1   |1         |
|5  |priti   |90000 |sales     |f     |2         |2   |2         |
|12 |akhilesh|90000 |sales     |m     |3         |2   |2      

# Lead and Lag in spark

* Lead - Suppose we have monthly sales data with columns month and sales like sales in Jan, Feb, Mar and so on. If we want to get the <b>next</b> month sales for each row, then we use Lead function.
* Lag - Suppose we have monthly sales data with columns month and sales like sales in Jan, Feb, Mar and so on. If we want to get the <b>previous</b> month sales for each row, then we use Lag function. 

* Note - These functions works with Window function.

In [0]:
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

schema = ["Id","Company","Sales_date","Sales"]

product_df = spark.createDataFrame(data=product_data,schema=schema)
product_df.show()

+---+-------+----------+-------+
| Id|Company|Sales_date|  Sales|
+---+-------+----------+-------+
|  1| iphone|01-01-2023|1500000|
|  2|samsung|01-01-2023|1100000|
|  3|oneplus|01-01-2023|1100000|
|  1| iphone|01-02-2023|1300000|
|  2|samsung|01-02-2023|1120000|
|  3|oneplus|01-02-2023|1120000|
|  1| iphone|01-03-2023|1600000|
|  2|samsung|01-03-2023|1080000|
|  3|oneplus|01-03-2023|1160000|
|  1| iphone|01-04-2023|1700000|
|  2|samsung|01-04-2023|1800000|
|  3|oneplus|01-04-2023|1170000|
|  1| iphone|01-05-2023|1200000|
|  2|samsung|01-05-2023| 980000|
|  3|oneplus|01-05-2023|1175000|
|  1| iphone|01-06-2023|1100000|
|  2|samsung|01-06-2023|1100000|
|  3|oneplus|01-06-2023|1200000|
+---+-------+----------+-------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as func

window  = Window.partitionBy('Id').orderBy('Sales_date')

last_month_df = product_df.withColumn('Previous_month_sales',func.lag(func.col('Sales'),1,None).over(window))
last_month_df.show()

'''
pyspark.sql.functions.lag(col,offset,default)
Parameters: 
    col:Column or str - name of column or expression
    offset:int, optional default 1 - number of row to extend
    default: optional - default value if no previous record found
'''

+---+-------+----------+-------+--------------------+
| Id|Company|Sales_date|  Sales|Previous_month_sales|
+---+-------+----------+-------+--------------------+
|  1| iphone|01-01-2023|1500000|                null|
|  1| iphone|01-02-2023|1300000|             1500000|
|  1| iphone|01-03-2023|1600000|             1300000|
|  1| iphone|01-04-2023|1700000|             1600000|
|  1| iphone|01-05-2023|1200000|             1700000|
|  1| iphone|01-06-2023|1100000|             1200000|
|  2|samsung|01-01-2023|1100000|                null|
|  2|samsung|01-02-2023|1120000|             1100000|
|  2|samsung|01-03-2023|1080000|             1120000|
|  2|samsung|01-04-2023|1800000|             1080000|
|  2|samsung|01-05-2023| 980000|             1800000|
|  2|samsung|01-06-2023|1100000|              980000|
|  3|oneplus|01-01-2023|1100000|                null|
|  3|oneplus|01-02-2023|1120000|             1100000|
|  3|oneplus|01-03-2023|1160000|             1120000|
|  3|oneplus|01-04-2023|1170

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as func

window  = Window.partitionBy('Id').orderBy('Sales_date')

next_month_df = product_df.withColumn('Next_month_sales',func.lead(func.col('Sales'),1,None).over(window))
next_month_df.show()

'''
pyspark.sql.functions.lead(col,offset,default)
Parameters: 
    col:Column or str - name of column or expression
    offset:int, optional default 1 - number of row to extend
    default: optional - default value if no next record found
'''

+---+-------+----------+-------+----------------+
| Id|Company|Sales_date|  Sales|Next_month_sales|
+---+-------+----------+-------+----------------+
|  1| iphone|01-01-2023|1500000|         1300000|
|  1| iphone|01-02-2023|1300000|         1600000|
|  1| iphone|01-03-2023|1600000|         1700000|
|  1| iphone|01-04-2023|1700000|         1200000|
|  1| iphone|01-05-2023|1200000|         1100000|
|  1| iphone|01-06-2023|1100000|            null|
|  2|samsung|01-01-2023|1100000|         1120000|
|  2|samsung|01-02-2023|1120000|         1080000|
|  2|samsung|01-03-2023|1080000|         1800000|
|  2|samsung|01-04-2023|1800000|          980000|
|  2|samsung|01-05-2023| 980000|         1100000|
|  2|samsung|01-06-2023|1100000|            null|
|  3|oneplus|01-01-2023|1100000|         1120000|
|  3|oneplus|01-02-2023|1120000|         1160000|
|  3|oneplus|01-03-2023|1160000|         1170000|
|  3|oneplus|01-04-2023|1170000|         1175000|
|  3|oneplus|01-05-2023|1175000|         1200000|


# Rows Between in Spark

In [0]:
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

schema = ["Id","Company","Sales_date","Sales"]

product_df = spark.createDataFrame(data=product_data,schema=schema)
product_df.show()

+---+-------+----------+-------+
| Id|Company|Sales_date|  Sales|
+---+-------+----------+-------+
|  1| iphone|01-01-2023|1500000|
|  2|samsung|01-01-2023|1100000|
|  3|oneplus|01-01-2023|1100000|
|  1| iphone|01-02-2023|1300000|
|  2|samsung|01-02-2023|1120000|
|  3|oneplus|01-02-2023|1120000|
|  1| iphone|01-03-2023|1600000|
|  2|samsung|01-03-2023|1080000|
|  3|oneplus|01-03-2023|1160000|
|  1| iphone|01-04-2023|1700000|
|  2|samsung|01-04-2023|1800000|
|  3|oneplus|01-04-2023|1170000|
|  1| iphone|01-05-2023|1200000|
|  2|samsung|01-05-2023| 980000|
|  3|oneplus|01-05-2023|1175000|
|  1| iphone|01-06-2023|1100000|
|  2|samsung|01-06-2023|1100000|
|  3|oneplus|01-06-2023|1200000|
+---+-------+----------+-------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as func

window  = Window.partitionBy('Id').orderBy('Sales_date').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

month_df = product_df.withColumn('first_sales',func.first(func.col('Sales')).over(window))\
                          .withColumn('last_sales',func.last(func.col('Sales')).over(window))
month_df.show()


+---+-------+----------+-------+-----------+----------+
| Id|Company|Sales_date|  Sales|first_sales|last_sales|
+---+-------+----------+-------+-----------+----------+
|  1| iphone|01-01-2023|1500000|    1500000|   1100000|
|  1| iphone|01-02-2023|1300000|    1500000|   1100000|
|  1| iphone|01-03-2023|1600000|    1500000|   1100000|
|  1| iphone|01-04-2023|1700000|    1500000|   1100000|
|  1| iphone|01-05-2023|1200000|    1500000|   1100000|
|  1| iphone|01-06-2023|1100000|    1500000|   1100000|
|  2|samsung|01-01-2023|1100000|    1100000|   1100000|
|  2|samsung|01-02-2023|1120000|    1100000|   1100000|
|  2|samsung|01-03-2023|1080000|    1100000|   1100000|
|  2|samsung|01-04-2023|1800000|    1100000|   1100000|
|  2|samsung|01-05-2023| 980000|    1100000|   1100000|
|  2|samsung|01-06-2023|1100000|    1100000|   1100000|
|  3|oneplus|01-01-2023|1100000|    1100000|   1200000|
|  3|oneplus|01-02-2023|1120000|    1100000|   1200000|
|  3|oneplus|01-03-2023|1160000|    1100000|   1

Q - Based on the given emp_df, for every day, find the users on who have worked less then 8 hours. 

In [0]:
from pyspark.sql import functions as func

emp_data = [(1,"manish","11-07-2023","10:20"),
        (1,"manish","11-07-2023","11:20"),
        (2,"rajesh","11-07-2023","11:20"),
        (1,"manish","11-07-2023","11:50"),
        (2,"rajesh","11-07-2023","13:20"),
        (1,"manish","11-07-2023","19:20"),
        (2,"rajesh","11-07-2023","17:20"),
        (1,"manish","12-07-2023","10:32"),
        (1,"manish","12-07-2023","12:20"),
        (3,"vikash","12-07-2023","09:12"),
        (1,"manish","12-07-2023","16:23"),
        (3,"vikash","12-07-2023","18:08")]

emp_schema = ["Id", "Name", "Date", "Time"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)

emp_df = emp_df.withColumn("Time",func.from_unixtime(func.unix_timestamp(func.expr(" Concat(date,' ', Time) "),"dd-MM-yyyy HH:mm")))
emp_df.show()

+---+------+----------+-------------------+
| Id|  Name|      Date|               Time|
+---+------+----------+-------------------+
|  1|manish|11-07-2023|2023-07-11 10:20:00|
|  1|manish|11-07-2023|2023-07-11 11:20:00|
|  2|rajesh|11-07-2023|2023-07-11 11:20:00|
|  1|manish|11-07-2023|2023-07-11 11:50:00|
|  2|rajesh|11-07-2023|2023-07-11 13:20:00|
|  1|manish|11-07-2023|2023-07-11 19:20:00|
|  2|rajesh|11-07-2023|2023-07-11 17:20:00|
|  1|manish|12-07-2023|2023-07-12 10:32:00|
|  1|manish|12-07-2023|2023-07-12 12:20:00|
|  3|vikash|12-07-2023|2023-07-12 09:12:00|
|  1|manish|12-07-2023|2023-07-12 16:23:00|
|  3|vikash|12-07-2023|2023-07-12 18:08:00|
+---+------+----------+-------------------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as func

window = Window.partitionBy('Name','Date').orderBy('Time').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

short_time_emp_df = emp_df.withColumn('EntryTime',func.first(func.col('Time')).over(window))\
                          .withColumn('ExitTime',func.last(func.col('Time')).over(window))\
                          .withColumn('EntryTime',func.to_timestamp(func.col('EntryTime'),"yyyy-MM-dd HH:mm:ss"))\
                          .withColumn('ExitTime',func.to_timestamp(func.col('ExitTime'),"yyyy-MM-dd HH:mm:ss"))\
                          .withColumn('TotalTime',func.col('ExitTime')-func.col('EntryTime'))

short_time_emp_df.show()


+---+------+----------+-------------------+-------------------+-------------------+--------------------+
| Id|  Name|      Date|               Time|          EntryTime|           ExitTime|           TotalTime|
+---+------+----------+-------------------+-------------------+-------------------+--------------------+
|  1|manish|11-07-2023|2023-07-11 10:20:00|2023-07-11 10:20:00|2023-07-11 19:20:00|INTERVAL '0 09:00...|
|  1|manish|11-07-2023|2023-07-11 11:20:00|2023-07-11 10:20:00|2023-07-11 19:20:00|INTERVAL '0 09:00...|
|  1|manish|11-07-2023|2023-07-11 11:50:00|2023-07-11 10:20:00|2023-07-11 19:20:00|INTERVAL '0 09:00...|
|  1|manish|11-07-2023|2023-07-11 19:20:00|2023-07-11 10:20:00|2023-07-11 19:20:00|INTERVAL '0 09:00...|
|  1|manish|12-07-2023|2023-07-12 10:32:00|2023-07-12 10:32:00|2023-07-12 16:23:00|INTERVAL '0 05:51...|
|  1|manish|12-07-2023|2023-07-12 12:20:00|2023-07-12 10:32:00|2023-07-12 16:23:00|INTERVAL '0 05:51...|
|  1|manish|12-07-2023|2023-07-12 16:23:00|2023-07-12 1