## Imports - FIX for 'lit' error

In [0]:
# Quick check
for year in ['2020_21', '2021_22', '2022_23', '2023_24', '2024_25', '2025_26']:
    df = spark.table(f'msme_risk_analytics.bronze_gst_statewise_{year}')
    print(f"{year}: {df.count()} rows")
    df.select(df.columns[0:7]).show(3, truncate=False)

2020_21: 49 rows
+--------+-----------------+----------------------+------------------+------------+------------------+------------+
|State Cd|State            |Financial Year 2020-21|_c3               |_c4         |_c5               |_c6         |
+--------+-----------------+----------------------+------------------+------------+------------------+------------+
|NULL    |NULL             |CGST                  |SGST              |IGST        |CESS              |TOTAL       |
|01      |Jammu and Kashmir|950.9074697           |1552.8399126000002|1084.6129068|59.139085300000005|3647.4993744|
|02      |Himachal Pradesh |1178.0300780999999    |1664.3786552      |4202.2963276|10.2487692        |7054.9538301|
+--------+-----------------+----------------------+------------------+------------+------------------+------------+
only showing top 3 rows
2021_22: 49 rows
+--------+-----------------+----------------------+------------+------------+-----------------+------------+
|State Cd|State      

In [0]:
from pyspark.sql.functions import lit, col, sum as spark_sum, count, avg, desc, current_timestamp, when, round
from functools import reduce
from pyspark.sql import DataFrame

##Read all 6 GST files

In [0]:
gst_files = [
    ('bronze_gst_statewise_2020_21', '2020-21'),
    ('bronze_gst_statewise_2021_22', '2021-22'),
    ('bronze_gst_statewise_2022_23', '2022-23'),
    ('bronze_gst_statewise_2023_24', '2023-24'),
    ('bronze_gst_statewise_2024_25', '2024-25'),
    ('bronze_gst_statewise_2025_26', '2025-26')
]

print(f"Processing {len(gst_files)} GST files...")

Processing 6 GST files...


## Check first file structure

In [0]:
sample = spark.table("msme_risk_analytics.bronze_gst_statewise_2020_21")
print("Available columns in bronze table:")
print(sample.columns)
print(f"\nTotal columns: {len(sample.columns)}")
sample.show(5, truncate=False)

Available columns in bronze table:
['State Cd', 'State', 'Financial Year 2020-21', '_c3', '_c4', '_c5', '_c6', 'Wednesday, April 01, 2020', '_c8', '_c9', '_c10', '_c11', 'Friday, May 01, 2020', '_c13', '_c14', '_c15', '_c16', 'Monday, June 01, 2020', '_c18', '_c19', '_c20', '_c21', 'Wednesday, July 01, 2020', '_c23', '_c24', '_c25', '_c26', 'Saturday, August 01, 2020', '_c28', '_c29', '_c30', '_c31', 'Tuesday, September 01, 2020', '_c33', '_c34', '_c35', '_c36', 'Thursday, October 01, 2020', '_c38', '_c39', '_c40', '_c41', 'Sunday, November 01, 2020', '_c43', '_c44', '_c45', '_c46', 'Tuesday, December 01, 2020', '_c48', '_c49', '_c50', '_c51', 'Friday, January 01, 2021', '_c53', '_c54', '_c55', '_c56', 'Monday, February 01, 2021', '_c58', '_c59', '_c60', '_c61', 'Monday, March 01, 2021', '_c63', '_c64', '_c65', '_c66']

Total columns: 67
+--------+-----------------+----------------------+------------------+------------------+------------------+------------------+-----------------------

## Process all 6 years

In [0]:
dfs = []

for table_name, fiscal_year in gst_files:
    print(f"Processing {table_name} (FY {fiscal_year})...")
    
    df = spark.table(f"msme_risk_analytics.{table_name}")
    
    # Remove header row (where State Cd is NULL)
    df_clean = df.filter(col(df.columns[0]).isNotNull())
    
    # Rename columns
    df_clean = df_clean \
        .withColumnRenamed(df.columns[0], 'state_code') \
        .withColumnRenamed(df.columns[1], 'state_name') \
        .withColumnRenamed(df.columns[2], 'fy_cgst') \
        .withColumnRenamed('_c3', 'fy_sgst') \
        .withColumnRenamed('_c4', 'fy_igst') \
        .withColumnRenamed('_c5', 'fy_cess') \
        .withColumnRenamed('_c6', 'fy_total') \
        .withColumn('fiscal_year', lit(f'FY{fiscal_year}'))
    
    # Cast to double
    df_clean = df_clean \
        .withColumn('fy_cgst', col('fy_cgst').cast('double')) \
        .withColumn('fy_sgst', col('fy_sgst').cast('double')) \
        .withColumn('fy_igst', col('fy_igst').cast('double')) \
        .withColumn('fy_cess', col('fy_cess').cast('double')) \
        .withColumn('fy_total', col('fy_total').cast('double'))
    
    # Select final columns
    df_final = df_clean.select(
        'state_code', 'state_name', 'fiscal_year',
        'fy_cgst', 'fy_sgst', 'fy_igst', 'fy_cess', 'fy_total'
    )
    
    dfs.append(df_final)
    print(f"  ✓ {df_final.count()} states")

