In [42]:
import findspark
findspark.init()

In [43]:

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("Spark_Project").getOrCreate()

# Verify SparkContext is available
sc = spark.sparkContext

In [44]:
#file location and type
file_location="C:\\Users\\16478\\Desktop\\term 2\\1034\\Major_Contract_Awards.csv"
file_type="csv"

#csv options
infer_schema="true"   #to detect the column datatype
first_row_header="true"  #to take the header row
delimiter=","

#creating df
df=spark.read.format(file_type).option("inferSchema",infer_schema).option("header",first_row_header).load(file_location)
df.printSchema()
df.show(5)
select_df = df.select("Fiscal Year", "Region","Borrower Country","Borrower Country Code","Project ID","Project Name","Supplier Country","Supplier Country Code","Total Contract Amount (USD)")
print(select_df.count())

root
 |-- As of Date: string (nullable = true)
 |-- Fiscal Year: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Borrower Country: string (nullable = true)
 |-- Borrower Country Code: string (nullable = true)
 |-- Project ID: string (nullable = true)
 |-- Project Name: string (nullable = true)
 |-- Procurement Type: string (nullable = true)
 |-- Procurement Category: string (nullable = true)
 |-- Procurement Method: string (nullable = true)
 |-- Product line: string (nullable = true)
 |-- WB Contract Number: integer (nullable = true)
 |-- Contract Signing Date: string (nullable = true)
 |-- Supplier: string (nullable = true)
 |-- Supplier Country: string (nullable = true)
 |-- Supplier Country Code: string (nullable = true)
 |-- Total Contract Amount (USD): string (nullable = true)

+-------------+-----------+--------------------+----------------+---------------------+----------+--------------------+-------------------+--------------------+--------------------+----

In [45]:
from pyspark.sql.functions import isnan, when, count, col


#finding null values in columns
null_df=select_df.select([count(when(col(c).isNull(), 0)).alias(c) for c in select_df.columns])
print(null_df)
null_df.show()
# creating new df without null values
filter_df=select_df.filter(select_df['Region'].isNotNull() & select_df['Total Contract Amount (USD)'].isNull())
filter_df.select("*").show()
# verification for no of null values removed
required_df = select_df.filter(select_df['Total Contract Amount (USD)'].isNotNull())
print(select_df.count()-required_df.count())
required_df.show()

DataFrame[Fiscal Year: bigint, Region: bigint, Borrower Country: bigint, Borrower Country Code: bigint, Project ID: bigint, Project Name: bigint, Supplier Country: bigint, Supplier Country Code: bigint, Total Contract Amount (USD): bigint]
+-----------+------+----------------+---------------------+----------+------------+----------------+---------------------+---------------------------+
|Fiscal Year|Region|Borrower Country|Borrower Country Code|Project ID|Project Name|Supplier Country|Supplier Country Code|Total Contract Amount (USD)|
+-----------+------+----------------+---------------------+----------+------------+----------------+---------------------+---------------------------+
|          0|     0|               0|                    0|         0|           0|               0|                    0|                        413|
+-----------+------+----------------+---------------------+----------+------------+----------------+---------------------+---------------------------+

+---

In [46]:
#removing the $ character from amount
#from pyspark.sql.functions import translate
newDf_1=select_df.withColumn('Total Contract Amount (USD)', translate('Total Contract Amount (USD)', '$', ''))    #replace
newDf=newDf_1.withColumn('Total Contract Amount (USD)', translate('Total Contract Amount (USD)', ',', ''))    #replace
newDf.show()
# changing the data type of amount to Float
#from pyspark.sql.types import StringType, DateType, FloatType
datatype_change_df = newDf.withColumn("Total Contract Amount (USD)" , newDf["Total Contract Amount (USD)"].cast(FloatType()))
datatype_change_df.printSchema()

