In [84]:
# products_df
# transactions_df
# customers_df

StatementMeta(taxprocessing, 9, 2, Finished, Available)

In [85]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Path to the JDBC driver JAR file in ADLS
jdbc_driver_path = "abfss://mycontainer@adlsgen21507.dfs.core.windows.net/synapse/workspaces/spark-notebook-taxprocessing/mssql-jdbc-12.6.2.jre11.jar"

# Initialize SparkSession with the JDBC driver JAR
spark = SparkSession.builder \
    .appName("tax-processing") \
    .config("spark.jars", jdbc_driver_path) \
    .getOrCreate()

df1 = spark.read.load('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/transactions.csv', format='csv', header=True,inferSchma = True)
display(df1.limit(10))

StatementMeta(taxprocessing, 9, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1ae371c2-833f-4b36-b2bc-ca88fd6dee63)

In [86]:
df1.printSchema() 

StatementMeta(taxprocessing, 9, 4, Finished, Available)

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- items: string (nullable = true)



In [87]:
StructTypeSchema = StructType([
    StructField("product_id", StringType(), True),  # Optional field
    StructField("quantity", IntegerType(), True),   # Optional field (corrected typo)
    StructField("price", StringType(), True)       # Optional field
])
df1 = df1.withColumn('propStruct', from_json(df1.items,StructTypeSchema))

df1.show() 

StatementMeta(taxprocessing, 9, 5, Finished, Available)

