# Big Data Project for Computing Infrastructures course by Francesco Picciotti (matr.854021)
---
## Task: Data Analysis and Exploration of Hertfordshire and North London Water Quality  

In [1]:
import ibmos2spark
from pyspark.sql import SparkSession
from pyspark.mllib.fpm import FPGrowth

In [2]:
# The code was removed by DSX for sharing.

[Row(_c0=u'0', @id=u'http://environment.data.gov.uk/water-quality/data/measurement/TH-1111137-0092', sample.samplingPoint=u'http://environment.data.gov.uk/water-quality/id/sampling-point/TH-PBRE9999', sample.samplingPoint.notation=u'TH-PBRE9999', sample.samplingPoint.label=u'BRENT CATCHMENT EFFLUENT N.C.R.S', sample.sampleDateTime=u'2013-04-22T17:30:00', determinand.label=u'COD as O2', determinand.definition=u'Chemical Oxygen Demand :- {COD}', determinand.notation=u'92', resultQualifier.notation=None, result=u'138.0', codedResultInterpretation.interpretation=None, determinand.unit.label=u'mg/l', sample.sampledMaterialType.label=u'RIVER / RUNNING SURFACE WATER', sample.isComplianceSample=u'False', sample.purpose.label=u'UNPLANNED REACTIVE MONITORING FORMAL (POLLUTION INCIDENTS)', sample.samplingPoint.easting=u'500000', sample.samplingPoint.northing=u'7'),
 Row(_c0=u'1', @id=u'http://environment.data.gov.uk/water-quality/data/measurement/TH-1111137-0085', sample.samplingPoint=u'http://en

In [3]:
df_data_1.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- @id: string (nullable = true)
 |-- sample.samplingPoint: string (nullable = true)
 |-- sample.samplingPoint.notation: string (nullable = true)
 |-- sample.samplingPoint.label: string (nullable = true)
 |-- sample.sampleDateTime: string (nullable = true)
 |-- determinand.label: string (nullable = true)
 |-- determinand.definition: string (nullable = true)
 |-- determinand.notation: string (nullable = true)
 |-- resultQualifier.notation: string (nullable = true)
 |-- result: string (nullable = true)
 |-- codedResultInterpretation.interpretation: string (nullable = true)
 |-- determinand.unit.label: string (nullable = true)
 |-- sample.sampledMaterialType.label: string (nullable = true)
 |-- sample.isComplianceSample: string (nullable = true)
 |-- sample.purpose.label: string (nullable = true)
 |-- sample.samplingPoint.easting: string (nullable = true)
 |-- sample.samplingPoint.northing: string (nullable = true)



- **_c0**: It's the row index, thus a progressive number
- **@id**: URI identifier
- **sample.samplingPoint**: The URI for making reference to a sampling point
- **sample.samplingPoint.notation**: A shorten string identifing each sampling point e.g.TH-PBRE9999
- **sample.samplingPoint.label**: The full name of the sampling point
- **sample.sampleDateTime**: The date and time when a sample was collected.
- **determinand.label**: A brief string identifing the determinand sampled, which is the property measured
- **determinand.definition**: A string describing the determinand meaning, its definition
- **determinand.notation**: A string which uniquely identifies the determinand
- **resultQualifier.notation**: This feature can be empty orcontaining "<",">" stating that is below or above the regulations
- **result**: The amount of the measured determinand
- **codedResultInterpretation.interpretation**: It is an empty column
- **determinand.unit.label**: The unit measure that expresses the **result** field
- **sample.sampledMaterialType.label**: The kind of material (strech of water, matter, ecc.) from which the determinand is sampled
- **sample.isComplianceSample**: It is a boolean to indicate whether the sample has been collected for a compliance purpose
- **sample.purpose.label**: The string describing the kind of the sampling purpose
- **sample.samplingPoint.easting**: The easting of the point on the British National Grid
- **sample.samplingPoint.northing**: The northing of the point on the British National Grid

In [4]:
dataRDD = df_data_1.rdd
dataRDD.cache()
print ('The total number of sampling is %d' %dataRDD.count())

The total number of sampling is 361101


## Columns exploration

In [5]:
sampling_points = dataRDD.map(lambda x: x['sample.samplingPoint.label']).distinct()
print ('The number of distinct sampling points is %d' %sampling_points.count())
print sampling_points.collect()

The number of distinct sampling points is 597
[u'BOURNE BROOK AT RYE STREET', u'HILL HALL STW :THEYDON MOUNT', u'BOBBINGWORTH MILL C/H STW :BOVINGER', u'G.U.C. AT LOT MEAD LOCK', u'HATFIELD QUARRY WM18 LOWER P3 0031/0123', u'NAZEING BROOK ABOVE HOE LANE TRIBUTARY', u'HOUGHTON BROOK BELOW M1, LUTON', u'STREAM FROM ARKLEY SPRING ABOVE LEE CONF', u'BATHING WATER - HAMPSTEAD MIXED POND', u'LEE AT HOLWELL BRIDGE', u'WHITE RODING STW', u'MOGDEN STW ( 24 HR COMPOSITE )', u'RIVERSIDE STW SPOT SAMPLE', u'VER ABOVE COLNE', u'DE SALIS HOTEL STW :ELSENHAM', u'RODING AT WOODFORD BRIDGE', u'HIPPO POOL REED BED T/E', u'BROOKHOUSE BROOK ABOVE FIDDLERS HAMLET S', u'MILL GREEN B/H M7, HATFIELD', u'WALTHAM ABBEY WTW T/E :WALTHAM ABBEY', u'LEE ABOVE STORT', u'LEE NAV SUBSIDIARY B AT KEIDES WEIR', u'RIVERSIDE GARDEN CENTRE STW :HERTFORD', u'SYMONDSHYDE FARM W36 HATFIELD QUARRY', u'CARTERS POND BH, HATFIELD ESTATE', u'BRENT AT UXBRIDGE ROAD, HANWELL', u'LEE NAV AT MARKFIELD RECREATION GROUND', u'FURNEUX PEL

In [6]:
materials = dataRDD.map(lambda x: x['sample.sampledMaterialType.label']).distinct()
print ('The number of distinct materials is %d' %materials.count())
print materials.collect()

The number of distinct materials is 26
[u'RUNNING SURFACE WATER SEDIMENT', u'ANY SEWAGE', u'ANY WATER', u'SURFACE DRAINAGE', u'ANY FISH - NOT INCLUDING FLATFISH - WHOLE ANIMAL', u'ESTUARINE WATER', u'ESTUARY SEDIMENT', u'TRADE EFFLUENT - FRESHWATER RETURNED ABSTRACTED', u'PRECIPITATION', u'FINAL SEWAGE EFFLUENT', u'ANY SOLID/SEDIMENT - UNSPECIFIED', u'ANY TRADE EFFLUENT', u'WASTE - BULK MATERIAL', u'UNCODED', u'ESTUARY SEDIMENT - INTER TIDAL', u'MYTILUS EDULIS - MUSSEL - WHOLE ANIMAL', u'ANY NON-AQUEOUS LIQUID', u'POND / LAKE / RESERVOIR WATER SEDIMENT', u'POND / LAKE / RESERVOIR WATER', u'CANAL WATER', u'GROUNDWATER', u'BOREHOLE GAS', u'ANY LEACHATE', u'RIVER / RUNNING SURFACE WATER', u'STORM SEWER OVERFLOW DISCHARGE', u'CRUDE SEWAGE']


There are some of those that are repeated!

In [7]:
determinands_lab = dataRDD.map(lambda x: x['determinand.label']).distinct() 
print ('The number of distinct labels is %d' %determinands_lab.count())
print determinands_lab.collect()

The number of distinct labels is 1043
[u'Phosphorus-P', u'2-CHLORO-4-N', u'PCB 008 DW', u'nButylbenzne', u'SiO2 Rv', u'OilTypeQual', u'11DiClPropen', u'4MePhen DW', u'Xylene Tot', u'24ClPhen DW', u'2,4-D', u'2,4-D e e', u'1,2 DCP (dw)', u'Dicofol WW', u'PCB 149 DW', u'13DiClBenzDW', u'IsoPropylBen', u'PCB 114 DW', u'phi 2.0-2.5', u'S1 Leach Dil', u'2,4-D ibe', u'Ni BLM Bio', u'Pb Filtered', u'g -HBCDD WW', u'PirimphosEtD', u'Arsenic - As', u'Ethenylbenzn', u'Dtrgt NncSyn', u'BrClMethane', u'Cu Filtered', u'Ind123pyrWWt', u'phi 9.0-9.5', u'Sld Filt@105', u'4Cl3MePhenol', u'12457-3 DW', u'C>10-40', u'NO2 N DW', u'Dry Method', u'Rotenone', u'PBDE 153', u'PCB Con 156', u'Tin - as Sn', u'Num Bathers', u'Drins Dry Wt', u'Co- Filtered', u'1,3,5TCB D', u'Phorate', u'Uranium - U', u'B-[ghi]-pery', u'HCHGamma WWt', u'SecButylbenz', u'Butan1,3Diol', u'PFnonncAcid', u'B-[a]-pyrene', u'phi 4.0-4.5', u'Dimethoate', u'2,4-D bge', u'As LeachDW', u'PropyzamideD', u'B[a]Pyre Lch', u'ColfmF PMF', u'PCB 1

In [8]:
interpretations = dataRDD.map(lambda x: x['codedResultInterpretation.interpretation']).distinct()
print ('The number of distinct interpretations is %d' %interpretations.count())
print interpretations.collect()

The number of distinct interpretations is 1
[None]


In [9]:
results_not = dataRDD.map(lambda x: x['resultQualifier.notation']).distinct() 
print ('The number of distinct results is %d' %results_not.count())
print results_not.collect()

The number of distinct results is 3
[None, u'<', u'>']


In [10]:
purposes = dataRDD.map(lambda x: x['sample.purpose.label']).distinct() 
print ('The number of distinct purposes is %d' %purposes.count())
print purposes.collect()

The number of distinct purposes is 19
[u'STATUTORY FAILURES (FOLLOW UPS AT DESIGNATED POINTS)', u'MONITORING  (NATIONAL AGENCY POLICY)', u'PLANNED INVESTIGATION (NATIONAL AGENCY POLICY)', u'ENVIRONMENTAL MONITORING STATUTORY (EU DIRECTIVES)', u'PLANNED INVESTIGATION (LOCAL MONITORING)', u'WASTE MONITORING (FORMAL SAMPLE)', u'COMPLIANCE FORMAL (PERMIT)', u'WATER QUALITY UWWTD MONITORING DATA', u'IPPC/IPC MONITORING (FORMAL SAMPLE)', u'MONITORING  (UK GOVT POLICY - NOT GQA OR RE)', u'STATUTORY FAILURES (FOLLOW UPS AT NON-DESIGNATED POINTS)', u'UNPLANNED REACTIVE MONITORING (POLLUTION INCIDENTS)', u'ENVIRONMENTAL MONITORING (GQA & RE ONLY)', u'WASTE MONITORING (AGENCY AUDIT - PERMIT)', u'WASTE MONITORING (AGENCY INVESTIGATION)', u'COMPLIANCE AUDIT (PERMIT)', u'WATER QUALITY OPERATOR SELF MONITORING COMPLIANCE DATA', u'UNPLANNED REACTIVE MONITORING FORMAL (POLLUTION INCIDENTS)', u'PLANNED FORMAL NON-STATUTORY (PERMIT/ENV MON)']


### Frequent pollutants
Let's find the most sampled determinands and in order to focus to forecast the time series regarding one pollutant

In [11]:
SamplesDet = dataRDD.map(lambda x: ((x['sample.samplingPoint.notation'], x['sample.sampleDateTime']),
                       (x['determinand.notation'], x['result']))
           ).groupByKey().mapValues(list)

In [12]:
#Prepare transactions to see the frequent items
transactions = SamplesDet.map(lambda x: list(set([el[0] for el in x[1]])))

In [13]:
#In order to find out the most frequent items, it's used FPGrowth which is a widely known and 
#most optimised algorithm for Frequent Itemset generator 
model = FPGrowth.train(transactions, minSupport=0.4, numPartitions=70)
result = model.freqItemsets().collect()

In [14]:
#Most frequent items sampled from this dataset with a minimum support of 40%
print(result)

[FreqItemset(items=[u'111'], freq=17108), FreqItemset(items=[u'61'], freq=12864), FreqItemset(items=[u'61', u'111'], freq=11315), FreqItemset(items=[u'76'], freq=12218), FreqItemset(items=[u'76', u'61'], freq=11685), FreqItemset(items=[u'76', u'111'], freq=11373), FreqItemset(items=[u'9901'], freq=11912), FreqItemset(items=[u'9901', u'76'], freq=11707), FreqItemset(items=[u'9901', u'76', u'61'], freq=11510), FreqItemset(items=[u'9901', u'61'], freq=11600), FreqItemset(items=[u'9901', u'111'], freq=11143), FreqItemset(items=[u'116'], freq=11695), FreqItemset(items=[u'116', u'111'], freq=11224), FreqItemset(items=[u'180'], freq=11129), FreqItemset(items=[u'180', u'116'], freq=11129), FreqItemset(items=[u'180', u'116', u'111'], freq=11129), FreqItemset(items=[u'180', u'111'], freq=11129), FreqItemset(items=[u'9924'], freq=11110), FreqItemset(items=[u'9924', u'76'], freq=11110), FreqItemset(items=[u'9924', u'9901'], freq=11110), FreqItemset(items=[u'9924', u'9901', u'76'], freq=11110)]


In [15]:
frequentDeterminands = result

In [16]:
# Find the name of the given determinand (pollutant)
def find(name, attr):
    found = dataRDD.filter(lambda x: x[attr+".notation"]==str(name)).map(lambda x: x[attr+".label"]).take(1)
    return found[0]

In [17]:
print(map(lambda x: ([find(elem, "determinand") for elem in x.items], x.freq), frequentDeterminands))

[([u'Ammonia(N)'], 17108), ([u'pH'], 12864), ([u'pH', u'Ammonia(N)'], 11315), ([u'Temp Water'], 12218), ([u'Temp Water', u'pH'], 11685), ([u'Temp Water', u'Ammonia(N)'], 11373), ([u'O Diss %sat'], 11912), ([u'O Diss %sat', u'Temp Water'], 11707), ([u'O Diss %sat', u'Temp Water', u'pH'], 11510), ([u'O Diss %sat', u'pH'], 11600), ([u'O Diss %sat', u'Ammonia(N)'], 11143), ([u'N Oxidised'], 11695), ([u'N Oxidised', u'Ammonia(N)'], 11224), ([u'Orthophospht'], 11129), ([u'Orthophospht', u'N Oxidised'], 11129), ([u'Orthophospht', u'N Oxidised', u'Ammonia(N)'], 11129), ([u'Orthophospht', u'Ammonia(N)'], 11129), ([u'Oxygen Diss'], 11110), ([u'Oxygen Diss', u'Temp Water'], 11110), ([u'Oxygen Diss', u'O Diss %sat'], 11110), ([u'Oxygen Diss', u'O Diss %sat', u'Temp Water'], 11110)]


Therefore the most frequent sampled pollutant is the Ammoniacal Nitrogen as N (Ammonia), which is from food processing waste and harms all the organisms. So the goal is to create a model to forecast the daily quantity of Ammonia.

In [18]:
rdd_ammonia = dataRDD.filter(lambda x: x['determinand.label']=="Ammonia(N)").cache()

## Where the Ammonia has been sampled mostly? What a coincidence!

In [19]:
ammonia_locations = rdd_ammonia.map(lambda x: x['sample.sampledMaterialType.label']).distinct()
print(ammonia_locations.count())
print(ammonia_locations.collect())

15
[u'SURFACE DRAINAGE', u'ANY WATER', u'ANY SEWAGE', u'ESTUARINE WATER', u'TRADE EFFLUENT - FRESHWATER RETURNED ABSTRACTED', u'FINAL SEWAGE EFFLUENT', u'ANY SOLID/SEDIMENT - UNSPECIFIED', u'ANY TRADE EFFLUENT', u'POND / LAKE / RESERVOIR WATER', u'CANAL WATER', u'GROUNDWATER', u'ANY LEACHATE', u'RIVER / RUNNING SURFACE WATER', u'STORM SEWER OVERFLOW DISCHARGE', u'CRUDE SEWAGE']


In [20]:
def add(x,y): 
    return x+y
rdd_ammonia.map(lambda x: (x['sample.sampledMaterialType.label'], 1)).combineByKey(int, add, add).collect()

[(u'SURFACE DRAINAGE', 215),
 (u'ANY WATER', 14),
 (u'ANY SEWAGE', 20),
 (u'ESTUARINE WATER', 40),
 (u'TRADE EFFLUENT - FRESHWATER RETURNED ABSTRACTED', 23),
 (u'FINAL SEWAGE EFFLUENT', 5129),
 (u'ANY SOLID/SEDIMENT - UNSPECIFIED', 3),
 (u'ANY TRADE EFFLUENT', 229),
 (u'POND / LAKE / RESERVOIR WATER', 134),
 (u'CANAL WATER', 6),
 (u'GROUNDWATER', 178),
 (u'ANY LEACHATE', 20),
 (u'RIVER / RUNNING SURFACE WATER', 11080),
 (u'STORM SEWER OVERFLOW DISCHARGE', 6),
 (u'CRUDE SEWAGE', 12)]

Last thing to note is where the Ammonia has been sampled most.

Basically it was mostly present in: 
- Sewage effluents (5129 times)
- River/Running Surface Water (11080 times)
Which is the 95% (16209/17108) of the times!

Especially the presence in the river surface water is meaningful since the Ammonia traces within water may come from:
- fertilizers
- food processing waste
- Industrial wastewater as non-conventional pollutants

The last two can be both the causes since it implies the presence of Industrial sewage and the first would mean a high concentration of ammonia in the undeground water.