In [1]:
import pyspark


In [2]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

In [3]:
import json

In [4]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:

# Read JSON file into dataframe
df = spark.read.json("donation_np.json")
df.printSchema()
df.show()
# remove non-essential attribute
df = df.na.drop(subset=["Name"])
df.show()


root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+---------

In [6]:
#change in column names
df = df.withColumnRenamed("Contribution Mode","mode_of_payment").withColumnRenamed("Financial Year","fin_year").withColumnRenamed("PAN Given","pan_given")
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|3, Motilal Nehru ...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |    

In [7]:
import hashlib

In [8]:
#encrypt address column
from pyspark.sql.functions import sha2, concat_ws
df = df.withColumn("Address", sha2(concat_ws("||", df.Address), 256))


In [9]:
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |    

In [10]:
df = df.drop("_corrupt_record","field10","field11","field12","field13","field14","field9")

In [11]:
df.show()

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|Through Bank Tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|Through Bank Tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|          146865 SBI| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|      

In [12]:
#categorize mode_of_payment into 4 categories
from pyspark.sql.functions import when
df2 = df.withColumn("mode_of_payment",
                                 when(df.mode_of_payment.like("%D"),"others")\
                                 .when(df.mode_of_payment.like("%CASH"),"Cash")\
                                 .when(df.mode_of_payment.like("%Bank%"),"Bank")\
                                .otherwise("Cheque"))
df2.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|           Bank| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|           Bank| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|           Bank| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|           Bank| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|         Cheque| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|         Cheque| 2011-12|   Uma Shankar Gupta|     

In [13]:
from pyspark.sql.functions import sum

In [14]:
#typecasting string to integer
from pyspark.sql.types import IntegerType
df2 = df2.withColumn("Amount", df2["Amount"].cast(IntegerType()))

In [15]:
df2.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|           Bank| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|           Bank| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|           Bank| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|           Bank| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|         Cheque| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|         Cheque| 2011-12|   Uma Shankar Gupta|     

In [16]:
df2.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Amount: integer (nullable = true)
 |-- mode_of_payment: string (nullable = false)
 |-- fin_year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- pan_given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)



In [17]:
cf = df2.groupBy("Party").sum("Amount")
cf.show()

+------+-----------+
| Party|sum(Amount)|
+------+-----------+
|   INC| 4031487349|
|   BJP| 9295525996|
|   NCP|  647911419|
|   CPI|   68123698|
|CPI(M)|  150622128|
+------+-----------+



In [18]:
#	Calculate aggregates per party - SUM
from pyspark.sql.functions import col,lit

sum_inc_ltd = df2.groupBy("Party").sum("Amount").filter(df.Party=="INC").select("sum(Amount)").collect()
sum_bjp_ltd = df2.groupBy("Party").sum("Amount").filter(df.Party=="BJP").select("sum(Amount)").collect()
sum_ncp_ltd = df2.groupBy("Party").sum("Amount").filter(df.Party=="NCP").select("sum(Amount)").collect()
sum_cpi_ltd = df2.groupBy("Party").sum("Amount").filter(df.Party=="CPI").select("sum(Amount)").collect()
sum_cpim_ltd = df2.groupBy("Party").sum("Amount").filter(df.Party=="CPI(M)").select("sum(Amount)").collect()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
sumList = ['INC_SUM_LTD', 'BJP_SUM_LTD', 'NCP_SUM_LTD', 'CPI_SUM_LTD', 'CPI(M)_SUM_LTD']
sumValue = [sum_inc_ltd[0][0], sum_bjp_ltd[0][0], sum_ncp_ltd[0][0], sum_cpi_ltd[0][0], sum_cpim_ltd[0][0]]

for i in range(0, len(sumList)):

    df2 = df2.withColumn(sumList[i], lit(0))
    df2 = df2.withColumn(sumList[i], when(df2["Party"].contains(partyList[i]), sumValue[i]).otherwise(0))




