In [6]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col,to_date,split,regexp_replace,year

#Creating spark session

spark = SparkSession.builder.config("spark.jars", "mysql-connector-j-8.3.0.jar")\
.master("local")\
.appName("ElectoralBond")\
.getOrCreate()

'''Working with Purchaser DataSet'''

purchaser_data = spark.read.option("header",True).csv('DataSet\PurchaserList.csv')

#Rename columns
purchaser_data = purchaser_data.withColumnRenamed('Date Of Purchase','DateOfPurchase')\
.withColumnRenamed('Name of the Purchaser','NameOfPurchaser')\
.withColumnRenamed('Bond Number','BondNumber')\
.withColumnRenamed('Denominations','Amount')

#Get only the column that are required
purchaser_data = purchaser_data.select('DateOfPurchase','NameOfPurchaser','BondNumber','Amount','Status')


# Change date type dd/MMM/yyyy to yyyy-mm-dd and replace , from amount
purchaser_data = purchaser_data.withColumn('DateOfPurchase',to_date(col('DateOfPurchase'),'dd/MMM/yyyy'))\
.withColumn('Amount',regexp_replace('Amount', ',', ''))\
.withColumn('Amount',col('Amount').cast(LongType()))\
.withColumn('BondNumber',col('BondNumber').cast(LongType()))\
.withColumn('Year',year('DateOfPurchase'))

#Filter the data that have status = paid
purchaser_data = purchaser_data.filter(purchaser_data.Status == 'Paid')


'''Working with Party DataSet'''


#Getting data from the PartyList Dataset
party_data = spark.read.option("header",True).csv('DataSet\PartyList.csv')

#Selecting and Renaming only those columns that we require
party_data  = party_data.select('Name of Political Party','Bond Number')\
.withColumnRenamed('Name of Political Party','PoliticalParty')\
.withColumnRenamed('Bond Number','BondNumber')


'''Joining both the data to get final dataset'''

final_dataset = purchaser_data.join(party_data, purchaser_data.BondNumber == party_data.BondNumber, 'inner')
final_dataset = final_dataset.select('DateOfPurchase','NameOfPurchaser','PoliticalParty','Amount','Status','Year')\
.withColumnRenamed('PoliticalParty','PartyName')
final_dataset.show()

'''Inserting the dataframe to the database'''
final_dataset.write.format("jdbc").option("url", "jdbc:mysql://localhost:3306/analytical")\
.option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "electoralbonds")\
.option("user", "root").option("password", "admin").save()

spark.stop()

+--------------+-------------------+--------------------+-------+------+----+
|DateOfPurchase|    NameOfPurchaser|           PartyName| Amount|Status|Year|
+--------------+-------------------+--------------------+-------+------+----+
|    2019-04-12|A B C INDIA LIMITED|PRESIDENT, ALL IN...|1000000|  Paid|2019|
|    2019-04-12|A B C INDIA LIMITED|BHARATIYA JANATA ...|1000000|  Paid|2019|
|    2019-04-12|A B C INDIA LIMITED|ALL INDIA TRINAMO...|1000000|  Paid|2019|
|    2019-04-12|A B C INDIA LIMITED|BHARATIYA JANATA ...|1000000|  Paid|2019|
|    2019-04-12|A B C INDIA LIMITED|ALL INDIA TRINAMO...|1000000|  Paid|2019|
|    2019-04-12|A B C INDIA LIMITED|BHARATIYA JANATA ...|1000000|  Paid|2019|
|    2019-04-12|A B C INDIA LIMITED|     AAM AADMI PARTY| 100000|  Paid|2019|
|    2019-04-12|A B C INDIA LIMITED|BHARATIYA JANATA ...| 100000|  Paid|2019|
|    2019-04-12|A B C INDIA LIMITED|     AAM AADMI PARTY| 100000|  Paid|2019|
|    2019-04-12|A B C INDIA LIMITED|BHARATIYA JANATA ...| 100000

NameError: name 'e' is not defined