In [2]:
pip install pyspark


[notice] A new release of pip available: 22.1.2 -> 22.2
[notice] To update, run: python.exe -m pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.


In [94]:
import pyspark
from pyspark.sql.functions import hash,lower,col,lit,when,max
from pyspark.sql import SparkSession

In [95]:
# Creating a spark session
spark = SparkSession.builder.getOrCreate()

In [96]:
# Reading Json File
donation_df = spark.read.option("multiline","true") \
      .json("donation_np.json")
      
donation_df.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)



In [97]:
# Converting the JSON File to Pandas DataFrame
donation_df.toPandas()

Unnamed: 0,Address,Amount,Contribution Mode,Financial Year,Name,PAN Given,Party,Type,field10,field11,field12,field13,field14,field9
0,"16-B, Ferozeshah Road New Delhi-1",3000000,CASH,2010-11,Aziz Pasha,Y,CPI,Others,,,,,,
1,"No.1, First Floor Main Road Domlur Second Stag...",10000000,"000037, HDFC Bank Bengaluru, Dt. 16.03.2015, 5...",2014-15,V K Ramachandran,Y,CPI(M),Others,,,,,,
2,"3, Motilal Nehru Place New Delhi",108000,"Cheque, State Bank of India Parliament House N...",2014-15,Dr. Manmohan Singh,N,INC,Others,,,,,,
3,"9,Firozshah Road New Delhi",54000,Through Bank Transfer,2011-12,Dr. Manda Jagnathan,N,INC,Others,,,,,,
4,"17,Dr.B.R.Mehta Lane New Delhi",54000,Through Bank Transfer,2011-12,Prof. K.V.Thomas,N,INC,Others,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
13568,"Zydus Tower, Satellite Cross Road, Ahmedabad-3...",1000000,"854, HDFC Bank",2014-15,Zydus Healthcare,Y,BJP,Others,,,,,,
13569,"Zydaus Towere, Satellite Cross Roads, Ahmedaba...",5000000,250506BANK OF BARODA,2010-11,Zydus Wellness Limited,Y,BJP,Others,,,,,,
13570,"Zydus Tower, Satellite Cross Roads, Ahmedabad",5000000,250507BANK OF BARODA,2010-11,Zydus Wellness Limited,Y,BJP,Others,,,,,,
13571,"Zydus Wellneww Ltd, Zydus Tower Satellite Cros...",10000000,"901983, Bank of Baroda",2012-13,Zydus Wellness Limited,Y,BJP,Others,,,,,,


In [98]:
# Renaming Columns
donation_df = donation_df.withColumnRenamed("Contribution Mode","mode_of_payment").withColumnRenamed("Financial Year","fin_year").withColumnRenamed("PAN Given", "pan_given")

In [99]:
donation_df.columns

['Address',
 'Amount',
 'mode_of_payment',
 'fin_year',
 'Name',
 'pan_given',
 'Party',
 'Type',
 'field10',
 'field11',
 'field12',
 'field13',
 'field14',
 'field9']

In [100]:
# Encryting the Address Field
donation_df = donation_df.withColumn('Address', hash('Address'))

In [101]:
# Transforming the mode_of_payment to lower case 
donation_df = donation_df.withColumn('mode_of_payment', lower(col('mode_of_payment')))

In [102]:
# Transforming into Categorical Data
donation_df = donation_df.withColumn( "mode_of_payment", when(col("mode_of_payment").contains("cash"),"Cash").when(col("mode_of_payment").contains("cheque"),"Cheque").when(col("mode_of_payment").contains("bank"),"Bank").otherwise("Others"))

In [103]:
donation_df.show(10)

+-----------+--------+---------------+--------+--------------------+---------+------+------+-------+-------+-------+-------+-------+------+
|    Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|field10|field11|field12|field13|field14|field9|
+-----------+--------+---------------+--------+--------------------+---------+------+------+-------+-------+-------+-------+-------+------+
| 1308690849| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|       |       |       |       |       |      |
|  244956150|10000000|           Bank| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|       |       |       |       |       |      |
|  -18372129|  108000|         Cheque| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|       |       |       |       |       |      |
| 2092735068|   54000|           Bank| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|       |       |       |       |       |      |
|-1762953337|   5400

