# Step 0: Create spark context

In [18]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [19]:
spark

In [20]:
sc

## Step 1: Get data with which we are familiar



### Dataset: Green practices across DC
[documentation](https://opendata.dc.gov/datasets/best-management-practices/data)
![rainbarrel](https://cdn11.bigcommerce.com/s-j602wc6a/products/7096/images/21615/great-american-rainbarrel-trio__96479.1502809069.500.750.jpg?c=2)

We want to know:
- what are the most common BMP_Type by neighborhood? 
- How many instances are on private vs public land?

### Step 2: Create a pandas dataframe

In [21]:
import pandas as pd
pandaDf = pd.read_csv('https://opendata.arcgis.com/datasets/a973c2c7b7c14a918859f3e38bdffdd2_42.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [6]:
pandaDf.head()

Unnamed: 0,X,Y,BMP_ID_NUMBER,BMP_TYPE,BMP_SUB_TYPE,UNDERDRAIN,BMP_GROUP,INSTALLATION_DATE,DATE_REMOVED,CONTRIBUTING_DRAINAGE_AREA_FT2,...,BMP_NUMBER,PLAN_NUMBER,BMP_TYPE_ACCESS,DATE_APPROVED,SEWERSHED,INCLUDE_BUILT,INCLUDE_ALL,WARD,GREEN_ROOF_REBATE,RIVERSMART_HOMES
0,-77.036773,38.958677,R15760-1-2,Shade Tree,,False,Tree Planting and Preservation,2014-05-14T00:00:00.000Z,,0,...,#R15760-1-2,,,,MS4,True,True,4.0,No,Yes
1,-76.938687,38.888273,R11333-1-12,Shade Tree,,False,Tree Planting and Preservation,2017-03-20T00:00:00.000Z,,0,...,#R11333-1-12,,,,MS4,True,True,7.0,No,Yes
2,-76.989573,38.928465,R12966-1-2,Rain Barrel,,False,Rainwater Harvesting,2012-06-07T00:00:00.000Z,,220,...,#R12966-1-2,,,,CSS,True,True,5.0,No,Yes
3,-77.031645,38.954937,R13157-1-6,Shade Tree,,False,Tree Planting and Preservation,2012-09-01T00:00:00.000Z,,0,...,#R13157-1-6,,,,CSS,True,True,4.0,No,Yes
4,-77.0079,38.8276,4719-1-5,Infiltration trench,,False,Infiltration,2018-06-21T00:00:00.000Z,,9,...,#4719-1-5,4719.0,,2016-06-21T00:00:00.000Z,MS4,True,True,8.0,No,No


In [7]:
pd.crosstab(pandaDf.WARD, pandaDf.BMP_GROUP)

BMP_GROUP,Bayscaping,Bioretention,CDA to a Shared BMP,Filtering System,Green Roof,Impervious Surface Disconnection,Infiltration,Land Cover Change,Open Channel,Permeable Pavement,Ponds,Proprietary Practice,Rainwater Harvesting,Storage,Stream Restoration,Tree Planting and Preservation,Wetlands
WARD,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
1.0,89,87,0,50,125,5,37,0,4,78,0,77,202,29,0,248,0
2.0,8,150,0,121,359,9,11,8,5,59,0,106,73,32,0,73,0
3.0,153,250,2,57,72,94,247,0,21,103,7,155,665,19,2,886,1
4.0,393,440,0,23,64,185,62,1,17,177,0,70,1572,17,3,1520,0
5.0,307,496,1,58,109,55,40,3,24,187,4,135,1240,40,0,1336,2
6.0,102,436,1,116,432,27,53,2,26,171,0,147,386,29,0,473,1
7.0,229,256,0,55,50,40,81,0,21,69,2,93,548,11,3,426,0
8.0,108,365,1,38,48,19,109,0,20,105,11,138,210,12,1,286,0


In [10]:
! ls

Intro-to-data-with-spark.ipynb     PySpark_SQL_Cheat_Sheet_Python.pdf
PySpark_Cheat_Sheet_Python.pdf     cheatSheet_pyspark.pdf


### Step 3: Create spark dataframe

In [22]:
myrdd = sc.textFile('Best_Management_Practices.csv').map(lambda line: line.split(","))
# need to make this a df

In [23]:
type(myrdd)

pyspark.rdd.PipelinedRDD

In [25]:
ds = spark.read.csv(path='Best_Management_Practices.csv',
sep=',',encoding='UTF-8',
header=True,inferSchema=True)
ds.head()

Row(X=-77.03677300762558, Y=38.958676906847835, BMP_ID_NUMBER='R15760-1-2', BMP_TYPE='Shade Tree', BMP_SUB_TYPE=None, UNDERDRAIN=False, BMP_GROUP='Tree Planting and Preservation', INSTALLATION_DATE=datetime.datetime(2014, 5, 13, 20, 0), DATE_REMOVED=None, CONTRIBUTING_DRAINAGE_AREA_FT2=0, POST_PROJECT_NATURAL_FT2=None, POST_PROJECT_COMPACTED_FT2=None, POST_PROJECT_IMPERVIOUS_FT2=0.0, POST_PROJECT_BMP_AREA_FT2=None, POST_PROJECT_VEHICULAR_FT2=None, PRE_PROJECT_NATURAL_FT2=None, PRE_PROJECT_COMPACTED_FT2=None, PRE_PROJECT_IMPERVIOUS_FT2=None, PRE_PROJECT_BMP_AREA_FT2=None, PRE_PROJECT_VEHICULAR_FT2=None, STORAGE_VOLUME_FT3=None, RETENTION_VOLUME_FT3=10, ADDITIONAL_VOLUME_TREATED_FT3=0.0, X_COORDINATE=396812.99, Y_COORDINATE=143418.72, MAJOR_DRAINAGE_BASIN='Rock Creek', MINOR_DRAINAGE_BASIN='Rock Creek', NUMBER_OF_TREES=1, PROPRIETARY_PRACTICE_NAME=None, DESCRIBE_PROPRIETARY_PRACTICE=None, PROJECT_TYPE=None, GIS_LAST_MOD_DTTM=datetime.datetime(2019, 6, 17, 2, 0, 18), LATITUDE=38.958669108

In [27]:
ds.persist()

DataFrame[X: double, Y: double, BMP_ID_NUMBER: string, BMP_TYPE: string, BMP_SUB_TYPE: string, UNDERDRAIN: boolean, BMP_GROUP: string, INSTALLATION_DATE: timestamp, DATE_REMOVED: timestamp, CONTRIBUTING_DRAINAGE_AREA_FT2: int, POST_PROJECT_NATURAL_FT2: double, POST_PROJECT_COMPACTED_FT2: double, POST_PROJECT_IMPERVIOUS_FT2: double, POST_PROJECT_BMP_AREA_FT2: int, POST_PROJECT_VEHICULAR_FT2: double, PRE_PROJECT_NATURAL_FT2: int, PRE_PROJECT_COMPACTED_FT2: double, PRE_PROJECT_IMPERVIOUS_FT2: double, PRE_PROJECT_BMP_AREA_FT2: double, PRE_PROJECT_VEHICULAR_FT2: double, STORAGE_VOLUME_FT3: double, RETENTION_VOLUME_FT3: int, ADDITIONAL_VOLUME_TREATED_FT3: double, X_COORDINATE: double, Y_COORDINATE: double, MAJOR_DRAINAGE_BASIN: string, MINOR_DRAINAGE_BASIN: string, NUMBER_OF_TREES: int, PROPRIETARY_PRACTICE_NAME: string, DESCRIBE_PROPRIETARY_PRACTICE: string, PROJECT_TYPE: string, GIS_LAST_MOD_DTTM: timestamp, LATITUDE: double, LONGITUDE: double, OBJECTID: int, BMP_GROUP_ABBREVIATION: string

In [None]:
# Select columns using DF format

In [36]:
def show(df, n=5):
    return df.limit(n).toPandas()

In [37]:
ds.limit(5).toPandas()
# ds.head()
# ds.show(5)

Unnamed: 0,X,Y,BMP_ID_NUMBER,BMP_TYPE,BMP_SUB_TYPE,UNDERDRAIN,BMP_GROUP,INSTALLATION_DATE,DATE_REMOVED,CONTRIBUTING_DRAINAGE_AREA_FT2,...,BMP_NUMBER,PLAN_NUMBER,BMP_TYPE_ACCESS,DATE_APPROVED,SEWERSHED,INCLUDE_BUILT,INCLUDE_ALL,WARD,GREEN_ROOF_REBATE,RIVERSMART_HOMES
0,-77.036773,38.958677,R15760-1-2,Shade Tree,,False,Tree Planting and Preservation,2014-05-13 20:00:00,,0,...,#R15760-1-2,,,NaT,MS4,True,True,4,No,Yes
1,-76.938687,38.888273,R11333-1-12,Shade Tree,,False,Tree Planting and Preservation,2017-03-19 20:00:00,,0,...,#R11333-1-12,,,NaT,MS4,True,True,7,No,Yes
2,-76.989573,38.928465,R12966-1-2,Rain Barrel,,False,Rainwater Harvesting,2012-06-06 20:00:00,,220,...,#R12966-1-2,,,NaT,CSS,True,True,5,No,Yes
3,-77.031645,38.954937,R13157-1-6,Shade Tree,,False,Tree Planting and Preservation,2012-08-31 20:00:00,,0,...,#R13157-1-6,,,NaT,CSS,True,True,4,No,Yes
4,-77.0079,38.8276,4719-1-5,Infiltration trench,,False,Infiltration,2018-06-20 20:00:00,,9,...,#4719-1-5,4719.0,,2016-06-20 20:00:00,MS4,True,True,8,No,No


In [38]:
# Change code to relevant names and variables

import pyspark.sql.functions as F
counts = ds.agg(F.countDistinct('BPM_Type'))

AnalysisException: "cannot resolve '`BPM_Type`' given input columns: [NUMBER_OF_TREES, BMP_TYPE_ACCESS, PUBLIC_RIGHT_OF_WAY, OBJECTID, INSTALLATION_DATE, MAJOR_REGULATED_ACTIVITY, RETENTION_VOLUME_FT3, SRC_GENERATION, POST_PROJECT_NATURAL_FT2, CONTRIBUTING_DRAINAGE_AREA_FT2, GIS_LAST_MOD_DTTM, INCLUDE_ALL, RIVERSMART_HOMES, PRE_PROJECT_VEHICULAR_FT2, BMP_ID_NUMBER, X_COORDINATE, Y_COORDINATE, POST_PROJECT_IMPERVIOUS_FT2, PRE_PROJECT_COMPACTED_FT2, PLAN_NUMBER, POST_PROJECT_BMP_AREA_FT2, PROPRIETARY_PRACTICE_NAME, STORAGE_VOLUME_FT3, ADDITIONAL_VOLUME_TREATED_FT3, SEWERSHED, LATITUDE, MAJOR_DRAINAGE_BASIN, POST_PROJECT_VEHICULAR_FT2, LONGITUDE, UNDERDRAIN, DATE_REMOVED, PRE_PROJECT_NATURAL_FT2, Y, DESCRIBE_PROPRIETARY_PRACTICE, MINOR_DRAINAGE_BASIN, BMP_NUMBER, X, POST_PROJECT_COMPACTED_FT2, GRAY_OR_GREEN_INFRASTRUCTURE, BMP_SUB_TYPE, PRE_PROJECT_IMPERVIOUS_FT2, BMP_GROUP, WARD, GREEN_ROOF_REBATE, BMP_GROUP_ABBREVIATION, DATE_APPROVED, PROJECT_TYPE, PRE_PROJECT_BMP_AREA_FT2, INCLUDE_BUILT, BMP_TYPE, RIVERSMART_REWARDS];;\n'Aggregate [count(distinct 'BPM_Type) AS count(DISTINCT BPM_Type)#2566]\n+- Relation[X#10,Y#11,BMP_ID_NUMBER#12,BMP_TYPE#13,BMP_SUB_TYPE#14,UNDERDRAIN#15,BMP_GROUP#16,INSTALLATION_DATE#17,DATE_REMOVED#18,CONTRIBUTING_DRAINAGE_AREA_FT2#19,POST_PROJECT_NATURAL_FT2#20,POST_PROJECT_COMPACTED_FT2#21,POST_PROJECT_IMPERVIOUS_FT2#22,POST_PROJECT_BMP_AREA_FT2#23,POST_PROJECT_VEHICULAR_FT2#24,PRE_PROJECT_NATURAL_FT2#25,PRE_PROJECT_COMPACTED_FT2#26,PRE_PROJECT_IMPERVIOUS_FT2#27,PRE_PROJECT_BMP_AREA_FT2#28,PRE_PROJECT_VEHICULAR_FT2#29,STORAGE_VOLUME_FT3#30,RETENTION_VOLUME_FT3#31,ADDITIONAL_VOLUME_TREATED_FT3#32,X_COORDINATE#33,... 27 more fields] csv\n"

In [39]:
# query = """
# SELECT overall, COUNT(*)
# FROM reviews
# GROUP BY overall
# ORDER BY overall
# """
# reviews_df.createOrReplaceTempView('reviews')
# output = spark.sql(query)
# show(output, n=1000)

In [42]:
query = """SELECT BMP_TYPE, WARD, COUNT(*)
          FROM water
          GROUP BY BMP_TYPE, WARD
          ORDER BY BMP_TYPE, WARD
          """
ds.createOrReplaceTempView('water')
output = spark.sql(query)
show(output, n=1000)

Unnamed: 0,BMP_TYPE,WARD,count(1)
0,,1.0,1
1,Bayscaping,,15
2,Bayscaping,1.0,89
3,Bayscaping,2.0,8
4,Bayscaping,3.0,153
5,Bayscaping,4.0,393
6,Bayscaping,5.0,307
7,Bayscaping,6.0,102
8,Bayscaping,7.0,229
9,Bayscaping,8.0,108


## Step 4: Practice with some more complex data
Documentation for data can be found [here](http://jmcauley.ucsd.edu/data/amazon/)

In [None]:
# Download data (run this only once)
#!wget http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Toys_and_Games_5.json.gz
#!gunzip reviews_Toys_and_Games_5.json.gz