In [6]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))  
spark = SparkSession.builder.getOrCreate()

# important :
<span size ='4' style="color:yellow"> ApacheSparkSQL is always preferred over usage of RDDs since the Catalyst Optimizer is doing a very good job optimizing the internal calls to the RDD API.

In [7]:
# enabling sql processing 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## unlike an rdd, a dataframe creates a schema around 
## the data which supplies the necessary structure for 
## SQL queries

# creating spark dataframe
df = sqlContext.read.json('./data/world_bank.json')

In [8]:
# showing the schema
print(df.printSchema())

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- approvalfy: string (nullable = true)
 |-- board_approval_month: string (nullable = true)
 |-- boardapprovaldate: string (nullable = true)
 |-- borrower: string (nullable = true)
 |-- closingdate: string (nullable = true)
 |-- country_namecode: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- countryname: string (nullable = true)
 |-- countryshortname: string (nullable = true)
 |-- docty: string (nullable = true)
 |-- envassesmentcategorycode: string (nullable = true)
 |-- grantamt: long (nullable = true)
 |-- ibrdcommamt: long (nullable = true)
 |-- id: string (nullable = true)
 |-- idacommamt: long (nullable = true)
 |-- impagency: string (nullable = true)
 |-- lendinginstr: string (nullable = true)
 |-- lendinginstrtype: string (nullable = true)
 |-- lendprojectcost: long (nullable = true)
 |-- majorsector_percent: array (nullable = true)
 |    |-- element: struct (containsNu

In [9]:
for row in df.take(2):
    print(row)
    print('_'*20)

Row(_id=Row($oid='52b213b38594d8a2be17c780'), approvalfy='1999', board_approval_month='November', boardapprovaldate='2013-11-12T00:00:00Z', borrower='FEDERAL DEMOCRATIC REPUBLIC OF ETHIOPIA', closingdate='2018-07-07T00:00:00Z', country_namecode='Federal Democratic Republic of Ethiopia!$!ET', countrycode='ET', countryname='Federal Democratic Republic of Ethiopia', countryshortname='Ethiopia', docty='Project Information Document,Indigenous Peoples Plan,Project Information Document', envassesmentcategorycode='C', grantamt=0, ibrdcommamt=0, id='P129828', idacommamt=130000000, impagency='MINISTRY OF EDUCATION', lendinginstr='Investment Project Financing', lendinginstrtype='IN', lendprojectcost=550000000, majorsector_percent=[Row(Name='Education', Percent=46), Row(Name='Education', Percent=26), Row(Name='Public Administration, Law, and Justice', Percent=16), Row(Name='Education', Percent=12)], mjsector_namecode=[Row(code='EX', name='Education'), Row(code='EX', name='Education'), Row(code='BX

In [10]:
# creating a table that's a pointer to the dataframe
# becayse SQL statements must be run against a table
df.registerTempTable('world_bank')

In [11]:
# defining a new DF for the result of the SQL query
temp_df = sqlContext.sql("select * from world_bank")
print(type(temp_df))
print('_'* 20)
print(temp_df)

<class 'pyspark.sql.dataframe.DataFrame'>
____________________
DataFrame[_id: struct<$oid:string>, approvalfy: string, board_approval_month: string, boardapprovaldate: string, borrower: string, closingdate: string, country_namecode: string, countrycode: string, countryname: string, countryshortname: string, docty: string, envassesmentcategorycode: string, grantamt: bigint, ibrdcommamt: bigint, id: string, idacommamt: bigint, impagency: string, lendinginstr: string, lendinginstrtype: string, lendprojectcost: bigint, majorsector_percent: array<struct<Name:string,Percent:bigint>>, mjsector_namecode: array<struct<code:string,name:string>>, mjtheme: array<string>, mjtheme_namecode: array<struct<code:string,name:string>>, mjthemecode: string, prodline: string, prodlinetext: string, productlinetype: string, project_abstract: struct<cdata:string>, project_name: string, projectdocs: array<struct<DocDate:string,DocType:string,DocTypeDesc:string,DocURL:string,EntityID:string>>, projectfinancialty

In [12]:
#displaying the results
import pandas as pd
sqlContext.sql('select id, borrower  from world_bank limit 2').toPandas()

Unnamed: 0,id,borrower
0,P129828,FEDERAL DEMOCRATIC REPUBLIC OF ETHIOPIA
1,P144674,GOVERNMENT OF TUNISIA


In [13]:
# group by queries 
query = """
select 
    regionname,
    count(*) as project_count
from world_bank
group by regionname
order by count(*) desc
"""
sqlContext.sql(query).toPandas()

Unnamed: 0,regionname,project_count
0,Africa,152
1,East Asia and Pacific,100
2,Europe and Central Asia,74
3,South Asia,65
4,Middle East and North Africa,54
5,Latin America and Caribbean,53
6,Other,2


In [14]:
sqlContext.sql('select sector.Name from world_bank \
limit 2').show()

+--------------------+
|                Name|
+--------------------+
|[Primary educatio...|
|[Public administr...|
+--------------------+



In [15]:
# converting RDD to DF (there are two ways for that)
# you need to converst RDDs to DFs to run SQL on them
# the main difference between the two is whether the 
# columns are named

In [16]:
import random
data = []
for x in range(1,6):
    random_int = int(random.random() * 10)
    data.append([x, random_int, random_int^2])
# creating RDD
rdd = sc.parallelize(data) 
print(rdd.collect())

[[1, 5, 7], [2, 9, 11], [3, 2, 0], [4, 9, 11], [5, 5, 7]]


In [17]:
# First way :  Applying a schema
StructField?
# used to create a schema with the StructType method
from pyspark.sql.types import *
schemaString = 'ID VAL1 VAL2'
fields = [StructField(field_name, StringType() )\
         for field_name in schemaString.split()]
schema = StructType(fields)

Object `StructField` not found.


In [18]:
schemaex = sqlContext.createDataFrame(rdd, schema)
schemaex.registerTempTable('example')
#print(schemaex.collect())
for row in schemaex.take(2):
    print(row.ID, row.VAL1)

1 5
2 9


In [19]:
sqlContext.sql('select * from example').show()

+---+----+----+
| ID|VAL1|VAL2|
+---+----+----+
|  1|   5|   7|
|  2|   9|  11|
|  3|   2|   0|
|  4|   9|  11|
|  5|   5|   7|
+---+----+----+



In [20]:
# second way : creating rows with named columns
from pyspark.sql import Row
new_rdd = rdd.map(lambda x : Row(id=x[0],val1=x[1],val2=x[2]))
df_  = new_rdd.toDF()
df_.registerTempTable('example1')
sqlContext.sql('select * from example1').toPandas()

Unnamed: 0,id,val1,val2
0,1,5,7
1,2,9,11
2,3,2,0
3,4,9,11
4,5,5,7


In [21]:
# joining tables
query = """
select *
from 
    example as e
inner join 
    example1 as e1
on e.ID = e1.id
"""
sqlContext.sql(query).toPandas() # using SQL
df_.join(schemaex,schemaex['ID']==df_['id']).toPandas() # using python

t1 = df_.toPandas()
t2 = schemaex.toPandas()
t2.join(t1,t2['ID']==t1['id']) # using pandas

Unnamed: 0,ID,VAL1,VAL2,id,val1,val2
0,1,5,7,1,5,7
1,2,9,11,1,5,7
2,3,2,0,1,5,7
3,4,9,11,1,5,7
4,5,5,7,1,5,7


In [63]:
sqlContext.sql('select min(val1) as minimum_val from \
example1').show()

+-----------+
|minimum_val|
+-----------+
|          2|
+-----------+



In [66]:
sqlContext.sql('select* from \
example1').toPandas()#.shape[1]

Unnamed: 0,id,val1,val2
0,1,5,7
1,2,9,11
2,3,2,0
3,4,9,11
4,5,5,7


In [57]:
spark.sql(query)#.show()

AnalysisException: "Table or view not found: `example`.`columns`; line 3 pos 5;\n'Aggregate [unresolvedalias(count(1), None)]\n+- 'Filter ('table_name = example)\n   +- 'UnresolvedRelation `example`.`columns`\n"