In [1]:
pip install pandas pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ce3edbdba53f560813992c3d3d29d93c7a670f58c32c73e32d1c2c1b03467dc6
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder.appName("AccountBalance").getOrCreate()

# Sample data
data = [
    ("2023-01-01", "100", "Credit", 1000),
    ("2023-01-02", "100", "Credit", 1500),
    ("2023-01-03", "100", "Debit", 1000),
    ("2023-01-02", "200", "Credit", 3500),
    ("2023-01-03", "200", "Debit", 2000),
    ("2023-01-04", "200", "Credit", 3500),
    ("2023-01-13", "300", "Credit", 4000),
    ("2023-01-14", "300", "Debit", 4500),
    ("2023-01-15", "300", "Credit", 1500),
]

# Define schema
columns = ["TransactionDate", "AccountNumber", "TransactionType", "Amount"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Define window specification
windowSpec = Window.partitionBy("AccountNumber").orderBy("TransactionDate").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate balance
df_with_balance = df.withColumn(
    "Amount_Adjusted",
    when(col("TransactionType") == "Credit", col("Amount")).otherwise(-col("Amount"))
).withColumn(
    "CurrentBalance",
    sum("Amount_Adjusted").over(windowSpec)
).drop("Amount_Adjusted")

# Show result
df_with_balance.show()


+---------------+-------------+---------------+------+--------------+
|TransactionDate|AccountNumber|TransactionType|Amount|CurrentBalance|
+---------------+-------------+---------------+------+--------------+
|     2023-01-01|          100|         Credit|  1000|          1000|
|     2023-01-02|          100|         Credit|  1500|          2500|
|     2023-01-03|          100|          Debit|  1000|          1500|
|     2023-01-02|          200|         Credit|  3500|          3500|
|     2023-01-03|          200|          Debit|  2000|          1500|
|     2023-01-04|          200|         Credit|  3500|          5000|
|     2023-01-13|          300|         Credit|  4000|          4000|
|     2023-01-14|          300|          Debit|  4500|          -500|
|     2023-01-15|          300|         Credit|  1500|          1000|
+---------------+-------------+---------------+------+--------------+



## If importing dataset from excel file


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when
from pyspark.sql.window import Window
import pandas as pd

# Initialize SparkSession
spark = SparkSession.builder.appName("AccountBalance").getOrCreate()

excel_file_path = "path/to/your/excel/file.xlsx"
pandas_df = pd.read_excel(excel_file_path)

# Convert Pandas DataFrame to Spark DataFrame
df = spark.createDataFrame(pandas_df)

# Define window specification
windowSpec = Window.partitionBy("AccountNumber").orderBy("TransactionDate").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate balance
df_with_balance = df.withColumn(
    "Amount_Adjusted",
    when(col("TransactionType") == "Credit", col("Amount")).otherwise(-col("Amount"))
).withColumn(
    "CurrentBalance",
    sum("Amount_Adjusted").over(windowSpec)
).drop("Amount_Adjusted")

# Show result
df_with_balance.show()
