In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import*
import os


In [2]:
spark = SparkSession.builder \
    .appName("challan") \
    .master("local[*]") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/25 17:05:00 WARN Utils: Your hostname, LAPTOP-A2FP9AQK, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/02/25 17:05:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/25 17:05:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
file_path = "/home/somnath/coding/pyspark/input_data/echallan_daily_data.csv"
path = "/home/somnath/coding/pyspark/output_data"

In [4]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("mode","PERMISSIVE") \
    .load(file_path)

In [5]:
df.collect()

[Row(date=datetime.date(2015, 1, 1), totalChallan=120, disposedChallan=29, pendingChallan=91, pendingAmount=588600, disposedAmount=116860, totalAmount=705460, pendingCourt=1, disposedCourt=0, totalCourt=1),
 Row(date=datetime.date(2015, 1, 2), totalChallan=162, disposedChallan=39, pendingChallan=123, pendingAmount=663350, disposedAmount=281425, totalAmount=944775, pendingCourt=1, disposedCourt=1, totalCourt=2),
 Row(date=datetime.date(2015, 1, 3), totalChallan=122, disposedChallan=34, pendingChallan=88, pendingAmount=596900, disposedAmount=286950, totalAmount=883850, pendingCourt=4, disposedCourt=1, totalCourt=5),
 Row(date=datetime.date(2015, 1, 4), totalChallan=116, disposedChallan=27, pendingChallan=89, pendingAmount=967350, disposedAmount=301600, totalAmount=1268950, pendingCourt=3, disposedCourt=2, totalCourt=5),
 Row(date=datetime.date(2015, 1, 5), totalChallan=162, disposedChallan=28, pendingChallan=134, pendingAmount=877654, disposedAmount=103000, totalAmount=980654, pendingCou

In [6]:
df.count()

4061

In [7]:
columns = len(df.columns)
print(columns)

10


In [8]:
df.printSchema()

root
 |-- date: date (nullable = true)
 |-- totalChallan: integer (nullable = true)
 |-- disposedChallan: integer (nullable = true)
 |-- pendingChallan: integer (nullable = true)
 |-- pendingAmount: integer (nullable = true)
 |-- disposedAmount: integer (nullable = true)
 |-- totalAmount: integer (nullable = true)
 |-- pendingCourt: integer (nullable = true)
 |-- disposedCourt: integer (nullable = true)
 |-- totalCourt: integer (nullable = true)



In [9]:
df.show (5,False)

+----------+------------+---------------+--------------+-------------+--------------+-----------+------------+-------------+----------+
|date      |totalChallan|disposedChallan|pendingChallan|pendingAmount|disposedAmount|totalAmount|pendingCourt|disposedCourt|totalCourt|
+----------+------------+---------------+--------------+-------------+--------------+-----------+------------+-------------+----------+
|2015-01-01|120         |29             |91            |588600       |116860        |705460     |1           |0            |1         |
|2015-01-02|162         |39             |123           |663350       |281425        |944775     |1           |1            |2         |
|2015-01-03|122         |34             |88            |596900       |286950        |883850     |4           |1            |5         |
|2015-01-04|116         |27             |89            |967350       |301600        |1268950    |3           |2            |5         |
|2015-01-05|162         |28             |134    

In [10]:
df.repartition(2).write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save(path)

In [11]:
df.describe().show()

26/02/25 17:05:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+------------------+-----------------+-------------------+--------------------+--------------------+-----------------+------------------+------------------+
|summary|      totalChallan|   disposedChallan|   pendingChallan|      pendingAmount|      disposedAmount|         totalAmount|     pendingCourt|     disposedCourt|        totalCourt|
+-------+------------------+------------------+-----------------+-------------------+--------------------+--------------------+-----------------+------------------+------------------+
|  count|              4061|              4061|             4061|               4061|                4061|                4061|             4061|              4061|              4061|
|   mean|102564.29598621029| 39190.89509972913|63373.40088648116|9.998178130879094E7| 5.655994373356316E7| 1.565417250423541E8|31538.34819010096|5563.7631125338585| 37102.11130263482|
| stddev|  94274.3099047864|30766.668065442063|69079.82083101527| 1.295082265646

In [12]:
monthly_df = df.groupBy(
    month("date").alias("month")
).agg(
    sum("totalChallan").alias("totalChallan"),
    sum("disposedChallan").alias("disposedChallan"),
    sum("pendingChallan").alias("pendingChallan"),
    sum("pendingAmount").alias("pendingAmount"),
    sum("disposedAmount").alias("disposedAmount"),
    sum("totalAmount").alias("totalAmount"),
    sum("pendingCourt").alias("pendingCourt"),
    sum("disposedCourt").alias("disposedCourt"),
    sum("totalCourt").alias("totalCourt")
)

In [13]:
yearly_df = df.groupBy(
    year("date").alias("year")
).agg(
    sum("totalChallan").alias("totalChallan"),
    sum("disposedChallan").alias("disposedChallan"),
    sum("pendingChallan").alias("pendingChallan"),
    sum("pendingAmount").alias("pendingAmount"),
    sum("disposedAmount").alias("disposedAmount"),
    sum("totalAmount").alias("totalAmount"),
    sum("pendingCourt").alias("pendingCourt"),
    sum("disposedCourt").alias("disposedCourt"),
    sum("totalCourt").alias("totalCourt")
)

In [14]:
monthly_df.show(5,False)

+-----+------------+---------------+--------------+-------------+--------------+-----------+------------+-------------+----------+
|month|totalChallan|disposedChallan|pendingChallan|pendingAmount|disposedAmount|totalAmount|pendingCourt|disposedCourt|totalCourt|
+-----+------------+---------------+--------------+-------------+--------------+-----------+------------+-------------+----------+
|12   |37931314    |13399438       |24531876      |41052856303  |20113892108   |61166748411|10543311    |1786256      |12329567  |
|1    |38347036    |13520202       |24826834      |39402739670  |19561151709   |58963891379|10472471    |1839020      |12311491  |
|6    |34314635    |13577873       |20736762      |31772630055  |20361638384   |52134268439|10701694    |1984370      |12686064  |
|3    |33078759    |13710026       |19368733      |29951514982  |21366102507   |51317617489|10303815    |1894969      |12198784  |
|5    |31136535    |12080554       |19055981      |30211531965  |16956160733   |471

In [15]:
monthly_df.write.format("csv") \
    .mode("overwrite") \
    .partitionBy( "month") \
    .option("header", "true") \
    .save("/home/somnath/coding/pyspark/output_data/monthly_data")

In [16]:
yearly_df.write.format("csv") \
    .mode("overwrite") \
    .partitionBy( "year") \
    .option("header", "true") \
    .save("/home/somnath/coding/pyspark/output_data/yearly_data")

In [17]:
yearly_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[_groupingexpression#1176], functions=[sum(totalChallan#18), sum(disposedChallan#19), sum(pendingChallan#20), sum(pendingAmount#21), sum(disposedAmount#22), sum(totalAmount#23), sum(pendingCourt#24), sum(disposedCourt#25), sum(totalCourt#26)])
   +- Exchange hashpartitioning(_groupingexpression#1176, 200), ENSURE_REQUIREMENTS, [plan_id=378]
      +- HashAggregate(keys=[_groupingexpression#1176], functions=[partial_sum(totalChallan#18), partial_sum(disposedChallan#19), partial_sum(pendingChallan#20), partial_sum(pendingAmount#21), partial_sum(disposedAmount#22), partial_sum(totalAmount#23), partial_sum(pendingCourt#24), partial_sum(disposedCourt#25), partial_sum(totalCourt#26)])
         +- Project [totalChallan#18, disposedChallan#19, pendingChallan#20, pendingAmount#21, disposedAmount#22, totalAmount#23, pendingCourt#24, disposedCourt#25, totalCourt#26, year(date#17) AS _groupingexpression#1176]
            

In [18]:
df_2015 = yearly_df.filter(yearly_df.year == 2015)
df_2015.show()


+----+------------+---------------+--------------+-------------+--------------+-----------+------------+-------------+----------+
|year|totalChallan|disposedChallan|pendingChallan|pendingAmount|disposedAmount|totalAmount|pendingCourt|disposedCourt|totalCourt|
+----+------------+---------------+--------------+-------------+--------------+-----------+------------+-------------+----------+
|2015|      112492|          14952|         97540|    219092716|      55412297|  274505013|       70210|         2411|     72621|
+----+------------+---------------+--------------+-------------+--------------+-----------+------------+-------------+----------+



In [19]:
yearly_df.repartition(4).write.format("CSV") \
    .option("header", "true") \
    .mode("overwrite") \
    .option("path", "/home/somnath/coding/pyspark/output_data/yearly_data/BUCKET") \
    .bucketBy(4, "year") \
    .saveAsTable("year_bucketed_table")    

In [20]:
spark.table("year_bucketed_table").show()

+----+------------+---------------+--------------+-------------+--------------+------------+------------+-------------+----------+
|year|totalChallan|disposedChallan|pendingChallan|pendingAmount|disposedAmount| totalAmount|pendingCourt|disposedCourt|totalCourt|
+----+------------+---------------+--------------+-------------+--------------+------------+------------+-------------+----------+
|2025|    97884423|       22955259|      74929164| 144158997231|   37850988379|182009985610|    35646527|      1794478|  37441005|
|2018|     4317842|        3366850|        950992|   2016267241|    6686319692|  8702586933|      541373|       173800|    715173|
|2022|    47649749|       23999582|      23650167|  36057961570|   36661025294| 72718986864|    13589745|      3702527|  17292272|
|2023|    67911658|       27837346|      40074312|  65165178521|   42487725509|107652904030|    22325544|      4541276|  26866820|
|2024|    81892378|       26316839|      55575539|  88702822632|   40065723712|1287

In [21]:
spark.table("year_bucketed_table").count()

12

In [22]:
spark.sql("SELECT * FROM year_bucketed_table ORDER BY year DESC").show()

+----+------------+---------------+--------------+-------------+--------------+------------+------------+-------------+----------+
|year|totalChallan|disposedChallan|pendingChallan|pendingAmount|disposedAmount| totalAmount|pendingCourt|disposedCourt|totalCourt|
+----+------------+---------------+--------------+-------------+--------------+------------+------------+-------------+----------+
|2026|    11527247|        1602984|       9924263|  19056295705|    2978920858| 22035216563|     1038677|        21577|   1060254|
|2025|    97884423|       22955259|      74929164| 144158997231|   37850988379|182009985610|    35646527|      1794478|  37441005|
|2024|    81892378|       26316839|      55575539|  88702822632|   40065723712|128768546344|    28761158|      3394839|  32155997|
|2023|    67911658|       27837346|      40074312|  65165178521|   42487725509|107652904030|    22325544|      4541276|  26866820|
|2022|    47649749|       23999582|      23650167|  36057961570|   36661025294| 727

In [23]:
# Run the SQL and store result in a DataFrame
result_df = spark.sql("SELECT * FROM year_bucketed_table ORDER BY year DESC")

# Save to CSV in an output folder
result_df.write.format("csv") \
    .mode("overwrite") \
    .option("header", "true") \
    .save("/home/somnath/coding/pyspark/output_data/yearly_data_sorted")

In [24]:

result_df.rdd.getNumPartitions()

1

In [25]:
repartition_df = result_df.repartition(2)
repartition_df.rdd.getNumPartitions()

2

In [26]:
#spark.stop()