In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 47.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=9a88221bc6492938ba4330c2834a1dbd708c7ed07c583ccae4ead733c2c549df
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [None]:
import pyspark as spark
import json
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Displaying the dataframe
df = spark.read.json("donation_np.json")
df.printSchema()
df.show()


root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+---------

In [None]:
# Dropping the null rows
df = df.na.drop(subset=['Name'])
df.show()

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|16-B, Ferozeshah ...| 3000000|                CASH|       2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...|       2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|3, Motilal Nehru ...|  108000|Cheque, State Ban...|       2014-15|  Dr. Manmohan Singh|        N|   INC|Others|        

In [None]:
# Renaming the column name 
df =df.withColumnRenamed("Contribution Mode","mode_of_payment").withColumnRenamed("Financial Year","fin_year").withColumnRenamed("PAN Given","pan_given")
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|3, Motilal Nehru ...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |    

In [None]:
from pyspark.sql.functions import sha2, concat_ws
df = df.withColumn("Address", sha2(concat_ws("||", df.Address), 256))
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |    

In [None]:
# Categorize mode_of_payment into below 4 categories 

from pyspark.sql.functions import *

df =df.withColumn('mode_of_payment',
when(df.mode_of_payment.like('%Cheque%'),'CHEQUE')  
.when(df.mode_of_payment.like('%CASH%'),'CASH') \
.when(df.mode_of_payment.like('%Cash%'),'CASH') \
.when(df.mode_of_payment.like('%Bank%'),'BANK') \
.otherwise('OTHERS'))

df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|5a3058deb6f337958...| 3000000|           CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|846539cb21bc9e6c6...|10000000|           BANK| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|869fa3a19f1c51ad1...|  108000|         CHEQUE| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |       |       |      |
|5f04f4013

In [None]:
# Dropping the null columns
col = ["field9","field10","field11","field12","field13","field14"]
df = df.drop(*col)

In [None]:
# Typecasting the Amount feature to int
df = df.withColumn("Amount",df.Amount.cast('int'))

In [None]:
agg_values = df.groupBy('Party').agg(count('Amount'),
                              avg('Amount'),
                              max('Amount'),
                              )

agg_values.show()
agg_values.collect()[2][1]

+------+-------------+------------------+-----------+
| Party|count(Amount)|       avg(Amount)|max(Amount)|
+------+-------------+------------------+-----------+
|   INC|         3785|1065122.1529722589|  500000000|
|   BJP|         8782|1058474.8344340697|  500000000|
|   NCP|          107| 6055246.906542056|   50000000|
|   CPI|          384|177405.46354166666|    3000000|
|CPI(M)|          515| 292470.1514563107|   10000000|
+------+-------------+------------------+-----------+



107

In [None]:
# Storing the names of the distinct party names and Max Amount of each party
partyNameList = agg_values.select('Party')
maxAmountList = agg_values.select('max(Amount)')

In [None]:
def add_new_feature_col(featureName, partyName, i, j, df):
    
    df = df.withColumn(featureName, when(df.Party == partyName,agg_values.collect()[i][j]).otherwise(0))

    return df
   

In [None]:
for i in range(5):
     j = 1
     partyName = partyNameList.collect()[i][0]
     featureName =  partyName + '_COUNT_LTD'
     print("FeatureName: ",featureName)
     df = add_new_feature_col(featureName, partyName, i,j,df)

FeatureName:  INC_COUNT_LTD
FeatureName:  BJP_COUNT_LTD
FeatureName:  NCP_COUNT_LTD
FeatureName:  CPI_COUNT_LTD
FeatureName:  CPI(M)_COUNT_LTD


In [None]:
for i in range(5):
     j = 3
     partyName = partyNameList.collect()[i][0]
     featureName =  partyName + '_MAX_LTD'
     print("FeatureName: ",featureName)
     df = add_new_feature_col(featureName, partyName, i,j,df)

FeatureName:  INC_MAX_LTD
FeatureName:  BJP_MAX_LTD
FeatureName:  NCP_MAX_LTD
FeatureName:  CPI_MAX_LTD
FeatureName:  CPI(M)_MAX_LTD


In [None]:
for i in range(5):
     j = 2
     partyName = partyNameList.collect()[i][0]
     featureName =  partyName + '_AVG_LTD'
     print("FeatureName: ",featureName)
     df = add_new_feature_col(featureName, partyName, i,j,df)

FeatureName:  INC_AVG_LTD
FeatureName:  BJP_AVG_LTD
FeatureName:  NCP_AVG_LTD
FeatureName:  CPI_AVG_LTD
FeatureName:  CPI(M)_AVG_LTD


In [None]:
from pyspark.sql.functions import *
INC_sum = df.where(df.Party=="INC").select(sum("Amount")).first()['sum(Amount)']
BJP_sum = df.where(df.Party=="BJP").select(sum("Amount")).first()['sum(Amount)']
NCP_sum = df.where(df.Party=="NCP").select(sum("Amount")).first()['sum(Amount)']
CPI_sum = df.where(df.Party=="CPI").select(sum("Amount")).first()['sum(Amount)']
CPI_M_sum = df.where(df.Party=="CPI(M)").select(sum("Amount")).first()['sum(Amount)']

df = df.withColumn('INC_SUM_LTD', when(df.Party == "INC",INC_sum).otherwise(0))\
    .withColumn('BJP_SUM_LTD', when(df.Party == "BJP",BJP_sum).otherwise(0))\
    .withColumn('NCP_SUM_LTD', when(df.Party == "NCP",NCP_sum).otherwise(0))\
    .withColumn('CPI_SUM_LTD', when(df.Party == "CPI",CPI_sum).otherwise(0))\
    .withColumn('CPI_M_SUM_LTD', when(df.Party == "CPI_M",CPI_M_sum).otherwise(0))