+--------------------+-----------+----------+--------------------+--------------------+
|      transaction_id|customer_id|      date|               items|          propStruct|
+--------------------+-----------+----------+--------------------+--------------------+
|K-12722-FUR-FU-10...|    K-12722|2022-07-30|{'product_id': 'F...|{FUR-FU-10004665,...|
|K-12722-OFF-AR-10...|    K-12722|2022-09-26|{'product_id': 'O...|{OFF-AR-10002053,...|
|K-12722-TEC-PH-10...|    K-12722|2023-06-20|{'product_id': 'T...|{TEC-PH-10001459,...|
|WB-64368-OFF-BI-1...|   WB-64368|2022-10-21|{'product_id': 'O...|{OFF-BI-10003655,...|
|WB-64368-OFF-LA-1...|   WB-64368|2024-04-23|{'product_id': 'O...|{OFF-LA-10002381,...|
|WB-64368-OFF-PA-1...|   WB-64368|2022-08-11|{'product_id': 'O...|{OFF-PA-10003129,...|
|J-14825-FUR-BO-10...|    J-14825|2023-06-18|{'product_id': 'F...|{FUR-BO-10000468,...|
|J-14825-TEC-MA-10...|    J-14825|2023-04-20|{'product_id': 'T...|{TEC-MA-10001031,...|
|J-14825-OFF-PA-10...|    J-1482

In [88]:
df1.printSchema()

StatementMeta(taxprocessing, 9, 6, Finished, Available)

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- items: string (nullable = true)
 |-- propStruct: struct (nullable = true)
 |    |-- product_id: string (nullable = true)
 |    |-- quantity: integer (nullable = true)
 |    |-- price: string (nullable = true)



In [89]:
transactions_df = df1.select(col('transaction_id'),col('customer_id'),col('date'),col('propStruct.product_id').alias('product_id'),col('propStruct.quantity').alias('quantity'),col('propStruct.price').alias('price'))

StatementMeta(taxprocessing, 9, 7, Finished, Available)

In [90]:
transactions_df.show() 

StatementMeta(taxprocessing, 9, 8, Finished, Available)

+--------------------+-----------+----------+---------------+--------+--------+
|      transaction_id|customer_id|      date|     product_id|quantity|   price|
+--------------------+-----------+----------+---------------+--------+--------+
|K-12722-FUR-FU-10...|    K-12722|2022-07-30|FUR-FU-10004665|       3|   $94.2|
|K-12722-OFF-AR-10...|    K-12722|2022-09-26|OFF-AR-10002053|       2|  $16.02|
|K-12722-TEC-PH-10...|    K-12722|2023-06-20|TEC-PH-10001459|       2| $164.88|
|WB-64368-OFF-BI-1...|   WB-64368|2022-10-21|OFF-BI-10003655|       5|  $16.34|
|WB-64368-OFF-LA-1...|   WB-64368|2024-04-23|OFF-LA-10002381|       3| $242.94|
|WB-64368-OFF-PA-1...|   WB-64368|2022-08-11|OFF-PA-10003129|       2|   $32.4|
|J-14825-FUR-BO-10...|    J-14825|2023-06-18|FUR-BO-10000468|       1| $13.494|
|J-14825-TEC-MA-10...|    J-14825|2023-04-20|TEC-MA-10001031|       3| $12.224|
|J-14825-OFF-PA-10...|    J-14825|2022-09-10|OFF-PA-10002230|       2|  $191.6|
|J-14825-OFF-AR-10...|    J-14825|2021-0

In [91]:
transactions_df = transactions_df.withColumn("price",regexp_replace("price","\\$","").cast('float'))\
                    .withColumn("Sales", expr('quantity * price'))

StatementMeta(taxprocessing, 9, 9, Finished, Available)

In [92]:
transactions_df.printSchema()

StatementMeta(taxprocessing, 9, 10, Finished, Available)

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: float (nullable = true)
 |-- Sales: float (nullable = true)



In [93]:
transactions_df.show(10)

StatementMeta(taxprocessing, 9, 11, Finished, Available)

+--------------------+-----------+----------+---------------+--------+------+---------+
|      transaction_id|customer_id|      date|     product_id|quantity| price|    Sales|
+--------------------+-----------+----------+---------------+--------+------+---------+
|K-12722-FUR-FU-10...|    K-12722|2022-07-30|FUR-FU-10004665|       3|  94.2|282.59998|
|K-12722-OFF-AR-10...|    K-12722|2022-09-26|OFF-AR-10002053|       2| 16.02|    32.04|
|K-12722-TEC-PH-10...|    K-12722|2023-06-20|TEC-PH-10001459|       2|164.88|   329.76|
|WB-64368-OFF-BI-1...|   WB-64368|2022-10-21|OFF-BI-10003655|       5| 16.34|     81.7|
|WB-64368-OFF-LA-1...|   WB-64368|2024-04-23|OFF-LA-10002381|       3|242.94|   728.82|
|WB-64368-OFF-PA-1...|   WB-64368|2022-08-11|OFF-PA-10003129|       2|  32.4|     64.8|
|J-14825-FUR-BO-10...|    J-14825|2023-06-18|FUR-BO-10000468|       1|13.494|   13.494|
|J-14825-TEC-MA-10...|    J-14825|2023-04-20|TEC-MA-10001031|       3|12.224|   36.672|
|J-14825-OFF-PA-10...|    J-1482

### Total Sales By Transactions

In [94]:
total_sales_by_transaction = transactions_df \
    .groupBy(col('transaction_id')) \
    .agg(sum(col('Sales')).alias('total_sales')) \
    .orderBy(col('total_sales'), ascending = False) \
    .select('transaction_id', 'total_sales')

StatementMeta(taxprocessing, 9, 12, Finished, Available)

In [95]:
total_sales_by_transaction.show(50)

StatementMeta(taxprocessing, 9, 13, Finished, Available)

+--------------------+------------+
|      transaction_id| total_sales|
+--------------------+------------+
|AP-85302-FUR-FU-1...|113192.40625|
|AP-77154-FUR-FU-1...|113192.40625|
|HP-67453-FUR-FU-1...|113192.40625|
|MP-25617-FUR-FU-1...|113192.40625|
|O-72939-FUR-FU-10...|113192.40625|
|HP-12914-FUR-FU-1...|113192.40625|
|WB-78345-FUR-FU-1...|113192.40625|
|MP-43972-FUR-FU-1...|113192.40625|
|M-37810-FUR-FU-10...|113192.40625|
|T-98316-FUR-FU-10...|113192.40625|
|G-76821-FUR-FU-10...|113192.40625|
|UP-14755-FUR-FU-1...|113192.40625|
|AP-24050-FUR-FU-1...|113192.40625|
|O-66967-FUR-FU-10...|113192.40625|
|N-87419-FUR-FU-10...|113192.40625|
|K-16342-FUR-FU-10...|113192.40625|
|UP-40869-FUR-FU-1...|113192.40625|
|R-78076-FUR-FU-10...|113192.40625|
|P-24194-FUR-FU-10...|113192.40625|
|R-32598-FUR-FU-10...|113192.40625|
|M-60435-FUR-FU-10...|90553.921875|
|WB-50343-FUR-FU-1...|90553.921875|
|J-32318-FUR-FU-10...|90553.921875|
|J-69369-FUR-FU-10...|90553.921875|
|UP-49535-FUR-FU-1...|90553.

In [96]:
products_df = spark.read.load('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/products (1).csv', format='csv', header=True,inferSchma = True)
display(products_df.limit(10))

StatementMeta(taxprocessing, 9, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, 220df194-420e-47e5-bf40-29fae4504a25)

In [97]:
# Scale the random values to be between 0.15 and 0.42

random_size = 7.24

products_df = products_df.withColumn("tax_percentage",rand() * random_size) \
                .withColumn("price",regexp_replace("price","\\$","").cast('float'))

products_df.show(10) 

StatementMeta(taxprocessing, 9, 15, Finished, Available)

+---------------+--------------------+--------------------+-------+------------------+
|     product_id|         description|          attributes|  price|    tax_percentage|
+---------------+--------------------+--------------------+-------+------------------+
|FUR-BO-10000112|Bush Birmingham C...|{'product_categor...|   46.0|0.6255698915787462|
|FUR-BO-10000330|Sauder Camden Cou...|{'product_categor...|1024.38|6.0233799587951555|
|FUR-BO-10000362|Sauder Inglewood ...|{'product_categor...| 17.216|3.2057056733911056|
|FUR-BO-10000468|O'Sullivan 2-Shel...|{'product_categor...| 13.494| 2.267389661472281|
|FUR-BO-10000711|Hon Metal Bookcas...|{'product_categor...|  91.36|1.5886114570735634|
|FUR-BO-10000780|O'Sullivan Planta...|{'product_categor...|  104.8|  6.92059361511338|
|FUR-BO-10001337|O'Sullivan Living...|{'product_categor...|  14.94|0.7627121867047044|
|FUR-BO-10001519|O'Sullivan 3-Shel...|{'product_categor...| 263.88| 3.081862475839149|
|FUR-BO-10001567|Bush Westfield Co...|{'pro

#### Total Tax per transaction

In [98]:
# joining on 2 condititons for unique product id.
tax_per_transaction = transactions_df.join(products_df,how='inner',on = (products_df["product_id"] == transactions_df["product_id"]) & (products_df["price"] == transactions_df["price"]))

StatementMeta(taxprocessing, 9, 16, Finished, Available)

In [99]:
tax_per_transaction = tax_per_transaction.withColumn("total_tax_amount",expr('tax_percentage * quantity'))

StatementMeta(taxprocessing, 9, 17, Finished, Available)

In [100]:
tax_per_transaction.show(10) 

StatementMeta(taxprocessing, 9, 18, Finished, Available)

+--------------------+-----------+----------+---------------+--------+------+---------+---------------+--------------------+--------------------+------+------------------+------------------+
|      transaction_id|customer_id|      date|     product_id|quantity| price|    Sales|     product_id|         description|          attributes| price|    tax_percentage|  total_tax_amount|
+--------------------+-----------+----------+---------------+--------+------+---------+---------------+--------------------+--------------------+------+------------------+------------------+
|K-12722-FUR-FU-10...|    K-12722|2022-07-30|FUR-FU-10004665|       3|  94.2|282.59998|FUR-FU-10004665|3M Polarizing Tas...|{'product_categor...|  94.2| 0.736568189408735| 2.209704568226205|
|K-12722-OFF-AR-10...|    K-12722|2022-09-26|OFF-AR-10002053|       2| 16.02|    32.04|OFF-AR-10002053|Premium Writing P...|{'product_categor...| 16.02|5.5961898935971215|11.192379787194243|
|K-12722-TEC-PH-10...|    K-12722|2023-06-20|

In [101]:
tax_per_transactions1 = tax_per_transaction

StatementMeta(taxprocessing, 9, 19, Finished, Available)

### total sales per day

In [102]:
tax_sales_per_day = tax_per_transaction.groupBy('date') \
    .agg(sum('sales').alias('sales per day'), sum('total_tax_amount').alias('tax per day')) \
    .orderBy('date',asc = False) \
    .select('date', 'sales per day', 'tax per day')

StatementMeta(taxprocessing, 9, 20, Finished, Available)

In [103]:
tax_sales_per_day.show()

StatementMeta(taxprocessing, 9, 21, Finished, Available)

+----------+------------------+------------------+
|      date|     sales per day|       tax per day|
+----------+------------------+------------------+
|2021-06-13| 129953.5689303875| 1691.291028292776|
|2021-06-14| 77943.08056163788|1738.4513650609929|
|2021-06-15|156190.86480140686| 2035.385420220091|
|2021-06-16|182265.31578683853|2124.7180420447326|
|2021-06-17|107526.87702465057|1955.6175123696835|
|2021-06-18| 261187.1218674183| 2115.835352086349|
|2021-06-19| 203410.3118209839|  2123.59971041166|
|2021-06-20| 99931.48541533947| 1917.952670657346|
|2021-06-21|136170.97430610657| 1888.589689675233|
|2021-06-22|132143.95849847794|1855.7527142184294|
|2021-06-23|113544.32037830353|1980.4109949607152|
|2021-06-24| 134083.1011686325|1796.5892024171562|
|2021-06-25|141890.91727018356|1978.6751708511738|
|2021-06-26| 181475.4420375824|1872.5940644559723|
|2021-06-27|159079.35660982132|1557.9272651506003|
|2021-06-28| 196249.1378853321| 2254.435841311728|
|2021-06-29| 152241.3582712412|

### customer lifetime value

In [104]:
customers_df = spark.read.load('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/customers (1).csv', format='csv', header=True)
display(customers_df.limit(10))

StatementMeta(taxprocessing, 9, 22, Finished, Available)

SynapseWidget(Synapse.DataFrame, f2f4a4ec-e8b6-489e-a67c-d4f6d7e7e7ec)

In [105]:
CLV = tax_per_transaction.join(customers_df, on=tax_per_transaction['customer_id'] == customers_df['customer_id'], how='inner') \
    .groupBy(tax_per_transaction['customer_id']) \
    .agg(sum('Sales').alias('CLV')) \
    .select('customer_id', 'CLV')


StatementMeta(taxprocessing, 9, 23, Finished, Available)

In [106]:
CLV.show(10)

StatementMeta(taxprocessing, 9, 24, Finished, Available)

+-----------+------------------+
|customer_id|               CLV|
+-----------+------------------+
|   MP-68708| 263.4419975280762|
|    A-16437| 4143.990013122559|
|    C-14168| 617.5999908447266|
|    M-86002| 7863.560287475586|
|    A-68927|503.90000915527344|
|    K-24682|441.79798889160156|
|    M-75143| 80.95000076293945|
|    U-34709|             282.0|
|    J-60970| 12324.62395477295|
|    T-83293| 537.6900119781494|
+-----------+------------------+
only showing top 10 rows



In [107]:
StructTypeSchema1 = StructType([
    StructField('product_category',StringType(),True),
    StructField('sub_category',StringType(),True)
    ])

products_df = products_df.withColumn('PropStruct',from_json(products_df['attributes'],schema=StructTypeSchema1))

products_df.printSchema()

StatementMeta(taxprocessing, 9, 25, Finished, Available)

root
 |-- product_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- attributes: string (nullable = true)
 |-- price: float (nullable = true)
 |-- tax_percentage: double (nullable = false)
 |-- PropStruct: struct (nullable = true)
 |    |-- product_category: string (nullable = true)
 |    |-- sub_category: string (nullable = true)



In [108]:
products_df = products_df.select('product_id','description','PropStruct.product_category','PropStruct.sub_category','price','tax_percentage')

StatementMeta(taxprocessing, 9, 26, Finished, Available)

In [109]:
products_df.show() 

StatementMeta(taxprocessing, 9, 27, Finished, Available)

+---------------+--------------------+----------------+------------+--------+------------------+
|     product_id|         description|product_category|sub_category|   price|    tax_percentage|
+---------------+--------------------+----------------+------------+--------+------------------+
|FUR-BO-10000112|Bush Birmingham C...|       Furniture|   Bookcases|    46.0|0.6255698915787462|
|FUR-BO-10000330|Sauder Camden Cou...|       Furniture|   Bookcases| 1024.38|6.0233799587951555|
|FUR-BO-10000362|Sauder Inglewood ...|       Furniture|   Bookcases|  17.216|3.2057056733911056|
|FUR-BO-10000468|O'Sullivan 2-Shel...|       Furniture|   Bookcases|  13.494| 2.267389661472281|
|FUR-BO-10000711|Hon Metal Bookcas...|       Furniture|   Bookcases|   91.36|1.5886114570735634|
|FUR-BO-10000780|O'Sullivan Planta...|       Furniture|   Bookcases|   104.8|  6.92059361511338|
|FUR-BO-10001337|O'Sullivan Living...|       Furniture|   Bookcases|   14.94|0.7627121867047044|
|FUR-BO-10001519|O'Sullivan 3-

In [110]:
column_names = tax_per_transaction.columns
print("Column names:", column_names)

StatementMeta(taxprocessing, 9, 28, Finished, Available)

Column names: ['transaction_id', 'customer_id', 'date', 'product_id', 'quantity', 'price', 'Sales', 'product_id', 'description', 'attributes', 'price', 'tax_percentage', 'total_tax_amount']


In [111]:
new_columns = ['transaction_id', 'customer_id', 'date', 'product_id_', 'quantity', 'price', 'Sales', 'product_id', 'description', 'attributes', 'price', 'tax_percentage', 'total_tax_amount']
tax_per_transaction = tax_per_transaction.toDF(*new_columns)

StatementMeta(taxprocessing, 9, 29, Finished, Available)

In [112]:
products_performance = tax_per_transaction.join(products_df, on=tax_per_transaction['product_id'] == products_df['product_id'], how='inner') \
    .groupBy(tax_per_transaction['description'],products_df['product_category'],products_df['sub_category']) \
    .agg(sum('Sales').alias('product sales performance')) \
    .select('description','product_category','sub_category','product sales performance')

StatementMeta(taxprocessing, 9, 30, Finished, Available)

In [113]:
products_performance.show()

StatementMeta(taxprocessing, 9, 31, Finished, Available)

+--------------------+----------------+------------+-------------------------+
|         description|product_category|sub_category|product sales performance|
+--------------------+----------------+------------+-------------------------+
|Recycled Interoff...| Office Supplies|   Envelopes|        15785.47216796875|
|Martin Yale Chadl...| Office Supplies|    Supplies|       299020.27392578125|
|           Avery 482| Office Supplies|      Labels|        1539032.826171875|
|Eldon Stackable T...|       Furniture| Furnishings|         208488.955078125|
|Adams Telephone M...| Office Supplies|       Paper|       124472.37994384766|
|Portfile Personal...| Office Supplies|     Storage|       144526.40383911133|
|Rediform S.O.S. 1...| Office Supplies|       Paper|       26766.478912353516|
|Southworth 100% R...| Office Supplies|       Paper|       13900.895515441895|
|Peel-Off China Ma...| Office Supplies|         Art|       17472.000549316406|
|Plantronics Savi ...|      Technology| Accessories|

### Product Performance within category

In [114]:

from pyspark.sql import Window
pp_wrt_category_wind = Window.partitionBy('product_category').orderBy(desc(col('product sales performance')))

pp_wrt_category = products_performance.withColumn('category_rank',rank().over(pp_wrt_category_wind))




StatementMeta(taxprocessing, 9, 32, Finished, Available)

In [115]:
pp_wrt_category.filter(col('category_rank') == 1).show() 

StatementMeta(taxprocessing, 9, 33, Finished, Available)

+--------------------+----------------+------------+-------------------------+-------------+
|         description|product_category|sub_category|product sales performance|category_rank|
+--------------------+----------------+------------+-------------------------+-------------+
|Eldon Regeneratio...|       Furniture| Furnishings|         6361413.03515625|            1|
|           Avery 506| Office Supplies|      Labels|         4367987.43359375|            1|
|    Samsung Convoy 3|      Technology|      Phones|         1345746.90234375|            1|
+--------------------+----------------+------------+-------------------------+-------------+



#### Extra Discounts.

In [116]:
# more than 5 transaction and purchase value should be more than 1k, then he get's a discount of 5 %:-
tax_per_transaction.show() 

master_customer = tax_per_transaction.groupBy('customer_id').agg(count('transaction_id').alias('cnt_of_transaction'), sum('Sales').alias('total_sales'))
master_customer.select('customer_id','cnt_of_transaction','total_sales').show() 

StatementMeta(taxprocessing, 9, 34, Finished, Available)

+--------------------+-----------+----------+---------------+--------+-------+---------+---------------+--------------------+--------------------+-------+-------------------+------------------+
|      transaction_id|customer_id|      date|    product_id_|quantity|  price|    Sales|     product_id|         description|          attributes|  price|     tax_percentage|  total_tax_amount|
+--------------------+-----------+----------+---------------+--------+-------+---------+---------------+--------------------+--------------------+-------+-------------------+------------------+
|K-12722-FUR-FU-10...|    K-12722|2022-07-30|FUR-FU-10004665|       3|   94.2|282.59998|FUR-FU-10004665|3M Polarizing Tas...|{'product_categor...|   94.2|  0.736568189408735| 2.209704568226205|
|K-12722-OFF-AR-10...|    K-12722|2022-09-26|OFF-AR-10002053|       2|  16.02|    32.04|OFF-AR-10002053|Premium Writing P...|{'product_categor...|  16.02| 5.5961898935971215|11.192379787194243|
|K-12722-TEC-PH-10...|    K-12

In [117]:
master_customer.printSchema() 

StatementMeta(taxprocessing, 9, 35, Finished, Available)

root
 |-- customer_id: string (nullable = true)
 |-- cnt_of_transaction: long (nullable = false)
 |-- total_sales: double (nullable = true)



In [118]:
master_customer_lst = master_customer.filter( (col('cnt_of_transaction') >= 10) & (col('total_sales') >= 1000)).select('customer_id', 'cnt_of_transaction', 'total_sales')

StatementMeta(taxprocessing, 9, 36, Finished, Available)

In [119]:
master_customer_lst.show()

StatementMeta(taxprocessing, 9, 37, Finished, Available)

+-----------+------------------+------------------+
|customer_id|cnt_of_transaction|       total_sales|
+-----------+------------------+------------------+
|    J-60970|                10| 12324.62395477295|
|    R-22742|                10|10108.898056983948|
|    M-11634|                11| 62333.43787097931|
|    G-97345|                10| 2621.396004676819|
|    K-14481|                10| 2833.192036628723|
|   TN-29615|                10|1418.0240507125854|
|   MP-60246|                10|26578.737815856934|
|    K-68635|                10| 13683.10628604889|
|    K-75826|                12| 5943.254018783569|
|    R-37611|                10|3106.5250129699707|
|    K-62828|                10| 1870.259994506836|
|    G-50622|                10| 8481.289985656738|
|    A-20491|                10| 7816.653943061829|
|    R-89018|                10| 7111.196122646332|
|   AP-78139|                11|14952.029636383057|
|    M-51056|                10|  8421.12396812439|
|    K-66038

In [120]:
# # Join tax_per_transaction with master_customer_lst on customer_id
joined_df = tax_per_transaction.join(master_customer_lst, on=['customer_id'], how='left')

joined_df = joined_df.select("transaction_id","customer_id","sales","cnt_of_transaction")
joined_df.show()

# # Apply discount based on condition
discounted_df = joined_df.withColumn(
    "Price Discount",
    when(col("cnt_of_transaction").isNotNull(), col('sales') * 0.05).otherwise(0)
)

display(discounted_df.limit(10)) 

StatementMeta(taxprocessing, 9, 38, Finished, Available)

+--------------------+-----------+---------+------------------+
|      transaction_id|customer_id|    sales|cnt_of_transaction|
+--------------------+-----------+---------+------------------+
|WB-64368-OFF-BI-1...|   WB-64368|     81.7|              null|
|WB-64368-OFF-LA-1...|   WB-64368|   728.82|              null|
|WB-64368-OFF-PA-1...|   WB-64368|     64.8|              null|
|A-30388-FUR-CH-10...|    A-30388|28.223999|              null|
|A-30388-OFF-BI-10...|    A-30388|  1126.48|              null|
|A-30388-FUR-BO-10...|    A-30388|151.72499|              null|
|K-12722-FUR-FU-10...|    K-12722|282.59998|              null|
|K-12722-OFF-AR-10...|    K-12722|    32.04|              null|
|K-12722-TEC-PH-10...|    K-12722|   329.76|              null|
|J-14825-FUR-BO-10...|    J-14825|   13.494|              null|
|J-14825-TEC-MA-10...|    J-14825|   36.672|              null|
|J-14825-OFF-PA-10...|    J-14825|    383.2|              null|
|J-14825-OFF-AR-10...|    J-14825|  217.

SynapseWidget(Synapse.DataFrame, 1f98b2bc-c855-4fe0-a103-b3d066c40f27)

#### process data related to refunds.

In [121]:
refunds_df = spark.read.load('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/refunds (3).csv', format='csv', inferSchema = True, header=True)
display(refunds_df.limit(10))

StatementMeta(taxprocessing, 9, 39, Finished, Available)

SynapseWidget(Synapse.DataFrame, b4d103d1-acae-464c-a65e-2ef054138916)

In [122]:
refunds_df = refunds_df.withColumn("total_Refund",regexp_replace("refund_amount","\\$","").cast('float'))\
                .withColumn("tax_refund",regexp_replace("tax_refund_amount","\\$","").cast('float'))

refunds_df.show()  



StatementMeta(taxprocessing, 9, 40, Finished, Available)

+--------------------+--------------------+-----------+---------------+----------+--------+-------------+-----------------+------------+----------+
|           refund_id|      transaction_id|customer_id|     product_id|      date|quantity|refund_amount|tax_refund_amount|total_Refund|tax_refund|
+--------------------+--------------------+-----------+---------------+----------+--------+-------------+-----------------+------------+----------+
|B-46832-B-46832-O...|B-46832-OFF-PA-10...|    B-46832|OFF-PA-10002741|2024-02-17|       4|      $163.88|           $16.39|      163.88|     16.39|
|C-87776-C-87776-O...|C-87776-OFF-EN-10...|    C-87776|OFF-EN-10000781|2021-06-20|       1|      $213.12|           $21.31|      213.12|     21.31|
|AP-38866-AP-38866...|AP-38866-FUR-FU-1...|   AP-38866|FUR-FU-10003930|2021-11-10|       1|       $16.22|            $1.62|       16.22|      1.62|
|H-69793-H-69793-O...|H-69793-OFF-AR-10...|    H-69793|OFF-AR-10002445|2022-05-19|       1|        $75.6|       

In [123]:
refunds_df = refunds_df.select("transaction_id","total_refund","tax_refund")

StatementMeta(taxprocessing, 9, 41, Finished, Available)

In [124]:
# tax_per_transactions1.show()

# new_columns = ['transaction_id', 'customer_id', 'date', 'product_id_', 'quantity', 'price', 'Sales', 'product_id', 'description', 'attributes', 'price', 'tax_percentage', 'total_tax_amount']
# tax_per_transactions1 = tax_per_transactions1.toDF(*new_columns)

StatementMeta(taxprocessing, 9, 42, Finished, Available)

In [125]:
tax_per_transaction1 = tax_per_transaction.select('transaction_id','Sales','total_tax_amount')

StatementMeta(taxprocessing, 9, 43, Finished, Available)

In [126]:
# Joining transactions_df and refunds_df
# joined_df = tax_per_transaction1.join(refunds_df, on=["transaction_id"], how="left")

# display(joined_df) 

refunds_df.show()
tax_per_transaction1.show() 


StatementMeta(taxprocessing, 9, 44, Finished, Available)

+--------------------+------------+----------+
|      transaction_id|total_refund|tax_refund|
+--------------------+------------+----------+
|B-46832-OFF-PA-10...|      163.88|     16.39|
|C-87776-OFF-EN-10...|      213.12|     21.31|
|AP-38866-FUR-FU-1...|       16.22|      1.62|
|H-69793-OFF-AR-10...|        75.6|      7.56|
|B-98981-TEC-PH-10...|       441.5|     44.15|
|M-71343-OFF-PA-10...|        23.7|      2.37|
|B-86111-OFF-AR-10...|       26.38|      2.64|
|M-27753-FUR-FU-10...|        4.67|      0.47|
|AP-86470-FUR-CH-1...|       51.26|      5.13|
|K-58243-TEC-MA-10...|        53.6|      5.36|
|P-58172-OFF-AP-10...|      126.84|     12.68|
|TN-18525-TEC-PH-1...|        32.4|      3.24|
|M-38704-OFF-PA-10...|        2.38|      0.24|
|K-49635-TEC-AC-10...|       19.46|      1.95|
|AP-75575-OFF-ST-1...|       15.12|      1.51|
|M-81003-FUR-FU-10...|      286.86|     28.69|
|G-60190-TEC-PH-10...|       46.44|      4.64|
|K-75031-OFF-BI-10...|        62.1|      6.21|
|AP-34759-OFF

In [127]:
joined_df = tax_per_transaction1.join(refunds_df, on=["transaction_id"], how="left")

joined_df = joined_df.fillna({'total_refund': 0,'tax_refund':0 })

# # Selecting all columns from transactions_df and specific columns from refunds_df
Refunds_data_adjust = joined_df.select(tax_per_transaction1.columns +
                             ["total_refund",
                              "tax_refund"])

# # # Show or perform further actions on result_df

Refunds_data_adjust = Refunds_data_adjust.select("transaction_id", "Sales",  "total_tax_amount",expr("Sales-total_refund").alias("amount_after_Refund"),expr("total_tax_amount - tax_refund").alias("adjusted_tax"))
Refunds_data_adjust.show() 

### ################### amount will be same if there is no refund ######################

StatementMeta(taxprocessing, 9, 45, Finished, Available)

+--------------------+---------+------------------+-------------------+------------------+
|      transaction_id|    Sales|  total_tax_amount|amount_after_Refund|      adjusted_tax|
+--------------------+---------+------------------+-------------------+------------------+
|K-12722-FUR-FU-10...|282.59998| 2.209704568226205|          282.59998| 2.209704568226205|
|K-12722-OFF-AR-10...|    32.04|11.192379787194243|              32.04|11.192379787194243|
|K-12722-TEC-PH-10...|   329.76| 4.075136240604813|             329.76| 4.075136240604813|
|WB-64368-OFF-BI-1...|     81.7| 21.97738979866459|               81.7| 21.97738979866459|
|WB-64368-OFF-LA-1...|   728.82|15.607910331159346|             728.82|15.607910331159346|
|WB-64368-OFF-PA-1...|     64.8| 4.489270427066227|               64.8| 4.489270427066227|
|J-14825-FUR-BO-10...|   13.494| 2.267389661472281|             13.494| 2.267389661472281|
|J-14825-TEC-MA-10...|   36.672|2.6389683226659026|             36.672|2.6389683226659026|

In [128]:
# group by tax rate, total_sales.
# if tax rates high does the sales decreases. 


# prmotion discount vs non-promotion discount period on sales, tax collection((pivoting is better option))

# total customers by cvc.
# categories by customers 

# total_sales, total_tax by geographic (ranking 2 columns(asc and desc)) ..... orderby total_sales,total_tax(asc), total_tax(desc)

StatementMeta(taxprocessing, 9, 46, Finished, Available)

In [129]:
products_df.show() 

StatementMeta(taxprocessing, 9, 47, Finished, Available)

+---------------+--------------------+----------------+------------+--------+------------------+
|     product_id|         description|product_category|sub_category|   price|    tax_percentage|
+---------------+--------------------+----------------+------------+--------+------------------+
|FUR-BO-10000112|Bush Birmingham C...|       Furniture|   Bookcases|    46.0|0.6255698915787462|
|FUR-BO-10000330|Sauder Camden Cou...|       Furniture|   Bookcases| 1024.38|6.0233799587951555|
|FUR-BO-10000362|Sauder Inglewood ...|       Furniture|   Bookcases|  17.216|3.2057056733911056|
|FUR-BO-10000468|O'Sullivan 2-Shel...|       Furniture|   Bookcases|  13.494| 2.267389661472281|
|FUR-BO-10000711|Hon Metal Bookcas...|       Furniture|   Bookcases|   91.36|1.5886114570735634|
|FUR-BO-10000780|O'Sullivan Planta...|       Furniture|   Bookcases|   104.8|  6.92059361511338|
|FUR-BO-10001337|O'Sullivan Living...|       Furniture|   Bookcases|   14.94|0.7627121867047044|
|FUR-BO-10001519|O'Sullivan 3-

In [130]:
products_df_tax_round = products_df.withColumn("Round_Tax", round(col("tax_percentage")))
products_df_tax_round.show() 

StatementMeta(taxprocessing, 9, 48, Finished, Available)

+---------------+--------------------+----------------+------------+--------+------------------+---------+
|     product_id|         description|product_category|sub_category|   price|    tax_percentage|Round_Tax|
+---------------+--------------------+----------------+------------+--------+------------------+---------+
|FUR-BO-10000112|Bush Birmingham C...|       Furniture|   Bookcases|    46.0|0.6255698915787462|      1.0|
|FUR-BO-10000330|Sauder Camden Cou...|       Furniture|   Bookcases| 1024.38|6.0233799587951555|      6.0|
|FUR-BO-10000362|Sauder Inglewood ...|       Furniture|   Bookcases|  17.216|3.2057056733911056|      3.0|
|FUR-BO-10000468|O'Sullivan 2-Shel...|       Furniture|   Bookcases|  13.494| 2.267389661472281|      2.0|
|FUR-BO-10000711|Hon Metal Bookcas...|       Furniture|   Bookcases|   91.36|1.5886114570735634|      2.0|
|FUR-BO-10000780|O'Sullivan Planta...|       Furniture|   Bookcases|   104.8|  6.92059361511338|      7.0|
|FUR-BO-10001337|O'Sullivan Living...

In [131]:
transaction_history = tax_per_transaction.select("product_id","Sales","total_tax_amount")
transaction_history.show() 

Tax_bracket_analysis = transaction_history.join(products_df_tax_round, how = "inner", on = ["product_id"]).groupBy("Round_Tax")\
                        .agg(sum("total_tax_amount").alias("tax_collected_per_tbracket") \
                        , sum("Sales").alias("total_sales_by_tax_bracket"))

Tax_bracket_analysis.show()


StatementMeta(taxprocessing, 9, 49, Finished, Available)

+---------------+---------+------------------+
|     product_id|    Sales|  total_tax_amount|
+---------------+---------+------------------+
|FUR-FU-10004665|282.59998| 2.209704568226205|
|OFF-AR-10002053|    32.04|11.192379787194243|
|TEC-PH-10001459|   329.76| 4.075136240604813|
|OFF-BI-10003655|     81.7| 21.97738979866459|
|OFF-LA-10002381|   728.82|15.607910331159346|
|OFF-PA-10003129|     64.8| 4.489270427066227|
|FUR-BO-10000468|   13.494| 2.267389661472281|
|TEC-MA-10001031|   36.672|2.6389683226659026|
|OFF-PA-10002230|    383.2|13.105241741438716|
|OFF-AR-10003958|  217.472|24.389014808539322|
|FUR-FU-10001588|    22.08|14.002659891527438|
|FUR-CH-10004997|28.223999| 8.716332734803517|
|OFF-BI-10003718|  1126.48|31.809597208205762|
|FUR-BO-10003034|151.72499|1.4991464945264057|
|FUR-TA-10000577|     98.8|11.912806237403753|
|TEC-PH-10002200|     30.8|18.765817542246108|
|TEC-AC-10003289|  1463.88|  3.69255240323035|
|TEC-MA-10001148|  3357.72|20.388414051589336|
|OFF-BI-10003

#### promotional season vs non-promotion data

In [132]:
promotional_data = (
    spark.read.format('csv')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('quote', '"')
    .option('escape', '"')
    .load('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/promotions (1).csv')
)

display(promotional_data.limit(10))


StatementMeta(taxprocessing, 9, 50, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5781cff6-cf8c-4323-8b6c-383557e5902f)

In [133]:
promotional_data.printSchema() 

StatementMeta(taxprocessing, 9, 51, Finished, Available)

root
 |-- promo_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- discount: double (nullable = true)
 |-- applicable_products: string (nullable = true)
 |-- start_date: integer (nullable = true)
 |-- start_date_month: integer (nullable = true)
 |-- end_date: integer (nullable = true)
 |-- end_date_month: integer (nullable = true)



In [134]:
# promotional_data = promotional_data.withColumn("array_column", array(promotional_data["applicable_products"]))

from pyspark.sql.functions import split, col

# using split()
# Assuming promotional_data is your DataFrame with "applicable_products" as array of strings
promotional_data = promotional_data.withColumn("applicable_products", regexp_replace(col("applicable_products"), r"[\[\]]", ""))
promotional_data = promotional_data.withColumn("applicable_products_array", split(col("applicable_products"), ",\s*"))
promotional_data.printSchema()

StatementMeta(taxprocessing, 9, 52, Finished, Available)

root
 |-- promo_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- discount: double (nullable = true)
 |-- applicable_products: string (nullable = true)
 |-- start_date: integer (nullable = true)
 |-- start_date_month: integer (nullable = true)
 |-- end_date: integer (nullable = true)
 |-- end_date_month: integer (nullable = true)
 |-- applicable_products_array: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [135]:
promotional_data.printSchema() 

StatementMeta(taxprocessing, 9, 53, Finished, Available)

root
 |-- promo_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- discount: double (nullable = true)
 |-- applicable_products: string (nullable = true)
 |-- start_date: integer (nullable = true)
 |-- start_date_month: integer (nullable = true)
 |-- end_date: integer (nullable = true)
 |-- end_date_month: integer (nullable = true)
 |-- applicable_products_array: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [136]:
promotional_data.select("applicable_products_array").show(truncate = False) 

StatementMeta(taxprocessing, 9, 54, Finished, Available)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|applicable_products_array                                                                                                                                                 |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[OFF-BI-10001072]                                                                                                                                                         |
|[FUR-FU-10003268, OFF-PA-10000743, TEC-AC-10001445, OFF-ST-10002615, OFF-AP-10000326, OFF-EN-10001535]                                                                    |
|[OFF-BI-10004506, TEC-MA-10004552, OFF-BI-10000494, OFF-PA-10002764]                                                                  

In [137]:
from pyspark.sql.functions import explode, col


exploded_df = promotional_data.select("*",explode(col("applicable_products_array")).alias("exploded_element"))




StatementMeta(taxprocessing, 9, 55, Finished, Available)

In [138]:
exploded_df.show() 

StatementMeta(taxprocessing, 9, 56, Finished, Available)

+--------------------+--------------------+--------+--------------------+----------+----------------+--------+--------------+-------------------------+----------------+
|            promo_id|         description|discount| applicable_products|start_date|start_date_month|end_date|end_date_month|applicable_products_array|exploded_element|
+--------------------+--------------------+--------+--------------------+----------+----------------+--------+--------------+-------------------------+----------------+
|82a124c2-283d-45b...|         Diwali Sale|   13.57|     OFF-BI-10001072|        15|              10|      15|            11|        [OFF-BI-10001072]| OFF-BI-10001072|
|87e92b81-64b1-4a5...|        Holi Special|   11.86|FUR-FU-10003268, ...|        10|               3|      20|             3|     [FUR-FU-10003268,...| FUR-FU-10003268|
|87e92b81-64b1-4a5...|        Holi Special|   11.86|FUR-FU-10003268, ...|        10|               3|      20|             3|     [FUR-FU-10003268,...| OFF

In [139]:
transaction_data_after_promotion_discount = tax_per_transaction.select("transaction_id","date","product_id","Sales")

transaction_data_after_promotion_discount.show() 

StatementMeta(taxprocessing, 9, 57, Finished, Available)

+--------------------+----------+---------------+---------+
|      transaction_id|      date|     product_id|    Sales|
+--------------------+----------+---------------+---------+
|K-12722-FUR-FU-10...|2022-07-30|FUR-FU-10004665|282.59998|
|K-12722-OFF-AR-10...|2022-09-26|OFF-AR-10002053|    32.04|
|K-12722-TEC-PH-10...|2023-06-20|TEC-PH-10001459|   329.76|
|WB-64368-OFF-BI-1...|2022-10-21|OFF-BI-10003655|     81.7|
|WB-64368-OFF-LA-1...|2024-04-23|OFF-LA-10002381|   728.82|
|WB-64368-OFF-PA-1...|2022-08-11|OFF-PA-10003129|     64.8|
|J-14825-FUR-BO-10...|2023-06-18|FUR-BO-10000468|   13.494|
|J-14825-TEC-MA-10...|2023-04-20|TEC-MA-10001031|   36.672|
|J-14825-OFF-PA-10...|2022-09-10|OFF-PA-10002230|    383.2|
|J-14825-OFF-AR-10...|2021-09-17|OFF-AR-10003958|  217.472|
|J-14825-FUR-FU-10...|2021-12-03|FUR-FU-10001588|    22.08|
|A-30388-FUR-CH-10...|2024-05-01|FUR-CH-10004997|28.223999|
|A-30388-OFF-BI-10...|2021-09-03|OFF-BI-10003718|  1126.48|
|A-30388-FUR-BO-10...|2023-12-28|FUR-BO-

In [140]:

# Create a temporary view
transaction_data_after_promotion_discount.createOrReplaceTempView("sales_after_promo")

exploded_df.createOrReplaceTempView("promo_data") 

StatementMeta(taxprocessing, 9, 58, Finished, Available)

In [141]:
# display(transaction_data_after_promotion_discount )

StatementMeta(taxprocessing, 9, 59, Finished, Available)

In [142]:
%%sql
select * from sales_after_promo limit 10;

select * from promo_data limit 10;

StatementMeta(, 9, 61, Finished, Available)

<Spark SQL result set with 10 rows and 4 fields>

<Spark SQL result set with 10 rows and 10 fields>

In [143]:
%%sql 
-- with cte1 as (
--     select product_id,sum(discounted_price) as discounted_sales
--     from 
--     (SELECT 
--         t1.product_id,t1.Sales,t1.date, 
--         t2.*,
--         t1.Sales - COALESCE(t2.discount, 0) as discounted_price
--     FROM sales_after_promo t1
--     LEFT JOIN promo_data t2 ON t2.exploded_element = t1.product_id
--         AND MONTH(t1.date) BETWEEN t2.start_date_month AND t2.end_date_month
--         AND DAY(t1.date) BETWEEN t2.start_date AND t2.end_date
--     where t2.discount is not null)k 
--     group by product_id),
--     cte2 as (

--         select product_id,sum(sales) as sales_without_discount
--     from 
--     (SELECT 
--         t1.product_id,t1.Sales,t1.date, 
--         t2.*,
--         t1.Sales - COALESCE(t2.discount, 0) as discounted_price
--     FROM sales_after_promo t1
--     LEFT JOIN promo_data t2 ON t2.exploded_element = t1.product_id
--         AND MONTH(t1.date) BETWEEN t2.start_date_month AND t2.end_date_month
--         AND DAY(t1.date) BETWEEN t2.start_date AND t2.end_date
--     where t2.discount is null)k 
--     group by product_id)
    
-- select cte1.product_id,cte1.discounted_sales,cte2.sales_without_discount from cte1 inner join cte2 on cte1.product_id = cte2.product_id;

WITH cte AS (
    SELECT 
        t1.product_id,
        SUM(CASE WHEN t2.discount IS NOT NULL THEN t1.Sales - COALESCE(t2.discount, 0) ELSE t1.Sales END) AS discounted_sales,
        SUM(CASE WHEN t2.discount IS NULL THEN t1.Sales END) AS sales_without_discount
    FROM sales_after_promo t1
    LEFT JOIN promo_data t2 ON t2.exploded_element = t1.product_id
                            AND MONTH(t1.date) BETWEEN t2.start_date_month AND t2.end_date_month
                            AND DAY(t1.date) BETWEEN t2.start_date AND t2.end_date
    GROUP BY t1.product_id
)

SELECT 
    cte.product_id,
    cte.discounted_sales,
    cte.sales_without_discount
FROM cte;



    

StatementMeta(taxprocessing, 9, 62, Finished, Available)

<Spark SQL result set with 1000 rows and 3 fields>

In [144]:
# converting back to dataframe:-

seasonal_non_seasonal_prom = spark.sql(f'''
WITH cte AS (
    SELECT 
        t1.product_id,
        SUM(CASE WHEN t2.discount IS NOT NULL THEN t1.Sales - COALESCE(t2.discount, 0) ELSE t1.Sales END) AS discounted_sales,
        SUM(CASE WHEN t2.discount IS NULL THEN t1.Sales END) AS sales_without_discount
    FROM sales_after_promo t1
    LEFT JOIN promo_data t2 ON t2.exploded_element = t1.product_id
                            AND MONTH(t1.date) BETWEEN t2.start_date_month AND t2.end_date_month
                            AND DAY(t1.date) BETWEEN t2.start_date AND t2.end_date
    GROUP BY t1.product_id
)

SELECT 
    cte.product_id,
    cte.discounted_sales,
    cte.sales_without_discount
FROM cte;
''')


seasonal_non_seasonal_prom.show() 

StatementMeta(taxprocessing, 9, 63, Finished, Available)

+---------------+------------------+----------------------+
|     product_id|  discounted_sales|sales_without_discount|
+---------------+------------------+----------------------+
|TEC-MA-10001047|117431.99615478516|    117431.99615478516|
|OFF-PA-10002615| 7654.500104904175|     7654.500104904175|
|OFF-EN-10002600| 11281.58946609497|     11281.58946609497|
|OFF-AR-10003504| 48703.20068359375|     48703.20068359375|
|FUR-FU-10000409|1810.5600490570068|    1810.5600490570068|
|FUR-CH-10003833|131359.83758544922|    131359.83758544922|
|OFF-AP-10002495| 376902.3908691406|     376902.3908691406|
|OFF-PA-10004735| 2998.270025253296|     2998.270025253296|
|FUR-CH-10000863| 2198.016058444977|     2198.016058444977|
|OFF-PA-10000575|174746.60717773438|    174746.60717773438|
|OFF-LA-10004008| 9774.383605957031|     9774.383605957031|
|OFF-PA-10001457|42946.078216552734|    42946.078216552734|
|FUR-TA-10004086|  14201.1801071167|      14201.1801071167|
|OFF-AR-10003876|  74852.1893157959|    

##### customers_segmentation

In [145]:
tax_per_transaction.show()

StatementMeta(taxprocessing, 9, 64, Finished, Available)

+--------------------+-----------+----------+---------------+--------+-------+---------+---------------+--------------------+--------------------+-------+-------------------+------------------+
|      transaction_id|customer_id|      date|    product_id_|quantity|  price|    Sales|     product_id|         description|          attributes|  price|     tax_percentage|  total_tax_amount|
+--------------------+-----------+----------+---------------+--------+-------+---------+---------------+--------------------+--------------------+-------+-------------------+------------------+
|K-12722-FUR-FU-10...|    K-12722|2022-07-30|FUR-FU-10004665|       3|   94.2|282.59998|FUR-FU-10004665|3M Polarizing Tas...|{'product_categor...|   94.2|  0.736568189408735| 2.209704568226205|
|K-12722-OFF-AR-10...|    K-12722|2022-09-26|OFF-AR-10002053|       2|  16.02|    32.04|OFF-AR-10002053|Premium Writing P...|{'product_categor...|  16.02| 5.5961898935971215|11.192379787194243|
|K-12722-TEC-PH-10...|    K-12

In [146]:
tax_per_transaction_r = tax_per_transaction.select("customer_id","Sales","attributes")

StatementMeta(taxprocessing, 9, 65, Finished, Available)

In [147]:
tax_per_transaction_r.printSchema() 

StatementMeta(taxprocessing, 9, 66, Finished, Available)

root
 |-- customer_id: string (nullable = true)
 |-- Sales: float (nullable = true)
 |-- attributes: string (nullable = true)



In [148]:
tax_per_transaction_r.show(truncate=False) 

StatementMeta(taxprocessing, 9, 67, Finished, Available)

+-----------+---------+------------------------------------------------------------------+
|customer_id|Sales    |attributes                                                        |
+-----------+---------+------------------------------------------------------------------+
|K-12722    |282.59998|{'product_category': 'Furniture', 'sub_category': 'Furnishings'}  |
|K-12722    |32.04    |{'product_category': 'Office Supplies', 'sub_category': 'Art'}    |
|K-12722    |329.76   |{'product_category': 'Technology', 'sub_category': 'Phones'}      |
|WB-64368   |81.7     |{'product_category': 'Office Supplies', 'sub_category': 'Binders'}|
|WB-64368   |728.82   |{'product_category': 'Office Supplies', 'sub_category': 'Labels'} |
|WB-64368   |64.8     |{'product_category': 'Office Supplies', 'sub_category': 'Paper'}  |
|J-14825    |13.494   |{'product_category': 'Furniture', 'sub_category': 'Bookcases'}    |
|J-14825    |36.672   |{'product_category': 'Technology', 'sub_category': 'Machines'}    |

In [149]:
StructTypeSchema_r = StructType([
    StructField("product_category", StringType(), True),  # Optional field
    StructField("sub_category", StringType(), True)      # Optional field
])
tax_per_transaction_r = tax_per_transaction_r.withColumn('propStruct', from_json(tax_per_transaction_r.attributes,StructTypeSchema_r))

tax_per_transaction_r = tax_per_transaction_r.select("customer_id","Sales","propStruct.product_category","propStruct.sub_category")

tax_per_transaction_r.show() 

StatementMeta(taxprocessing, 9, 68, Finished, Available)

+-----------+---------+----------------+------------+
|customer_id|    Sales|product_category|sub_category|
+-----------+---------+----------------+------------+
|    K-12722|282.59998|       Furniture| Furnishings|
|    K-12722|    32.04| Office Supplies|         Art|
|    K-12722|   329.76|      Technology|      Phones|
|   WB-64368|     81.7| Office Supplies|     Binders|
|   WB-64368|   728.82| Office Supplies|      Labels|
|   WB-64368|     64.8| Office Supplies|       Paper|
|    J-14825|   13.494|       Furniture|   Bookcases|
|    J-14825|   36.672|      Technology|    Machines|
|    J-14825|    383.2| Office Supplies|       Paper|
|    J-14825|  217.472| Office Supplies|         Art|
|    J-14825|    22.08|       Furniture| Furnishings|
|    A-30388|28.223999|       Furniture|      Chairs|
|    A-30388|  1126.48| Office Supplies|     Binders|
|    A-30388|151.72499|       Furniture|   Bookcases|
|   AP-31244|     98.8|       Furniture|      Tables|
|   AP-31244|     30.8|     

In [150]:
tax_per_transaction_r = tax_per_transaction_r.groupBy(col('customer_id'),col('product_category')).agg(sum(col('Sales')).alias("Total Sales per category")).orderBy(col('customer_id'))

tax_per_transaction_r.show()

StatementMeta(taxprocessing, 9, 69, Finished, Available)

+-----------+----------------+------------------------+
|customer_id|product_category|Total Sales per category|
+-----------+----------------+------------------------+
|    A-10042|       Furniture|      1967.9759826660156|
|    A-10054| Office Supplies|      1766.5380601882935|
|    A-10110|      Technology|       47.20800018310547|
|    A-10110| Office Supplies|      2754.0660400390625|
|    A-10110|       Furniture|      297.40802001953125|
|    A-10238| Office Supplies|       437.8879985809326|
|    A-10238|       Furniture|       627.2640218734741|
|    A-10281| Office Supplies|       72.03000354766846|
|    A-10329|       Furniture|      238.72000122070312|
|    A-10329| Office Supplies|      1121.8600788116455|
|    A-10330|       Furniture|      19.440000534057617|
|    A-10330| Office Supplies|       1703.531979560852|
|    A-10369|       Furniture|       459.6000061035156|
|    A-10369| Office Supplies|      3348.1079711914062|
|    A-10415| Office Supplies|      1032.9600296

In [151]:
pivot_df = tax_per_transaction_r.groupBy("customer_id") \
    .pivot("product_category") \
    .agg(sum(col("Total Sales per category"))).fillna(0).withColumn("Total Sales", expr('`Furniture` + `Office Supplies` + `Technology`'))

pivot_df.show() 

StatementMeta(taxprocessing, 9, 70, Finished, Available)

+-----------+------------------+------------------+------------------+------------------+
|customer_id|         Furniture|   Office Supplies|        Technology|       Total Sales|
+-----------+------------------+------------------+------------------+------------------+
|    A-10042|1967.9759826660156|               0.0|               0.0|1967.9759826660156|
|    A-10054|               0.0|1766.5380601882935|               0.0|1766.5380601882935|
|    A-10110|297.40802001953125|2754.0660400390625| 47.20800018310547| 3098.682060241699|
|    A-10238| 627.2640218734741| 437.8879985809326|               0.0|1065.1520204544067|
|    A-10281|               0.0| 72.03000354766846|               0.0| 72.03000354766846|
|    A-10329|238.72000122070312|1121.8600788116455|               0.0|1360.5800800323486|
|    A-10330|19.440000534057617| 1703.531979560852|               0.0|1722.9719800949097|
|    A-10369| 459.6000061035156|3348.1079711914062|               0.0| 3807.707977294922|
|    A-104

In [152]:
pivot_df.printSchema()

StatementMeta(taxprocessing, 9, 71, Finished, Available)

root
 |-- customer_id: string (nullable = true)
 |-- Furniture: double (nullable = false)
 |-- Office Supplies: double (nullable = false)
 |-- Technology: double (nullable = false)
 |-- Total Sales: double (nullable = false)



#### highperforming vs lowperforming demographics

In [153]:
data1 = tax_per_transaction.select("customer_id","Sales","total_tax_amount")

StatementMeta(taxprocessing, 9, 72, Finished, Available)

In [154]:
data2 = customers_df.select("customer_id","geographic_region")

StatementMeta(taxprocessing, 9, 73, Finished, Available)

In [155]:
demographics_sales = data1.join(data2, how="inner", on=["customer_id"]).select(data1["*"], data2["geographic_region"])

demographics_sales = demographics_sales.groupBy("customer_id","geographic_region").agg(sum('Sales').alias('Total Sales'), sum('total_tax_amount').alias('Total tax collected'))
demographics_sales.drop("customer_id")
demographics_sales.show(50)

StatementMeta(taxprocessing, 9, 74, Finished, Available)

+-----------+-----------------+------------------+-------------------+
|customer_id|geographic_region|       Total Sales|Total tax collected|
+-----------+-----------------+------------------+-------------------+
|    M-72432|        Rajasthan| 63.35999870300293|  33.11345924713181|
|    R-53585|            Assam| 3299.672128677368| 40.635079408158276|
|    J-55369|        Telangana|   1652.5439453125| 100.44100867579893|
|    R-72093| Himachal Pradesh| 586.1354942321777| 24.827707497535037|
|    K-56609|     Chhattisgarh| 127.0719985961914| 14.351543573377088|
|    S-79120| Himachal Pradesh| 9833.162036895752|  57.67152744558103|
|   TN-40501|        Rajasthan|  13157.3638215065|   74.5063380871759|
|   AP-20963|      Maharashtra| 584.3720264434814| 26.671191259163088|
|    K-36818|    Uttar Pradesh|3535.6358394622803|  56.80041557675721|
|    C-53437|         Nagaland| 7229.258033752441|  37.51268749332313|
|    O-20893|        Karnataka|2627.2899436950684| 106.61434420029298|
|    O

In [156]:
# demographics_sales.drop('customer_id')
high_performing_region = demographics_sales.orderBy(desc("Total Sales"),desc("Total tax collected")).limit(5).withColumn("Performance", lit("High Performing")).select("geographic_region","Performance")
low_performing_region = demographics_sales.orderBy(asc("Total Sales"),asc("Total tax collected")).limit(5).withColumn("Performance", lit("Low Performing")).select("geographic_region","Performance") 

StatementMeta(taxprocessing, 9, 75, Finished, Available)

In [157]:
high_performing_region.show()
low_performing_region.show() 

StatementMeta(taxprocessing, 9, 76, Finished, Available)

+-----------------+---------------+
|geographic_region|    Performance|
+-----------------+---------------+
|      Maharashtra|High Performing|
|        Telangana|High Performing|
|          Haryana|High Performing|
|   Andhra Pradesh|High Performing|
|      Uttarakhand|High Performing|
+-----------------+---------------+

+-----------------+--------------+
|geographic_region|   Performance|
+-----------------+--------------+
|           Kerala|Low Performing|
|        Rajasthan|Low Performing|
|        Rajasthan|Low Performing|
|           Odisha|Low Performing|
|    Uttar Pradesh|Low Performing|
+-----------------+--------------+



In [158]:
# products_df
# transactions_df
# customers_df
# tax_per_transaction
# tax_sales_per_day
# CLV
# products_performance
# pp_wrt_category
# discounted_df 
# Refunds_data_adjust
# Tax_bracket_analysis
# seasonal_non_seasonal_prom
# pivot_df

# Example for transactions_df
# transactions_df.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/transactions_df.csv', header=True)

# Example for customers_df
# customers_df.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/customers_df.csv', header=True)



StatementMeta(taxprocessing, 9, 77, Finished, Available)

In [159]:
# # Example for tax_per_transaction
# tax_per_transaction.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/tax_per_transaction.csv', header=True)

# Example for tax_sales_per_day
# tax_sales_per_day.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/tax_sales_per_day.csv', header=True)



StatementMeta(taxprocessing, 9, 78, Finished, Available)

In [160]:
# # Example for CLV
# CLV.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/CLV.csv', header=True)



StatementMeta(taxprocessing, 9, 79, Finished, Available)

In [161]:
# # Example for products_performance
# products_performance.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/products_performance.csv', header=True)

# # Example for pp_wrt_category
# pp_wrt_category.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/pp_wrt_category.csv', header=True)

# # Example for discounted_df
# discounted_df.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/discounted_df.csv', header=True)

# # Example for Refunds_data_adjust
# Refunds_data_adjust.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/Refunds_data_adjust.csv', header=True)

# # Example for Tax_bracket_analysis
# Tax_bracket_analysis.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/Tax_bracket_analysis.csv', header=True)

# # Example for seasonal_non_seasonal_prom
# seasonal_non_seasonal_prom.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/seasonal_non_seasonal_prom.csv', header=True)

# # Example for pivot_df
# pivot_df.write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/pivot_df.csv', header=True)

StatementMeta(taxprocessing, 9, 80, Finished, Available)

In [162]:
tax_sales_per_day.coalesce(1).write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/tax_sales_per_day', header=True)
CLV.coalesce(1).write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/CLV', header=True)
products_performance.coalesce(1).write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/products_performance', header=True)
pp_wrt_category.coalesce(1).write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/pp_wrt_category', header=True)
discounted_df.coalesce(1).write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/discounted_df', header=True)
Refunds_data_adjust.coalesce(1).write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/Refunds_data_adjust', header=True)
Tax_bracket_analysis.coalesce(1).write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/Tax_bracket_analysis', header=True)
seasonal_non_seasonal_prom.coalesce(1).write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/seasonal_non_seasonal_prom', header=True)
pivot_df.coalesce(1).write.csv('abfss://mycontainer@adlsgen21507.dfs.core.windows.net/Final_Output/pivot_df', header=True)

StatementMeta(taxprocessing, 9, 81, Finished, Available)

In [171]:
jdbc_url = "jdbc:sqlserver://ASUS-A15\GOLD:1433;databaseName=tax_processing_data;"
table_name1 = "tax_sales_per_day_tbl"
table_name2 = "CLV_tbl"
table_name3 = "products_performance_tbl"
connection_properties = {
    "user": "sa",
    "password": "Aadil111",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Write DataFrame df1 to SQL Server table
tax_sales_per_day.write.jdbc(url=jdbc_url, table=table_name1, mode="overwrite", properties=connection_properties)

# Write DataFrame df2 to SQL Server table
CLV.write.jdbc(url=jdbc_url, table=table_name2, mode="overwrite", properties=connection_properties)

# Write DataFrame df3 to SQL Server table
products_performance.write.jdbc(url=jdbc_url, table=table_name3, mode="overwrite", properties=connection_properties)

StatementMeta(taxprocessing, 9, 90, Finished, Available)

Py4JJavaError: An error occurred while calling o11647.jdbc.
: com.microsoft.sqlserver.jdbc.SQLServerException: The TCP/IP connection to the host ASUS-A15, port 1433 has failed. Error: "ASUS-A15. Verify the connection properties. Make sure that an instance of SQL Server is running on the host and accepting TCP/IP connections at the port. Make sure that TCP connections to the port are not blocked by a firewall.".
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)
	at com.microsoft.sqlserver.jdbc.SQLServerException.ConvertConnectExceptionToSQLServerException(SQLServerException.java:285)
	at com.microsoft.sqlserver.jdbc.SocketFinder.findSocket(IOBuffer.java:2462)
	at com.microsoft.sqlserver.jdbc.TDSChannel.open(IOBuffer.java:668)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:2695)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:2362)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:2213)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.connect(SQLServerConnection.java:1276)
	at com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:861)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:123)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:50)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:152)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:214)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:67)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:152)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:145)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:145)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:129)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:123)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:200)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:897)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:412)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:379)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:794)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
