In [1]:
# import necessary libraries
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
from pyspark.sql import SparkSession
# create sparksession
spark = SparkSession \
    .builder \
    .appName("Pysparkexample") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:
%%timeit
df1=spark.read.csv('Vermont_Vendor_Payments.csv',header='true')

The slowest run took 4.25 times longer than the fastest. This could mean that an intermediate result is being cached.
843 ms ± 467 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
%%timeit
df_pandas=pd.read_csv('Vermont_Vendor_Payments.csv',low_memory=False)

The slowest run took 10.30 times longer than the fastest. This could mean that an intermediate result is being cached.
19.1 s ± 18.3 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


#For this analysis I will read in the data using the inferSchema option and cast the Amount column to a double.

In [7]:
df = spark.read.csv('Vermont_Vendor_Payments.csv', header='true', inferSchema = True)

In [9]:
df = df.withColumn("Amount", df["Amount"].cast("double"))

In [10]:
df.columns

['Quarter Ending',
 'Department',
 'UnitNo',
 'Vendor Number',
 'Vendor',
 'City',
 'State',
 'DeptID Description',
 'DeptID',
 'Amount',
 'Account',
 'AcctNo',
 'Fund Description',
 'Fund']

Basic Spark Methods:

In [11]:
df.columns

['Quarter Ending',
 'Department',
 'UnitNo',
 'Vendor Number',
 'Vendor',
 'City',
 'State',
 'DeptID Description',
 'DeptID',
 'Amount',
 'Account',
 'AcctNo',
 'Fund Description',
 'Fund']

In [12]:
print('The total number of rows:',df.count(),'\n The total number of column is:',len(df.columns))

The total number of rows: 1680170 
 The total number of column is: 14


show() - prints the first 20 rows of the dataframe by default. I chose to only print 5 in this article.

In [13]:
df.show()

+--------------+--------------------+------+-------------+--------------------+----------+-----+--------------------+----------+--------+--------------------+------+--------------------+-----+
|Quarter Ending|          Department|UnitNo|Vendor Number|              Vendor|      City|State|  DeptID Description|    DeptID|  Amount|             Account|AcctNo|    Fund Description| Fund|
+--------------+--------------------+------+-------------+--------------------+----------+-----+--------------------+----------+--------+--------------------+------+--------------------+-----+
|    09/30/2009|Environmental Con...|  6140|   0000276016|1st Run Computer ...|      null|   NY|     WQD - Waterbury|6140040206|   930.0|Rep&Maint-Info Te...|513000|Environmental Per...|21295|
|    09/30/2009|Environmental Con...|  6140|   0000276016|1st Run Computer ...|      null|   NY|Water Supply Divi...|6140040406|   930.0|Rep&Maint-Info Te...|513000|Environmental Per...|21295|
|    09/30/2009|Vermont Veterans'..

In [14]:
df.head

<bound method DataFrame.head of DataFrame[Quarter Ending: string, Department: string, UnitNo: int, Vendor Number: string, Vendor: string, City: string, State: string, DeptID Description: string, DeptID: string, Amount: double, Account: string, AcctNo: string, Fund Description: string, Fund: string]>

Like in pandas, we can call the describe method to get basic numerical summaries of the data. We need to use the show method to print it to the notebook.This does not print very nicely in the notebook

In [15]:
df.describe().show()

+-------+--------------+--------------------+------------------+------------------+--------------------+--------+------------------+------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+
|summary|Quarter Ending|          Department|            UnitNo|     Vendor Number|              Vendor|    City|             State|DeptID Description|              DeptID|              Amount|             Account|           AcctNo|    Fund Description|                Fund|
+-------+--------------+--------------------+------------------+------------------+--------------------+--------+------------------+------------------+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+
|  count|       1680169|             1680169|           1680169|           1680169|             1680169|  937846|           1680121|           1679632|             1680169|   

Querying the data:
One of the strengths of Spark is that it can be queried with each language’s respective Spark library or with Spark SQL. following will demonstrate a few queries using both the pythonic and SQL options.
The following code registers temporary table and selects a few columns using SQL syntax:

In [16]:
# Creating a temporary table query with SQL
df.createOrReplaceTempView('VermontVendor')
spark.sql(
'''
SELECT `Quarter Ending`, Department, Amount, State FROM VermontVendor
LIMIT 10
'''
).show()

+--------------+--------------------+------+-----+
|Quarter Ending|          Department|Amount|State|
+--------------+--------------------+------+-----+
|    09/30/2009|Environmental Con...| 930.0|   NY|
|    09/30/2009|Environmental Con...| 930.0|   NY|
|    09/30/2009|Vermont Veterans'...|  24.0|   CT|
|    09/30/2009|Vermont Veterans'...| 420.0|   CT|
|    09/30/2009|         Corrections| 270.8|   PA|
|    09/30/2009|         Corrections|  35.0|   PA|
|    09/30/2009|       Public Safety| 971.4|   PA|
|    09/30/2009|Agriculture, Food...| 60.59|   TX|
|    09/30/2009|Agriculture, Food...|541.62|   TX|
|    09/30/2009|              Health|283.98|   PA|
+--------------+--------------------+------+-----+



In [18]:
#This code performs pretty much the same operation using pythonic syntax:
df.select('Quarter Ending', 'Department', 'Amount', 'State').show(10)

