In [1]:
import numpy as np  

In [2]:
import pandas as pd

In [4]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 52.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=824b1ab7bc69b003a94a3dfad1cf610313f8c1fe97690e7f6124f8c91fd6d499
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [5]:
import pyspark

In [6]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.getOrCreate()

In [8]:
data = spark.read.json('donation_np.json').limit(100)

In [9]:
data.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)



In [10]:
data = data.withColumnRenamed('Contribution Mode','mode_of_payment')

In [11]:
data = data.withColumnRenamed('Financial Year','fin_year')

In [12]:
data = data.withColumnRenamed('PAN Given','pan_given')

In [13]:
data.columns

['Address',
 'Amount',
 'mode_of_payment',
 'fin_year',
 'Name',
 'pan_given',
 'Party',
 'Type',
 '_corrupt_record',
 'field10',
 'field11',
 'field12',
 'field13',
 'field14',
 'field9']

In [14]:
from pyspark.sql.functions import col
data = data.where(col("Amount").isNotNull())

In [15]:
type(data)

pyspark.sql.dataframe.DataFrame

In [16]:
from pyspark.sql.functions import *
# data = data.withColumn("Address", hash("Address"))

In [17]:
data = data.withColumn('Address',md5(col('Address')))

In [18]:
data.select("Address").show(10)

+--------------------+
|             Address|
+--------------------+
|44bca5383d6dae229...|
|3d5fc135c2defdd47...|
|425cb8edfacd38d97...|
|39532b270ae265a0e...|
|e5eaf382bb92f67dd...|
|aebd8d41127096039...|
|a6bdeec2ba637e946...|
|cfab1ba8c67c7c838...|
|5ccb472bc1ee506b7...|
|58c899d47e71fefc0...|
+--------------------+
only showing top 10 rows



In [19]:
data.select("mode_of_payment").show(10)

+--------------------+
|     mode_of_payment|
+--------------------+
|                CASH|
|000037, HDFC Bank...|
|Cheque, State Ban...|
|Through Bank Tran...|
|Through Bank Tran...|
|          146865 SBI|
|          994966 SBI|
|14021,  Union Ban...|
|          168278 SBI|
|       Bank Transfer|
+--------------------+
only showing top 10 rows



In [20]:
from pyspark.sql.functions import when

In [21]:
data = data.withColumn("mode_of_payment",when(lower(col("mode_of_payment")).contains("cash"),"Cash").when(lower(col("mode_of_payment")).contains("cheque"),"Cheque").when(lower(col("mode_of_payment")).contains("bank"),"Bank").otherwise("Others"))

In [22]:
data.select("mode_of_payment").show(100)

+---------------+
|mode_of_payment|
+---------------+
|           Cash|
|           Bank|
|         Cheque|
|           Bank|
|           Bank|
|         Others|
|         Others|
|           Bank|
|         Others|
|           Bank|
|           Bank|
|         Others|
|         Cheque|
|         Cheque|
|         Cheque|
|           Bank|
|           Bank|
|           Bank|
|         Others|
|           Bank|
|           Bank|
|           Bank|
|         Cheque|
|           Bank|
|           Bank|
|           Bank|
|         Others|
|           Cash|
|           Bank|
|           Bank|
|           Bank|
|           Bank|
|           Bank|
|           Bank|
|           Bank|
|           Bank|
|           Bank|
|         Others|
|           Bank|
|           Bank|
|         Others|
|         Others|
|           Bank|
|           Bank|
|         Others|
|           Cash|
|           Cash|
|           Cash|
|           Cash|
|           Cash|
|           Cash|
|           Cash|
|         

In [23]:
data=data.drop('_corrupt_record','field10','field11','field12','field13','field14','field9')

In [24]:
from pyspark.sql.types import IntegerType

In [25]:
data = data.withColumn("Amount",data["Amount"].cast(IntegerType()))

In [26]:
grouped_data = data.groupBy("Party").sum("Amount")

In [27]:
for k in grouped_data.collect():
    party,amount = k["Party"],k["sum(Amount)"]
    data = data.withColumn(party+"_SUM_LTD",when(data.Party.contains(party),amount).otherwise(0))

In [28]:
data.select("BJP_SUM_LTD").show()

+-----------+
|BJP_SUM_LTD|
+-----------+
|          0|
|          0|
|          0|
|          0|
|          0|
|   12602000|
|   12602000|
|   12602000|
|   12602000|
|          0|
|   12602000|
|   12602000|
|   12602000|
|   12602000|
|   12602000|
|   12602000|
|   12602000|
|   12602000|
|   12602000|
|          0|
+-----------+
only showing top 20 rows



In [29]:
data.select("CPI_SUM_LTD").show()

