In [242]:
import pyspark

In [243]:
from pyspark.context import SparkContext

from pyspark.sql.session import SparkSession

In [244]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [245]:
df = spark.read.json("donation_np.json")
df = df.na.drop(subset=["name"])
df.show()

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|16-B, Ferozeshah ...| 3000000|                CASH|       2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...|       2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|3, Motilal Nehru ...|  108000|Cheque, State Ban...|       2014-15|  Dr. Manmohan Singh|        N|   INC|Others|        

In [246]:
# 1. Change below column names
df = df.withColumnRenamed("Financial Year","fin_year").withColumnRenamed("PAN Given","pan_given").withColumnRenamed("Contribution Mode","mode_of_payment")
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|3, Motilal Nehru ...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |    

In [247]:
# 2. Encrypt address column
import hashlib
# from pyspark.sql.types import StringType, IntegerType, StructType, StructField
# def encrypt_value(Address):
#     sha_value = hashlib.sha256(Address.encode()).hexdigest()
#     return sha_value

# from pyspark.sql.functions import udf
# spark_udf = udf(encrypt_value, StringType())
# df = df.withColumn('encrypted_value',spark_udf('Address'))
# df.show()
from pyspark.sql.functions import sha2, concat_ws
df = df.withColumn("Address", sha2(concat_ws("||", df.Address), 256))
df.show()



+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |    

In [248]:
# 3. Categorize mode_of_payment into below 4 categories
from pyspark.sql.functions import when
from pyspark.sql.functions import regexp_replace
df = df.withColumn("mode_of_payment", when(df.mode_of_payment.contains("Cheque"),"Cheque")
                                 .when(df.mode_of_payment.contains("CASH"),"Cash")
                                 .when(df.mode_of_payment.contains("Cash"),"Cash")
                                 .when(df.mode_of_payment.contains("Bank"),"Bank")
                                 .otherwise("Others"))
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|846539cb21bc9e6c6...|10000000|           Bank| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|869fa3a19f1c51ad1...|  108000|         Cheque| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |       |       |      |
|5f04f4013

In [249]:
# Calculate aggregates per party - SUM
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when

df = df.withColumn("Amount",df.Amount.cast(IntegerType()))

sum_inc_ltd = df.groupBy("Party").sum("Amount").filter(df.Party=="INC").select("sum(Amount)").collect()
sum_bjp_ltd = df.groupBy("Party").sum("Amount").filter(df.Party=="BJP").select("sum(Amount)").collect()
sum_ncp_ltd = df.groupBy("Party").sum("Amount").filter(df.Party=="NCP").select("sum(Amount)").collect()
sum_cpi_ltd = df.groupBy("Party").sum("Amount").filter(df.Party=="CPI").select("sum(Amount)").collect()
sum_cpim_ltd = df.groupBy("Party").sum("Amount").filter(df.Party=="CPI(M)").select("sum(Amount)").collect()

# df = df.withColumn("SUM_INC_LTD", when(df.Party.contains("INC"),sum_inc_ltd).otherwise(0))
# df = df.withColumn("INC_SUM_LTD",when((df.Party == "INC"),sum_inc_ltd[0][0]).otherwise(0))
# df = df.withColumn("BJP_SUM_LTD",when((df.Party == "BJP"),sum_bjp_ltd[0][0]).otherwise(0))
# df = df.withColumn("NCP_SUM_LTD",when((df.Party == "NCP"),sum_ncp_ltd[0][0]).otherwise(0))
# df = df.withColumn("CPI_SUM_LTD",when((df.Party == "CPI"),sum_cpi_ltd[0][0]).otherwise(0))
# df = df.withColumn("CPI(M)_SUM_LTD",when((df.Party == "CPI(M)"),sum_cpim_ltd[0][0]).otherwise(0)).show()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
sumList = ['INC_SUM_LTD', 'BJP_SUM_LTD', 'NCP_SUM_LTD', 'CPI_SUM_LTD', 'CPI_M_SUM_LTD']
sumValue = [sum_inc_ltd[0][0], sum_bjp_ltd[0][0], sum_ncp_ltd[0][0], sum_cpi_ltd[0][0], sum_cpim_ltd[0][0]]

for i in range(0,len(sumList)):
   df = df.withColumn(sumList[i],lit(0))
   df = df.withColumn(sumList[i], when(df.Party.contains(partyList[i]), sumValue[i]).otherwise(0))

In [250]:
# Calculate aggregates per party - COUNT
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when

df = df.withColumn("Amount",df.Amount.cast(IntegerType()))