In [19]:
"""df2 = df2.withColumn("sum_inc_ltd",when((df.Party == "INC"),sum_inc_ltd[0][0]).otherwise(0))
df2 = df2.withColumn("sum_bjp_ltd",when((df.Party == "BJP"),sum_bjp_ltd[0][0]).otherwise(0))
df2 = df2.withColumn("sum_ncp_ltd",when((df.Party == "NCP"),sum_ncp_ltd[0][0]).otherwise(0))
df2 = df2.withColumn("sum_cpi_ltd",when((df.Party == "CPI"),sum_cpi_ltd[0][0]).otherwise(0))
df2 = df2.withColumn("sum_cpim_ltd",when((df.Party == "CPIM"),sum_cpim_ltd[0][0]).otherwise(0))
df2.show()"""

'df2 = df2.withColumn("sum_inc_ltd",when((df.Party == "INC"),sum_inc_ltd[0][0]).otherwise(0))\ndf2 = df2.withColumn("sum_bjp_ltd",when((df.Party == "BJP"),sum_bjp_ltd[0][0]).otherwise(0))\ndf2 = df2.withColumn("sum_ncp_ltd",when((df.Party == "NCP"),sum_ncp_ltd[0][0]).otherwise(0))\ndf2 = df2.withColumn("sum_cpi_ltd",when((df.Party == "CPI"),sum_cpi_ltd[0][0]).otherwise(0))\ndf2 = df2.withColumn("sum_cpim_ltd",when((df.Party == "CPIM"),sum_cpim_ltd[0][0]).otherwise(0))\ndf2.show()'

In [20]:
"""count_inc_ltd = df2.groupBy("Party").count("Party").filter(df.Party=="INC").select("sum(Amount)").collect()
count_bjp_ltd = df2.groupBy("Party").count("Party").filter(df.Party=="BJP").select("sum(Amount)").collect()
count_ncp_ltd = df2.groupBy("Party").count("Party").filter(df.Party=="NCP").select("sum(Amount)").collect()
count_cpi_ltd = df2.groupBy("Party").count("Party").filter(df.Party=="CPI").select("sum(Amount)").collect()
count_cpim_ltd = df2.groupBy("Party").count("Party").filter(df.Party=="CPI(M)").select("sum(Amount)").collect()

df2 = df2.withColumn("INC_COUNT_LTD",when((df.Party == "INC"),count_inc_ltd[0][0]).otherwise(0))
df2 = df2.withColumn("BJP_COUNT_LTD",when((df.Party == "BJP"),count_bjp_ltd[0][0]).otherwise(0))
df2 = df2.withColumn("NCP_COUNT_LTD",when((df.Party == "NCP"),count_ncp_ltd[0][0]).otherwise(0))
df2 = df2.withColumn("CPI_COUNT_LTD",when((df.Party == "CPI"),count_cpi_ltd[0][0]).otherwise(0))
df2 = df2.withColumn("CPI(M)_COUNT_LTD",when((df.Party == "CPIM"),count_cpim_ltd[0][0]).otherwise(0))
df2.show() """
#•	Calculate aggregates per party - COUNT
from pyspark.sql.functions import col,lit

INC = df2.groupBy("Party").count().filter(df.Party=="INC").select("count").collect()
BJP = df2.groupBy("Party").count().filter(df.Party=="BJP").select("count").collect()
NCP = df2.groupBy("Party").count().filter(df.Party=="NCP").select("count").collect()
CPI = df2.groupBy("Party").count().filter(df.Party=="CPI").select("count").collect()
CPI_M = df2.groupBy("Party").count().filter(df.Party=="CPI(M)").select("count").collect()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
countList = ['INC_COUNT_LTD', 'BJP_COUNT_LTD', 'NCP_COUNT_LTD', 'CPI_COUNT_LTD', 'CPI_M_COUNT_LTD']
countValue = [INC[0][0], BJP[0][0],NCP[0][0], CPI[0][0], CPI_M[0][0]]

for i in range(0, len(countList)):

    df2 = df2.withColumn(countList[i], lit(0))
    df2 = df2.withColumn(countList[i], when(df2["Party"].contains(partyList[i]), countValue[i]).otherwise(0))



