In [71]:
import pyspark

In [72]:
from pyspark.sql import *
from pyspark.sql import SparkSession

In [73]:
import json

In [74]:
import hashlib

In [75]:
f = open('donation_np.json')
data = json.load(f)

In [76]:
spark = SparkSession \
    .builder \
    .appName("Python Spark Donation ") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [77]:
df = spark.read.json("donation_np.json")
df.printSchema()
df.show()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+---------

In [78]:
df = df.withColumnRenamed("Contribution Mode","mode_of_payment").withColumnRenamed("Financial Year","fin_year").withColumnRenamed("PAN Given","pan_given")

In [79]:
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|                null|    null|                null|    null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |    

In [80]:
from pyspark.sql.functions import sha2, concat_ws
df = df.withColumn("Address", sha2(concat_ws("||", "Address"), 256))
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|e3b0c44298fc1c149...|    null|                null|    null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |    

In [81]:
df = df.drop("_corrupt_record","field10","field11","field12","field13","field14","field9")

In [82]:
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|e3b0c44298fc1c149...|    null|                null|    null|                null|     null|  null|  null|
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|Through Bank Tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|Through Bank Tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|      

In [83]:
df = df.where(df.Name.isNotNull())
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|Through Bank Tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|Through Bank Tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|          146865 SBI| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|      

In [84]:
from pyspark.sql.functions import lower, col, when,lit
df = df.withColumn("mode_of_payment", lower(col("mode_of_payment")))

In [85]:
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|                cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, hdfc bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|cheque, state ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|through bank tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|through bank tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|          146865 sbi| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|      

In [86]:
df = df.withColumn("mode_of_payment", when(col("mode_of_payment").contains("cash"),"Cash")
      .when(col("mode_of_payment").contains("cheque"),"Cheque")
      .when(col("mode_of_payment").contains("ch."),"Cheque")
      .when(col("mode_of_payment").contains("bank"),"Bank")
      .otherwise("Others"))

df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|           Bank| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|         Cheque| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|           Bank| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|           Bank| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|         Others| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|         Others| 2011-12|   Uma Shankar Gupta|     

In [91]:
from pyspark.sql.types import IntegerType
from pyspark.sql import functions
df = df.withColumn("Amount",col("Amount").cast(IntegerType()))
cf=df.groupBy("Party").agg(functions.sum("Amount"),functions.count("Party"),functions.avg("Amount"),functions.max("Amount"))
cf.show()

+------+-----------+------------+------------------+-----------+
| Party|sum(Amount)|count(Party)|       avg(Amount)|max(Amount)|
+------+-----------+------------+------------------+-----------+
|   INC| 4031487349|        3785|1065122.1529722589|  500000000|
|   BJP| 9295525996|        8782|1058474.8344340697|  500000000|
|   NCP|  647911419|         107| 6055246.906542056|   50000000|
|   CPI|   68123698|         384|177405.46354166666|    3000000|
|CPI(M)|  150622128|         515| 292470.1514563107|   10000000|
+------+-----------+------------+------------------+-----------+



In [88]:
def GetValueFromDataFrame(df,ColumnName):
    a = []
    for row in df.rdd.collect():
        a.append(row[ColumnName])
    return a

sum_arr=GetValueFromDataFrame(cf,"sum(Amount)")
print(sum_arr[0])


4031487349


In [89]:
def GetValueFromDataframe(df,columnName):
    a = []
    for row in df.rdd.collect():      
        a.append(row[columnName])
    return a

sum_arr = GetValueFromDataframe(cf,"sum(Amount)")
party_arr = GetValueFromDataframe(cf,"Party")
print(sum_arr[0])
i = 0

while i < len(party_arr):
    col_name = party_arr[i] + "_SUM_LTD"
    df = df.withColumn(col_name,lit(0))
    df = df.withColumn(col_name, when(col("Party") == party_arr[i],sum_arr[i])
    .otherwise(0))
    i += 1
df.show()

4031487349
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI(M)_SUM_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|          0|          0|          0|   68123698|             0|
|846539cb21bc9e6c6...|10000000|           Bank| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|          0|          0|          0|          0|     150622128|
|869fa3a19f1c51ad1...|  108000|         Cheque| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others| 4031487349|          0|          0|          0|             0|

In [92]:
def GetValueFromDataframe(df,columnName):
    a = []
    for row in df.rdd.collect():      
        a.append(row[columnName])
    return a

sum_arr = GetValueFromDataframe(cf,"sum(Amount)")
count_arr = GetValueFromDataframe(cf,"count(Party)")
avg_arr = GetValueFromDataframe(cf,"avg(Amount)")
max_arr = GetValueFromDataframe(cf,"max(Amount)")
party_arr = GetValueFromDataframe(cf,"Party")

i = 0
j = 0

party_arr_end = ["_SUM_LTD","_COUNT_LTD","_AVG_LTD","_MAX_LTD"]
agg_values = [sum_arr,count_arr,avg_arr,max_arr]

while i < 4:
    while j < len(party_arr):
        col_name = party_arr[j] + party_arr_end[i]
        df = df.withColumn(col_name,lit(0))
        df = df.withColumn(col_name, when(col("Party") == party_arr[j],agg_values[i][j])
        .otherwise(0))
        j += 1
    j = 0
    i += 1
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+----------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+--------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|   CPI(M)_AVG_LTD|INC_MAX_LTD|BJP_MAX_LTD|NCP_MAX_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+----

