In [0]:
spark

------------- Create DataFrame

In [0]:
# create the customer data...
customer_data = [(1,'manish','patna',"30-05-2022"),
                (2,'vikash','kolkata',"12-03-2023"),
                (3,'nikita','delhi',"25-06-2023"),
                (4,'rahul','ranchi',"24-03-2023"),
                (5,'mahesh','jaipur',"22-03-2023"),
                (6,'prantosh','kolkata',"18-10-2022"),
                (7,'raman','patna',"30-12-2022"),
                (8,'prakash','ranchi',"24-02-2023"),
                (9,'ragini','kolkata',"03-03-2023"),
                (10,'raushan','jaipur',"05-02-2023")]
customer_schema=['customer_id','customer_name','address','date_of_joining']
cust_df = spark.createDataFrame(customer_data, customer_schema)
cust_df.show()


# creata the sales data..
sales_data = [(1,22,10,"01-06-2022"),
            (1,27,5,"03-02-2023"),
            (2,5,3,"01-06-2023"),
            (5,22,1,"22-03-2023"),
            (7,22,4,"03-02-2023"),
            (9,5,6,"03-03-2023"),
            (2,1,12,"15-06-2023"),
            (1,56,2,"25-06-2023"),
            (5,12,5,"15-04-2023"),
            (11,12,76,"12-03-2023")]
sales_schema=['customer_id','product_id','quantity','date_of_purchase']
sales_df = spark.createDataFrame(sales_data, sales_schema)
sales_df.show()


# create the product data... 
product_data = [(1, 'fanta',20),
                (2, 'dew',22),
                (5, 'sprite',40),
                (7, 'redbull',100),
                (12,'mazza',45),
                (22,'coke',27),
                (25,'limca',21),
                (27,'pepsi',14),
                (56,'sting',10)]
product_schema=['id','name','price']
prod_df = spark.createDataFrame(product_data, product_schema)
prod_df.show()


+-----------+-------------+-------+---------------+
|customer_id|customer_name|address|date_of_joining|
+-----------+-------------+-------+---------------+
|          1|       manish|  patna|     30-05-2022|
|          2|       vikash|kolkata|     12-03-2023|
|          3|       nikita|  delhi|     25-06-2023|
|          4|        rahul| ranchi|     24-03-2023|
|          5|       mahesh| jaipur|     22-03-2023|
|          6|     prantosh|kolkata|     18-10-2022|
|          7|        raman|  patna|     30-12-2022|
|          8|      prakash| ranchi|     24-02-2023|
|          9|       ragini|kolkata|     03-03-2023|
|         10|      raushan| jaipur|     05-02-2023|
+-----------+-------------+-------+---------------+

+-----------+----------+--------+----------------+
|customer_id|product_id|quantity|date_of_purchase|
+-----------+----------+--------+----------------+
|          1|        22|      10|      01-06-2022|
|          1|        27|       5|      03-02-2023|
|          2|   

-------------- Use Simple Inner Join

In [0]:
# import libraries
from pyspark.sql.functions import *

# apply join with customer and sale data frame..
cust_df.alias('ct').join(sales_df.alias('st'), col('ct.customer_id')==col('st.customer_id'), 'inner').show()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          1|        56|       2|      25-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15-06-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        22|       1|      22-03-2023|
|          5|       mahesh| jaipur|     22-03-2023|          5|        12|       5|      15

-------------- Join Two or More Table

In [0]:
# import libraries
from pyspark.sql.functions import *

# apply join with customer and sale data frame..
cust_df.alias('ct').join(
                        sales_df.alias('st'),
                        col('ct.customer_id')==col('st.customer_id'),
                        'inner'
                    ).join(
                        prod_df.alias('pt'),
                        col('st.product_id') == col('pt.id'),
                        'inner'
                    ).select(
                        col('ct.customer_id'),
                        col('ct.customer_name'),    
                        col('ct.address'),
                        col('ct.date_of_joining'),
                        col('st.product_id'),
                        col('st.quantity'),
                        col('st.date_of_purchase'),
                        col('pt.name'),
                        col('pt.price'),
                    ).sort(
                        col('ct.customer_id')
                    ).show()


+-----------+-------------+-------+---------------+----------+--------+----------------+------+-----+
|customer_id|customer_name|address|date_of_joining|product_id|quantity|date_of_purchase|  name|price|
+-----------+-------------+-------+---------------+----------+--------+----------------+------+-----+
|          1|       manish|  patna|     30-05-2022|        22|      10|      01-06-2022|  coke|   27|
|          1|       manish|  patna|     30-05-2022|        27|       5|      03-02-2023| pepsi|   14|
|          1|       manish|  patna|     30-05-2022|        56|       2|      25-06-2023| sting|   10|
|          2|       vikash|kolkata|     12-03-2023|         5|       3|      01-06-2023|sprite|   40|
|          2|       vikash|kolkata|     12-03-2023|         1|      12|      15-06-2023| fanta|   20|
|          5|       mahesh| jaipur|     22-03-2023|        12|       5|      15-04-2023| mazza|   45|
|          5|       mahesh| jaipur|     22-03-2023|        22|       1|      22-03

