<a href="https://colab.research.google.com/github/beingmechon/DataAnalysisWithPython/blob/main/Electoral_Bond_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install required packages

In [324]:
! sudo apt-get install -q graphviz libgraphviz-dev pkg-config

Reading package lists...
Building dependency tree...
Reading state information...
pkg-config is already the newest version (0.29.2-1ubuntu3).
graphviz is already the newest version (2.42.2-6).
libgraphviz-dev is already the newest version (2.42.2-6).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [322]:
! pip install -q tabula-py squarify plotly dash pyspark pyvis networkx[default,extra]

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Installing backend dependencies ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for pygraphviz (pyproject.toml) ... [?25l[?25hdone


In [204]:
import tabula
import squarify
import numpy as np
import pandas as pd
import plotly.express as px
import matplotlib.pyplot as plt
from dash import Dash, dcc, html, Input, Output


In [205]:
def pdf_to_df(path):
  pdf_output = tabula.read_pdf(path, pages='all', lattice=True)

  for i in range(1, len(pdf_output)):
    pdf_output[i].columns = pdf_output[0].columns

  return pd.concat(pdf_output, ignore_index=True)

In [206]:
# !wget -O ECI-Enchasers.pdf https://raw.githubusercontent.com/beingmechon/DataAnalysisWithPython/main/electoral_bond/ECI-Enchasers.pdf
# !wget -O ECI-Purchasers.pdf https://raw.githubusercontent.com/beingmechon/DataAnalysisWithPython/main/electoral_bond/ECI-Purchasers.pdf

In [207]:
enchaser_pdf = "https://raw.githubusercontent.com/beingmechon/DataAnalysisWithPython/main/electoral_bond/ECI-Enchasers.pdf"
purchaser_pdf = "https://raw.githubusercontent.com/beingmechon/DataAnalysisWithPython/main/electoral_bond/ECI-Purchasers.pdf"

In [208]:
enchasers = pdf_to_df(enchaser_pdf)
purchasers = pdf_to_df(purchaser_pdf)

## Explore and Wrangle the data

### Explore dataframes

In [209]:
enchasers.head()

Unnamed: 0,Sr No.,Date of\rEncashment,Name of the Political Party,Account no. of\rPolitical Party,Prefix,Bond\rNumber,Denominations,Pay Branch\rCode,Pay Teller
0,1,12/Apr/2019,ALL INDIA ANNA DRAVIDA MUNNETRA KAZHAGAM,*******5199,OC,775,10000000,800,2770121
1,2,12/Apr/2019,ALL INDIA ANNA DRAVIDA MUNNETRA KAZHAGAM,*******5199,OC,3975,10000000,800,2770121
2,3,12/Apr/2019,ALL INDIA ANNA DRAVIDA MUNNETRA KAZHAGAM,*******5199,OC,3967,10000000,800,2770121
3,4,12/Apr/2019,ALL INDIA ANNA DRAVIDA MUNNETRA KAZHAGAM,*******5199,TL,10418,1000000,800,2770121
4,5,12/Apr/2019,ALL INDIA ANNA DRAVIDA MUNNETRA KAZHAGAM,*******5199,TL,126,1000000,800,2770121


In [210]:
purchasers.head()

Unnamed: 0,Sr No.,Reference No (URN),Journal Date,Date of\rPurchase,Date of Expiry,Name of the Purchaser,Prefix,Bond\rNumber,Denominations,Issue Branch Code,Issue Teller,Status
0,1,1201904120000001166,12/Apr/2019,12/Apr/2019,26/Apr/2019,A B C INDIA LIMITED,TL,11448,1000000,1,5899230,Paid
1,2,1201904120000001166,12/Apr/2019,12/Apr/2019,26/Apr/2019,A B C INDIA LIMITED,TL,11447,1000000,1,5899230,Paid
2,3,1201904120000001166,12/Apr/2019,12/Apr/2019,26/Apr/2019,A B C INDIA LIMITED,TL,11441,1000000,1,5899230,Paid
3,4,1201904120000001166,12/Apr/2019,12/Apr/2019,26/Apr/2019,A B C INDIA LIMITED,OL,1113,100000,1,5899230,Paid
4,5,1201904120000001166,12/Apr/2019,12/Apr/2019,26/Apr/2019,A B C INDIA LIMITED,OL,1118,100000,1,5899230,Paid


In [211]:
purchasers.describe()

Unnamed: 0,Sr No.,Bond\rNumber,Issue Branch Code,Issue Teller
count,18871.0,18871.0,18871.0,18871.0
mean,9436.0,12181.003338,480.06555,5895737.0
std,5447.732801,6573.754865,395.446834,1825395.0
min,1.0,8.0,1.0,1013030.0
25%,4718.5,8317.5,1.0,5054982.0
50%,9436.0,12350.0,509.0,6405134.0
75%,14153.5,14764.0,813.0,7273126.0
max,18871.0,71548.0,1355.0,8492239.0


In [212]:
purchasers.isnull().sum()

Sr No.                   0
Reference No  (URN)      0
Journal Date             0
Date of\rPurchase        0
Date of Expiry           0
Name of the Purchaser    0
Prefix                   0
Bond\rNumber             0
Denominations            0
Issue Branch Code        0
Issue Teller             0
Status                   0
dtype: int64

In [213]:
enchasers.describe()

Unnamed: 0,Sr No.,Bond\rNumber,Pay Branch\rCode,Pay Teller
count,20421.0,20421.0,20421.0,20421.0
mean,10211.0,11783.43514,559.467852,5293055.0
std,5895.179259,6525.685559,297.836567,1953104.0
min,1.0,10.0,1.0,149845.0
25%,5106.0,7797.0,691.0,3490777.0
50%,10211.0,11913.0,691.0,5614473.0
75%,15316.0,14585.0,691.0,7516991.0
max,20421.0,71548.0,2295.0,8212066.0


In [214]:
enchasers.isnull().sum()

Sr No.                             0
Date of\rEncashment                0
Name of the Political Party        0
Account no. of\rPolitical Party    0
Prefix                             0
Bond\rNumber                       0
Denominations                      0
Pay Branch\rCode                   0
Pay Teller                         0
dtype: int64

In [215]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("Electoral Bond Analysis").config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","10g").getOrCreate()


In [216]:
enchaser_df = spark.createDataFrame(enchasers)
purchaser_df = spark.createDataFrame(purchasers)


In [217]:
enchaser_df_c = enchaser_df.alias('enchaser_df_c')
purchaser_df_c = purchaser_df.alias('purchaser_df_c')

### Remove unwnated Columns

In [218]:
# Remove unwanted cols from purchasers and enchasers

enchaser_df = enchaser_df.drop('Account no. of\rPolitical Party', 'Pay Teller', 'Pay Branch\rCode')
purchaser_df = purchaser_df.drop('Reference No  (URN)', 'Journal Date', 'Issue Branch Code', 'Issue Teller')

### Rename Columns

In [219]:
enchaser_df = enchaser_df.withColumnsRenamed({'Sr No.': 'SrNo',
                                              'Date of\rEncashment': 'EnchasedDate',
                                              'Name of the Political Party': 'EnchaserName',
                                              'Bond\rNumber': 'BondNumber'})

In [220]:
purchaser_df = purchaser_df.withColumnsRenamed({'Sr No.': 'SrNo',
                                                'Date of\rPurchase': 'PurchasedDate',
                                                'Date of Expiry': 'ExpiryDate',
                                                'Name of the Purchaser': 'PurchaserName',
                                                'Bond\rNumber': 'BondNumber'})

### Make unique identifier
Merged prefix and existing bond numbers to make new Bond Number



In [221]:
# Merged prefix and bond_number

enchaser_df = enchaser_df.withColumn('BondNo', concat(col('Prefix'), lit('-'), col('BondNumber')))
purchaser_df = purchaser_df.withColumn('BondNo', concat(col('Prefix'), lit('-'), col('BondNumber')))


In [222]:
# Removed merged columns

enchaser_df = enchaser_df.drop('Prefix', 'BondNumber')
purchaser_df = purchaser_df.drop('Prefix', 'BondNumber')

In [223]:
enchaser_df.show(2)

+----+------------+--------------------+-------------+-------+
|SrNo|EnchasedDate|        EnchaserName|Denominations| BondNo|
+----+------------+--------------------+-------------+-------+
|   1| 12/Apr/2019|ALL INDIA ANNA DR...|  1,00,00,000| OC-775|
|   2| 12/Apr/2019|ALL INDIA ANNA DR...|  1,00,00,000|OC-3975|
+----+------------+--------------------+-------------+-------+
only showing top 2 rows



### Split date

In [224]:
enchaser_df = enchaser_df.withColumn("EnchasedDate", to_date(enchaser_df["EnchasedDate"], "dd/MMM/yyyy"))
enchaser_df = enchaser_df.withColumn("Day", dayofmonth("EnchasedDate")) \
                         .withColumn("Month", month("EnchasedDate")) \
                         .withColumn("Year", year("EnchasedDate"))

enchaser_df.show(5)

+----+------------+--------------------+-------------+--------+---+-----+----+
|SrNo|EnchasedDate|        EnchaserName|Denominations|  BondNo|Day|Month|Year|
+----+------------+--------------------+-------------+--------+---+-----+----+
|   1|  2019-04-12|ALL INDIA ANNA DR...|  1,00,00,000|  OC-775| 12|    4|2019|
|   2|  2019-04-12|ALL INDIA ANNA DR...|  1,00,00,000| OC-3975| 12|    4|2019|
|   3|  2019-04-12|ALL INDIA ANNA DR...|  1,00,00,000| OC-3967| 12|    4|2019|
|   4|  2019-04-12|ALL INDIA ANNA DR...|    10,00,000|TL-10418| 12|    4|2019|
|   5|  2019-04-12|ALL INDIA ANNA DR...|    10,00,000|  TL-126| 12|    4|2019|
+----+------------+--------------------+-------------+--------+---+-----+----+
only showing top 5 rows



In [225]:
purchaser_df = purchaser_df.withColumn("PurchasedDate", to_date(purchaser_df["PurchasedDate"], "dd/MMM/yyyy"))
purchaser_df = purchaser_df.withColumn("Day", dayofmonth("PurchasedDate")) \
                         .withColumn("Month", month("PurchasedDate")) \
                         .withColumn("Year", year("PurchasedDate"))

purchaser_df.show(5)

+----+-------------+-----------+-------------------+-------------+------+--------+---+-----+----+
|SrNo|PurchasedDate| ExpiryDate|      PurchaserName|Denominations|Status|  BondNo|Day|Month|Year|
+----+-------------+-----------+-------------------+-------------+------+--------+---+-----+----+
|   1|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|    10,00,000|  Paid|TL-11448| 12|    4|2019|
|   2|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|    10,00,000|  Paid|TL-11447| 12|    4|2019|
|   3|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|    10,00,000|  Paid|TL-11441| 12|    4|2019|
|   4|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|     1,00,000|  Paid| OL-1113| 12|    4|2019|
|   5|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|     1,00,000|  Paid| OL-1118| 12|    4|2019|
+----+-------------+-----------+-------------------+-------------+------+--------+---+-----+----+
only showing top 5 rows



### Change column data types

In [226]:
enchaser_df = enchaser_df.withColumn("Denominations", col("Denominations").cast("string")) \
    .withColumn("Denominations", regexp_replace(col("Denominations"), ",", "")) \
    .withColumn("Denominations", col("Denominations").cast("float"))

In [227]:
purchaser_df = purchaser_df.withColumn("Denominations", col("Denominations").cast("string")) \
    .withColumn("Denominations", regexp_replace(col("Denominations"), ",", "")) \
    .withColumn("Denominations", col("Denominations").cast("float"))

## Analysis on Enchasers

In [228]:
from pyspark.sql.functions import sum as _sum

In [229]:
enchaser_df.show(5)

+----+------------+--------------------+-------------+--------+---+-----+----+
|SrNo|EnchasedDate|        EnchaserName|Denominations|  BondNo|Day|Month|Year|
+----+------------+--------------------+-------------+--------+---+-----+----+
|   1|  2019-04-12|ALL INDIA ANNA DR...|        1.0E7|  OC-775| 12|    4|2019|
|   2|  2019-04-12|ALL INDIA ANNA DR...|        1.0E7| OC-3975| 12|    4|2019|
|   3|  2019-04-12|ALL INDIA ANNA DR...|        1.0E7| OC-3967| 12|    4|2019|
|   4|  2019-04-12|ALL INDIA ANNA DR...|    1000000.0|TL-10418| 12|    4|2019|
|   5|  2019-04-12|ALL INDIA ANNA DR...|    1000000.0|  TL-126| 12|    4|2019|
+----+------------+--------------------+-------------+--------+---+-----+----+
only showing top 5 rows



In [230]:
topEnchaser = enchaser_df.groupBy('EnchaserName').agg(_sum('Denominations').alias('TotalAmount')).orderBy('TotalAmount', ascending=False)
topEnchaser.show(5, truncate=False)

+---------------------------------------+------------+
|EnchaserName                           |TotalAmount |
+---------------------------------------+------------+
|BHARATIYA JANATA PARTY                 |6.0605111E10|
|ALL INDIA TRINAMOOL CONGRESS           |1.6095314E10|
|PRESIDENT, ALL INDIA CONGRESS COMMITTEE|1.4218655E10|
|BHARAT RASHTRA SAMITHI                 |1.2147099E10|
|BIJU JANATA DAL                        |7.755E9     |
+---------------------------------------+------------+
only showing top 5 rows



In [231]:
n = 50

top_n_parties = topEnchaser.orderBy('TotalAmount', descending=False).limit(n)

fig1 = px.bar(top_n_parties.toPandas(),
              y='EnchaserName',
              x='TotalAmount',
              color='TotalAmount',
              title=f'Top {n} Bonds Enchased Parties (Amount in Crores)',
              text_auto='.2s',
              orientation='h')

fig1.update_traces(textfont_size=12, textangle=0, cliponaxis=False, textposition='outside')
fig1.update_layout(uniformtext_minsize=8,
                   uniformtext_mode='hide',
                   height=800,
                   barmode="stack",
                   hovermode="y unified",)

fig1.show()

In [232]:
totalByYear_enchasers = enchaser_df.groupBy('EnchaserName', 'Year').agg(_sum('Denominations').alias('TotalAmountByYear')).orderBy('EnchaserName', 'Year', 'TotalAmountByYear')
totalByYear_enchasers.show(5, truncate=False)

+---------------+----+-----------------+
|EnchaserName   |Year|TotalAmountByYear|
+---------------+----+-----------------+
|AAM AADMI PARTY|2019|2.2E7            |
|AAM AADMI PARTY|2020|3.3E7            |
|AAM AADMI PARTY|2021|1.85E7           |
|AAM AADMI PARTY|2022|4.795E8          |
|AAM AADMI PARTY|2023|1.015E8          |
+---------------+----+-----------------+
only showing top 5 rows



In [233]:
from dash import Dash, html, dcc, callback, Output, Input
import plotly.express as px
import pandas as pd

totalByYearPD_enchasers = totalByYear_enchasers.toPandas()
partyName = [row.EnchaserName for row in totalByYear_enchasers.select('EnchaserName').distinct().collect()]


app = Dash(__name__)

app.layout = html.Div([
    html.H1(children='Party Enchased per Year', style={'textAlign': 'center', 'color': 'white'}),
    dcc.Dropdown(
        id='dropdown-selection',
        options=[{'label': party, 'value': party} for party in totalByYearPD_enchasers['EnchaserName'].unique()],
        value='AAM AADMI PARTY'
    ),
    dcc.Graph(id='graph-content')
])

@app.callback(
    Output('graph-content', 'figure'),
    [Input('dropdown-selection', 'value')]
)
def update_graph(value):
    df = totalByYearPD_enchasers[totalByYearPD_enchasers['EnchaserName'] == value]
    fig = px.bar(df, x='Year', y='TotalAmountByYear', color='TotalAmountByYear')
    fig.update_xaxes(type='category')
    fig.update_layout(bargap=0.3)

    return fig

if __name__ == '__main__':
     app.run_server(mode='inline', host="0.0.0.0", port=1005)


<IPython.core.display.Javascript object>

## Analysis on Purchasers

In [234]:
from pyspark.sql.functions import sum as _sum

In [235]:
purchaser_df.show(5)

+----+-------------+-----------+-------------------+-------------+------+--------+---+-----+----+
|SrNo|PurchasedDate| ExpiryDate|      PurchaserName|Denominations|Status|  BondNo|Day|Month|Year|
+----+-------------+-----------+-------------------+-------------+------+--------+---+-----+----+
|   1|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|    1000000.0|  Paid|TL-11448| 12|    4|2019|
|   2|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|    1000000.0|  Paid|TL-11447| 12|    4|2019|
|   3|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|    1000000.0|  Paid|TL-11441| 12|    4|2019|
|   4|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|     100000.0|  Paid| OL-1113| 12|    4|2019|
|   5|   2019-04-12|26/Apr/2019|A B C INDIA LIMITED|     100000.0|  Paid| OL-1118| 12|    4|2019|
+----+-------------+-----------+-------------------+-------------+------+--------+---+-----+----+
only showing top 5 rows



In [236]:
topPurchaser = purchaser_df.groupBy('PurchaserName').agg(_sum('Denominations').alias('TotalAmount')).orderBy('TotalAmount', ascending=False)
topPurchaser.show(5, truncate=False)

+----------------------------------------------+-----------+
|PurchaserName                                 |TotalAmount|
+----------------------------------------------+-----------+
|FUTURE GAMING AND HOTEL SERVICES PR           |1.208E10   |
|MEGHA ENGINEERING AND INFRASTRUCTURES LI MITED|8.21E9     |
|QWIKSUPPLYCHAINPRIVATELIMITED                 |4.1E9      |
|HALDIA ENERGY LIMITED                         |3.77E9     |
|VEDANTA LIMITED                               |3.7565E9   |
+----------------------------------------------+-----------+
only showing top 5 rows



In [237]:
topPurchaser.orderBy('TotalAmount', ascending=False).show()

+--------------------+-----------+
|       PurchaserName|TotalAmount|
+--------------------+-----------+
|FUTURE GAMING AND...|   1.208E10|
|MEGHA ENGINEERING...|     8.21E9|
|QWIKSUPPLYCHAINPR...|      4.1E9|
|HALDIA ENERGY LIM...|     3.77E9|
|     VEDANTA LIMITED|   3.7565E9|
|ESSEL MINING AND ...|    2.245E9|
|WESTERN UP POWER ...|      2.2E9|
|KEVENTER FOODPARK...|     1.95E9|
|       MADANLAL LTD.|    1.855E9|
|BHARTI AIRTEL LIM...|     1.83E9|
|YASHODA SUPER SPE...|     1.62E9|
|UTKAL ALUMINA INT...|    1.353E9|
|DLF COMMERCIAL DE...|      1.3E9|
|MKJ ENTERPRISES L...|   1.2835E9|
|JINDAL STEEL AND ...|     1.23E9|
|B G SHIRKE CONSTR...|     1.17E9|
|DHARIWAL INFRASTR...|     1.15E9|
|CHENNAI GREEN WOO...|     1.05E9|
|BIRLACARBONINDIAP...|     1.05E9|
|   RUNGTA SONS P LTD|      1.0E9|
+--------------------+-----------+
only showing top 20 rows



In [238]:
n = 50
top_n_purchasers = topPurchaser.orderBy('TotalAmount', ascending=False).limit(n)

fig1 = px.bar(top_n_purchasers.orderBy('TotalAmount', ascending=True).toPandas(),
              y='PurchaserName',
              x='TotalAmount',
              color='TotalAmount',
              title=f'Top {n} Bonds Purchased Parties (Amount in Crores)',
              text_auto='.2s',
              orientation='h')

fig1.update_traces(textfont_size=12, textangle=0, cliponaxis=False, textposition='outside')
fig1.update_layout(uniformtext_minsize=8,
                   uniformtext_mode='hide',
                   height=800,
                   barmode="stack",
                   hovermode="y unified",)

fig1.show()

In [239]:
totalByYear_purchaser = purchaser_df.groupBy('PurchaserName', 'Year').agg(_sum('Denominations').alias('TotalAmountByYear')).orderBy('PurchaserName', 'Year', 'TotalAmountByYear')
totalByYear_purchaser.show(5, truncate=False)

+----------------------------------+----+-----------------+
|PurchaserName                     |Year|TotalAmountByYear|
+----------------------------------+----+-----------------+
|14 REELS PLUS LLP                 |2022|1.0E7            |
|A B C INDIA LIMITED               |2019|4000000.0        |
|AAKANKSHA BAHETY                  |2022|1000000.0        |
|AALAYA CONSTRUCTIONS              |2023|1.0E7            |
|AARISH SOLAR POWER PRIVATE LIMITED|2023|2.0E7            |
+----------------------------------+----+-----------------+
only showing top 5 rows



In [240]:
from dash import Dash, html, dcc, callback, Output, Input
import plotly.express as px
import pandas as pd

totalByYearPD_purchasers = totalByYear_purchaser.toPandas()
purchasrName = [row.PurchaserName for row in totalByYear_purchaser.select('PurchaserName').distinct().collect()]


app = Dash(__name__)

app.layout = html.Div([
    html.H1(children='Total Purchased per Year', style={'textAlign': 'center', 'color': 'white'}),
    dcc.Dropdown(
        id='dropdown-selection',
        options=[{'label': party, 'value': party} for party in totalByYearPD_purchasers['PurchaserName'].unique()],
        value='FUTURE GAMING AND HOTEL SERVICES PR'
    ),
    dcc.Graph(id='graph-content')
])

@app.callback(
    Output('graph-content', 'figure'),
    [Input('dropdown-selection', 'value')]
)
def update_graph(value):
    df = totalByYearPD_purchasers[totalByYearPD_purchasers['PurchaserName'] == value]
    fig = px.bar(df, x='Year', y='TotalAmountByYear', color='TotalAmountByYear')
    fig.update_xaxes(type='category')
    fig.update_layout(bargap=0.3)

    return fig

if __name__ == '__main__':
     app.run_server(mode='inline', host="0.0.0.0", port=1005)


<IPython.core.display.Javascript object>

## Create combined data

Join data based on bond number of purchaser information and enchaser information

In [241]:
enchaser_df.createOrReplaceTempView('edf')
purchaser_df.createOrReplaceTempView('pdf')
totalByYear_purchaser.createOrReplaceTempView('totalPruch')
totalByYear_enchasers.createOrReplaceTempView('totalEnchas')

### Enchaser with no purchaser data

In [242]:
enchaserWithNoPurch = spark.sql("""select *
                        -- edf.SrNo,
                        -- EnchasedDate,
                        -- EnchaserName,
                        -- edf.Denominations,
                        -- PurchasedDate,
                        -- ExpiryDate,
                        -- PurchaserName,
                        -- Status,
                        -- edf.BondNo
                        from edf
                        left join pdf on edf.bondno=pdf.bondno""")

enchaserWithNoPurch.where('PurchasedDate is NULL').groupBy('EnchaserName', 'edf.Year').agg(_sum('edf.Denominations')).show(truncate=False)

+------------------------------------------------------------+----+----------------------+
|EnchaserName                                                |Year|sum(edf.Denominations)|
+------------------------------------------------------------+----+----------------------+
|BHARATIYA JANATA PARTY                                      |2019|4.6631E9              |
|ADYAKSHA SAMAJVADI PARTY                                    |2019|8400000.0             |
|YSR  CONGRESS PARTY  (YUVAJANA SRAMIKA RYTHU CONGRESS PARTY)|2019|8.25E7                |
|AAM AADMI PARTY                                             |2019|2000000.0             |
|RASHTRIYA JANTA DAL                                         |2019|1.0E7                 |
|BHARAT RASHTRA SAMITHI                                      |2019|2.355E8               |
|BIHAR PRADESH JANTA DAL(UNITED)                             |2019|2.0E7                 |
|ALL INDIA TRINAMOOL CONGRESS                                |2019|1.701E8               |

### Purchaser with no enchaser data

In [243]:
enchaserWithNoPurch = spark.sql("""select *
                        -- edf.SrNo,
                        -- EnchasedDate,
                        -- EnchaserName,
                        -- edf.Denominations,
                        -- PurchasedDate,
                        -- ExpiryDate,
                        -- PurchaserName,
                        -- Status,
                        -- edf.BondNo
                        from edf
                        right join pdf on edf.bondno=pdf.bondno""")

enchaserWithNoPurch.where('EnchasedDate is NULL')\
                    .groupBy('PurchaserName', 'pdf.Year').agg(_sum('pdf.Denominations').alias("TotalAmount"))\
                    .orderBy("TotalAmount", ascending=False).show(truncate=False)

+------------------------------------------+----+-----------+
|PurchaserName                             |Year|TotalAmount|
+------------------------------------------+----+-----------+
|FUTURE GAMING AND HOTEL SERVICES PR       |2022|3.0E7      |
|ECL FINANCE LTD                           |2019|2.0E7      |
|EDELWEISS RURAL   CORPORATE SERVICE       |2019|1.0E7      |
|PACIFICA DEVELOPERS P LTD REFLECTIO       |2024|7500000.0  |
|SYLVANUS BUILDERS AND DEVELOPERS LI       |2024|5000000.0  |
|PACIFICA  INDIA  PROJECTS PVT LTD         |2024|5000000.0  |
|C MACKERTICH PVT LTD                      |2019|4000000.0  |
|VEDANTA LTD                               |2022|3000000.0  |
|SUDHA COMMERCIAL COMPANY LTD              |2019|3000000.0  |
|JUPITER MERCANTILES PVT LTD               |2019|2500000.0  |
|UTKAL ALUMINA INTERNATIONAL LIMITED       |2019|2000000.0  |
|V M SALGAOCAR CORPORATION PVT LTD         |2022|1000000.0  |
|KEYSTONE REALTORS PVT LTD                 |2019|1000000.0  |
|CHENNAM

In [244]:
joinData = spark.sql("""select *
                        -- edf.SrNo,
                        -- EnchasedDate,
                        -- EnchaserName,
                        -- edf.Denominations,
                        -- PurchasedDate,
                        -- ExpiryDate,
                        -- PurchaserName,
                        -- Status,
                        -- edf.BondNo
                        from edf
                        join pdf on edf.bondno=pdf.bondno""")

joinData.show(5)

+----+------------+--------------------+-------------+-------+---+-----+----+----+-------------+-----------+--------------------+-------------+------+-------+---+-----+----+
|SrNo|EnchasedDate|        EnchaserName|Denominations| BondNo|Day|Month|Year|SrNo|PurchasedDate| ExpiryDate|       PurchaserName|Denominations|Status| BondNo|Day|Month|Year|
+----+------------+--------------------+-------------+-------+---+-----+----+----+-------------+-----------+--------------------+-------------+------+-------+---+-----+----+
| 874|  2019-04-16|BHARATIYA JANATA ...|        1.0E7|OC-5482| 16|    4|2019| 131|   2019-04-12|26/Apr/2019|MEGHA ENGINEERING...|        1.0E7|  Paid|OC-5482| 12|    4|2019|
|1196|  2019-04-18|BHARATIYA JANATA ...|        1.0E7|OC-7052| 18|    4|2019| 188|   2019-04-12|26/Apr/2019|PHL FINIVEST PVT LTD|        1.0E7|  Paid|OC-7052| 12|    4|2019|
|1209|  2019-04-18|BHARATIYA JANATA ...|        1.0E7|OC-6700| 18|    4|2019| 201|   2019-04-12|26/Apr/2019|PHL FINIVEST PVT LTD| 

In [245]:
# Lets create a column called date diff which means date difference in data of amount purhcased and enchased based on bond number

joinData = joinData.withColumn("datadiff", datediff(col('EnchasedDate'), col('PurchasedDate')))
joinData.show(5)

+----+------------+--------------------+-------------+-------+---+-----+----+----+-------------+-----------+--------------------+-------------+------+-------+---+-----+----+--------+
|SrNo|EnchasedDate|        EnchaserName|Denominations| BondNo|Day|Month|Year|SrNo|PurchasedDate| ExpiryDate|       PurchaserName|Denominations|Status| BondNo|Day|Month|Year|datadiff|
+----+------------+--------------------+-------------+-------+---+-----+----+----+-------------+-----------+--------------------+-------------+------+-------+---+-----+----+--------+
| 874|  2019-04-16|BHARATIYA JANATA ...|        1.0E7|OC-5482| 16|    4|2019| 131|   2019-04-12|26/Apr/2019|MEGHA ENGINEERING...|        1.0E7|  Paid|OC-5482| 12|    4|2019|       4|
|1196|  2019-04-18|BHARATIYA JANATA ...|        1.0E7|OC-7052| 18|    4|2019| 188|   2019-04-12|26/Apr/2019|PHL FINIVEST PVT LTD|        1.0E7|  Paid|OC-7052| 12|    4|2019|       6|
|1209|  2019-04-18|BHARATIYA JANATA ...|        1.0E7|OC-6700| 18|    4|2019| 201|   

### Let find out Total amount based on with all enchaser and purchaser detail

In [246]:
joinData_party = joinData.groupBy('EnchaserName', 'edf.Year').agg(_sum('edf.Denominations').alias('TotalAmount')).orderBy('TotalAmount', ascending=False)
joinData_party.show(5, truncate=False)

+---------------------------------------+----+------------+
|EnchaserName                           |Year|TotalAmount |
+---------------------------------------+----+------------+
|BHARATIYA JANATA PARTY                 |2022|1.7635499E10|
|BHARATIYA JANATA PARTY                 |2023|1.6763261E10|
|BHARATIYA JANATA PARTY                 |2019|1.5054401E10|
|PRESIDENT, ALL INDIA CONGRESS COMMITTEE|2023|7.9337E9    |
|ALL INDIA TRINAMOOL CONGRESS           |2023|5.624738E9  |
+---------------------------------------+----+------------+
only showing top 5 rows



In [247]:
joinData_purch = joinData.groupBy('PurchaserName', 'pdf.Year').agg(_sum('pdf.Denominations').alias('TotalAmount')).orderBy('TotalAmount', ascending=False)
joinData_purch.show(5, truncate=False)


+----------------------------------------------+----+-----------+
|PurchaserName                                 |Year|TotalAmount|
+----------------------------------------------+----+-----------+
|MEGHA ENGINEERING AND INFRASTRUCTURES LI MITED|2023|4.15E9     |
|FUTURE GAMING AND HOTEL SERVICES PR           |2022|4.02E9     |
|QWIKSUPPLYCHAINPRIVATELIMITED                 |2022|3.6E9      |
|FUTURE GAMING AND HOTEL SERVICES PR           |2021|3.34E9     |
|FUTURE GAMING AND HOTEL SERVICES PR           |2023|2.56E9     |
+----------------------------------------------+----+-----------+
only showing top 5 rows



### Amount given and received information
Lets check who gave how much amount to the parties

In [248]:
joinData_party_purch = joinData.groupBy('EnchaserName', 'PurchaserName',).agg(_sum('edf.Denominations').alias('TotalAmount')).orderBy('TotalAmount', ascending=False)
joinData_party_purch.show(truncate=False)


+------------------------------------------------------------+----------------------------------------------+-----------+
|EnchaserName                                                |PurchaserName                                 |TotalAmount|
+------------------------------------------------------------+----------------------------------------------+-----------+
|BHARATIYA JANATA PARTY                                      |MEGHA ENGINEERING AND INFRASTRUCTURES LI MITED|5.19E9     |
|DRAVIDA MUNNETRA KAZHAGAM (DMK)                             |FUTURE GAMING AND HOTEL SERVICES PR           |4.53E9     |
|ALL INDIA TRINAMOOL CONGRESS                                |FUTURE GAMING AND HOTEL SERVICES PR           |4.35E9     |
|BHARATIYA JANATA PARTY                                      |QWIKSUPPLYCHAINPRIVATELIMITED                 |3.75E9     |
|ALL INDIA TRINAMOOL CONGRESS                                |HALDIA ENERGY LIMITED                         |2.81E9     |
|BHARATIYA JANATA PARTY 

Actual enchased, purchased amount and total enchased, purchased amount is not matching. so lets check on which year and for which purchaser or enchaser how much amount is missing.

### Comparing Actual vs. Total Enchased Data

We'll compare actual enchased amounts with total enchased amounts to identify discrepancies. This comparison helps detect missing enchased amount that doesnt have purchaser information and ensures data integrity.

In [249]:
diffData_party = joinData_party.join(totalByYear_enchasers,
                    (totalByYear_enchasers.EnchaserName == joinData_party.EnchaserName) &
                    (totalByYear_enchasers.Year == joinData_party.Year),
                    how="inner")

diffData_party = diffData_party.withColumnRenamed('TotalAmountByYear', 'TotalAmountActual') \
                               .withColumnRenamed('TotalAmount', 'TotalAmountWithPurchaserData')
diffData_party = diffData_party.withColumn("DifferenceAmount", abs(diffData_party.TotalAmountWithPurchaserData-diffData_party.TotalAmountActual))

In [250]:
diffData_party.select('edf.EnchaserName', 'edf.Year', 'TotalAmountWithPurchaserData', 'TotalAmountActual', 'DifferenceAmount').where('DifferenceAmount>0').show(truncate=False)


+----------------------------------------------+----+----------------------------+-----------------+----------------+
|EnchaserName                                  |Year|TotalAmountWithPurchaserData|TotalAmountActual|DifferenceAmount|
+----------------------------------------------+----+----------------------------+-----------------+----------------+
|BHARATIYA JANATA PARTY                        |2019|1.5054401E10                |1.9717501E10     |4.6631E9        |
|ADYAKSHA SAMAJVADI PARTY                      |2019|1.0E8                       |1.084E8          |8400000.0       |
|AAM AADMI PARTY                               |2019|2.0E7                       |2.2E7            |2000000.0       |
|RASHTRIYA JANTA DAL                           |2019|1.5E7                       |2.5E7            |1.0E7           |
|BHARAT RASHTRA SAMITHI                        |2019|1.36029E8                   |3.71529E8        |2.355E8         |
|BIHAR PRADESH JANTA DAL(UNITED)               |2019|1.0

### Comparing Actual vs. Total Purchased Data

We'll compare actual purchased amounts with total purchased amounts to identify discrepancies. This comparison helps detect missing purchased amount that doesnt have enchaser information and ensures data integrity.

In [251]:
diffData_purch = joinData_purch.join(totalByYear_purchaser,
                    (totalByYear_purchaser.PurchaserName == joinData_purch.PurchaserName) &
                    (totalByYear_purchaser.Year == joinData_purch.Year),
                    how="inner")

diffData_purch = diffData_purch.withColumnRenamed('TotalAmountByYear', 'TotalAmountActual') \
                               .withColumnRenamed('TotalAmount', 'TotalAmountWithEnchaserData')
diffData_purch = diffData_purch.withColumn("DifferenceAmount", abs(diffData_purch.TotalAmountWithEnchaserData-diffData_purch.TotalAmountActual))

In [252]:
diffData_purch.select('pdf.PurchaserName', 'pdf.Year', 'TotalAmountWithEnchaserData', 'TotalAmountActual', 'DifferenceAmount').where('DifferenceAmount>0').orderBy("PurchaserName").show(truncate=False)

+------------------------------------------+----+---------------------------+-----------------+----------------+
|PurchaserName                             |Year|TotalAmountWithEnchaserData|TotalAmountActual|DifferenceAmount|
+------------------------------------------+----+---------------------------+-----------------+----------------+
|CHENNAMANAGATHIHALLI SOLAR POWER PROJECT  |2023|150000.0                   |750000.0         |600000.0        |
|DIPAN  P  SHAH                            |2023|8500000.0                  |8800000.0        |300000.0        |
|FUTURE GAMING AND HOTEL SERVICES PR       |2022|4.02E9                     |4.05E9           |3.0E7           |
|KEYSTONE REALTORS PVT LTD                 |2019|4.9E7                      |5.0E7            |1000000.0       |
|PRAKASH DISTILLERY & CHEMICAL CO (P) LT  D|2021|4780000.0                  |5080000.0        |300000.0        |
|RAKESH PRAVINCHANDRA SHAH                 |2023|1.86E7                     |1.87E7           |1

### Enchasers and their fund sources

#### TreeMap Chart

In [253]:
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql import Window

joinData_party_purch = joinData_party_purch.withColumn("PercentageShare",
      _sum('TotalAmount').over(Window.partitionBy("EnchaserName", "PurchaserName")) / _sum('TotalAmount').over(Window.partitionBy("EnchaserName"))*100)\
                                .orderBy(col("EnchaserName").asc(), col("PercentageShare").desc())

joinData_party_purch.show(5, truncate=False)

+---------------+------------------------------+-----------+------------------+
|EnchaserName   |PurchaserName                 |TotalAmount|PercentageShare   |
+---------------+------------------------------+-----------+------------------+
|AAM AADMI PARTY|AVEES TRADING FINANCE PVT LTD |1.0E8      |15.32567049808429 |
|AAM AADMI PARTY|BAJAJ AUTO LTD                |8.0E7      |12.260536398467432|
|AAM AADMI PARTY|TRANSWAYS EXIM PRIVATE LIMITED|7.0E7      |10.727969348659004|
|AAM AADMI PARTY|MKJ ENTERPRISES LIMITED       |7.0E7      |10.727969348659004|
|AAM AADMI PARTY|TORRENT  POWER  LIMITED       |5.0E7      |7.662835249042145 |
+---------------+------------------------------+-----------+------------------+
only showing top 5 rows



 Enchaser and their purchaser treemap

In [254]:
import plotly.express as px

fig = px.treemap(joinData_party_purch.toPandas(),
                 path=[px.Constant("Parties"), 'EnchaserName', 'PurchaserName'],
                 values='TotalAmount',
                 color='TotalAmount',
                 color_continuous_scale='RdBu')

fig.update_traces(marker=dict(cornerradius=5),
                  textinfo='label+value',
                  hoverinfo='label+value+percent entry',
                  textfont_size=10)

fig.update_layout(margin=dict(t=50, l=25, r=25, b=25))

fig.show()


Lets simply the plot

In [255]:
from pyspark.sql.functions import when, rank

top_n = 5

jDPPN = joinData_party_purch.alias("jDPPN")
windowSpec = Window.partitionBy("EnchaserName").orderBy(col("TotalAmount").desc())
jDPPN = jDPPN.withColumn("rank", rank().over(windowSpec))

other_total_amounts = jDPPN.filter(jDPPN['rank'] > top_n).groupBy('EnchaserName').agg(_sum('TotalAmount').alias('OtherTotalAmount'))
df_with_other = jDPPN.join(other_total_amounts, on='EnchaserName', how='left')
df_with_other = df_with_other.withColumn('PurchaserName', when(df_with_other['rank'] > top_n, 'Other').otherwise(df_with_other['PurchaserName']))
df_with_other = df_with_other.withColumn('TotalAmount', when(df_with_other['PurchaserName'] == 'Other', df_with_other['OtherTotalAmount']).otherwise(df_with_other['TotalAmount']))
df_with_other = df_with_other.drop('OtherTotalAmount', "PercentageShare", "rank").drop_duplicates()

df_with_other.show(5, truncate=False)


+---------------+---------------------------------+-----------+
|EnchaserName   |PurchaserName                    |TotalAmount|
+---------------+---------------------------------+-----------+
|AAM AADMI PARTY|AVEES TRADING FINANCE PVT LTD    |1.0E8      |
|AAM AADMI PARTY|BAJAJ AUTO LTD                   |8.0E7      |
|AAM AADMI PARTY|MKJ ENTERPRISES LIMITED          |7.0E7      |
|AAM AADMI PARTY|TRANSWAYS EXIM PRIVATE LIMITED   |7.0E7      |
|AAM AADMI PARTY|ASIAN TRADING CORPORATION LIMITED|5.0E7      |
+---------------+---------------------------------+-----------+
only showing top 5 rows



In [256]:
max_value_threshold = df_with_other.toPandas()['TotalAmount'].quantile(0.99)

fig = px.treemap(df_with_other.toPandas(),
                #  path=['EnchaserName', 'PurchaserName'],
                 path=[px.Constant("Parties"), 'EnchaserName', 'PurchaserName'],
                 values='TotalAmount',
                 color="TotalAmount",
                 color_continuous_scale='earth',
                 color_continuous_midpoint=df_with_other.toPandas()['TotalAmount'].quantile(0.75), #df_with_other.toPandas()['TotalAmount'].mean(),
                 range_color=[0, max_value_threshold],
                 title="Enchasers and their fund sources")

fig.update_traces(marker=dict(cornerradius=5),
                  textinfo='label+value',
                  hoverinfo='label+value+percent entry',
                  textfont_size=15,
                  textfont_color="white",
                  textposition='top left')

fig.update_layout(margin=dict(t=50, l=25, r=25, b=25), uniformtext_minsize=8, uniformtext_mode=False, paper_bgcolor="LightSteelBlue")
fig.show()


#### Sunburst Chart

In [257]:
max_value_threshold = df_with_other.toPandas()['TotalAmount'].quantile(0.99)

fig = px.sunburst(
    df_with_other.toPandas(),
    path=['EnchaserName', 'PurchaserName'],
    values='TotalAmount',
    color="TotalAmount",
    color_continuous_scale='earth',
    color_continuous_midpoint=df_with_other.toPandas()['TotalAmount'].quantile(0.75), #df_with_other.toPandas()['TotalAmount'].mean(),
    range_color=[0, max_value_threshold]
)

fig.update_traces(textinfo='label',
                  hoverinfo='label+value+percent entry',
                  textfont_size=25,
                  textfont_color="black",
                  insidetextorientation='radial')

fig.update_layout(uniformtext=dict(minsize=8, mode='hide'),
                  margin=dict(t=50, l=25, r=25, b=25),
                  title_text="Enchasers and their fund sources",
                  title_font_size=15,
                  autosize=False,
                  width=1000,
                  height=1000,
                  paper_bgcolor="LightSteelBlue")
fig.show()

In [258]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

# Get distinct EnchaserName and PurchaserName values
enchaser_names = df_with_other.select("EnchaserName").distinct()
purchaser_names = df_with_other.select("PurchaserName").distinct()

# Create a combined list of unique names
combined_names = enchaser_names.union(purchaser_names).distinct()

# Assign indices to each unique name
name_dict = combined_names.rdd.zipWithIndex().map(lambda x: (x[0][0], x[1])).collectAsMap()

# Create DataFrame with indices
df_sankey = df_with_other.select(
    col("EnchaserName").alias("EN"),
    col("PurchaserName").alias("PN"),
    "TotalAmount"
).withColumn("EN_id", F.udf(lambda x: name_dict[x])(col("EN"))).withColumn(
    "PN_id", F.udf(lambda x: name_dict[x])(col("PN"))
).select("EN_id", "PN_id", "TotalAmount")

# Collect lists
en = df_sankey.select("EN_id").rdd.flatMap(lambda x: x).collect()
pn = df_sankey.select("PN_id").rdd.flatMap(lambda x: x).collect()
ta = df_sankey.select("TotalAmount").rdd.flatMap(lambda x: x).collect()


#### Sankey Chart

In [259]:
import plotly.graph_objects as go

fig = go.Figure(data=[go.Sankey(
    node = dict(
      pad = 15,
      thickness = 20,
      line = dict(color = "black", width = 0.5),
      label = list(name_dict.keys()),
      color = "blue"
    ),
    link = dict(
      source = en,
      target = pn,
      value = ta,
      color = "aliceblue"
  ))])

fig.update_layout(uniformtext=dict(minsize=8, mode='hide'),
                  margin=dict(t=50, l=25, r=25, b=25),
                  title_text="Enchasers and their fund sources",
                  title_font_size=15,
                  autosize=False,
                  width=2000,
                  height=1000,
                  paper_bgcolor="LightSteelBlue",
                  font_size=10)
fig.show()

#### Icicle Chart

In [260]:
fig = px.icicle(df_with_other.toPandas(),
                path=[px.Constant("Parties"), 'EnchaserName', 'PurchaserName'],
                values='TotalAmount',)
                # tiling = dict(orientation='v', flip='y'))

fig.update_traces(root_color="lightgrey", tiling = dict(orientation='v', flip='y'))
fig.update_layout(margin = dict(t=50, l=25, r=25, b=25),
                  autosize=False,
                  width=1500,
                  height=500,)
fig.show()

#### Network Graph

In [319]:
import networkx as nx
from pyvis.network import Network
from IPython.display import IFrame, HTML, display

df_with_other_pandas = df_with_other.toPandas()
G = nx.from_pandas_edgelist(df_with_other_pandas, source='EnchaserName', target='PurchaserName', edge_attr='TotalAmount')

net = Network(height='500px', width='100%', notebook=True, cdn_resources='remote', select_menu=True)

for node in G.nodes():
    if node in df_with_other_pandas['EnchaserName'].tolist():
        net.add_node(node, color='blue')
    elif node in df_with_other_pandas['PurchaserName'].tolist():
        net.add_node(node, color='red')

for source, target, data in G.edges(data=True):
    net.add_edge(source, target, value=data['TotalAmount'], width=data['TotalAmount'])

name = "enchaser_purchaser_graph.html"

# net.show_buttons(filter_=['physics'])
net.show(name)
display(HTML(name))

enchaser_purchaser_graph.html
