In [1]:
from pyspark.sql import SparkSession
from pyspark import  SparkContext
import os
import pandas as pd

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:
spark = SparkSession.builder.master("yarn").getOrCreate()

/spark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/15 02:33:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/15 02:33:38 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [3]:
import pandas as pd

# Sample data for Product table
product_data = {
    'product_id': [1, 2, 3],
    'product_name': ['LC Phone', 'LC T-Shirt', 'LC Keychain']
}

# Sample data for Sales table
sales_data = {
    'product_id': [1, 2, 3],
    'period_start': ['2019-01-25', '2018-12-01', '2019-12-01'],
    'period_end': ['2019-02-28', '2020-01-01', '2020-01-31'],
    'average_daily_sales': [100, 10, 1]
}

# Convert date strings to datetime objects
sales_data['period_start'] = pd.to_datetime(sales_data['period_start'])
sales_data['period_end'] = pd.to_datetime(sales_data['period_end'])

# Create DataFrame for Product and Sales tables
product_df = pd.DataFrame(product_data)
sales_df = pd.DataFrame(sales_data)


df_person = spark.createDataFrame(product_df)
df_person.createOrReplaceTempView("Product")

df_person = spark.createDataFrame(sales_df)
df_person.createOrReplaceTempView("Sales")



In [4]:
print(product_df)
print()
print(sales_df)

   product_id product_name
0           1     LC Phone
1           2   LC T-Shirt
2           3  LC Keychain

   product_id period_start period_end  average_daily_sales
0           1   2019-01-25 2019-02-28                  100
1           2   2018-12-01 2020-01-01                   10
2           3   2019-12-01 2020-01-31                    1


In [101]:
query = """
    select
        *,
        explode(sequence(year(period_start), year(period_end))) as year_gen
    from Sales s
"""

In [102]:
spark.sql(query).show()

+----------+-------------------+-------------------+-------------------+--------+
|product_id|       period_start|         period_end|average_daily_sales|year_gen|
+----------+-------------------+-------------------+-------------------+--------+
|         1|2019-01-25 00:00:00|2019-02-28 00:00:00|                100|    2019|
|         2|2018-12-01 00:00:00|2020-01-01 00:00:00|                 10|    2018|
|         2|2018-12-01 00:00:00|2020-01-01 00:00:00|                 10|    2019|
|         2|2018-12-01 00:00:00|2020-01-01 00:00:00|                 10|    2020|
|         3|2019-12-01 00:00:00|2020-01-31 00:00:00|                  1|    2019|
|         3|2019-12-01 00:00:00|2020-01-31 00:00:00|                  1|    2020|
+----------+-------------------+-------------------+-------------------+--------+



In [86]:
query_1 = f"""
    select
        *,
        max(year_gen) over(partition by product_id) = year_gen as is_max,
        min(year_gen) over(partition by product_id) = year_gen as is_min
    from (
        {query}
    )
"""

In [87]:
spark.sql(query_1).show()

+----------+-------------------+-------------------+-------------------+--------+------+------+
|product_id|       period_start|         period_end|average_daily_sales|year_gen|is_max|is_min|
+----------+-------------------+-------------------+-------------------+--------+------+------+
|         1|2019-01-25 00:00:00|2019-02-28 00:00:00|                100|    2019|  true|  true|
|         2|2018-12-01 00:00:00|2020-01-01 00:00:00|                 10|    2018| false|  true|
|         2|2018-12-01 00:00:00|2020-01-01 00:00:00|                 10|    2019| false| false|
|         2|2018-12-01 00:00:00|2020-01-01 00:00:00|                 10|    2020|  true| false|
|         3|2019-12-01 00:00:00|2020-01-31 00:00:00|                  1|    2019| false|  true|
|         3|2019-12-01 00:00:00|2020-01-31 00:00:00|                  1|    2020|  true| false|
+----------+-------------------+-------------------+-------------------+--------+------+------+



In [96]:
query_2 = f"""
    select
        t.product_id,
        p.product_name,
        t.year_gen as year_report,
        case
            when (is_max and is_min) then average_daily_sales * (
                date_diff(
                    period_end, 
                    period_start
                ) + 1
            )
            
            when (is_min) then average_daily_sales * date_diff(
                to_date(concat(year_gen + 1, '-01-01'), 'yyyy-MM-dd'), 
                period_start
            )

            when (is_max) then average_daily_sales * (
                date_diff(
                    period_end, 
                    to_date(concat(year_gen, '-01-01'), 'yyyy-MM-dd')
                ) + 1
            )
            
            else average_daily_sales * date_diff(
                to_date(concat(year_gen + 1, '-01-01'), 'yyyy-MM-dd'), 
                to_date(concat(year_gen, '-01-01'), 'yyyy-MM-dd')
            )
            
        end as total_amount
    from (
        {query_1}
    ) t left join Product p on t.product_id = p.product_id
    order by t.product_id asc, year_report asc
"""

In [100]:
print(query_2)
spark.sql(query_2).show()
spark.sql(query_2).explain(extended=True)



    select
        t.product_id,
        p.product_name,
        t.year_gen as year_report,
        case
            when (is_max and is_min) then average_daily_sales * (
                date_diff(
                    period_end, 
                    period_start
                ) + 1
            )
            
            when (is_min) then average_daily_sales * date_diff(
                to_date(concat(year_gen + 1, '-01-01'), 'yyyy-MM-dd'), 
                period_start
            )

            when (is_max) then average_daily_sales * (
                date_diff(
                    period_end, 
                    to_date(concat(year_gen, '-01-01'), 'yyyy-MM-dd')
                ) + 1
            )
            
            else average_daily_sales * date_diff(
                to_date(concat(year_gen + 1, '-01-01'), 'yyyy-MM-dd'), 
                to_date(concat(year_gen, '-01-01'), 'yyyy-MM-dd')
            )
            
        end as total_amount
    from (
        
    selec

In [75]:
print(query_2)


    select
        *,
        case
            when is_max and is_min then average_daily_sales * (date_diff(period_end, period_start) + 1)
            when is_min then average_daily_sales * (date_diff(to_date(concat(year_gen + 1, '-01-01'), 'yyyy-MM-dd'), period_start))
            when is_max then average_daily_sales * (date_diff(to_date(concat(year_gen + 1, '-01-01'), 'yyyy-MM-dd'), period_start))
        end as value
    from (
        
    select
        *,
        max(year_gen) over(partition by product_id) = year_gen as is_max,
        min(year_gen) over(partition by product_id) = year_gen as is_min
    from (
        
    select
        *,
        Year(period_end) - Year(period_start) + 1 as diff_year,
        date_diff(period_end, period_start) as diff_date,
        date_add(period_end, 1) as date_end_cal,
        Year(period_end) - Year(period_start) as date_gen,
        explode(sequence(year(period_start), year(period_end))) as year_gen,
        average_daily_sales * 365 as 