print(f"\n✅ All files processed")

Processing bronze_gst_statewise_2020_21 (FY 2020-21)...
  ✓ 42 states
Processing bronze_gst_statewise_2021_22 (FY 2021-22)...
  ✓ 42 states
Processing bronze_gst_statewise_2022_23 (FY 2022-23)...
  ✓ 42 states
Processing bronze_gst_statewise_2023_24 (FY 2023-24)...
  ✓ 42 states
Processing bronze_gst_statewise_2024_25 (FY 2024-25)...
  ✓ 42 states
Processing bronze_gst_statewise_2025_26 (FY 2025-26)...
  ✓ 40 states

✅ All files processed


## Union all years

In [0]:
print("\nUnifying all years...")
unified_gst = reduce(DataFrame.unionByName, dfs)
print(f"Total records after union: {unified_gst.count()}")

# Show sample
unified_gst.show(10, truncate=False)


Unifying all years...
Total records after union: 250
+----------+-----------------+-----------+------------------+------------------+------------------+------------------+------------------+
|state_code|state_name       |fiscal_year|fy_cgst           |fy_sgst           |fy_igst           |fy_cess           |fy_total          |
+----------+-----------------+-----------+------------------+------------------+------------------+------------------+------------------+
|01        |Jammu and Kashmir|FY2020-21  |950.9074697       |1552.8399126000002|1084.6129068      |59.139085300000005|3647.4993744      |
|02        |Himachal Pradesh |FY2020-21  |1178.0300780999999|1664.3786552      |4202.2963276      |10.2487692        |7054.9538301      |
|03        |Punjab           |FY2020-21  |3489.6770088      |5521.5639123      |4727.9776606000005|173.8617595       |13913.0803412     |
|04        |Chandigarh       |FY2020-21  |302.0597935       |409.98266479999995|927.2779292999999 |11.3574026        |

## Clean GST data

In [0]:
print("\nCleaning data...")

silver_gst = unified_gst \
    .dropna(subset=['state_name']) \
    .filter(col('state_name') != 'CBIC') \
    .filter(col('state_name') != 'Other Territory') \
    .filter(col('state_name') != 'Grand Total') \
    .withColumn('load_timestamp', current_timestamp())

print(f"Records after cleaning: {silver_gst.count()}")

# Show cleaned data
silver_gst.show(10, truncate=False)


Cleaning data...
Records after cleaning: 224
+----------+-----------------+-----------+------------------+------------------+------------------+------------------+------------------+--------------------------+
|state_code|state_name       |fiscal_year|fy_cgst           |fy_sgst           |fy_igst           |fy_cess           |fy_total          |load_timestamp            |
+----------+-----------------+-----------+------------------+------------------+------------------+------------------+------------------+--------------------------+
|01        |Jammu and Kashmir|FY2020-21  |950.9074697       |1552.8399126000002|1084.6129068      |59.139085300000005|3647.4993744      |2026-01-29 13:30:49.784717|
|02        |Himachal Pradesh |FY2020-21  |1178.0300780999999|1664.3786552      |4202.2963276      |10.2487692        |7054.9538301      |2026-01-29 13:30:49.784717|
|03        |Punjab           |FY2020-21  |3489.6770088      |5521.5639123      |4727.9776606000005|173.8617595       |13913.08034

## STATE ANALYSIS - Top GST Collection States (TARGET MARKETS)

In [0]:
print("\n" + "="*70)
print("COMPLY - TARGET MARKET ANALYSIS")
print("="*70)

# Calculate total collection per state across all years
state_rankings = silver_gst \
    .groupBy('state_name') \
    .agg(
        spark_sum('fy_total').alias('total_collection_6years'),
        avg('fy_total').alias('avg_annual_collection'),
        count('*').alias('num_years')
    ) \
    .withColumn('total_collection_crores', round(col('total_collection_6years'), 2)) \
    .orderBy(desc('total_collection_6years'))

print("\n🎯 TOP 15 STATES BY GST COLLECTION (Target Markets):")
state_rankings.select('state_name', 'total_collection_crores', 'avg_annual_collection', 'num_years').show(15, truncate=False)


COMPLY - TARGET MARKET ANALYSIS

🎯 TOP 15 STATES BY GST COLLECTION (Target Markets):
+--------------+-----------------------+---------------------+---------+
|state_name    |total_collection_crores|avg_annual_collection|num_years|
+--------------+-----------------------+---------------------+---------+
|Maharashtra   |1585953.34             |264325.5558643833    |6        |
|Karnataka     |714531.3               |119088.55022526666   |6        |
|Gujarat       |641467.75              |106911.29206128333   |6        |
|Tamil Nadu    |603920.84              |100653.47319235001   |6        |
|Haryana       |515584.08              |85930.68068826666    |6        |
|Uttar Pradesh |513877.86              |85646.31019108334    |6        |
|Delhi         |338693.91              |56448.984440633336   |6        |
|West Bengal   |322739.07              |53789.84475923333    |6        |
|Telangana     |299731.21              |49955.201674949996   |6        |
|Odisha        |279238.42             