In [104]:
donation_df = donation_df.drop('_corrupt_record', 'field9', 'field10', 'field11', 'field12', 'field13', 'field14')

Aggregates per party 

In [105]:
from pyspark.sql.types import IntegerType

# Typecasting Amount field
donation_df = donation_df.withColumn("Amount",donation_df.Amount.cast(IntegerType()))

In [106]:
# Calculating Sum Amount for each party
donation_df_sum = donation_df.groupBy('Party').sum('Amount')

In [107]:
# Creating Sum column for each party
for sum in donation_df_sum.collect():
    Party, Amount = sum["Party"], sum["sum(Amount)"]
    donation_df = donation_df.withColumn(Party+"_SUM_LTD", when(donation_df.Party.contains(Party), Amount).otherwise(0))

In [108]:
# Calculating Count of Each party
donation_df_count = donation_df.groupBy('Party').count()

In [109]:
for count in donation_df_count.collect():
    Party, Amount = count["Party"], count["count"]
    donation_df = donation_df.withColumn(Party+"_COUNT_LTD", when(donation_df.Party.contains(Party), Amount).otherwise(0))

In [110]:
donation_df_Avg = donation_df.groupBy('Party').avg('Amount')

In [111]:
for avg in donation_df_Avg.collect():
    Party, Amount = avg["Party"], avg["avg(Amount)"]
    donation_df = donation_df.withColumn(Party+"_AVG_LTD", when(donation_df.Party.contains(Party), Amount).otherwise(0))

In [112]:
donation_df_Max = donation_df.groupBy('Party').max('Amount')

In [113]:
for Max in donation_df_Max.collect():
    Party, Amount = Max["Party"], Max["max(Amount)"]
    donation_df = donation_df.withColumn(Party+"_MAX_LTD", when(donation_df.Party.contains(Party), Amount).otherwise(0))

Top donor per party 

In [114]:
donation_df_MaxDonors = donation_df[donation_df.Amount.isin([donor[0] for donor in donation_df.groupBy("Party").agg(max("Amount")).select("max(Amount)").collect()])].select("Name", "Party")

In [115]:
donation_df_MaxDonors.collect()[0]["Name"]

'Aziz Pasha'

In [116]:
for donor in donation_df_MaxDonors.collect():
    donation_df = donation_df.withColumn(donor["Party"]+"_TOP_DONOR", when(col("Party").contains(donor["Party"]), donor["Name"]).otherwise("---"))

Donations in a financial year for each party

In [77]:
grouped_multiple = donation_df.groupby(['Party', 'fin_year']).sum('Amount').collect()

In [117]:
for year in grouped_multiple:
    donation_df = donation_df.withColumn(year["fin_year"] + "_"+year["Party"]+"_SUM",when(col("fin_year").contains(year["fin_year"]) & col("Party").contains(year["Party"]),lit(year["sum(Amount)"])).otherwise("---"))

Number of donation till date per mode_of_payment 

In [118]:
donation_df_mode = donation_df.groupBy('mode_of_payment').count().show()

+---------------+-----+
|mode_of_payment|count|
+---------------+-----+
|           Bank| 7788|
|         Cheque| 2080|
|           Cash|  918|
|         Others| 2787|
+---------------+-----+



In [119]:
for d in donation_df.groupBy("Party").count().collect():
    donation_df = donation_df.withColumn(d["Party"] + "_COUNT_LTD", when(col("Party").contains(d["Party"]), d["count"]).otherwise(0))

In [127]:
donation_df.toPandas().to_csv('mycsv.csv')

Writing Using a parquet File

In [125]:
pip install pyarrow

Collecting pyarrow
  Downloading pyarrow-8.0.0-cp39-cp39-win_amd64.whl (17.9 MB)
     ---------------------------------------- 17.9/17.9 MB 1.2 MB/s eta 0:00:00
Installing collected packages: pyarrow
Successfully installed pyarrow-8.0.0

[notice] A new release of pip available: 22.1.2 -> 22.2
[notice] To update, run: python.exe -m pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.


In [128]:
import pyarrow.csv as pv
import pyarrow.parquet as pq

In [130]:
table = pv.read_csv('mycsv.csv')
pq.write_table(table, 'donation.parquet')