# Spark Demo

### Loading data

In [5]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
import databricks.koalas as ks
from pyspark.sql.types import IntegerType
spark = SparkSession \
    .builder \
    .appName("Spark demo time!") \
    .getOrCreate()


#Vermont Vendor payments dataset can be found here:
# https://data.vermont.gov/Finance/Vermont-Vendor-Payments/786x-sbp3
spark_df = spark.read.csv('Vermont_Vendor_Payments.csv', header=True) 
# cast amount col to integer as mixed datatype
spark_df = spark_df.withColumn("Amount", spark_df["Amount"].cast(IntegerType()))
spark_df.show(5)

21/07/15 14:57:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


+--------------+--------------------+------+-------------+--------------------+-----------+-----+-------------------+----------+-------+--------------------+------+--------------------+-----+
|Quarter Ending|          Department|UnitNo|Vendor Number|              Vendor|       City|State| DeptID Description|    DeptID| Amount|             Account|AcctNo|    Fund Description| Fund|
+--------------+--------------------+------+-------------+--------------------+-----------+-----+-------------------+----------+-------+--------------------+------+--------------------+-----+
|    12/31/2019|Vt Housing & Cons...| 09150|   0000002188|Vermont Housing &...| Montpelier|   VT|              Trust|9150120000|1075000|Transfer Out - Co...|720010|Housing & Conserv...|90610|
|    12/31/2019|Vt Housing & Cons...| 09150|   0000375660|Wagner Developmen...|Brattleboro|   VT|            VT REDI|9150293000|   4612|Other Direct Gran...|552990|Housing & Conserv...|90610|
|    12/31/2019|Vt Housing & Cons...| 09

### Exploring Data

In [29]:
spark_df.printSchema()

root
 |-- Quarter Ending: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- UnitNo: string (nullable = true)
 |-- Vendor Number: string (nullable = true)
 |-- Vendor: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- DeptID Description: string (nullable = true)
 |-- DeptID: string (nullable = true)
 |-- Amount: integer (nullable = true)
 |-- Account: string (nullable = true)
 |-- AcctNo: string (nullable = true)
 |-- Fund Description: string (nullable = true)
 |-- Fund: string (nullable = true)



In [30]:
# Avail stats = count, mean, stddev, min, max, percentiles specified as a percentage (e.g., 75%)
stats_spark_df = spark_df.summary("count", "mean", "min", "25%", "75%", "max")
stats_spark_df.show()

                                                                                

+-------+--------------+--------------------+-----------------+------------------+--------------------+--------+------------------+------------------+--------------------+------------------+--------------------+----------------+--------------------+--------------------+
|summary|Quarter Ending|          Department|           UnitNo|     Vendor Number|              Vendor|    City|             State|DeptID Description|              DeptID|            Amount|             Account|          AcctNo|    Fund Description|                Fund|
+-------+--------------+--------------------+-----------------+------------------+--------------------+--------+------------------+------------------+--------------------+------------------+--------------------+----------------+--------------------+--------------------+
|  count|       1714538|             1714538|          1714538|           1714538|             1714538|  972215|           1714490|           1714001|             1714538|           17141

In [31]:
# Small dataset, switch to Pandas dataframe for easier formatting / use
stats_df = spark_df.summary("count", "min", "25%", "75%", "max").toPandas()
stats_df

                                                                                

Unnamed: 0,summary,Quarter Ending,Department,UnitNo,Vendor Number,Vendor,City,State,DeptID Description,DeptID,Amount,Account,AcctNo,Fund Description,Fund
0,count,1714538,1714538,1714538.0,1714538,1714538,972215,1714490,1714001,1714538,1714186,1714538,1714538,1714536,1714537
1,min,03/31/2010,AOT Proprietary Funds,1100.0,0000000002,"""Jewett,Martin A """"Shorty"""" Inc""",0,0,"""""""Admin.","CCV""""""",-2880183,-294.00,-294.00,507200,10000
2,25%,,,2150.0,13095.0,,0.0,0.0,,2.15002E9,68,469.26,516652.0,512015.0,10000.0
3,75%,,,6120.0,207476.0,,0.0,0.0,,6.1200501E9,2281,1.160550263E9,550020.0,520211.0,22005.0
4,max,12/31/2019,Women's Commission,9150.0,SINGLE,"xAd, Inc.",w Berlin,ZZ,Youth at Risk,Seg,1160550371,Youth Development - GF,Water/Sewer,Youth Substance Abuse Safety P,Facilities Operations Fund