+-----------+--------------------+--------------------+---------------------+----------+--------------------+--------------------+---------------------+---------------------------+
|Fiscal Year|              Region|    Borrower Country|Borrower Country Code|Project ID|        Project Name|    Supplier Country|Supplier Country Code|Total Contract Amount (USD)|
+-----------+--------------------+--------------------+---------------------+----------+--------------------+--------------------+---------------------+---------------------------+
|       2004|              AFRICA|                Mali|                   ML|   P041723|Rural Infrastr (F...|                Mali|                   ML|                 176578.47 |
|       2004|              AFRICA|                Mali|                   ML|   P041723|Rural Infrastr (F...|                Mali|                   ML|                  38386.63 |
|       2004|              AFRICA|                Mali|                   ML|   P041723|Rural I

In [47]:
#Overall amount spent for all projects year Wise
grouped_df=datatype_change_df.groupBy('Fiscal Year').agg(sum('Total Contract Amount (USD)').cast(DecimalType(20, 2)).alias('Total_Amt_spent'))
sorted_df=grouped_df.sort("Fiscal Year",ascending = [True])
sorted_df.show()


+-----------+---------------+
|Fiscal Year|Total_Amt_spent|
+-----------+---------------+
|       2004|  8802666003.21|
|       2005|  9455225201.82|
|       2006|  8058946978.08|
|       2007|  9946680178.06|
|       2008| 12509365465.06|
|       2009| 11107510212.79|
|       2010| 14258776029.20|
|       2011| 16555182762.57|
|       2012| 16522654897.56|
|       2013| 15405735943.30|
|       2014| 14467766184.78|
|       2015| 11843495996.41|
|       2016| 14087775010.02|
|       2017| 10783231557.83|
|       2018|   813062335.13|
+-----------+---------------+



In [48]:

#details of Amount spent Country wise 
grouped_df=datatype_change_df.groupBy('Borrower Country').agg(sum('Total Contract Amount (USD)').cast(DecimalType(20, 2)).alias('Total_Amt_spent'))
sorted_df=grouped_df.sort(["Total_Amt_spent"],ascending = [False])
sorted_df.show()


+--------------------+---------------+
|    Borrower Country|Total_Amt_spent|
+--------------------+---------------+
|               India| 18803255974.53|
|               China| 17471407861.32|
|              Brazil|  9149722836.44|
|           Argentina|  7227734355.89|
|             Vietnam|  6764613349.50|
|Egypt, Arab Repub...|  5499074063.37|
|            Pakistan|  4997625017.26|
|            Ethiopia|  4612266885.07|
|              Africa|  4241284631.58|
|          Bangladesh|  4237774397.43|
|         Afghanistan|  4038329869.76|
|          Azerbaijan|  3888039333.95|
|             Nigeria|  3628668399.11|
|          Kazakhstan|  3564597973.96|
|           Indonesia|  3296791275.48|
|  Russian Federation|  3292233554.32|
|Congo, Democratic...|  3099829838.55|
|        South Africa|  3072498194.63|
|               Kenya|  2856797790.69|
|              Turkey|  2839801399.44|
+--------------------+---------------+
only showing top 20 rows



In [49]:
#No of projects undertaken in Each segment
grouped_df=datatype_change_df.groupBy('Project Name').agg(count('Project Name').alias('Projects_count')).sort(["Projects_count"],ascending = [False])
grouped_df.show()

+--------------------+--------------+
|        Project Name|Projects_count|
+--------------------+--------------+
|           COMM DEVT|          1097|
|   VN-RURAL ENERGY 2|          1015|
|Proj. in Support ...|           775|
|DRC-Emerg MS Reha...|           652|
|CO Consolidation ...|           638|
|GT Support Rural ...|           601|
|                SIEP|           539|
|BD: Municipal Ser...|           522|
|SV Income Support...|           513|
|AF: Edu. Qlty. Im...|           499|
|NI PFM Modernizat...|           493|
|VIETNAM WATER RES...|           491|
|VN-SYSTEM ENERGY,...|           487|
|BD: HNP Sector Pr...|           464|
|NI (CRL) Roads Re...|           455|
|HN Trade Facilita...|           422|
|BI-Pub Works & Em...|           413|
|BD: Primary Educa...|           404|
|Gujarat Emergency...|           403|
|GT (APL2)LAND ADM...|           402|
+--------------------+--------------+
only showing top 20 rows



In [50]:
#No of projects undertaken in India by each segment
grouped_df=datatype_change_df.filter(datatype_change_df['Borrower Country']=='India').groupBy('Project Name').agg(count('Project Name').alias('No_of_projects')).sort(["No_of_projects"],ascending = [False])
#type(grouped_df)
grouped_df.show()

+--------------------+--------------+
|        Project Name|No_of_projects|
+--------------------+--------------+
|Gujarat Emergency...|           403|
|AP Econ Restructu...|           300|
|         IN: UP WSRP|           245|
|   Rajasthan Power I|           241|
|KAR WSHD DEVELOPMENT|           195|
|IN: AP RURAL POV ...|           190|
|   UP Sodic Lands II|           185|
|IN: KARNATAKA RWS...|           143|
|     IN: TN IAM WARM|           124|
|IN: Integrated Co...|           119|
|        IN: RAJ WSRP|            98|
|IN: TN Health Sys...|            96|
|IN: AP and Telang...|            84|
|        IN: UP Roads|            82|
|      IN: MAHAR WSIP|            82|
|     IN: POWERGRID V|            82|
|IN: Haryana Power...|            82|
|IN: MUMBAI URBAN ...|            77|
|IN: Assam Agric C...|            74|
|IN: Third Nationa...|            72|
+--------------------+--------------+
only showing top 20 rows



In [51]:
#No of projects undertaken Region wise by each segment
grouped_df=datatype_change_df.groupBy('Region').agg(count('region').alias('No_of_projects'),sum('Total Contract Amount (USD)').cast(DecimalType(20, 2)).alias('Amount_spent')).sort(['Amount_spent'],ascending = [False])
grouped_df.show()


+--------------------+--------------+--------------+
|              Region|No_of_projects|  Amount_spent|
+--------------------+--------------+--------------+
|              AFRICA|         39926|43063393092.96|
|          SOUTH ASIA|         19253|33684480315.77|
|EAST ASIA AND PAC...|         23617|30962263272.27|
|EUROPE AND CENTRA...|         30458|29028129118.84|
|LATIN AMERICA AND...|         30088|25304724454.49|
|MIDDLE EAST AND N...|          8206|12566615938.20|
|               OTHER|            75|    8468563.31|
+--------------------+--------------+--------------+