INC = df.groupBy("Party").count().filter(df.Party == 'INC').select("count").collect()
BJP = df.groupBy("Party").count().filter(df.Party == 'BJP').select("count").collect()
NCP = df.groupBy("Party").count().filter(df.Party == 'NCP').select("count").collect()
CPI = df.groupBy("Party").count().filter(df.Party == 'CPI').select("count").collect()
CPI_M = df.groupBy("Party").count().filter(df.Party == 'CPI(M)').select("count").collect()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
countList = ['INC_COUNT_LTD', 'BJP_COUNT_LTD', 'NCP_COUNT_LTD', 'CPI_COUNT_LTD', 'CPI_M_COUNT_LTD']
countValue = [INC[0][0], BJP[0][0], NCP[0][0], CPI[0][0], CPI_M[0][0]]

for i in range(0, len(countList)):
    df = df.withColumn(countList[i], lit(0))
    df = df.withColumn(countList[i], when(df["Party"].contains(partyList[i]), countValue[i]).otherwise(0))

df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+-----------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+-------------+---------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI_M_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI_M_COUNT_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+-----------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+-------------+---------------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|  

In [251]:
# Calculate aggregates per party - avg
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when

df = df.withColumn("Amount",df.Amount.cast(IntegerType()))

avg_inc_ltd = df.groupBy("Party").avg("Amount").filter(df.Party=="INC").select("avg(Amount)").collect()
avg_bjp_ltd = df.groupBy("Party").avg("Amount").filter(df.Party=="BJP").select("avg(Amount)").collect()
avg_ncp_ltd = df.groupBy("Party").avg("Amount").filter(df.Party=="NCP").select("avg(Amount)").collect()
avg_cpi_ltd = df.groupBy("Party").avg("Amount").filter(df.Party=="CPI").select("avg(Amount)").collect()
avg_cpim_ltd = df.groupBy("Party").avg("Amount").filter(df.Party=="CPI(M)").select("avg(Amount)").collect()

# df = df.withColumn("avg_INC_LTD", when(df.Party.contains("INC"),avg_inc_ltd).otherwise(0))
# df = df.withColumn("INC_avg_LTD",when((df.Party == "INC"),avg_inc_ltd[0][0]).otherwise(0))
# df = df.withColumn("BJP_avg_LTD",when((df.Party == "BJP"),avg_bjp_ltd[0][0]).otherwise(0))
# df = df.withColumn("NCP_avg_LTD",when((df.Party == "NCP"),avg_ncp_ltd[0][0]).otherwise(0))
# df = df.withColumn("CPI_avg_LTD",when((df.Party == "CPI"),avg_cpi_ltd[0][0]).otherwise(0))
# df = df.withColumn("CPI(M)_avg_LTD",when((df.Party == "CPI(M)"),avg_cpim_ltd[0][0]).otherwise(0)).show()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
avgList = ['INC_AVG_LTD', 'BJP_AVG_LTD', 'NCP_AVG_LTD', 'CPI_AVG_LTD', 'CPI_M_AVG_LTD']
avgValue = [avg_inc_ltd[0][0], avg_bjp_ltd[0][0], avg_ncp_ltd[0][0], avg_cpi_ltd[0][0], avg_cpim_ltd[0][0]]

for i in range(0,len(avgList)):
   df = df.withColumn(avgList[i],lit(0))
   df = df.withColumn(avgList[i], when(df.Party.contains(partyList[i]), avgValue[i]).otherwise(0))

df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+-----------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+-------------+---------------+------------------+------------------+-----------+------------------+-----------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI_M_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI_M_COUNT_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|    CPI_M_AVG_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+-----------+-----------+-----------+-----------+-------------+--

In [252]:
# Calculate aggregates per party - max
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when

df = df.withColumn("Amount",df.Amount.cast(IntegerType()))

max_inc_ltd = df.groupBy("Party").max("Amount").filter(df.Party=="INC").select("max(Amount)").collect()
max_bjp_ltd = df.groupBy("Party").max("Amount").filter(df.Party=="BJP").select("max(Amount)").collect()
max_ncp_ltd = df.groupBy("Party").max("Amount").filter(df.Party=="NCP").select("max(Amount)").collect()
max_cpi_ltd = df.groupBy("Party").max("Amount").filter(df.Party=="CPI").select("max(Amount)").collect()
max_cpim_ltd = df.groupBy("Party").max("Amount").filter(df.Party=="CPI(M)").select("max(Amount)").collect()

# df = df.withColumn("max_INC_LTD", when(df.Party.contains("INC"),max_inc_ltd).otherwise(0))
# df = df.withColumn("INC_max_LTD",when((df.Party == "INC"),max_inc_ltd[0][0]).otherwise(0))
# df = df.withColumn("BJP_max_LTD",when((df.Party == "BJP"),max_bjp_ltd[0][0]).otherwise(0))
# df = df.withColumn("NCP_max_LTD",when((df.Party == "NCP"),max_ncp_ltd[0][0]).otherwise(0))
# df = df.withColumn("CPI_max_LTD",when((df.Party == "CPI"),max_cpi_ltd[0][0]).otherwise(0))
# df = df.withColumn("CPI(M)_max_LTD",when((df.Party == "CPI(M)"),max_cpim_ltd[0][0]).otherwise(0)).show()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
maxList = ['INC_MAX_LTD', 'BJP_MAX_LTD', 'NCP_MAX_LTD', 'CPI_MAX_LTD', 'CPI_M_MAX_LTD']
maxValue = [max_inc_ltd[0][0], max_bjp_ltd[0][0], max_ncp_ltd[0][0], max_cpi_ltd[0][0], max_cpim_ltd[0][0]]