In [None]:
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------------+-------------+-------------+-------------+----------------+-----------+-----------+-----------+-----------+--------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+-------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|INC_MAX_LTD|BJP_MAX_LTD|NCP_MAX_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|   CPI(M)_AVG_LTD|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI_M_SUM_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------------+-------------+-------------+-------------+----------------+----

In [None]:
def add_top_donor_col(featuerName, partyName, maxValue,df):
    maxDonorName = df.filter((df.Party == partyName) & (df.Amount == maxValue)).select('Name').first()['Name']
    print("maxdonorName : ",maxDonorName)
    df = df.withColumn(featureName, when(df.Party == partyName, maxDonorName).otherwise(''))

    return df
   

In [None]:
for i in range(5):
    partyName = partyNameList.collect()[i][0]
    featureName =  partyName + '_TOP_DONOR'
    print("FeatureName: ",featureName)
    df = add_top_donor_col(featureName, partyNameList.collect()[i][0],maxAmountList.collect()[i][0],df)


FeatureName:  INC_TOP_DONOR
maxdonorName :  General Electoral Trust
FeatureName:  BJP_TOP_DONOR
maxdonorName :  General Electoral Trust
FeatureName:  NCP_TOP_DONOR
maxdonorName :  Lodha Dwellers Pvt. Ltd.
FeatureName:  CPI_TOP_DONOR
maxdonorName :  Aziz Pasha
FeatureName:  CPI(M)_TOP_DONOR
maxdonorName :  V K Ramachandran


In [None]:
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------------+-------------+-------------+-------------+----------------+-----------+-----------+-----------+-----------+--------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+-------------+--------------------+--------------------+-------------+-------------+----------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|INC_MAX_LTD|BJP_MAX_LTD|NCP_MAX_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|   CPI(M)_AVG_LTD|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI_M_SUM_LTD|       INC_TOP_DONOR|       BJP_TOP_DONOR|NCP_TOP_DONOR|CPI_TOP_DONOR|CPI(M)_TOP_DONOR|
+-------------------

In [None]:
finYearList = df.select('fin_year').distinct().sort('fin_year').show()

+--------+
|fin_year|
+--------+
| 2003-04|
| 2004-05|
| 2005-06|
| 2006-07|
| 2007-08|
| 2008-09|
| 2009-10|
| 2010-11|
| 2011-12|
| 2012-13|
| 2013-14|
| 2014-15|
+--------+



In [None]:
def add_fin_year_col(featureName, partyName, maxAmount, df):
    df = df.withColumn(featureName, when(df.Party == partyName, maxAmount).otherwise(''))
    return df


In [None]:
for i in range(5):
    
    yearlyList = df.groupBy('Party','fin_year').sum('Amount').filter(df.Party== partyNameList.collect()[i][0]).sort('fin_year')
    
    partyName = yearlyList.collect()[i][0]
    finYearList = yearlyList.select('fin_year')
    finYearCount = finYearList.count()
    print("partyName: ",partyName)
    print("finyearcount: ",finYearCount)
    for j in range(finYearCount):
        featureName = finYearList.collect()[j][0] + '_' + partyName + '_SUM'
        maxAmount = yearlyList.collect()[j][2]
        df = add_fin_year_col(featureName, partyName, maxAmount,df)

partyName:  INC
finyearcount:  12
partyName:  BJP
finyearcount:  12
partyName:  NCP
finyearcount:  8
partyName:  CPI
finyearcount:  12
partyName:  CPI(M)
finyearcount:  12


In [None]:
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------------+-------------+-------------+-------------+----------------+-----------+-----------+-----------+-----------+--------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+-------------+--------------------+--------------------+-------------+-------------+----------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------

In [None]:
paymentModeList = df.select('mode_of_payment').distinct()
paymentModeList.show()

+---------------+
|mode_of_payment|
+---------------+
|         OTHERS|
|         CHEQUE|
|           BANK|
|           CASH|
+---------------+



In [None]:
def add_no_of_donation_per_mode_col(featureName, paymentMode, no_of_donations_val, df):
    print("payment mode : ",paymentMode)
    print("no_of_donations_val: ",no_of_donations_val)
    df = df.withColumn(featureName, when(df.mode_of_payment == paymentMode, no_of_donations_val).otherwise(0))

    return df

In [None]:
for i in range(4):
    paymentMode = paymentModeList.collect()[i][0]
    print()
    featureName = paymentMode + '_count_LTD'
    donation_val = df.groupBy('mode_of_payment').agg(count('mode_of_payment')).filter(df.mode_of_payment == paymentMode).collect()[0][1]
    df = add_no_of_donation_per_mode_col(featureName, paymentMode, donation_val, df)


payment mode :  OTHERS
no_of_donations_val:  3498

payment mode :  CHEQUE
no_of_donations_val:  1993

payment mode :  BANK
no_of_donations_val:  7279

payment mode :  CASH
no_of_donations_val:  803


In [None]:
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------------+-------------+-------------+-------------+----------------+-----------+-----------+-----------+-----------+--------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+-------------+--------------------+--------------------+-------------+-------------+----------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------

In [None]:
df.write.parquet("final_sheet.parquet") 

In [None]:
df.write.csv("final_csv.csv") 