--------------- Work With Others Types (Left, Right, Full Outer, Left Semi, Left Anti, Cross) Of Joins

In [0]:
# use left join...
cust_df.join(
            sales_df,
            cust_df.customer_id == sales_df.customer_id,
            'left'
        ).show()


# use right join...
sales_df.join(
            prod_df,
            sales_df.product_id == prod_df.id,
            'right'
        ).show()


# use outer join...
cust_df.join(
            sales_df,
            cust_df.customer_id == sales_df.customer_id,
            'outer'
        ).show()


# use left semi join...
cust_df.join(
            sales_df,
            cust_df.customer_id == sales_df.customer_id,
            'left_semi'
        ).show()
    

# use left anti join...
cust_df.join(
            sales_df,
            cust_df.customer_id == sales_df.customer_id,
            'left_anti'
        ).show()


# use cross join or cartision product...
cust_df.crossJoin(
            sales_df
        ).show()

+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|customer_id|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+-----------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|          1|        56|       2|      25-06-2023|
|          1|       manish|  patna|     30-05-2022|          1|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|          1|        22|      10|      01-06-2022|
|          2|       vikash|kolkata|     12-03-2023|          2|         1|      12|      15-06-2023|
|          2|       vikash|kolkata|     12-03-2023|          2|         5|       3|      01-06-2023|
|          3|       nikita|  delhi|     25-06-2023|       null|      null|    null|            null|
|          5|       mahesh| jaipur|     22-03-2023|          5|        12|       5|      15

---------------- Use Window Functions (Row_Number, Rank, Dense_Rank, Lead, Lag, First, last, Nth, rowBetween and rangeBetween)

In [0]:
# use group by after join in df..
join_df = cust_df.join(
                sales_df,
                cust_df.customer_id == sales_df.customer_id,
                'left'
            ).select(
                cust_df.customer_id,
                cust_df.customer_name,
                cust_df.address,
                cust_df.date_of_joining,
                sales_df.product_id,
                sales_df.quantity,
                sales_df.date_of_purchase
            )
join_df.show()
join_df.groupBy(
            join_df.customer_id, join_df.customer_name
        ).agg(
            sum(join_df.quantity).alias('total_quantity')
        ).show()



# import window library
from pyspark.sql.window import Window
# use row_number to set unique number for each group as window..
join_df.withColumn('row_number', row_number().over(Window.partitionBy('customer_id').orderBy('customer_id')))\
       .withColumn('rank', rank().over(Window.orderBy('customer_id')))\
       .withColumn('dense_rank', dense_rank().over(Window.orderBy('customer_id')))\
       .sort('customer_id')\
       .show()


# use lead and lag, first and last on the sales data..
from pyspark.sql.functions import *
final_sales_df = sales_df.join(
                            prod_df,
                            sales_df.product_id == prod_df.id,
                            'inner'
                        ).select(
                            sales_df.customer_id,
                            sales_df.product_id,
                            sales_df.quantity,
                            sales_df.date_of_purchase,
                            prod_df.name.alias('prod_name'),
                            prod_df.price.alias('unit_price'),
                            (sales_df.quantity * prod_df.price).alias('total_amt')
                        )


# use lag (work with backword value) and similarly lead (work wih forward value) as window function..                        
final_sales_df.withColumn(
                'previous_sales',
                lag('total_amt', 1).over(
                    Window.partitionBy('product_id').orderBy('date_of_purchase')
                )
            ).withColumn(
                'sales_gain_or_loss_percent',
                (((col('total_amt') - col('previous_sales'))/col('total_amt'))*100)       
            ).show()


# use first (return the first value from each window) and last (return the last value from each window) as window function.
final_sales_df.withColumn(
                'first_amt_of_each_prod',
                first('total_amt').over(
                    Window.partitionBy('product_id').orderBy('date_of_purchase')
                )
            ).withColumn(
                'last_amt_of_each_prod',
                last('total_amt').over(
                    Window.partitionBy('product_id').orderBy('date_of_purchase')
                )     
            ).show()