In [32]:
# note how much quick a Pandas dataframe operation is - no execution plan!
stat_new = stats_df.T
stat_new.columns =stat_new.iloc[0]
stat_new
x = stat_new[1:]
x = x.reset_index()
x['index'].unique()

array(['Quarter Ending', 'Department', 'UnitNo', 'Vendor Number',
       'Vendor', 'City', 'State', 'DeptID Description', 'DeptID',
       'Amount', 'Account', 'AcctNo', 'Fund Description', 'Fund'],
      dtype=object)

### Re-partitioning a DataFrame

In [33]:
print(f'Num partitions:{spark_df.rdd.getNumPartitions()}')

Num partitions:4


In [34]:
from pyspark.sql.types import IntegerType
from datetime import datetime

# if you are increasing the number of partitions use repartition()(performing full shuffle),
# if you are decreasing the number of partitions use coalesce() (minimizes shuffles)
# https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0
# https://techmagie.wordpress.com/2015/12/19/understanding-spark-partitioning/
start_time = datetime.now()
spark_df.where(spark_df.State =='GA')\
                      .groupBy(['State','Department'])\
                      .sum('Amount')\
                      .withColumnRenamed("sum(Amount)", "total_amount")\
                      .orderBy('Department')\
                      .show(1)
end_time = datetime.now()
total_time = end_time-start_time
print(f'total runtime:{total_time}.  Number of partitions:{4}')



+-----+--------------------+------------+
|State|          Department|total_amount|
+-----+--------------------+------------+
|   GA|AOT Proprietary F...|     7947477|
+-----+--------------------+------------+
only showing top 1 row

total runtime:0:00:08.731151.  Number of partitions:4




In [35]:
num_partitions = 8
spark_df = spark_df.repartition(num_partitions, "State")
start_time = datetime.now()
spark_df.where(spark_df.State =='GA')\
                      .groupBy(['State','Department'])\
                      .sum('Amount')\
                      .withColumnRenamed("sum(Amount)", "total_amount")\
                      .orderBy('Department')\
                      .show(1)
end_time = datetime.now()
total_time = end_time-start_time
print(f'total runtime:{total_time}.  Number of partitions:{num_partitions}')

[Stage 11:>                                                         (0 + 4) / 4]

+-----+--------------------+------------+
|State|          Department|total_amount|
+-----+--------------------+------------+
|   GA|AOT Proprietary F...|     7947477|
+-----+--------------------+------------+
only showing top 1 row

total runtime:0:00:06.433851.  Number of partitions:8


                                                                                

In [36]:
num_partitions = 32
spark_df = spark_df.repartition(num_partitions, "State")
start_time = datetime.now()
spark_df.where(spark_df.State =='GA')\
                      .groupBy(['State','Department'])\
                      .sum('Amount')\
                      .withColumnRenamed("sum(Amount)", "total_amount")\
                      .orderBy('Department')\
                      .show(1)
end_time = datetime.now()
total_time = end_time-start_time
print(f'total runtime:{total_time}.  Number of partitions:{num_partitions}')



+-----+--------------------+------------+
|State|          Department|total_amount|
+-----+--------------------+------------+
|   GA|AOT Proprietary F...|     7947477|
+-----+--------------------+------------+
only showing top 1 row

total runtime:0:00:07.044316.  Number of partitions:32


                                                                                

### PySpark Execution plan

