In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import *

In [3]:
sc = SparkContext()

#creating a sparkcontext instance

In [4]:
sqlContext = SQLContext(sc)

#creating a sqlcontext instance 

In [73]:
data = sqlContext.read.json("world_bank.json")

#reading world bank data and creating a DataFrame



In [7]:
data.printSchema()

#exploring the data schema

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- approvalfy: string (nullable = true)
 |-- board_approval_month: string (nullable = true)
 |-- boardapprovaldate: string (nullable = true)
 |-- borrower: string (nullable = true)
 |-- closingdate: string (nullable = true)
 |-- country_namecode: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- countryname: string (nullable = true)
 |-- countryshortname: string (nullable = true)
 |-- docty: string (nullable = true)
 |-- envassesmentcategorycode: string (nullable = true)
 |-- grantamt: long (nullable = true)
 |-- ibrdcommamt: long (nullable = true)
 |-- id: string (nullable = true)
 |-- idacommamt: long (nullable = true)
 |-- impagency: string (nullable = true)
 |-- lendinginstr: string (nullable = true)
 |-- lendinginstrtype: string (nullable = true)
 |-- lendprojectcost: long (nullable = true)
 |-- majorsector_percent: array (nullable = true)
 |    |-- element: struct (containsNu

In [8]:
data.select(data['countryshortname'],data['grantamt'],data['status']).groupBy('status').count().show()

#number of active and closed grants

+------+-----+
|status|count|
+------+-----+
|Active|  438|
|Closed|   62|
+------+-----+



In [9]:
data.select(data['countryshortname'],data['grantamt'],data['status']).orderBy(desc('grantamt')).show()
#grant amount by country and their status

+--------------------+---------+------+
|    countryshortname| grantamt|status|
+--------------------+---------+------+
|               China|365000000|Active|
|         Afghanistan|100000000|Active|
|Congo, Democratic...|100000000|Active|
|          Madagascar| 85400000|Active|
|             Vietnam| 84600000|Active|
|               Sudan| 76500000|Active|
|               Ghana| 75500000|Active|
|         Afghanistan| 74730000|Active|
|             Morocco| 70000000|Active|
|           Nicaragua| 51900000|Active|
|              Jordan| 50000000|Active|
|               Nepal| 46500000|Active|
|                Mali| 41700000|Active|
|  West Bank and Gaza| 40000000|Closed|
|              Zambia| 36000000|Active|
|          Bangladesh| 33800000|Active|
|           Indonesia| 31700000|Active|
|               Nepal| 31000000|Active|
|               China| 27280000|Active|
|          Kazakhstan| 21760000|Active|
+--------------------+---------+------+
only showing top 20 rows



In [10]:
data.select('borrower','countryshortname','regionname','grantamt','status').orderBy(desc('grantamt')).show()

#region wise borrowers and their borrowed amounts

+--------------------+--------------------+--------------------+---------+------+
|            borrower|    countryshortname|          regionname| grantamt|status|
+--------------------+--------------------+--------------------+---------+------+
|PEOPLE'S REPULIC ...|               China|East Asia and Pac...|365000000|Active|
|ISLAMIC REPUBLIC ...|         Afghanistan|          South Asia|100000000|Active|
|MINISTERE DE L'ED...|Congo, Democratic...|              Africa|100000000|Active|
|MINISTRY OF FINAN...|          Madagascar|              Africa| 85400000|Active|
|SOCIALIST REPUBLI...|             Vietnam|East Asia and Pac...| 84600000|Active|
| GOVERNMENT OF SUDAN|               Sudan|              Africa| 76500000|Active|
|GHANA MINISTRY OF...|               Ghana|              Africa| 75500000|Active|
|GOVERNMENT OF AFG...|         Afghanistan|          South Asia| 74730000|Active|
|FONDS D'EQUIPMENT...|             Morocco|Middle East and N...| 70000000|Active|
|REPUBLIC OF NIC

In [11]:
data.select('borrower','countryshortname','regionname','grantamt','status').groupBy('regionname').count().show()

#number of borrowers by region

+--------------------+-----+
|          regionname|count|
+--------------------+-----+
|          South Asia|   65|
|Middle East and N...|   54|
|              Africa|  152|
|East Asia and Pac...|  100|
|               Other|    2|
|Europe and Centra...|   74|
|Latin America and...|   53|
+--------------------+-----+



In [12]:
data.select('borrower','countryshortname','regionname','grantamt','status').groupBy('regionname').avg('grantamt').show()

#each region's avg grant amt

+--------------------+------------------+
|          regionname|     avg(grantamt)|
+--------------------+------------------+
|          South Asia| 6092461.538461538|
|Middle East and N...| 5311296.296296297|
|              Africa|4037039.4736842103|
|East Asia and Pac...|         7000400.0|
|               Other|         3060000.0|
|Europe and Centra...|1695540.5405405406|
|Latin America and...|1662641.5094339622|
+--------------------+------------------+



In [19]:
bank_rdd = data.select('boardapprovaldate','borrower','project_name','closingdate','projectstatusdisplay','totalamt','url','countrycode','countryshortname','regionname','grantamt','status').rdd

#creating a rdd from the dataframe

In [21]:
bank_rdd.filter(lambda x: 'ET' in x[7]).collect()

#filter all rows that corresponds to Ethiopia

[Row(boardapprovaldate=u'2013-11-12T00:00:00Z', borrower=u'FEDERAL DEMOCRATIC REPUBLIC OF ETHIOPIA', project_name=u'Ethiopia General Education Quality Improvement Project II', closingdate=u'2018-07-07T00:00:00Z', projectstatusdisplay=u'Active', totalamt=130000000, url=u'http://www.worldbank.org/projects/P129828/ethiopia-general-education-quality-improvement-project-ii?lang=en', countrycode=u'ET', countryshortname=u'Ethiopia', regionname=u'Africa', grantamt=0, status=u'Active'),
 Row(boardapprovaldate=u'2013-02-28T00:00:00Z', borrower=u'FEDERAL DEMOCRATIC REP. OF ETHIOPIA', project_name=u'Ethiopia Health MDG Support Operation', closingdate=u'2018-06-30T00:00:00Z', projectstatusdisplay=u'Active', totalamt=100000000, url=u'http://www.worldbank.org/projects/P123531/ethiopia-health-mdg-support-operation?lang=en', countrycode=u'ET', countryshortname=u'Ethiopia', regionname=u'Africa', grantamt=0, status=u'Active'),
 Row(boardapprovaldate=u'2012-09-25T00:00:00Z', borrower=u'FEDERAL DEMOCRATIC 

In [75]:
pairs = bank_rdd.map(lambda x: (x[8],x[10]))
pairs1 = bank_rdd.map(lambda x: (x[0],x[10]))


#creating pipelinedRDD transformations to select country, grant_amt and boardapprovaldata,grant_amts respectively

In [76]:
import pandas as pd
from operator import add


df = pd.DataFrame(pairs.reduceByKey(add).collect())

#collecting all the countries by reducing on the countryshortname key while adding their grant amounts

df.columns = ['country','grant_amt']

#give names to the columns

In [38]:
sorted_data = df.sort_values('grant_amt',ascending=False)
#sort the dataframe on grant_amt in descending order

In [46]:
from bokeh.charts import Bar, show
from bokeh.charts.attributes import ColorAttr, CatAttr
from bokeh.models import NumeralTickFormatter
from bokeh.io import output_notebook
output_notebook()
p = Bar(sorted_data.head(10), values='grant_amt', title="Top 10 Countries funded by the World Bank ", label=CatAttr(columns=['country'], sort=False),legend=False)
p.yaxis.formatter=NumeralTickFormatter(format="($ 0 a)")
show(p, notebook_handle=True)

#creating a bokeh visualization to show the top 10 grant receiving countries 



In [71]:
from bokeh.charts import TimeSeries
from bokeh.models import DatetimeTickFormatter
output_notebook()


timedata = pd.DataFrame(pairs1.reduceByKey(add).collect())

timedata.columns = ['boardapprovaldate','grant_amt']

timedata['boardapprovaldate'] = pd.to_datetime(timedata['boardapprovaldate'], format=('%Y-%m-%dT%H:%M:%SZ'))

p = TimeSeries(timedata, x='boardapprovaldate',y='grant_amt')

p.yaxis.formatter=NumeralTickFormatter(format="($ 0 a)")
p.xaxis.formatter=DatetimeTickFormatter(
        days=["%d %B %Y"],
        months=["%d %B %Y"],
        years=["%d %B %Y"])


show(p)
