In [0]:
#file location and type
file_location="/FileStore/tables/Major_Contract_Awards-2.csv"
file_type="csv"

#csv options
infer_schema="true"   #to detect the column datatype
first_row_header="true"  #to take the header row
delimiter=","

#creating df
df=spark.read.format(file_type).option("inferSchema",infer_schema).option("header",first_row_header).load(file_location)
df.printSchema()
df.show(5)
select_df = df.select("Fiscal Year", "Region","Borrower Country","Borrower Country Code","Project ID","Project Name","Supplier Country","Supplier Country Code","Total Contract Amount (USD)")
print(select_df.count())

root
 |-- As of Date: string (nullable = true)
 |-- Fiscal Year: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Borrower Country: string (nullable = true)
 |-- Borrower Country Code: string (nullable = true)
 |-- Project ID: string (nullable = true)
 |-- Project Name: string (nullable = true)
 |-- Procurement Type: string (nullable = true)
 |-- Procurement Category: string (nullable = true)
 |-- Procurement Method: string (nullable = true)
 |-- Product line: string (nullable = true)
 |-- WB Contract Number: integer (nullable = true)
 |-- Contract Signing Date: string (nullable = true)
 |-- Supplier: string (nullable = true)
 |-- Supplier Country: string (nullable = true)
 |-- Supplier Country Code: string (nullable = true)
 |-- Total Contract Amount (USD): string (nullable = true)

+-------------+-----------+--------------------+----------------+---------------------+----------+--------------------+-------------------+--------------------+--------------------+----

In [0]:
from pyspark.sql.functions import isnan, when, count, col


#finding null values in columns
null_df=select_df.select([count(when(col(c).isNull(), 0)).alias(c) for c in select_df.columns])
print(null_df)
null_df.show()
# creating new df without null values
filter_df=select_df.filter(select_df['Region'].isNotNull() & select_df['Total Contract Amount (USD)'].isNull())
filter_df.select("*").show()
# verification for no of null values removed
required_df = select_df.filter(select_df['Total Contract Amount (USD)'].isNotNull())
print(select_df.count()-required_df.count())
required_df.show()

DataFrame[Fiscal Year: bigint, Region: bigint, Borrower Country: bigint, Borrower Country Code: bigint, Project ID: bigint, Project Name: bigint, Supplier Country: bigint, Supplier Country Code: bigint, Total Contract Amount (USD): bigint]
+-----------+------+----------------+---------------------+----------+------------+----------------+---------------------+---------------------------+
|Fiscal Year|Region|Borrower Country|Borrower Country Code|Project ID|Project Name|Supplier Country|Supplier Country Code|Total Contract Amount (USD)|
+-----------+------+----------------+---------------------+----------+------------+----------------+---------------------+---------------------------+
|          0|     0|               0|                    0|         0|           0|               0|                    0|                        413|
+-----------+------+----------------+---------------------+----------+------------+----------------+---------------------+---------------------------+

+---

In [0]:
#removing the $ character from amount
from pyspark.sql.functions import translate
newDf_1=select_df.withColumn('Total Contract Amount (USD)', translate('Total Contract Amount (USD)', '$', ''))    #replace
newDf=newDf_1.withColumn('Total Contract Amount (USD)', translate('Total Contract Amount (USD)', ',', ''))    #replace
newDf.show()
# changing the data type of amount to Float
from pyspark.sql.types import StringType, DateType, FloatType
datatype_change_df = newDf.withColumn("Total Contract Amount (USD)" , newDf["Total Contract Amount (USD)"].cast(FloatType()))
datatype_change_df.printSchema()