In [37]:
spark_df.where(spark_df.State =='GA')\
                      .groupBy(['State','Department'])\
                      .sum('Amount')\
                      .withColumnRenamed("sum(Amount)", "total_amount")\
                      .orderBy('Department')\
                      .explain()

== Physical Plan ==
*(3) Sort [Department#17 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(Department#17 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#238]
   +- *(2) HashAggregate(keys=[State#22, Department#17], functions=[sum(cast(Amount#44 as bigint))])
      +- *(2) HashAggregate(keys=[State#22, Department#17], functions=[partial_sum(cast(Amount#44 as bigint))])
         +- Exchange hashpartitioning(State#22, 32), REPARTITION_WITH_NUM, [id=#233]
            +- *(1) Project [Department#17, State#22, cast(Amount#25 as int) AS Amount#44]
               +- *(1) Filter (isnotnull(State#22) AND (State#22 = GA))
                  +- FileScan csv [Department#17,State#22,Amount#25] Batched: false, DataFilters: [isnotnull(State#22), (State#22 = GA)], Format: CSV, Location: InMemoryFileIndex[file:/app/Vermont_Vendor_Payments.csv], PartitionFilters: [], PushedFilters: [IsNotNull(State), EqualTo(State,GA)], ReadSchema: struct<Department:string,State:string,Amount:string>




### Selecting Data - SQL versus pyspark

In [41]:
# switch # of partitions back to 8
spark_df = spark_df.coalesce(8)
print(f'switched back to {spark_df.rdd.getNumPartitions()} number of partitions')
# create temp table for 
spark_df.createOrReplaceTempView('vendor_data')
query = ''' SELECT State,Department, sum(cast(Amount as int)) as total_amount
            FROM vendor_data
            where State = 'GA'
            group by State, Department
            order by Department
            '''
spark.sql(query).show(25)

switched back to 8 number of partitions




+-----+--------------------+------------+
|State|          Department|total_amount|
+-----+--------------------+------------+
|   GA|AOT Proprietary F...|     7947477|
|   GA|Administration Ag...|       62945|
|   GA|Agency of Digital...|       93817|
|   GA|Agency of Transpo...|     4934655|
|   GA|Aging and Indepen...|      496304|
|   GA|Agriculture, Food...|       80765|
|   GA|Agriculture, Food...|      499928|
|   GA|Attorney General'...|       60329|
|   GA|Auditor of Accoun...|       11999|
|   GA|Banking Ins Sec H...|       70649|
|   GA|Buildings & Gen S...|      797132|
|   GA|Buildings & Gen S...|      679744|
|   GA|Buildings & Gen S...|  2327021701|
|   GA|Center of Crime V...|       62892|
|   GA|Children and Fami...|     2604481|
|   GA|Children and Fami...|     1979958|
|   GA|Commerce & Commun...|        9265|
|   GA|Commerce & Commun...|       14085|
|   GA|         Corrections|    56926403|
|   GA|Crime Victims' Se...|      121443|
|   GA|Criminal Justice ...|      



In [31]:
spark_df.where(spark_df.State =='GA')\
        .groupBy(['State','Department'])\
        .sum('Amount')\
        .withColumnRenamed("sum(Amount)", "total_amount")\
        .orderBy('Department')\
        .show()



+-----+--------------------+------------+
|State|          Department|total_amount|
+-----+--------------------+------------+
|   GA|AOT Proprietary F...|     7947477|
|   GA|Administration Ag...|       62945|
|   GA|Agency of Digital...|       93817|
|   GA|Agency of Transpo...|     4934655|
|   GA|Aging and Indepen...|      496304|
|   GA|Agriculture, Food...|       80765|
|   GA|Agriculture, Food...|      499928|
|   GA|Attorney General'...|       60329|
|   GA|Auditor of Accoun...|       11999|
|   GA|Banking Ins Sec H...|       70649|
|   GA|Buildings & Gen S...|      797132|
|   GA|Buildings & Gen S...|      679744|
|   GA|Buildings & Gen S...|  2327021701|
|   GA|Center of Crime V...|       62892|
|   GA|Children and Fami...|     2604481|
|   GA|Children and Fami...|     1979958|
|   GA|Commerce & Commun...|        9265|
|   GA|Commerce & Commun...|       14085|
|   GA|         Corrections|    56926403|
|   GA|Crime Victims' Se...|      121443|
+-----+--------------------+------

                                                                                

### Introducing Koalas 
- Pandas code conventions with Spark functionality in background
- Standard Pyspark will likely be faster
- Not all Pandas functionality is supported but does reduce learning curve
- Read more about Koalas here: https://koalas.readthedocs.io/en/latest/

### Adding an additional dataset via webscraping
- Current data just has state abbreviation, lets add state description
- Add in state information from https://www.bls.gov/respondents/mwr/electronic-data-interchange/appendix-d-usps-state-abbreviations-and-fips-codes.htm

#### step1 - get data from website & parse into HTML tags using BeautifulSoup

In [1]:
import requests
from bs4 import BeautifulSoup

url = 'https://www.bls.gov/respondents/mwr/electronic-data-interchange/appendix-d-usps-state-abbreviations-and-fips-codes.htm'
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
soup


<!DOCTYPE HTML>

<html lang="en-us">
<head>
<!-- P1 -->
<title>Appendix D ‐ USPS State Abbreviations and FIPS Codes :  U.S. Bureau of Labor Statistics</title>
<meta charset="utf-8"/>
<meta content="Appendix D ‐ USPS State Abbreviations and FIPS Codes" name="description">
<meta content="2020-04-23" name="date"/>
<meta content="" name="keywords"/>
<meta content="" name="surveys"/>
<meta content="" name="programs"/>
<meta content="0.5" name="search-priority"/>
<!-- START include/global/head_cms.stm -->
<script id="_fed_an_ua_tag" language="javascript" src="https://dap.digitalgov.gov/Universal-Federated-Analytics-Min.js?agency=DOL&amp;subagency=BLS&amp;yt=true"></script>
<script>
(function (g) {
var d = document, am = d.createElement('script'), h = d.head || d.getElementsByTagName("head")[0], fsr = 'fsReady',
aex = {
  "src": "//gateway.foresee.com/sites/bls.gov/production/gateway.min.js",
  "type": "text/javascript",
  "async": "true",
  "data-vendor": "fs",
  "data-role": "gateway"
};
f

#### step2  - find the HTML for the table

In [2]:
state_table = soup.findAll("table")[1]
state_table

<table border="1" cellpadding="2" cellspacing="0" width="602"><tbody><tr><td align="middle" height="34" valign="top" width="26%"><strong>State</strong></td><td align="middle" height="34" valign="top" width="13%"><strong>Postal<br> Abbr.</br></strong></td><td align="middle" height="34" valign="top" width="11%"><strong>FIPS<br> Code</br></strong></td><td align="middle" height="34" valign="top" width="27%"><strong>State</strong></td><td align="middle" height="34" valign="top" width="13%"><strong>Postal<br/> Abbr.</strong></td><td align="middle" height="34" valign="top" width="11%"><strong>FIPS<br/> Code</strong></td></tr><tr><td align="left" height="21" valign="top" width="26%">Alabama</td><td align="middle" height="21" valign="top" width="13%">AL</td><td align="middle" height="21" valign="top" width="11%">01</td><td align="left" height="21" valign="top" width="27%">Nebraska</td><td align="middle" height="21" valign="top" width="13%">NE</td><td align="middle" height="21" valign="top" widt

### step3 - Get data into a Structured state

In [3]:
table_body = state_table.find('tbody')
states_data = []
rows = table_body.find_all('tr')
for row in rows:
    cols = row.find_all('td')
    cols = [ele.text.strip() for ele in cols]
    states_data.append([ele for ele in cols if ele]) # Get rid of empty values
states_data

[['State', 'Postal Abbr.', 'FIPS Code', 'State', 'Postal Abbr.', 'FIPS Code'],
 ['Alabama', 'AL', '01', 'Nebraska', 'NE', '31'],
 ['Alaska', 'AK', '02', 'Nevada', 'NV', '32'],
 ['Arizona', 'AZ', '04', 'New Hampshire', 'NH', '33'],
 ['Arkansas', 'AR', '05', 'New Jersey', 'NJ', '34'],
 ['California', 'CA', '06', 'New Mexico', 'NM', '35'],
 ['Colorado', 'CO', '08', 'New York', 'NY', '36'],
 ['Connecticut', 'CT', '09', 'North Carolina', 'NC', '37'],
 ['Delaware', 'DE', '10', 'North Dakota', 'ND', '38'],
 ['District of Columbia', 'DC', '11', 'Ohio', 'OH', '39'],
 ['Florida', 'FL', '12', 'Oklahoma', 'OK', '40'],
 ['Georgia', 'GA', '13', 'Oregon', 'OR', '41'],
 ['Hawaii', 'HI', '15', 'Pennsylvania', 'PA', '42'],
 ['Idaho', 'ID', '16', 'Puerto Rico', 'PR', '72'],
 ['Illinois', 'IL', '17', 'Rhode Island', 'RI', '44'],
 ['Indiana', 'IN', '18', 'South Carolina', 'SC', '45'],
 ['Iowa', 'IA', '19', 'South Dakota', 'SD', '46'],
 ['Kansas', 'KS', '20', 'Tennessee', 'TN', '47'],
 ['Kentucky', 'KY', '2

In [25]:
cols = ['state_name','state_cd','fips_code']
# get first set of states in cols 1-3
parta = pd.DataFrame(states_data).iloc[1:,0:3]
parta.columns = cols
# get 2nd set of states in cols 4 onwards
partb = pd.DataFrame(states_data).iloc[1:,3:]
partb.columns = cols
# merge to one dataset of states
state_df = pd.concat([parta,partb])
state_df

Unnamed: 0,state_name,state_cd,fips_code
1,Alabama,AL,1.0
2,Alaska,AK,2.0
3,Arizona,AZ,4.0
4,Arkansas,AR,5.0
5,California,CA,6.0
6,Colorado,CO,8.0
7,Connecticut,CT,9.0
8,Delaware,DE,10.0
9,District of Columbia,DC,11.0
10,Florida,FL,12.0


In [46]:
state_kdf = ks.DataFrame(state_df)
# switch to Spark
state_spark_df = state_kdf.to_spark()
state_spark_df.show(5)

+----------+--------+---------+
|state_name|state_cd|fips_code|
+----------+--------+---------+
|   Alabama|      AL|       01|
|    Alaska|      AK|       02|
|   Arizona|      AZ|       04|
|  Arkansas|      AR|       05|
|California|      CA|       06|
+----------+--------+---------+
only showing top 5 rows



In [47]:
# create temp table & repartition data 20==>5
#state_spark_df = state_spark_df.repartition(2, "State")
state_spark_df.createOrReplaceTempView('state_data')
query = ''' SELECT
                v.State as state_cd
                ,s.state_name
                ,s.fips_code
                ,sum(Amount) as total_amount
            FROM vendor_data as v
            left outer join state_data as s
            on v.State = s.state_cd
            where v.State like '%A'
            group by 
                 v.State
                 ,s.State_name
                 ,fips_code
            order by v.State
            '''
spark.sql(query).show(25)

                                                                                

+--------+-------------+---------+------------+
|state_cd|   state_name|fips_code|total_amount|
+--------+-------------+---------+------------+
|      CA|   California|       06|   149397906|
|      GA|      Georgia|       13|  2525837977|
|      IA|         Iowa|       19|     7685316|
|      LA|    Louisiana|       22|    22688027|
|      MA|Massachusetts|       25| 26037848838|
|      PA| Pennsylvania|       42|  2639579806|
|      VA|     Virginia|       51|  3444284400|
|      WA|   Washington|       53|    12008666|
+--------+-------------+---------+------------+

