In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from datetime import datetime

# Initialize Spark Session
spark = SparkSession.builder.appName("BillingCalculation").getOrCreate()

# Sample Data
billings_data = [
    ('Sachin', '1990-01-01', 25),
    ('Sehwag', '1989-01-01', 15),
    ('Dhoni', '1989-01-01', 20),
    ('Sachin', '1991-02-05', 30),
]

hours_worked_data = [
    ('Sachin', '1990-07-01', 3),
    ('Sachin', '1990-08-01', 5),
    ('Sehwag', '1990-07-01', 2),
    ('Sachin', '1991-07-01', 4),
]

# Convert String Date to DateType
billings_data = [(emp, datetime.strptime(date, "%Y-%m-%d").date(), rate) for emp, date, rate in billings_data]
hours_worked_data = [(emp, datetime.strptime(date, "%Y-%m-%d").date(), hours) for emp, date, hours in hours_worked_data]

# Define Schema & Create DataFrames
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

billings_schema = StructType([
    StructField("emp_name", StringType(), True),
    StructField("bill_date", DateType(), True),
    StructField("bill_rate", IntegerType(), True)
])

hours_schema = StructType([
    StructField("emp_name", StringType(), True),
    StructField("work_date", DateType(), True),
    StructField("bill_hrs", IntegerType(), True)
])

billings_df = spark.createDataFrame(billings_data, schema=billings_schema)
hours_df = spark.createDataFrame(hours_worked_data, schema=hours_schema)

billings_df.createOrReplaceTempView("Billings")
hours_df.createOrReplaceTempView("Hours")



In [25]:
spark.sql(
"""
    with range as ( 
    select 
    billings.emp_name, bill_date, bill_rate,
    lead(date_add(bill_date, -1), 1, '9999-12-31') over(partition by billings.emp_name order by bill_date) as lead_date
    from Billings
    )
    
    select Hours.emp_name, sum(bill_rate * bill_hrs)
    from range join Hours on
    range.emp_name = Hours.emp_name and Hours.work_date between bill_date and lead_date
    group by Hours.emp_name
""").show()

+--------+---------------------------+
|emp_name|sum((bill_rate * bill_hrs))|
+--------+---------------------------+
|  Sachin|                        320|
|  Sehwag|                         30|
+--------+---------------------------+



In [27]:
lead_date = billings_df.withColumn(
    "lead_date", lead(date_add(col("bill_date"), -1), 1, '9999-12-31'). \
    over(Window.partitionBy(col("emp_name")).orderBy(col("bill_date")))
)

In [35]:
lead_date.alias("l").join(hours_df.alias("h"),(col("l.emp_name") == col("h.emp_name")) &  \
                          (col("work_date")).between(col("l.bill_date"), col("lead_date")), "inner") \
                            .withColumn("total", col("bill_hrs")*col("bill_rate"))

+--------+----------+---------+----------+--------+----------+--------+-----+
|emp_name| bill_date|bill_rate| lead_date|emp_name| work_date|bill_hrs|total|
+--------+----------+---------+----------+--------+----------+--------+-----+
|  Sachin|1990-01-01|       25|1991-02-04|  Sachin|1990-07-01|       3|   75|
|  Sachin|1990-01-01|       25|1991-02-04|  Sachin|1990-08-01|       5|  125|
|  Sehwag|1989-01-01|       15|9999-12-31|  Sehwag|1990-07-01|       2|   30|
|  Sachin|1991-02-05|       30|9999-12-31|  Sachin|1991-07-01|       4|  120|
+--------+----------+---------+----------+--------+----------+--------+-----+