# use rowBetween and rangeBetween with first and last as window function
"""
because if we check that find first and last value in previous step.. then last values is getting as current value
of each window where first work as unboundedPreceding value and last work as current value  in each window.
So we will use rowBetween with first and last then first work as unboundedPreceding value and last work as unboundedFollowing
value in each window.
"""
final_sales_df.withColumn(
                'first_amt_of_each_prod',
                first('total_amt').over(
                    Window.partitionBy('product_id').orderBy('date_of_purchase').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
                )
            ).withColumn(
                'last_amt_of_each_prod',
                last('total_amt').over(
                    Window.partitionBy('product_id').orderBy('date_of_purchase').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
                )     
            ).show()

+-----------+-------------+-------+---------------+----------+--------+----------------+
|customer_id|customer_name|address|date_of_joining|product_id|quantity|date_of_purchase|
+-----------+-------------+-------+---------------+----------+--------+----------------+
|          1|       manish|  patna|     30-05-2022|        56|       2|      25-06-2023|
|          1|       manish|  patna|     30-05-2022|        27|       5|      03-02-2023|
|          1|       manish|  patna|     30-05-2022|        22|      10|      01-06-2022|
|          2|       vikash|kolkata|     12-03-2023|         1|      12|      15-06-2023|
|          2|       vikash|kolkata|     12-03-2023|         5|       3|      01-06-2023|
|          3|       nikita|  delhi|     25-06-2023|      null|    null|            null|
|          4|        rahul| ranchi|     24-03-2023|      null|    null|            null|
|          5|       mahesh| jaipur|     22-03-2023|        12|       5|      15-04-2023|
|          5|       m

-------------- Work On Different Data To Know More About The RowBetween And RangeBetween

In [0]:
# create data frame..
emp_data = [(1,"manish","11-07-2023","10:20"),
            (1,"manish","11-07-2023","11:20"),
            (2,"rajesh","11-07-2023","11:20"),
            (1,"manish","11-07-2023","11:50"),
            (2,"rajesh","11-07-2023","13:20"),
            (1,"manish","11-07-2023","19:20"),
            (2,"rajesh","11-07-2023","17:20"),
            (1,"manish","12-07-2023","10:32"),
            (1,"manish","12-07-2023","12:20"),
            (3,"vikash","12-07-2023","09:12"),
            (1,"manish","12-07-2023","16:23"),
            (3,"vikash","12-07-2023","18:08")]

emp_schema = ["id", "name", "date", "time"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)
emp_df.show()


# use rowBetween to find the first entry and last exit of employee in office for tracking of employee moments.
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# combine the date and time string and convert into timestamp type..
emp_df1 = emp_df.withColumn(
                    'timestamp',
                    to_timestamp(concat_ws(' ', 'date', 'time'), "dd-MM-yyyy HH:mm")
                )
emp_df1.show()
emp_df1.printSchema()

# use window function..
window_func = Window.partitionBy('id', 'date').orderBy('date').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
emp_df1 = emp_df1.withColumn(
                    'entry time',
                    first('timestamp').over(window_func)
                ).withColumn(
                    'exit time',
                    last('timestamp').over(window_func)
                )
emp_df1.show()

# find distinct records..
emp_df1 = emp_df1.select('id', 'name', 'date', 'entry time', 'exit time').distinct()
emp_df1.withColumn(
            'time_diff',
            (col('exit time') - col('entry time'))
        ).show(truncate=False)

+---+------+----------+-----+
| id|  name|      date| time|
+---+------+----------+-----+
|  1|manish|11-07-2023|10:20|
|  1|manish|11-07-2023|11:20|
|  2|rajesh|11-07-2023|11:20|
|  1|manish|11-07-2023|11:50|
|  2|rajesh|11-07-2023|13:20|
|  1|manish|11-07-2023|19:20|
|  2|rajesh|11-07-2023|17:20|
|  1|manish|12-07-2023|10:32|
|  1|manish|12-07-2023|12:20|
|  3|vikash|12-07-2023|09:12|
|  1|manish|12-07-2023|16:23|
|  3|vikash|12-07-2023|18:08|
+---+------+----------+-----+

+---+------+----------+-----+-------------------+
| id|  name|      date| time|          timestamp|
+---+------+----------+-----+-------------------+
|  1|manish|11-07-2023|10:20|2023-07-11 10:20:00|
|  1|manish|11-07-2023|11:20|2023-07-11 11:20:00|
|  2|rajesh|11-07-2023|11:20|2023-07-11 11:20:00|
|  1|manish|11-07-2023|11:50|2023-07-11 11:50:00|
|  2|rajesh|11-07-2023|13:20|2023-07-11 13:20:00|
|  1|manish|11-07-2023|19:20|2023-07-11 19:20:00|
|  2|rajesh|11-07-2023|17:20|2023-07-11 17:20:00|
|  1|manish|12-07-2