In [21]:

#	Calculate aggregates per party - AVERAGE
from pyspark.sql.functions import col,lit

avg_inc_ltd = df2.groupBy("Party").avg("Amount").filter(df.Party=="INC").select("avg(Amount)").collect()
avg_bjp_ltd = df2.groupBy("Party").avg("Amount").filter(df.Party=="BJP").select("avg(Amount)").collect()
avg_ncp_ltd = df2.groupBy("Party").avg("Amount").filter(df.Party=="NCP").select("avg(Amount)").collect()
avg_cpi_ltd = df2.groupBy("Party").avg("Amount").filter(df.Party=="CPI").select("avg(Amount)").collect()
avg_cpim_ltd = df2.groupBy("Party").avg("Amount").filter(df.Party=="CPI(M)").select("avg(Amount)").collect()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
avgList = ['INC_AVG_LTD', 'BJP_AVG_LTD', 'NCP_AVG_LTD', 'CPI_AVG_LTD', 'CPI(M)_AVG_LTD']
avgValue = [avg_inc_ltd[0][0], avg_bjp_ltd[0][0], avg_ncp_ltd[0][0], avg_cpi_ltd[0][0], avg_cpim_ltd[0][0]]

for i in range(0, len(avgList)):

    df2 = df2.withColumn(avgList[i], lit(0))
    df2 = df2.withColumn(avgList[i], when(df2["Party"].contains(partyList[i]), avgValue[i]).otherwise(0))

In [22]:
#•	Calculate aggregates per party - MAX
from pyspark.sql.functions import col,lit

max_inc_ltd = df2.groupBy("Party").max("Amount").filter(df.Party=="INC").select("max(Amount)").collect()
max_bjp_ltd = df2.groupBy("Party").max("Amount").filter(df.Party=="BJP").select("max(Amount)").collect()
max_ncp_ltd = df2.groupBy("Party").max("Amount").filter(df.Party=="NCP").select("max(Amount)").collect()
max_cpi_ltd = df2.groupBy("Party").max("Amount").filter(df.Party=="CPI").select("max(Amount)").collect()
max_cpim_ltd = df2.groupBy("Party").max("Amount").filter(df.Party=="CPI(M)").select("max(Amount)").collect()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
maxList = ['INC_MAX_LTD', 'BJP_MAX_LTD', 'NCP_MAX_LTD', 'CPI_MAX_LTD', 'CPI(M)_MAX_LTD']
maxValue = [max_inc_ltd[0][0], max_bjp_ltd[0][0], max_ncp_ltd[0][0], max_cpi_ltd[0][0], max_cpim_ltd[0][0]]

for i in range(0, len(avgList)):

    df2 = df2.withColumn(maxList[i], lit(0))
    df2 = df2.withColumn(maxList[i], when(df2["Party"].contains(partyList[i]), maxValue[i]).otherwise(0))

In [29]:
df2 = df2.withColumn("BJP_TOP_DONOR",when((df2.Amount==max_bjp_ltd[0][0]),df2.Name).otherwise(0))

df2 = df2.withColumn("CPI_TOP_DONOR",when((df2.Amount==max_cpi_ltd[0][0]),df2.Name).otherwise(0))

df2 = df2.withColumn("INC_TOP_DONOR",when((df2.Amount==max_inc_ltd[0][0]),df2.Name).otherwise(0))

df2 = df2.withColumn("NCP_TOP_DONOR",when((df2.Amount==max_ncp_ltd[0][0]),df2.Name).otherwise(0))

df2 = df2.withColumn("CPI(M)_TOP_DONOR",when((df2.Amount==max_cpim_ltd[0][0]),df2.Name).otherwise(0))

df2.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+---------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+--------------+-------------+--------------+----------------+--------------+---------------+---------------+---------------+------------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+---------------+----

In [24]:
bank_count_ltd = df2.groupBy("mode_of_payment").count().filter(df2.mode_of_payment == 'Bank').select("count").collect()
df2 = df2.withColumn("BANK_COUNT_LTD", when(df2.mode_of_payment.contains("Bank"),bank_count_ltd[0][0]).otherwise(0))