+-----------+--------------------+--------------------+---------------------+----------+--------------------+--------------------+---------------------+---------------------------+
|Fiscal Year|              Region|    Borrower Country|Borrower Country Code|Project ID|        Project Name|    Supplier Country|Supplier Country Code|Total Contract Amount (USD)|
+-----------+--------------------+--------------------+---------------------+----------+--------------------+--------------------+---------------------+---------------------------+
|       2004|              AFRICA|                Mali|                   ML|   P041723|Rural Infrastr (F...|                Mali|                   ML|                 176578.47 |
|       2004|              AFRICA|                Mali|                   ML|   P041723|Rural Infrastr (F...|                Mali|                   ML|                  38386.63 |
|       2004|              AFRICA|                Mali|                   ML|   P041723|Rural I

In [0]:
#query 1-Amount spent year Wise
from pyspark.sql.types import *
from pyspark.sql.functions import sum
grouped_df=datatype_change_df.groupBy('Fiscal Year').agg(sum('Total Contract Amount (USD)').cast(DecimalType(20, 2)).alias('Total_Amt_spent'))
grouped_df.sort(["Fiscal Year"],ascending = [True])
grouped_df.display()


#query 2-Country wise Amount spent
grouped_df=datatype_change_df.groupBy('Borrower Country').agg(sum('Total Contract Amount (USD)').cast(DecimalType(20, 2)).alias('Total_Amt_spent'))
#grouped_df.show(5)
grouped_df.sort(["Total_Amt_spent"],ascending = [False])
grouped_df.display()


#query3
#No of projects undertaken in Each segment
grouped_df=datatype_change_df.groupBy('Project Name').agg(count('Project Name').alias('Projects_count')).sort(["Projects_count"],ascending = [False])
grouped_df.display()

#Query 4
#No of projects undertaken in India by each segment
grouped_df=datatype_change_df.filter(datatype_change_df['Borrower Country']=='India').groupBy('Project Name').agg(count('Project Name').alias('No_of_projects')).sort(["No_of_projects"],ascending = [False])
type(grouped_df)
grouped_df.display()

#query 5-No of projects undertaken Region wise by each segment
grouped_df=datatype_change_df.groupBy('Region').agg(count('region').alias('No_of_projects'),sum('Total Contract Amount (USD)').cast(DecimalType(20, 2)).alias('Amount_spent')).sort(['Amount_spent'],ascending = [False])
type(grouped_df)
grouped_df.display()


Fiscal Year,Total_Amt_spent
2007,9946680178.06
2006,8058946978.08
2004,8802666003.21
2005,9455225201.82
2009,11107510212.79
2008,12509365465.06
2010,14258776029.2
2011,16555182762.57
2015,11843495996.41
2013,15405735943.3


Borrower Country,Total_Amt_spent
Chad,223601095.99
Paraguay,312654005.4
Senegal,1129419229.25
Cabo Verde,132216973.75
Western Balkans,29219242.19
Kiribati,77448472.76
Guyana,47683002.24
Philippines,1174666389.6
Eritrea,182287685.1
Djibouti,109043008.38


Project Name,Projects_count
COMM DEVT,1097
VN-RURAL ENERGY 2,1015
Proj. in Support of Restruc. of Health,775
DRC-Emerg MS Rehab & Recovery ERL (FY03),652
CO Consolidation of Nat. Publ Mgmt Inf.,638
GT Support Rural Econ.Dev. Program,601
SIEP,539
BD: Municipal Services,522
SV Income Support and Employability,513
AF: Edu. Qlty. Improvement Program II,499


Project Name,No_of_projects
Gujarat Emergency Earthquake Reconstruct,403
AP Econ Restructuring,300
IN: UP WSRP,245
Rajasthan Power I,241
KAR WSHD DEVELOPMENT,195
IN: AP RURAL POV REDUCTION,190
UP Sodic Lands II,185
IN: KARNATAKA RWSS II,143
IN: TN IAM WARM,124
IN: Integrated Coastal Zone Mgmt Project,119


Region,No_of_projects,Amount_spent
AFRICA,39926,43063393092.96
SOUTH ASIA,19253,33684480315.77
EAST ASIA AND PACIFIC,23617,30962263272.27
EUROPE AND CENTRAL ASIA,30458,29028129118.84
LATIN AMERICA AND CARIBBEAN,30088,25304724454.49
MIDDLE EAST AND NORTH AFRICA,8206,12566615938.2
OTHER,75,8468563.31
