In [1]:
import pyspark as spark
import json
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.json("donation_np.json").limit(1000)
df.printSchema()


# df.show()


root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)



In [2]:
#Renaming the columns

df = df.withColumnRenamed("Contribution Mode","mode_of_payment").withColumnRenamed("Financial Year","fin_year").withColumnRenamed("PAN Given","pan_given")
df.printSchema()



root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- mode_of_payment: string (nullable = true)
 |-- fin_year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- pan_given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)



In [3]:
#Encryting the columns

from pyspark import SparkConf, SparkContext, SQLContext

import hashlib

def encrypt_address(Address):
    sha_value = hashlib.sha256(Address.encode()).hexdigest()
    return sha_value

In [4]:
from pyspark.sql.functions import sha2, concat_ws
df = df.withColumn("Address", sha2(concat_ws("||",df.Address),256))

# df.show(truncate=False)



In [5]:
df = df.na.drop(subset = ["Name"])
# df.show()

In [6]:
#categorizing 
from pyspark.sql.functions import when,col

df = df.withColumn(
    "mode_of_payment",
    when(col("mode_of_payment").like("%CASH%"),"CASH")\
    .when(col("mode_of_payment").like("%Bank%"),"BANK")\
    .when(col("mode_of_payment").like("Ch%"),"CHEQUE").otherwise("Other")
)



In [7]:
#Typecasting amount to int
from pyspark.sql.types import IntegerType


df = df.withColumn("Amount",df.Amount.cast(IntegerType()))




In [8]:
from pyspark.sql.functions import *

#SUM
CPI_sum = df.where(df.Party == "CPI").select(sum("Amount")).first()["sum(Amount)"]
BJP_sum = df.where(df.Party == "BJP").select(sum("Amount")).first()["sum(Amount)"]
INC_sum = df.where(df.Party == "INC").select(sum("Amount")).first()["sum(Amount)"]
NCP_sum = df.where(df.Party == "NCP").select(sum("Amount")).first()["sum(Amount)"]
CPIM_sum = df.where(df.Party == "CPI(M)").select(sum("Amount")).first()["sum(Amount)"]

df = df.withColumn("INC_CPI_SUM",when(df.Party == "CPI",CPI_sum).otherwise(0))\
.withColumn("INC_BJP_SUM",when(df.Party == "BJP",BJP_sum).otherwise(0))\
.withColumn("INC_INC_SUM",when(df.Party == "INC",INC_sum).otherwise(0))\
.withColumn("INC_NCP_SUM",when(df.Party == "NCP",NCP_sum).otherwise(0))\
.withColumn("INC_NCP_SUM",when(df.Party == "CPI(M)",CPIM_sum).otherwise(0))




# #AVERAGE
# CPI_avg = df.where(df.Party == "CPI").select(avg("Amount")).collect()
# BJP_avg = df.where(df.Party == "BJP").select(avg("Amount")).collect()
# INC_avg = df.where(df.Party == "INC").select(avg("Amount")).collect()
# NCP_avg = df.where(df.Party == "NCP").select(avg("Amount")).collect()
# CPIM_avg = df.where(df.Party == "CPI(M)").select(avg("Amount")).collect()
# print(CPI_avg)
# print(BJP_avg)
# print(INC_avg)
# print(NCP_avg)
# print(CPIM_avg)

# #MAX

#
# print(df.where(df.Party == "CPI").select(max("Amount")).collect())
# print(df.where(df.Party == "BJP").select(max("Amount")).collect())
# print(df.where(df.Party == "INC").select(max("Amount")).collect())
# print(df.where(df.Party == "NCP").select(max("Amount")).collect())
# print(df.where(df.Party == "CPI(M)").select(max("Amount")).collect())


In [9]:

#COUNT

CPI_count = df.where(df.Party == "CPI").count()
BJP_count = df.where(df.Party == "BJP").count()
INC_count = df.where(df.Party == "INC").count()
NCP_count = df.where(df.Party == "NCP").count()
CPIM_count = df.where(df.Party == "CPI(M)").count()
df = df.withColumn("INC_CPI_count",when(df.Party == "CPI",CPI_count).otherwise(0))\
.withColumn("INC_BJP_count",when(df.Party == "BJP",BJP_count).otherwise(0))\
.withColumn("INC_INC_count",when(df.Party == "INC",INC_count).otherwise(0))\
.withColumn("INC_NCP_count",when(df.Party == "NCP",NCP_count).otherwise(0))\
.withColumn("INC_NCP_count",when(df.Party == "CPI(M)",CPIM_count).otherwise(0))

In [10]:
aggregate_values = df.groupby("Party").agg(max("Amount"),count("Amount"),sum("Amount"),avg("Amount"))

In [14]:
top_donor_bjp = df.filter((df.Party == "BJP") & (df.Amount == aggregate_values.collect()[3][1]))
top_donor_inc = df.filter((df.Party == "INC") & (df.Amount == aggregate_values.collect()[2][1]))
top_donor_ncp = df.filter((df.Party == "NCP") & (df.Amount == aggregate_values.collect()[4][1]))
top_donor_cpi = df.filter((df.Party == "CPI") & (df.Amount == aggregate_values.collect()[0][1]))
top_donor_cpim = df.filter((df.Party == "CPI(M)") & (df.Amount == aggregate_values.collect()[1][1]))
print(top_donor_bjp.first()["Name"])
print(top_donor_inc.first()["Name"])
print(top_donor_ncp.first()["Name"])
print(top_donor_cpi.first()["Name"])
print(top_donor_cpim.first()["Name"])

party_fin_year = df.groupBy("Party","fin_year").agg(sum("Amount"))

# 





Ashok Commercial Enterprises
A.V.Patil Foundation
A 2 Z online  Services Pvt.Ltd
Aziz Pasha
V K Ramachandran


In [16]:


for i,j,k in party_fin_year.select("Party","fin_year","sum(Amount)").collect():
     df = df.withColumn("{}_{}_SUM".format(j,i),when( (df.Party == "{}".format(i)) & (df.fin_year == "{}".format(j)) ,lit(k) ).otherwise(lit(0)))
    
# df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+-----------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+---------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+------------------+------------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+------------------+---------------+------------------+----

In [17]:
df.groupBy('mode_of_payment').agg(count("mode_of_payment")).show()

+---------------+----------------------+
|mode_of_payment|count(mode_of_payment)|
+---------------+----------------------+
|           CASH|                     5|
|           BANK|                   573|
|          Other|                   322|
|         CHEQUE|                    91|
+---------------+----------------------+