for i in range(0,len(maxList)):
   df = df.withColumn(maxList[i],lit(0))
   df = df.withColumn(maxList[i], when(df.Party.contains(partyList[i]), maxValue[i]).otherwise(0))

In [253]:
# Calculate top donor per party - BJP_TOP_DONOR
df = df.withColumn("BJP TOP DONOR", when((df.Amount==max_bjp_ltd[0][0]),df.Name).otherwise(0))
# df.show()

df = df.withColumn("CPI TOP DONOR", when((df.Amount==max_cpi_ltd[0][0]),df.Name).otherwise(0))
# df.show()

df = df.withColumn("INC TOP DONOR", when((df.Amount==max_inc_ltd[0][0]),df.Name).otherwise(0))
# df.show()

df = df.withColumn("NCP TOP DONOR", when((df.Amount==max_ncp_ltd[0][0]),df.Name).otherwise(0))
# df.show()

df = df.withColumn("CPI(M) TOP DONOR", when((df.Amount==max_cpim_ltd[0][0]),df.Name).otherwise(0))
df.show()


+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+-----------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+-------------+---------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+-------------+----------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI_M_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI_M_COUNT_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|    CPI_M_AVG_LTD|INC_MAX_LTD|BJP_MAX_LTD|NCP_MAX_LTD|CPI_MAX_LTD|CPI_M_MAX_LTD|BJP TOP DONOR|CPI TOP DONOR|INC 

In [263]:
bank_count_ltd = df.groupBy("mode_of_payment").count().filter(df.mode_of_payment == 'Bank').select("count").collect()
print(bank_count_ltd[0][0])
df = df.withColumn("BANK_COUNT_LTD", when(df.mode_of_payment.contains("Bank"),bank_count_ltd[0][0]).otherwise(0))

cheque_count_ltd = df.groupBy("mode_of_payment").count().filter(df.mode_of_payment == 'Cheque').select("count").collect()
print(cheque_count_ltd[0][0])
df = df.withColumn("CHEQUE_COUNT_LTD", when(df.mode_of_payment.contains("Cheque"),cheque_count_ltd[0][0]).otherwise(0))

cash_count_ltd = df.groupBy("mode_of_payment").count().filter(df.mode_of_payment == 'Cash').select("count").collect()
print(cash_count_ltd[0][0])
df = df.withColumn("CASH_COUNT_LTD", when(df.mode_of_payment.contains("Cash"),cash_count_ltd[0][0]).otherwise(0))

others_count_ltd = df.groupBy("mode_of_payment").count().filter(df.mode_of_payment == 'Others').select("count").collect()
print(others_count_ltd[0][0])
df = df.withColumn("OTHERS_COUNT_LTD", when(df.mode_of_payment.contains("Others"),others_count_ltd[0][0]).otherwise(0))

df.show()

7279
1993
803
3498
+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+-----------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+-------------+---------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+-------------+----------------+--------------+----------------+--------------+----------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI_M_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI_M_COUNT_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|    CPI_M_AVG_LTD|INC_MAX_LTD

In [266]:
sdon = df.groupBy("Party","fin_year").sum("Amount").sort("fin_year")
sdon.show()

for row in sdon.collect():
    name = row[1] + "_" + row[0] + "_" + "SUM"
    df = df.withColumn(name,lit('-'))
    df = df.withColumn(name, when(df.Party == row[0], row[2])
    .otherwise('-'))

df.show()

+------+--------+-----------+
| Party|fin_year|sum(Amount)|
+------+--------+-----------+
|   INC| 2003-04|   28301101|
|CPI(M)| 2003-04|     200000|
|   BJP| 2003-04|  116881973|
|   CPI| 2003-04|     779148|
|   INC| 2004-05|  320555643|
|CPI(M)| 2004-05|     896355|
|   CPI| 2004-05|     630000|
|   BJP| 2004-05|  339521289|
|   CPI| 2005-06|    3988690|
|   BJP| 2005-06|   36156111|
|CPI(M)| 2005-06|     550000|
|   INC| 2005-06|   59212492|
|CPI(M)| 2006-07|    1124719|
|   BJP| 2006-07|   29550672|
|   INC| 2006-07|  121273513|
|   CPI| 2006-07|    1229400|
|   CPI| 2007-08|    4125800|
|CPI(M)| 2007-08|    7226116|
|   BJP| 2007-08|  249623653|
|   NCP| 2007-08|   10225000|
+------+--------+-----------+
only showing top 20 rows

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+-----------+-----------+-----------+-----------+-------------+-------------+-------------+