+--------------+--------------------+------+-----+
|Quarter Ending|          Department|Amount|State|
+--------------+--------------------+------+-----+
|    09/30/2009|Environmental Con...| 930.0|   NY|
|    09/30/2009|Environmental Con...| 930.0|   NY|
|    09/30/2009|Vermont Veterans'...|  24.0|   CT|
|    09/30/2009|Vermont Veterans'...| 420.0|   CT|
|    09/30/2009|         Corrections| 270.8|   PA|
|    09/30/2009|         Corrections|  35.0|   PA|
|    09/30/2009|       Public Safety| 971.4|   PA|
|    09/30/2009|Agriculture, Food...| 60.59|   TX|
|    09/30/2009|Agriculture, Food...|541.62|   TX|
|    09/30/2009|              Health|283.98|   PA|
+--------------+--------------------+------+-----+
only showing top 10 rows



#One thing to note is that the pythonic solution is significantly less code. I like SQL and it’s syntax, so I prefer the SQL interface over the pythonic one.
#I can filter the columns selected in my query using the SQL WHERE clause

Following is the code for filtering the columns selected in my query using the SQL WHERE clause

In [19]:
spark.sql(
'''

SELECT `Quarter Ending`, Department, Amount, State FROM VermontVendor 
WHERE Department = 'Education'
LIMIT 10

'''
).show()

+--------------+----------+-------+-----+
|Quarter Ending|Department| Amount|State|
+--------------+----------+-------+-----+
|    09/30/2009| Education|9423.36|   VT|
|    09/30/2009| Education| 110.03|   IL|
|    09/30/2009| Education| 332.58|   IL|
|    09/30/2009| Education|  60.08|   IL|
|    09/30/2009| Education| 284.83|   IL|
|    09/30/2009| Education| 377.15|   IL|
|    09/30/2009| Education| 114.74|   IL|
|    09/30/2009| Education| 129.72|   IL|
|    09/30/2009| Education| 114.54|   IL|
|    09/30/2009| Education|  375.6|   IL|
+--------------+----------+-------+-----+



A similar result can be achieved with the .filter() method in the python API.

In [20]:
df.select('Quarter Ending', 'Department', 'Amount', 'State').filter(df['Department'] == 'Education').show(10)

+--------------+----------+-------+-----+
|Quarter Ending|Department| Amount|State|
+--------------+----------+-------+-----+
|    09/30/2009| Education|9423.36|   VT|
|    09/30/2009| Education| 110.03|   IL|
|    09/30/2009| Education| 332.58|   IL|
|    09/30/2009| Education|  60.08|   IL|
|    09/30/2009| Education| 284.83|   IL|
|    09/30/2009| Education| 377.15|   IL|
|    09/30/2009| Education| 114.74|   IL|
|    09/30/2009| Education| 129.72|   IL|
|    09/30/2009| Education| 114.54|   IL|
|    09/30/2009| Education|  375.6|   IL|
+--------------+----------+-------+-----+
only showing top 10 rows



In [22]:
spark.conf.get('spark.some.config.option')

'some-value'

#above is an overview of using SQL in spark. Following will have all the important contents and function of PySpark
#creating Spark Dataframe from an RDD, a list or a pandas.DataFrame.

In [26]:
#createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)[source]
#from list
l=[('john','40')]
spark.createDataFrame(l).collect()

[Row(_1='john', _2='40')]

In [27]:
#guving schema
spark.createDataFrame(l,['name','age']).collect()

[Row(name='john', age='40')]

In [28]:
#dictionary
d=[{'name':'John','age':'40'}]
spark.createDataFrame(d).collect()



[Row(age='40', name='John')]

In [29]:
#dataframe from rdd
rdd = sc.parallelize(l)


NameError: name 'sc' is not defined

In [34]:
#Note: rdds can be created only with sparkContext but not with sparkSession
#In Spark 2+, Spark Context is available via Spark Session, so all you need to do is:
#spark.sparkContext().parallelize(l)
rdd=spark.sparkContext.parallelize(l)

In [35]:
#creating dataframe from rdd now:
spark.createDataFrame(rdd).collect()

[Row(_1='john', _2='40')]

In [36]:
#adding schema parameter-list of columns
spark.createDataFrame(rdd,['name','age']).collect()

[Row(name='john', age='40')]

In [37]:
from pyspark.sql import Row

In [43]:
#creating schema from Row method of pyspark.sql
#observe the P,p in persons carefully - map funtion is used to match the schmea to each row in the rdd created above
Person=Row('name','age')
person=rdd.map(lambda r: Person(*r))
df2=spark.createDataFrame(person)
df2.collect()

[Row(name='john', age='40')]

In [46]:
#creating spark dataframe from pandas dataframe
spark.createDataFrame(df2.toPandas()).collect()

[Row(name='john', age='40')]

In [47]:
#getting current active spark session
spark.getActiveSession()

In [48]:
spark.read()

TypeError: 'DataFrameReader' object is not callable

In [49]:
#returns the underlying psarkContext
spark.sparkContext

In [54]:
#Data frame created from SQLContext similar to SparkSession like  above -it is actually replaced by SparkSession in spark 2.0
#class pyspark.sql.SQLContext(sparkContext, sparkSession=None, jsqlContext=None)[source]
from pyspark.sql.SQLContext(none,sparkSession=spark)
sqlContext.createDataFrame(l).collect()



SyntaxError: invalid syntax (<ipython-input-54-cf46469d4728>, line 2)

In [56]:
import pyspark.sql.HiveContext(sparkContext=spark)

SyntaxError: invalid syntax (<ipython-input-56-978da5640423>, line 1)