## P ANALYSIS - Year-over-Year Growth

In [0]:
print("\n📊 YEAR-OVER-YEAR GST GROWTH ANALYSIS:")

yoy_growth = silver_gst \
    .groupBy('fiscal_year') \
    .agg(
        spark_sum('fy_total').alias('total_collection'),
        count('state_name').alias('num_states')
    ) \
    .orderBy('fiscal_year')

yoy_growth.show()


📊 YEAR-OVER-YEAR GST GROWTH ANALYSIS:
+-----------+------------------+----------+
|fiscal_year|  total_collection|num_states|
+-----------+------------------+----------+
|  FY2020-21| 862963.2166596998|        37|
|  FY2021-22|   1093834.6840593|        37|
|  FY2022-23|1320434.9500000002|        37|
|  FY2023-24|1518126.8299999998|        37|
|  FY2024-25|1673223.6799999997|        38|
|  FY2025-26|        1167327.36|        38|
+-----------+------------------+----------+



## Write Silver table

In [0]:
print("\nWriting to Silver layer...")

silver_gst.write \
    .format('delta') \
    .mode('overwrite') \
    .option('overwriteSchema', 'true') \
    .saveAsTable('msme_risk_analytics.silver_gst_unified')

print("✅ Silver table created: msme_risk_analytics.silver_gst_unified")


Writing to Silver layer...
✅ Silver table created: msme_risk_analytics.silver_gst_unified


##  Verify final table

In [0]:
final_check = spark.table('msme_risk_analytics.silver_gst_unified')

print(f"\n{'='*70}")
print("FINAL SILVER GST TABLE SUMMARY")
print(f"{'='*70}")
print(f"Total records: {final_check.count()}")
print(f"Columns: {len(final_check.columns)}")
print(f"Unique states: {final_check.select('state_name').distinct().count()}")
print(f"Fiscal years: {final_check.select('fiscal_year').distinct().count()}")

print("\nColumn data types:")
final_check.printSchema()

print("\nSample data:")
final_check.show(10, truncate=False)

print("\nData quality check:")
final_check.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in final_check.columns
]).show(vertical=True)


FINAL SILVER GST TABLE SUMMARY
Total records: 224
Columns: 9
Unique states: 39
Fiscal years: 6

Column data types:
root
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- fiscal_year: string (nullable = true)
 |-- fy_cgst: double (nullable = true)
 |-- fy_sgst: double (nullable = true)
 |-- fy_igst: double (nullable = true)
 |-- fy_cess: double (nullable = true)
 |-- fy_total: double (nullable = true)
 |-- load_timestamp: timestamp (nullable = true)


Sample data:
+----------+-----------------+-----------+------------------+------------------+------------------+------------------+------------------+--------------------------+
|state_code|state_name       |fiscal_year|fy_cgst           |fy_sgst           |fy_igst           |fy_cess           |fy_total          |load_timestamp            |
+----------+-----------------+-----------+------------------+------------------+------------------+------------------+------------------+-------------------------

## EXPORT TOP 10 STATES FOR Z STRATEGY

In [0]:
print("\n🎯 Z COMPLY - TOP 10 TARGET STATES:")

top_10_states = state_rankings.limit(10)
top_10_states.show(10, truncate=False)

# Save for reference
top_10_states.write \
    .format('delta') \
    .mode('overwrite') \
    .saveAsTable('msme_risk_analytics.zhagaran_target_states')

print("\n✅ Target states saved: msme_risk_analytics.zhagaran_target_states")
print("\n📋 NEXT STEP: Use these states for Z Comply MVP launch strategy")


🎯 Z COMPLY - TOP 10 TARGET STATES:
+-------------+-----------------------+---------------------+---------+-----------------------+
|state_name   |total_collection_6years|avg_annual_collection|num_years|total_collection_crores|
+-------------+-----------------------+---------------------+---------+-----------------------+
|Maharashtra  |1585953.3351862999     |264325.5558643833    |6        |1585953.34             |
|Karnataka    |714531.3013516         |119088.55022526666   |6        |714531.3               |
|Gujarat      |641467.7523677         |106911.29206128333   |6        |641467.75              |
|Tamil Nadu   |603920.8391541         |100653.47319235001   |6        |603920.84              |
|Haryana      |515584.08412959997     |85930.68068826666    |6        |515584.08              |
|Uttar Pradesh|513877.86114650004     |85646.31019108334    |6        |513877.86              |
|Delhi        |338693.90664380003     |56448.984440633336   |6        |338693.91              |
|Wes