+-----------+
|CPI_SUM_LTD|
+-----------+
|   15374024|
|   15374024|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
+-----------+
only showing top 20 rows



In [30]:
[d[0] for d in data.groupBy("Party").agg(max("Amount")).select("max(Amount)").collect()]

[3000000, 10000000, 108000, 8400000]

In [31]:
max_donors = data[data.Amount.isin([d[0] for d in data.groupBy("Party").agg(max("Amount")).select("max(Amount)").collect()])].select("Name","Party")

In [32]:
max_donors.collect()[0]["Name"]

'Aziz Pasha'

In [33]:
for m in max_donors.collect():

    data = data.withColumn(m["Party"]+"_TOP_DONOR",when(col("Party").contains(m["Party"]),m["Name"]).otherwise("---"))

In [34]:
data.columns

['Address',
 'Amount',
 'mode_of_payment',
 'fin_year',
 'Name',
 'pan_given',
 'Party',
 'Type',
 'CPI_SUM_LTD',
 'CPI(M)_SUM_LTD',
 'INC_SUM_LTD',
 'BJP_SUM_LTD',
 'CPI_TOP_DONOR',
 'CPI(M)_TOP_DONOR',
 'INC_TOP_DONOR',
 'BJP_TOP_DONOR']

In [35]:
data.select("CPI_TOP_DONOR").show()
    

+-------------+
|CPI_TOP_DONOR|
+-------------+
|   Aziz Pasha|
|   Aziz Pasha|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
|          ---|
+-------------+
only showing top 20 rows



In [36]:
per_fin_year = data.groupBy("Party","fin_year").sum("Amount").collect()

In [37]:
per_fin_year[0]

Row(Party='CPI', fin_year='2010-11', sum(Amount)=5240000)

In [38]:
for p in per_fin_year:
    data = data.withColumn(p["fin_year"]+"_"+p["Party"]+"_SUM",when(col("fin_year").contains(p["fin_year"]) & col("Party").contains(p["Party"]),lit(p["sum(Amount)"])).otherwise("---"))

In [39]:
data.select("2011-12_BJP_SUM").show()

+---------------+
|2011-12_BJP_SUM|
+---------------+
|            ---|
|            ---|
|            ---|
|            ---|
|            ---|
|        1000000|
|        1000000|
|            ---|
|        1000000|
|            ---|
|            ---|
|            ---|
|            ---|
|            ---|
|            ---|
|            ---|
|        1000000|
|            ---|
|            ---|
|            ---|
+---------------+
only showing top 20 rows



In [40]:
data.select("mode_of_payment").show()

+---------------+
|mode_of_payment|
+---------------+
|           Cash|
|           Bank|
|         Cheque|
|           Bank|
|           Bank|
|         Others|
|         Others|
|           Bank|
|         Others|
|           Bank|
|           Bank|
|         Others|
|         Cheque|
|         Cheque|
|         Cheque|
|           Bank|
|           Bank|
|           Bank|
|         Others|
|           Bank|
+---------------+
only showing top 20 rows



In [41]:
data.groupBy("mode_of_payment").count().collect()

[Row(mode_of_payment='Cash', count=33),
 Row(mode_of_payment='Bank', count=34),
 Row(mode_of_payment='Cheque', count=14),
 Row(mode_of_payment='Others', count=17)]

In [42]:
for d in data.groupBy("mode_of_payment").count().collect():
    data = data.withColumn(d["mode_of_payment"]+"_count_LTD",when(col("mode_of_payment").contains(d["mode_of_payment"]),lit(d["count"])).otherwise("---"))

In [43]:
data.select("Bank_count_LTD").show()

+--------------+
|Bank_count_LTD|
+--------------+
|           ---|
|            34|
|           ---|
|            34|
|            34|
|           ---|
|           ---|
|            34|
|           ---|
|            34|
|            34|
|           ---|
|           ---|
|           ---|
|           ---|
|            34|
|            34|
|            34|
|           ---|
|            34|
+--------------+
only showing top 20 rows



In [44]:
for d in data.groupBy("Party").count().collect():
    data = data.withColumn(d["Party"]+"_COUNT_LTD",when(col("Party").contains(d["Party"]),d["count"]).otherwise(0))

In [45]:
data.select("BJP_COUNT_LTD").show()

+-------------+
|BJP_COUNT_LTD|
+-------------+
|            0|
|            0|
|            0|
|            0|
|            0|
|           28|
|           28|
|           28|
|           28|
|            0|
|           28|
|           28|
|           28|
|           28|
|           28|
|           28|
|           28|
|           28|
|           28|
|            0|
+-------------+
only showing top 20 rows



In [46]:
data.groupBy("Party").avg("Amount")

DataFrame[Party: string, avg(Amount): double]