In [93]:
inc_top_donor = df.filter(df["Amount"] == max_arr[0]).select("Name").collect()[0][0]
bjp_top_donor = df.filter(df["Amount"] == max_arr[1]).select("Name").collect()[0][0]
ncp_top_donor = df.filter(df["Amount"] == max_arr[2]).select("Name").collect()[0][0]
cpi_top_donor = df.filter(df["Amount"] == max_arr[3]).select("Name").collect()[0][0]
cpi_m_top_donor = df.filter(df["Amount"] == max_arr[4]).select("Name").collect()[0][0]

In [94]:
i = 0
top_donor_arr = [inc_top_donor,bjp_top_donor,ncp_top_donor,cpi_top_donor,cpi_m_top_donor]

while i < 5:
    col_name = party_arr[i] + "_TOP_DONOR"
    df = df.withColumn(col_name,lit(0))
    df = df.withColumn(col_name, when(col("Party") == party_arr[i],top_donor_arr[i])
    .otherwise(0))
    i += 1
df.select("Party","INC_TOP_DONOR","BJP_TOP_DONOR","NCP_TOP_DONOR","CPI_TOP_DONOR","CPI(M)_TOP_DONOR").show()

+------+--------------------+--------------------+-------------+-------------+----------------+
| Party|       INC_TOP_DONOR|       BJP_TOP_DONOR|NCP_TOP_DONOR|CPI_TOP_DONOR|CPI(M)_TOP_DONOR|
+------+--------------------+--------------------+-------------+-------------+----------------+
|   CPI|                   0|                   0|            0|   Aziz Pasha|               0|
|CPI(M)|                   0|                   0|            0|            0|V K Ramachandran|
|   INC|General Electoral...|                   0|            0|            0|               0|
|   INC|General Electoral...|                   0|            0|            0|               0|
|   INC|General Electoral...|                   0|            0|            0|               0|
|   BJP|                   0|General Electoral...|            0|            0|               0|
|   BJP|                   0|General Electoral...|            0|            0|               0|
|   BJP|                   0|General Ele

In [97]:
x = df.groupBy('Party','fin_year').sum('Amount').sort('fin_year')
x.show()


+------+--------+-----------+
| Party|fin_year|sum(Amount)|
+------+--------+-----------+
|   INC| 2003-04|   28301101|
|CPI(M)| 2003-04|     200000|
|   BJP| 2003-04|  116881973|
|   CPI| 2003-04|     779148|
|   INC| 2004-05|  320555643|
|CPI(M)| 2004-05|     896355|
|   CPI| 2004-05|     630000|
|   BJP| 2004-05|  339521289|
|   CPI| 2005-06|    3988690|
|   BJP| 2005-06|   36156111|
|CPI(M)| 2005-06|     550000|
|   INC| 2005-06|   59212492|
|CPI(M)| 2006-07|    1124719|
|   BJP| 2006-07|   29550672|
|   INC| 2006-07|  121273513|
|   CPI| 2006-07|    1229400|
|   CPI| 2007-08|    4125800|
|CPI(M)| 2007-08|    7226116|
|   BJP| 2007-08|  249623653|
|   NCP| 2007-08|   10225000|
+------+--------+-----------+
only showing top 20 rows



In [100]:
for row in x.collect():
    name = row[1] + "_" + row[0] + "_" + "SUM"
    df = df.withColumn(name,lit('-'))
    df = df.withColumn(name, when(col("Party") == row[0], row[2])
    .otherwise('-'))
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+----------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+--------------+--------------------+--------------------+-------------+-------------+----------------+---------------+---------------+---------------+------------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+---------------+---------

In [102]:
y = df.groupBy('mode_of_payment').count()
y.show()

+---------------+-----+
|mode_of_payment|count|
+---------------+-----+
|           Bank| 7338|
|         Cheque| 2781|
|           Cash|  918|
|         Others| 2536|
+---------------+-----+



In [105]:
for row in y.collect():
    name = row[0] + "_COUNT_LTD"
    df = df.withColumn(name,lit('-'))
    df = df.withColumn(name, when(col("mode_of_payment") == row[0], row[1])
    .otherwise('-'))

df.select("mode_of_payment","BANK_COUNT_LTD","CHEQUE_COUNT_LTD","CASH_COUNT_LTD","OTHERS_COUNT_LTD").show()

+---------------+--------------+----------------+--------------+----------------+
|mode_of_payment|BANK_COUNT_LTD|CHEQUE_COUNT_LTD|CASH_COUNT_LTD|OTHERS_COUNT_LTD|
+---------------+--------------+----------------+--------------+----------------+
|           Cash|             -|               -|           918|               -|
|           Bank|          7338|               -|             -|               -|
|         Cheque|             -|            2781|             -|               -|
|           Bank|          7338|               -|             -|               -|
|           Bank|          7338|               -|             -|               -|
|         Others|             -|               -|             -|            2536|
|         Others|             -|               -|             -|            2536|
|           Bank|          7338|               -|             -|               -|
|         Others|             -|               -|             -|            2536|
|           Bank