In [23]:
import pyspark


In [24]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc=SparkContext.getOrCreate()
spark=SparkSession(sc)

In [25]:
df=spark.read.json("donation_np.json")
df.show()

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|                null|    null|                null|          null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|16-B, Ferozeshah ...| 3000000|                CASH|       2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...|       2014-15|    V K Ramachandran|        Y|CPI(M)|Others|        

In [26]:
df1=df.withColumnRenamed("contribution mode","mode_of_payment").withColumnRenamed("Financial Year","fin_year ").withColumnRenamed("PAN Given ","pan_given")

In [27]:
df1.show()

+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year |                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|                null|    null|                null|     null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|16-B, Ferozeshah ...| 3000000|                CASH|  2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...|  2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |      

In [28]:
import hashlib

def encrypt_val(address):

    sha_value=hashlib.sha256(address.encode()).hexdigest()

    return sha_value

In [29]:
from pyspark.sql.functions import *

from pyspark.sql.types import *



spark_udf = udf(encrypt_val, StringType())

df2=df1.withColumn('encrypted_value',md5(col("address")))

In [30]:
df2.show()

+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+
|             Address|  Amount|     mode_of_payment|fin_year |                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|     encrypted_value|
+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+
|                null|    null|                null|     null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|                null|
|16-B, Ferozeshah ...| 3000000|                CASH|  2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |44bca5383d6dae229...|
|No.1, First Floor...|10000000|000037, H

In [31]:
df2.show()

+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+
|             Address|  Amount|     mode_of_payment|fin_year |                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|     encrypted_value|
+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+
|                null|    null|                null|     null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|                null|
|16-B, Ferozeshah ...| 3000000|                CASH|  2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |44bca5383d6dae229...|
|No.1, First Floor...|10000000|000037, H

In [32]:
from pyspark.sql.functions import when

from pyspark.sql.functions import lower, col

df3 = df2.withColumn("payment_seperated", when(lower(df2["mode_of_payment"]).contains('cash'), 'Cash').when(lower(df2["mode_of_payment"]).contains('cheque') | lower(df2["mode_of_payment"]).contains('ch.') | lower(df2["mode_of_payment"]).contains('ch no'), 'Cheque').when(lower(df2["mode_of_payment"]).contains('bank'), 'Bank').otherwise('Others'))

In [33]:
df3.show()

+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+-----------------+
|             Address|  Amount|     mode_of_payment|fin_year |                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|     encrypted_value|payment_seperated|
+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+-----------------+
|                null|    null|                null|     null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|                null|           Others|
|16-B, Ferozeshah ...| 3000000|                CASH|  2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |     

In [34]:
# from pyspark.sql.types import IntegerType
# df4=df3.withColumn("Amount",col("Amount").cast("int"))
# tmp=df4.groupBy("Party").sum("Amount")
# cnt=df4.groupBy("Party").count()
# avgv=df4.groupBy("Party").avg("Amount")
# mx=df4.groupBy("Party").max("Amount")

from pyspark.sql.types import IntegerType

df3 = df3.withColumn ("Amount", col ("Amount").cast(IntegerType())) 
sm =df3.groupBy('Party').sum("Amount")

cnt = df3.groupBy('Party').count() 
avgv= df3.groupBy('Party').avg("Amount")

mx = df3.groupBy('Party').max("Amount")


In [35]:
mx.show()

+------+-----------+
| Party|max(Amount)|
+------+-----------+
|   INC|  500000000|
|   BJP|  500000000|
|  null|       null|
|   NCP|   50000000|
|   CPI|    3000000|
|CPI(M)|   10000000|
+------+-----------+



In [36]:
df3=df3.withColumn("INC_SUM_LTD",\
when((df3.Party=="INC"),sm.collect()[0][1]).otherwise(0))
df3=df3.withColumn("BJP_SUM_LTD",\
when((df3.Party=="BJP"),sm.collect()[1][1]).otherwise(0))
df3=df3.withColumn("NCP_SUM_LTD",\
when((df3.Party=="NCP"),sm.collect()[3][1]).otherwise(0))
df3=df3.withColumn("CPI_SUM_LTD",\
when((df3.Party=="CPI"),sm.collect()[4][1]).otherwise(0))
df3=df3.withColumn("CPI(M)_SUM_LTD",\
when((df3.Party=="CPI(M)"),sm.collect()[5][1]).otherwise(0))





In [37]:
df3.show()

