# Step 0: Create spark context

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
spark

In [3]:
sc

## Step 1: Get data with which we are familiar



### Dataset: Green practices across DC
[documentation](https://opendata.dc.gov/datasets/best-management-practices/data)
![rainbarrel](https://cdn11.bigcommerce.com/s-j602wc6a/products/7096/images/21615/great-american-rainbarrel-trio__96479.1502809069.500.750.jpg?c=2)

We want to know:
- what are the most common BMP_Type by neighborhood? 
- How many instances are on private vs public land?

### Step 2: Create a pandas dataframe

In [4]:
import pandas as pd
pandaDf = pd.read_csv('https://opendata.arcgis.com/datasets/a973c2c7b7c14a918859f3e38bdffdd2_42.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [7]:
pandaDf.columns

Index(['X', 'Y', 'BMP_ID_NUMBER', 'BMP_TYPE', 'BMP_SUB_TYPE', 'UNDERDRAIN',
       'BMP_GROUP', 'INSTALLATION_DATE', 'DATE_REMOVED',
       'CONTRIBUTING_DRAINAGE_AREA_FT2', 'POST_PROJECT_NATURAL_FT2',
       'POST_PROJECT_COMPACTED_FT2', 'POST_PROJECT_IMPERVIOUS_FT2',
       'POST_PROJECT_BMP_AREA_FT2', 'POST_PROJECT_VEHICULAR_FT2',
       'PRE_PROJECT_NATURAL_FT2', 'PRE_PROJECT_COMPACTED_FT2',
       'PRE_PROJECT_IMPERVIOUS_FT2', 'PRE_PROJECT_BMP_AREA_FT2',
       'PRE_PROJECT_VEHICULAR_FT2', 'STORAGE_VOLUME_FT3',
       'RETENTION_VOLUME_FT3', 'ADDITIONAL_VOLUME_TREATED_FT3', 'X_COORDINATE',
       'Y_COORDINATE', 'MAJOR_DRAINAGE_BASIN', 'MINOR_DRAINAGE_BASIN',
       'NUMBER_OF_TREES', 'PROPRIETARY_PRACTICE_NAME',
       'DESCRIBE_PROPRIETARY_PRACTICE', 'PROJECT_TYPE', 'GIS_LAST_MOD_DTTM',
       'LATITUDE', 'LONGITUDE', 'OBJECTID', 'BMP_GROUP_ABBREVIATION',
       'MAJOR_REGULATED_ACTIVITY', 'SRC_GENERATION', 'RIVERSMART_REWARDS',
       'GRAY_OR_GREEN_INFRASTRUCTURE', 'PUBLIC_RIG

In [263]:
counts=pandaDf.groupby(['WARD'])['BMP_TYPE'].value_counts()
for i in counts.index.levels[0]:
    print(counts[i], counts[i].max())


BMP_TYPE
Shade Tree                                 224
Rain Barrel                                154
Bayscaping                                  89
Proprietary practice                        70
Filtering Systems                           48
Rainwater harvesting                        48
Intensive green roof                        46
Green roof                                  45
Permeable pavement                          42
Extensive green roof                        32
Traditional bioretention                    30
Storage                                     29
Bioretention                                26
Infiltration                                23
Permeable pavers                            19
Rain Garden                                 18
Tree planting                               18
Infiltration trench                         13
Stormwater planters                         10
Porous asphalt                               9
Pervious concrete                            8
Prop

In [252]:
counts=pandaDf.groupby(['WARD', 'BMP_TYPE']).agg({'BMP_TYPE':'count'})
counts.rename(columns = {'BMP_TYPE':'Total'}, inplace=True)
counts.reset_index(inplace=True)
counts.groupby('WARD')['BMP_TYPE'].transform(max)==counts['Total']

0

In [210]:
counts.index.levels[0][counts.index.levels[0]==2.0]

2.0    1
Name: WARD, dtype: int64

In [69]:
counts.groupby('WARD')

<pandas.core.groupby.groupby.SeriesGroupBy object at 0x11ff51400>

### Step 3: Create spark dataframe

In [85]:
#myrdd = sc.textFile('Best_Management_Practices.csv').map(lambda line: line.split(","))
# need to make this a df

sparkDf = spark.read.csv('Best_Management_Practices.csv', sep=',', encoding='UTF-8', header=True, inferSchema=True)

In [87]:
sparkDf.persist()

Row(X=-77.03677300762558, Y=38.958676906847835, BMP_ID_NUMBER='R15760-1-2', BMP_TYPE='Shade Tree', BMP_SUB_TYPE=None, UNDERDRAIN=False, BMP_GROUP='Tree Planting and Preservation', INSTALLATION_DATE=datetime.datetime(2014, 5, 13, 20, 0), DATE_REMOVED=None, CONTRIBUTING_DRAINAGE_AREA_FT2=0, POST_PROJECT_NATURAL_FT2=None, POST_PROJECT_COMPACTED_FT2=None, POST_PROJECT_IMPERVIOUS_FT2=0.0, POST_PROJECT_BMP_AREA_FT2=None, POST_PROJECT_VEHICULAR_FT2=None, PRE_PROJECT_NATURAL_FT2=None, PRE_PROJECT_COMPACTED_FT2=None, PRE_PROJECT_IMPERVIOUS_FT2=None, PRE_PROJECT_BMP_AREA_FT2=None, PRE_PROJECT_VEHICULAR_FT2=None, STORAGE_VOLUME_FT3=None, RETENTION_VOLUME_FT3=10, ADDITIONAL_VOLUME_TREATED_FT3=0.0, X_COORDINATE=396812.99, Y_COORDINATE=143418.72, MAJOR_DRAINAGE_BASIN='Rock Creek', MINOR_DRAINAGE_BASIN='Rock Creek', NUMBER_OF_TREES=1, PROPRIETARY_PRACTICE_NAME=None, DESCRIBE_PROPRIETARY_PRACTICE=None, PROJECT_TYPE=None, GIS_LAST_MOD_DTTM=datetime.datetime(2019, 6, 17, 2, 0, 18), LATITUDE=38.958669108

In [90]:
sparkDf.limit(5).toPandas()
#sparkDf.head()
#sparkDf.show(5)

Unnamed: 0,X,Y,BMP_ID_NUMBER,BMP_TYPE,BMP_SUB_TYPE,UNDERDRAIN,BMP_GROUP,INSTALLATION_DATE,DATE_REMOVED,CONTRIBUTING_DRAINAGE_AREA_FT2,...,BMP_NUMBER,PLAN_NUMBER,BMP_TYPE_ACCESS,DATE_APPROVED,SEWERSHED,INCLUDE_BUILT,INCLUDE_ALL,WARD,GREEN_ROOF_REBATE,RIVERSMART_HOMES
0,-77.036773,38.958677,R15760-1-2,Shade Tree,,False,Tree Planting and Preservation,2014-05-13 20:00:00,,0,...,#R15760-1-2,,,NaT,MS4,True,True,4,No,Yes
1,-76.938687,38.888273,R11333-1-12,Shade Tree,,False,Tree Planting and Preservation,2017-03-19 20:00:00,,0,...,#R11333-1-12,,,NaT,MS4,True,True,7,No,Yes
2,-76.989573,38.928465,R12966-1-2,Rain Barrel,,False,Rainwater Harvesting,2012-06-06 20:00:00,,220,...,#R12966-1-2,,,NaT,CSS,True,True,5,No,Yes
3,-77.031645,38.954937,R13157-1-6,Shade Tree,,False,Tree Planting and Preservation,2012-08-31 20:00:00,,0,...,#R13157-1-6,,,NaT,CSS,True,True,4,No,Yes
4,-77.0079,38.8276,4719-1-5,Infiltration trench,,False,Infiltration,2018-06-20 20:00:00,,9,...,#4719-1-5,4719.0,,2016-06-20 20:00:00,MS4,True,True,8,No,No


In [91]:
# Select columns using DF format

In [92]:
def show(df, n=5):
    return df.limit(n).toPandas()

In [102]:
show(sparkDf, 7)

Unnamed: 0,X,Y,BMP_ID_NUMBER,BMP_TYPE,BMP_SUB_TYPE,UNDERDRAIN,BMP_GROUP,INSTALLATION_DATE,DATE_REMOVED,CONTRIBUTING_DRAINAGE_AREA_FT2,...,BMP_NUMBER,PLAN_NUMBER,BMP_TYPE_ACCESS,DATE_APPROVED,SEWERSHED,INCLUDE_BUILT,INCLUDE_ALL,WARD,GREEN_ROOF_REBATE,RIVERSMART_HOMES
0,-77.036773,38.958677,R15760-1-2,Shade Tree,,False,Tree Planting and Preservation,2014-05-13 20:00:00,,0,...,#R15760-1-2,,,NaT,MS4,True,True,4,No,Yes
1,-76.938687,38.888273,R11333-1-12,Shade Tree,,False,Tree Planting and Preservation,2017-03-19 20:00:00,,0,...,#R11333-1-12,,,NaT,MS4,True,True,7,No,Yes
2,-76.989573,38.928465,R12966-1-2,Rain Barrel,,False,Rainwater Harvesting,2012-06-06 20:00:00,,220,...,#R12966-1-2,,,NaT,CSS,True,True,5,No,Yes
3,-77.031645,38.954937,R13157-1-6,Shade Tree,,False,Tree Planting and Preservation,2012-08-31 20:00:00,,0,...,#R13157-1-6,,,NaT,CSS,True,True,4,No,Yes
4,-77.0079,38.8276,4719-1-5,Infiltration trench,,False,Infiltration,2018-06-20 20:00:00,,9,...,#4719-1-5,4719.0,,2016-06-20 20:00:00,MS4,True,True,8,No,No
5,-77.088417,38.939089,R1346-1-12,Simple disconnection to amended soils,,False,Impervious Surface Disconnection,2013-06-30 20:00:00,,7133,...,#R1346-1-12,,,NaT,MS4,True,True,3,No,No
6,-77.027018,38.943163,5552-3-3,Permeable pavers,Standard,True,Permeable Pavement,2019-05-29 20:00:00,,35872,...,#5552-3-3,5552.0,,2017-08-31 20:00:00,CSS,True,True,4,No,No


In [109]:
# Change code to relevant names and variables

import pyspark.sql.functions as F
counts = sparkDf.agg(F.countDistinct('BMP_Type'))

In [119]:
sparkDf.columns

['X',
 'Y',
 'BMP_ID_NUMBER',
 'BMP_TYPE',
 'BMP_SUB_TYPE',
 'UNDERDRAIN',
 'BMP_GROUP',
 'INSTALLATION_DATE',
 'DATE_REMOVED',
 'CONTRIBUTING_DRAINAGE_AREA_FT2',
 'POST_PROJECT_NATURAL_FT2',
 'POST_PROJECT_COMPACTED_FT2',
 'POST_PROJECT_IMPERVIOUS_FT2',
 'POST_PROJECT_BMP_AREA_FT2',
 'POST_PROJECT_VEHICULAR_FT2',
 'PRE_PROJECT_NATURAL_FT2',
 'PRE_PROJECT_COMPACTED_FT2',
 'PRE_PROJECT_IMPERVIOUS_FT2',
 'PRE_PROJECT_BMP_AREA_FT2',
 'PRE_PROJECT_VEHICULAR_FT2',
 'STORAGE_VOLUME_FT3',
 'RETENTION_VOLUME_FT3',
 'ADDITIONAL_VOLUME_TREATED_FT3',
 'X_COORDINATE',
 'Y_COORDINATE',
 'MAJOR_DRAINAGE_BASIN',
 'MINOR_DRAINAGE_BASIN',
 'NUMBER_OF_TREES',
 'PROPRIETARY_PRACTICE_NAME',
 'DESCRIBE_PROPRIETARY_PRACTICE',
 'PROJECT_TYPE',
 'GIS_LAST_MOD_DTTM',
 'LATITUDE',
 'LONGITUDE',
 'OBJECTID',
 'BMP_GROUP_ABBREVIATION',
 'MAJOR_REGULATED_ACTIVITY',
 'SRC_GENERATION',
 'RIVERSMART_REWARDS',
 'GRAY_OR_GREEN_INFRASTRUCTURE',
 'PUBLIC_RIGHT_OF_WAY',
 'BMP_NUMBER',
 'PLAN_NUMBER',
 'BMP_TYPE_ACCESS',
 

In [116]:
query = """
SELECT WARD, BMP_TYPE, COUNT(*)
FROM water
GROUP BY BMP_TYPE, WARD
ORDER BY BMP_TYPE, WARD
"""
sparkDf.createOrReplaceTempView('water')
output = spark.sql(query)
show(output, n=1000)

Unnamed: 0,WARD,BMP_TYPE,count(1)
0,1.0,,1
1,,Bayscaping,15
2,1.0,Bayscaping,89
3,2.0,Bayscaping,8
4,3.0,Bayscaping,153
5,4.0,Bayscaping,393
6,5.0,Bayscaping,307
7,6.0,Bayscaping,102
8,7.0,Bayscaping,229
9,8.0,Bayscaping,108


## Step 4: Practice with some more complex data
Documentation for data can be found [here](http://jmcauley.ucsd.edu/data/amazon/)

In [None]:
# Download data (run this only once)
#!wget http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Toys_and_Games_5.json.gz
#!gunzip reviews_Toys_and_Games_5.json.gz