cheque_count_ltd = df2.groupBy("mode_of_payment").count().filter(df2.mode_of_payment == 'Cheque').select("count").collect()
df2 = df2.withColumn("CHEQUE_COUNT_LTD", when(df2.mode_of_payment.contains("Cheque"),cheque_count_ltd[0][0]).otherwise(0))


cash_count_ltd = df2.groupBy("mode_of_payment").count().filter(df2.mode_of_payment == 'Bank').select("count").collect()
df2 = df2.withColumn("CASH_COUNT_LTD", when(df2.mode_of_payment.contains("Cash"),cash_count_ltd[0][0]).otherwise(0))


others_count_ltd = df2.groupBy("mode_of_payment").count().filter(df2.mode_of_payment == 'Bank').select("count").collect()
df2 = df2.withColumn("BANK_COUNT_LTD", when(df2.mode_of_payment.contains("Others"),others_count_ltd[0][0]).otherwise(0))
df2.show()



+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+---------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+--------------+-------------+--------------+----------------+--------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI_M_COUNT_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|   CPI(M)_AVG_LTD|INC_MAX_LTD|BJP_MAX_LTD|NCP_MAX_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|BJP_TOP_DONOR|BANK_COUNT_LTD|CHEQUE_COUNT_LTD|CASH_COUNT_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+---

In [25]:
df2.select(sum("Amount"))

DataFrame[sum(Amount): bigint]

In [26]:
df2.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+-----------+-----------+--------------+-------------+-------------+-------------+-------------+---------------+------------------+------------------+-----------+------------------+-----------------+-----------+-----------+-----------+-----------+--------------+-------------+--------------+----------------+--------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|INC_SUM_LTD|BJP_SUM_LTD|NCP_SUM_LTD|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|NCP_COUNT_LTD|CPI_COUNT_LTD|CPI_M_COUNT_LTD|       INC_AVG_LTD|       BJP_AVG_LTD|NCP_AVG_LTD|       CPI_AVG_LTD|   CPI(M)_AVG_LTD|INC_MAX_LTD|BJP_MAX_LTD|NCP_MAX_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|BJP_TOP_DONOR|BANK_COUNT_LTD|CHEQUE_COUNT_LTD|CASH_COUNT_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+---

In [27]:
sdon = df2.groupBy("Party","fin_year").sum("Amount").sort("fin_year")  
sdon.show() 

for row in sdon.collect():  
    name = row[1] + "_" + row[0] + "_" + "SUM"  
    df2 = df2.withColumn(name,lit('-'))  
    df2 = df2.withColumn(name, when(col("Party") == row[0], row[2])  
    .otherwise('-'))  




+------+--------+-----------+
| Party|fin_year|sum(Amount)|
+------+--------+-----------+
|   INC| 2003-04|   28301101|
|CPI(M)| 2003-04|     200000|
|   BJP| 2003-04|  116881973|
|   CPI| 2003-04|     779148|
|   INC| 2004-05|  320555643|
|CPI(M)| 2004-05|     896355|
|   CPI| 2004-05|     630000|
|   BJP| 2004-05|  339521289|
|   CPI| 2005-06|    3988690|
|   BJP| 2005-06|   36156111|
|CPI(M)| 2005-06|     550000|
|   INC| 2005-06|   59212492|
|CPI(M)| 2006-07|    1124719|
|   BJP| 2006-07|   29550672|
|   INC| 2006-07|  121273513|
|   CPI| 2006-07|    1229400|
|   CPI| 2007-08|    4125800|
|CPI(M)| 2007-08|    7226116|
|   BJP| 2007-08|  249623653|
|   NCP| 2007-08|   10225000|
+------+--------+-----------+
only showing top 20 rows



In [28]:
df2.write.parquet("sparkfile.parquet")

AnalysisException: path file:/c:/Users/asawan/Documents/spark/sparkfile.parquet already exists.