In [47]:
for d in data.groupBy("Party").avg("Amount").collect():
    data = data.withColumn(d["Party"]+"_AVG_LTD",when(col("Party").contains(d["Party"]),d["avg(Amount)"]).otherwise(0))

In [48]:
data.select("INC_AVG_LTD").show()

+------------------+
|       INC_AVG_LTD|
+------------------+
|               0.0|
|               0.0|
|50318.181818181816|
|50318.181818181816|
|50318.181818181816|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|50318.181818181816|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|               0.0|
|50318.181818181816|
+------------------+
only showing top 20 rows



In [49]:
data.groupBy("Party").max("Amount").show()

+------+-----------+
| Party|max(Amount)|
+------+-----------+
|   CPI|    3000000|
|CPI(M)|   10000000|
|   INC|     108000|
|   BJP|    8400000|
+------+-----------+



In [50]:
for d in data.groupBy("Party").max("Amount").collect():
    data = data.withColumn(d["Party"]+"_MAX_LTD",when(col("Party").contains(d["Party"]),d["max(Amount)"]).otherwise(0))

In [51]:
data.select("INC_MAX_LTD").show()

+-----------+
|INC_MAX_LTD|
+-----------+
|          0|
|          0|
|     108000|
|     108000|
|     108000|
|          0|
|          0|
|          0|
|          0|
|     108000|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|          0|
|     108000|
+-----------+
only showing top 20 rows



In [52]:
data.select("2011-12_INC_SUM","fin_year","Party").show()

+---------------+--------+------+
|2011-12_INC_SUM|fin_year| Party|
+---------------+--------+------+
|            ---| 2010-11|   CPI|
|            ---| 2014-15|CPI(M)|
|            ---| 2014-15|   INC|
|         162000| 2011-12|   INC|
|         162000| 2011-12|   INC|
|            ---| 2011-12|   BJP|
|            ---| 2011-12|   BJP|
|            ---| 2013-14|   BJP|
|            ---| 2011-12|   BJP|
|            ---| 2013-14|   INC|
|            ---| 2004-05|   BJP|
|            ---| 2012-13|   BJP|
|            ---| 2008-09|   BJP|
|            ---| 2008-09|   BJP|
|            ---| 2008-09|   BJP|
|            ---| 2012-13|   BJP|
|            ---| 2011-12|   BJP|
|            ---| 2014-15|   BJP|
|            ---| 2012-13|   BJP|
|            ---| 2009-10|   INC|
+---------------+--------+------+
only showing top 20 rows



In [53]:
data.select("Bank_count_LTD").show()

+--------------+
|Bank_count_LTD|
+--------------+
|           ---|
|            34|
|           ---|
|            34|
|            34|
|           ---|
|           ---|
|            34|
|           ---|
|            34|
|            34|
|           ---|
|           ---|
|           ---|
|           ---|
|            34|
|            34|
|            34|
|           ---|
|            34|
+--------------+
only showing top 20 rows



In [54]:
data.collect()[0]

Row(Address='44bca5383d6dae229cb7c43cd52d4bb1', Amount=3000000, mode_of_payment='Cash', fin_year='2010-11', Name='Aziz Pasha', pan_given='Y', Party='CPI', Type='Others', CPI_SUM_LTD=15374024, CPI(M)_SUM_LTD=0, INC_SUM_LTD=0, BJP_SUM_LTD=0, CPI_TOP_DONOR='Aziz Pasha', CPI(M)_TOP_DONOR='---', INC_TOP_DONOR='---', BJP_TOP_DONOR='---', 2010-11_CPI_SUM='5240000', 2014-15_CPI(M)_SUM='---', 2014-15_INC_SUM='---', 2011-12_INC_SUM='---', 2011-12_BJP_SUM='---', 2013-14_BJP_SUM='---', 2013-14_INC_SUM='---', 2004-05_BJP_SUM='---', 2012-13_BJP_SUM='---', 2008-09_BJP_SUM='---', 2014-15_BJP_SUM='---', 2009-10_INC_SUM='---', 2009-10_CPI_SUM='---', 2010-11_INC_SUM='---', 2013-14_CPI(M)_SUM='---', 2010-11_CPI(M)_SUM='---', 2011-12_CPI(M)_SUM='---', 2005-06_CPI_SUM='---', 2007-08_CPI_SUM='---', 2006-07_CPI_SUM='---', 2008-09_CPI_SUM='---', 2004-05_CPI_SUM='---', 2003-04_CPI_SUM='---', 2011-12_CPI_SUM='---', 2012-13_CPI_SUM='---', 2014-15_CPI_SUM='---', 2013-14_CPI_SUM='---', 2003-04_INC_SUM='---', 2012-1

In [55]:
data.write.parquet("parquetnp.json")