In [0]:
#Importing modules 
from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window 

#Creating spark session 
spark = SparkSession.builder.appName('read').getOrCreate()


In [0]:
schema_prices= ''' product_id int, start_date string, end_date string, price int '''

prices = [(1, '2019-02-17',  '2019-02-28' ,5),
          (1, '2019-03-01' , '2019-03-22', 20),
          (2, '2019-02-01' , '2019-02-20' , 15),
          (2, '2019-02-21' , '2019-03-31', 30 )]


df_prices = spark.createDataFrame(prices, schema_prices)
df_prices.display()


schema_unitssold = ''' product_id int, purchase_date string, units int'''

unitssold= [(1 , '2019-02-25' ,100),
            (1, '2019-03-01' , 15  ),
            (2, '2019-02-10', 200),
            (2, '2019-03-22',30)]

df_unitssold = spark.createDataFrame(unitssold, schema_unitssold)
df_unitssold.display()
        




product_id,start_date,end_date,price
1,2019-02-17,2019-02-28,5
1,2019-03-01,2019-03-22,20
2,2019-02-01,2019-02-20,15
2,2019-02-21,2019-03-31,30


product_id,purchase_date,units
1,2019-02-25,100
1,2019-03-01,15
2,2019-02-10,200
2,2019-03-22,30


In [0]:

df_unitssold.withColumn("purchase_date", col("purchase_date").cast("date")).display()
df_prices.withColumn("start_date", col("start_date").cast("date")).display()
df_prices.withColumn("end_date", col("end_date").cast("date")).display()

product_id,purchase_date,units
1,2019-02-25,100
1,2019-03-01,15
2,2019-02-10,200
2,2019-03-22,30


product_id,start_date,end_date,price
1,2019-02-17,2019-02-28,5
1,2019-03-01,2019-03-22,20
2,2019-02-01,2019-02-20,15
2,2019-02-21,2019-03-31,30


product_id,start_date,end_date,price
1,2019-02-17,2019-02-28,5
1,2019-03-01,2019-03-22,20
2,2019-02-01,2019-02-20,15
2,2019-02-21,2019-03-31,30


In [0]:
df_joined = df_prices.join(df_unitssold, df_prices.product_id==df_unitssold.product_id, "inner")\
    .select(df_prices.product_id.alias("Id"), df_prices.start_date, df_prices.end_date, df_prices.price, df_unitssold.purchase_date, df_unitssold.units)
df_joined.display()


Id,start_date,end_date,price,purchase_date,units
1,2019-02-17,2019-02-28,5,2019-02-25,100
1,2019-02-17,2019-02-28,5,2019-03-01,15
1,2019-03-01,2019-03-22,20,2019-02-25,100
1,2019-03-01,2019-03-22,20,2019-03-01,15
2,2019-02-01,2019-02-20,15,2019-02-10,200
2,2019-02-01,2019-02-20,15,2019-03-22,30
2,2019-02-21,2019-03-31,30,2019-02-10,200
2,2019-02-21,2019-03-31,30,2019-03-22,30


In [0]:

df_result1 = df_joined.filter(
    (df_joined["purchase_date"] >= df_joined["start_date"]) &
    (df_joined["purchase_date"] <= df_joined["end_date"])
)
df_result1.display()


Id,start_date,end_date,price,purchase_date,units
1,2019-02-17,2019-02-28,5,2019-02-25,100
1,2019-03-01,2019-03-22,20,2019-03-01,15
2,2019-02-01,2019-02-20,15,2019-02-10,200
2,2019-02-21,2019-03-31,30,2019-03-22,30


In [0]:

df_final = df_result1.groupBy("Id").agg(
    round(sum(col("units") * col("price")) / sum(col("units")), 2).alias("avg_price")
)
df_final.display()

Id,avg_price
1,6.96
2,16.96


SQL Solution 

In [0]:
df_prices.createOrReplaceTempView("prices")
df_unitssold.createOrReplaceTempView("units")

In [0]:
df_result = spark.sql("""select p.product_id, round(SUM(u.units*p.price)/SUM(u.units),2) as average_price
                        from prices P join units u
                        on p.product_id = u.product_id
                        where u.purchase_date between p.start_date and p.end_date
                        group by 1
                      
                      
                      """)
df_result.display()

product_id,average_price
1,6.96
2,16.96