+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+-----------------+-----------+-----------+-----------+-----------+--------------+
|             Address|  Amount|     mode_of_payment|fin_year |                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|     encrypted_value|payment_seperated|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI(M)_SUM_LTD|
+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+-----------------+-----------+-----------+-----------+-----------+--------------+
|                null|    null|                null|     null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|           

In [38]:
df3=df3.withColumn("INC_COUNT_LTD",\
when((df3.Party=="INC"),cnt.collect()[0][1]).otherwise(0))
df3=df3.withColumn("BJP_COUNT_LTD",\
when((df3.Party=="BJP"),cnt.collect()[1][1]).otherwise(0))
df3=df3.withColumn("NCP_COUNT_LTD",\
when((df3.Party=="NCP"),cnt.collect()[3][1]).otherwise(0))
df3=df3.withColumn("CPI_COUNT_LTD",\
when((df3.Party=="CPI"),cnt.collect()[4][1]).otherwise(0))
df3=df3.withColumn("CPI(M)_COUNT_LTD",\
when((df3.Party=="CPI(M)"),cnt.collect()[5][1]).otherwise(0))


  

In [39]:
df3.show()

+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+-----------------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+----------------+
|             Address|  Amount|     mode_of_payment|fin_year |                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|     encrypted_value|payment_seperated|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|
+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+-----------------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+--------

In [40]:
df3=df3.withColumn("INC_AVG_LTD",\
when((df3.Party=="INC"),avgv.collect()[0][1]).otherwise(0))
df3=df3.withColumn("BJP_AVG_LTD",\
when((df3.Party=="BJP"),avgv.collect()[1][1]).otherwise(0))
df3=df3.withColumn("NCP_AVG_LTD",\
when((df3.Party=="NCP"),avgv.collect()[3][1]).otherwise(0))
df3=df3.withColumn("CPI_AVG_LTD",\
when((df3.Party=="CPI"),avgv.collect()[4][1]).otherwise(0))
df3=df3.withColumn("CPI(M)_AVG_LTD",\
when((df3.Party=="CPI(M)"),avgv.collect()[5][1]).otherwise(0))



   

In [41]:
df3=df3.withColumn("INC_MAX_LTD",\
when((df3.Party=="INC"),mx.collect()[0][1]).otherwise(0))
df3=df3.withColumn("BJP_MAX_LTD",\
when((df3.Party=="BJP"),mx.collect()[1][1]).otherwise(0))
df3=df3.withColumn("NCP_MAX_LTD",\
when((df3.Party=="NCP"),mx.collect()[3][1]).otherwise(0))
df3=df3.withColumn("CPI_MAX_LTD",\
when((df3.Party=="CPI"),mx.collect()[4][1]).otherwise(0))
df3=df3.withColumn("CPI(M)_MAX_LTD",\
when((df3.Party=="CPI(M)"),mx.collect()[5][1]).otherwise(0))





In [42]:

inc_top_donor=df3.select("Name").filter(df3.Amount==mx.collect()[0][1])
bjp_top_donor=df3.select("Name").filter(df3.Amount==mx.collect()[1][1])
ncp_top_donor=df3.select("Name").filter(df3.Amount==mx.collect()[3][1])
cpi_top_donor=df3.select("Name").filter(df3.Amount==mx.collect()[4][1])
cpim_top_donor=df3.select("Name").filter(df3.Amount==mx.collect()[5][1])

In [43]:


df3=df3.withColumn("INC_TOP_DONOR",\
when((df3.Party=="INC"),inc_top_donor.collect()[0][0]).otherwise('-'))
df3=df3.withColumn("BJP_TOP_DONOR",\
when((df3.Party=="BJP"),bjp_top_donor.collect()[0][0]).otherwise('-'))
df3=df3.withColumn("NCP_TOP_DONOR",\
when((df3.Party=="NCP"),ncp_top_donor.collect()[0][0]).otherwise('-'))
df3=df3.withColumn("CPI_TOP_DONOR",\
when((df3.Party=="CPI"),cpi_top_donor.collect()[0][0]).otherwise('-'))
df3=df3.withColumn("CPI(M)_TOP_DONOR",\
when((df3.Party=="CPI(M)"),cpim_top_donor.collect()[0][0]).otherwise('-'))





In [45]:
df3.show()

+--------------------+--------+--------------------+---------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+--------------------+-----------------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+----------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+--------------+--------------------+--------------------+-------------+-------------+----------------+
|             Address|  Amount|     mode_of_payment|fin_year |                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|     encrypted_value|payment_seperated|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|   

In [None]:
new = df.groupBy('fin_year','Party').sum("Amount")