In [1]:
from typing import Callable

#### Publish a  message to a topic

In [4]:
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
project_id = "elated-cathode-343517"
# topic_id = "projects/elated-cathode-343517/topics/SEB-data-engineering-task"
topic_id = "SEB-data-engineering-task"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback

for i in range(10):
    data = str(f"Test-{i}")
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    # Non-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")


4515158904691133
4515158904691134
4515158904691135
4515158904691136
4515158904691137
4515158904691138
4515158904691139
4515158904691140
4515158904691141
4515158904691142
Published messages with error handler to projects/elated-cathode-343517/topics/SEB-data-engineering-task.


#### Pull messages from a subscription

In [5]:
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
project_id = "elated-cathode-343517"
subscription_id = "SEB-data-engineering-task-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Listening for messages on projects/elated-cathode-343517/subscriptions/SEB-data-engineering-task-sub..

Received Message {
  data: b'Test-0'
  ordering_key: ''
  attributes: {}
}.Received Message {
  data: b'Test-1'
  ordering_key: ''
  attributes: {}
}.

Received Message {
  data: b'Test-2'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Test-3'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Test-4'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Test-5'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Test-6'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Test-7'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Test-8'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Test-9'
  ordering_key: ''
  attributes: {}
}.


### Data Analysis

In [1]:
import pandas as pd
pd.options.display.max_columns=999

In [2]:
df1 = pd.read_csv("Waterbase_v2018_1_T_WISE4_AggregatedData.csv", low_memory = False)

In [3]:
df1.head(2)

Unnamed: 0,monitoringSiteIdentifier,monitoringSiteIdentifierScheme,parameterWaterBodyCategory,observedPropertyDeterminandCode,procedureAnalysedFraction,procedureAnalysedMedia,resultUom,phenomenonTimeReferenceYear,parameterSamplingPeriod,procedureLOQValue,resultNumberOfSamples,resultQualityNumberOfSamplesBelowLOQ,resultQualityMinimumBelowLOQ,resultMinimumValue,resultQualityMeanBelowLOQ,resultMeanValue,resultQualityMaximumBelowLOQ,resultMaximumValue,resultQualityMedianBelowLOQ,resultMedianValue,resultStandardDeviationValue,procedureAnalyticalMethod,parameterSampleDepth,resultObservationStatus,Remarks,metadata_versionId,metadata_beginLifeSpanVersion,metadata_statusCode,metadata_observationStatus,metadata_statements,UID
0,UK25034780,eionetMonitoringSiteCode,RW,CAS_35065-28-2,total,water,ug/L,2012,2012-01--2012-12,0.001,4.0,4.0,1.0,0.001,1.0,0.001,1.0,0.001,,,0.0,,0.0,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,9233859
1,UK25034780,eionetMonitoringSiteCode,RW,CAS_7012-37-5,total,water,ug/L,2012,2012-01--2012-12,0.001,4.0,4.0,1.0,0.001,1.0,0.001,1.0,0.001,,,0.0,,0.0,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,9233860


In [16]:
df.dtypes

monitoringSiteIdentifier                 object
monitoringSiteIdentifierScheme           object
parameterWaterBodyCategory               object
observedPropertyDeterminandCode          object
procedureAnalysedFraction                object
procedureAnalysedMedia                   object
resultUom                                object
phenomenonTimeReferenceYear               int64
parameterSamplingPeriod                  object
procedureLOQValue                       float64
resultNumberOfSamples                   float64
resultQualityNumberOfSamplesBelowLOQ    float64
resultQualityMinimumBelowLOQ            float64
resultMinimumValue                      float64
resultQualityMeanBelowLOQ               float64
resultMeanValue                         float64
resultQualityMaximumBelowLOQ            float64
resultMaximumValue                      float64
resultQualityMedianBelowLOQ             float64
resultMedianValue                       float64
resultStandardDeviationValue            

In [17]:
df.shape

(3211183, 31)

In [4]:
df2 = pd.read_csv("Waterbase_v2018_1_T_WISE4_AggregatedDataByWaterBody.csv", low_memory = False)
df2.head(2)

Unnamed: 0,waterBodyIdentifier,waterBodyIdentifierScheme,parameterWaterBodyCategory,observedPropertyDeterminandCode,procedureAnalysedFraction,procedureAnalysedMedia,resultUom,phenomenonTimeReferenceYear,parameterSamplingPeriod,procedureLOQValue,resultNumberOfSamples,resultQualityNumberOfSamplesBelowLOQ,resultQualityMinimumBelowLOQ,resultMinimumValue,resultQualityMeanBelowLOQ,resultMeanValue,resultQualityMaximumBelowLOQ,resultMaximumValue,resultQualityMedianBelowLOQ,resultMedianValue,resultStandardDeviationValue,resultNumberOfSitesClass1,resultNumberOfSitesClass2,resultNumberOfSitesClass3,resultNumberOfSitesClass4,resultNumberOfSitesClass5,resultObservationStatus,Remarks,metadata_versionId,metadata_beginLifeSpanVersion,metadata_statusCode,metadata_observationStatus,metadata_statements,UID
0,RODL06,euGroundWaterBodyCode,GW,CAS_14797-55-8,total,water,mg{NO3}/L,2007,2007-01-01--2007-12-31,0.0,17,0.0,,0.035,,8.97,,16.7,,,,5.0,4.0,0.0,0.0,,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,17431
1,ROIL05,euGroundWaterBodyCode,GW,EEA_3132-01-2,total,water,mg/L,2007,2007-01-01--2007-12-31,0.0,15,0.0,,0.57,,4.266,,8.5,,,,1.0,6.0,2.0,,,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,17472


In [19]:
df.dtypes

monitoringSiteIdentifier                 object
monitoringSiteIdentifierScheme           object
parameterWaterBodyCategory               object
observedPropertyDeterminandCode          object
procedureAnalysedFraction                object
procedureAnalysedMedia                   object
resultUom                                object
phenomenonTimeReferenceYear               int64
parameterSamplingPeriod                  object
procedureLOQValue                       float64
resultNumberOfSamples                   float64
resultQualityNumberOfSamplesBelowLOQ    float64
resultQualityMinimumBelowLOQ            float64
resultMinimumValue                      float64
resultQualityMeanBelowLOQ               float64
resultMeanValue                         float64
resultQualityMaximumBelowLOQ            float64
resultMaximumValue                      float64
resultQualityMedianBelowLOQ             float64
resultMedianValue                       float64
resultStandardDeviationValue            

In [20]:

df.shape

(3211183, 31)

In [58]:
BiologyEQRClassificationProcedure = pd.read_csv("Waterbase_v2018_1_T_WISE4_BiologyEQRClassificationProcedure.csv", low_memory = False)
BiologyEQRClassificationProcedure.head(2)

Unnamed: 0,CountryCode,observedPropertyDeterminandBiologyEQRCode,parameterWaterBodyCategory,parameterNCSWaterBodyType,parameterWFDIntercalibrationWaterBodyType,parameterNaturalAWBHMWB,parameterICStatusOfDeterminandBiologyEQR,parameterBoundaryValueClasses12,parameterBoundaryValueClasses23,parameterBoundaryValueClasses34,parameterBoundaryValueClasses45,procedureBiologicalAnalyticalMethodDescription,resultObservationStatus,Remarks,metadata_versionId,metadata_beginLifeSpanVersion,metadata_statusCode,metadata_observationStatus,metadata_statements,UID
0,AT,EEA_11-04-1,LW,B1,,Natural,0,0.8,0.6,0.4,0.2,,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1
1,AT,EEA_11-04-1,LW,B2,,Natural,0,0.8,0.6,0.4,0.2,,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2


In [24]:
df3.dtypes

CountryCode                                        object
observedPropertyDeterminandBiologyEQRCode          object
parameterWaterBodyCategory                         object
parameterNCSWaterBodyType                          object
parameterWFDIntercalibrationWaterBodyType          object
parameterNaturalAWBHMWB                            object
parameterICStatusOfDeterminandBiologyEQR            int64
parameterBoundaryValueClasses12                   float64
parameterBoundaryValueClasses23                   float64
parameterBoundaryValueClasses34                   float64
parameterBoundaryValueClasses45                   float64
procedureBiologicalAnalyticalMethodDescription     object
resultObservationStatus                            object
Remarks                                            object
metadata_versionId                                 object
metadata_beginLifeSpanVersion                      object
metadata_statusCode                                object
metadata_obser

In [27]:
df3.shape


(2553, 20)

In [44]:
BiologyEQRData = pd.read_csv("Waterbase_v2018_1_T_WISE4_BiologyEQRData.csv", low_memory = False)
#print(df4.head(),df4.dtypes, df4.shape)

In [45]:
BiologyEQRData.head(2)

Unnamed: 0,monitoringSiteIdentifier,monitoringSiteIdentifierScheme,parameterWaterBodyCategory,parameterNCSWaterBodyType,observedPropertyDeterminandBiologyEQRCode,phenomenonTimeReferenceYear,parameterSamplingPeriod,resultEcologicalStatusClassValue,resultNumberOfSamples,resultEQRValue,resultNormalisedEQRValue,resultObservationStatus,Remarks,metadata_versionId,metadata_beginLifeSpanVersion,metadata_statusCode,metadata_observationStatus,metadata_statements,UID
0,NL94_OUDMS_A,euMonitoringSiteCode,RW,R8,EEA_123-01-3,2010,2010-06--2010-08,345,1,0.509,,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,U,QC_LEGACY_182: resultNormalisedEQRValue cannot...,24044
1,NL94_OUDMS_A,euMonitoringSiteCode,RW,R8,EEA_13-01-4,2007,2007-09--2007-11,345,5,0.518,,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,U,QC_LEGACY_182: resultNormalisedEQRValue cannot...,24045


In [15]:
df5 = pd.read_csv("Waterbase_v2018_1_T_WISE4_DisaggregatedData.csv")
#print(df5.head(),df5.dtypes, df5.shape)

  exec(code_obj, self.user_global_ns, self.user_ns)


In [16]:
df5.head()

Unnamed: 0,monitoringSiteIdentifier,monitoringSiteIdentifierScheme,parameterWaterBodyCategory,observedPropertyDeterminandCode,procedureAnalysedFraction,procedureAnalysedMedia,resultUom,phenomenonTimeSamplingDate,resultObservedValue,resultQualityObservedValueBelowLOQ,procedureLOQValue,procedureAnalyticalMethod,parameterSampleDepth,resultObservationStatus,Remarks,metadata_versionId,metadata_beginLifeSpanVersion,metadata_statusCode,metadata_observationStatus,metadata_statements,UID
0,IT01044005,euMonitoringSiteCode,RW,CAS_7440-38-2,total,water,ug/L,2011-06-21 00:00:00.000,3.0,1,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118968
1,IT01044005,euMonitoringSiteCode,RW,CAS_7439-89-6,total,water,ug/L,2011-06-21 00:00:00.000,50.0,1,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118969
2,IT01044005,euMonitoringSiteCode,RW,CAS_7440-66-6,total,water,ug/L,2011-08-25 00:00:00.000,50.0,1,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118970
3,IT01044005,euMonitoringSiteCode,RW,CAS_7440-66-6,total,water,ug/L,2011-07-27 00:00:00.000,50.0,1,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118971
4,IT01044005,euMonitoringSiteCode,RW,CAS_7440-02-0,total,water,ug/L,2011-09-28 00:00:00.000,4.5,0,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118972


In [42]:
MonitoringSite_DerivedData = pd.read_csv("Waterbase_v2018_1_WISE4_MonitoringSite_DerivedData.csv")

In [43]:
MonitoringSite_DerivedData.head(2)

Unnamed: 0,monitoringSiteIdentifier,monitoringSiteIdentifierScheme,waterBodyIdentifier,waterBodyIdentifierScheme,confidentialityStatus,lon,lat
0,ALRV_201,eionetMonitoringSiteCode,,,F,19.8817,41.6808
1,IT20-0241-CF2000103-ST01,eionetMonitoringSiteCode,,,F,,


In [18]:
df3.CountryCode.unique()

array(['AT', 'BG', 'NL', 'SI', 'IE', 'IT', 'BE', 'GB', 'HR', 'RO', 'ES',
       'SE', 'SK', 'FR', 'FI', 'EE', 'PL', 'CY', 'DE', 'LT', 'NO', 'PT',
       'LU', 'CH', 'LV'], dtype=object)

In [17]:
df5.head()

Unnamed: 0,monitoringSiteIdentifier,monitoringSiteIdentifierScheme,parameterWaterBodyCategory,observedPropertyDeterminandCode,procedureAnalysedFraction,procedureAnalysedMedia,resultUom,phenomenonTimeSamplingDate,resultObservedValue,resultQualityObservedValueBelowLOQ,procedureLOQValue,procedureAnalyticalMethod,parameterSampleDepth,resultObservationStatus,Remarks,metadata_versionId,metadata_beginLifeSpanVersion,metadata_statusCode,metadata_observationStatus,metadata_statements,UID
0,IT01044005,euMonitoringSiteCode,RW,CAS_7440-38-2,total,water,ug/L,2011-06-21 00:00:00.000,3.0,1,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118968
1,IT01044005,euMonitoringSiteCode,RW,CAS_7439-89-6,total,water,ug/L,2011-06-21 00:00:00.000,50.0,1,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118969
2,IT01044005,euMonitoringSiteCode,RW,CAS_7440-66-6,total,water,ug/L,2011-08-25 00:00:00.000,50.0,1,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118970
3,IT01044005,euMonitoringSiteCode,RW,CAS_7440-66-6,total,water,ug/L,2011-07-27 00:00:00.000,50.0,1,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118971
4,IT01044005,euMonitoringSiteCode,RW,CAS_7440-02-0,total,water,ug/L,2011-09-28 00:00:00.000,4.5,0,,"APHA Standard Methods, Ed. 21th 2005, 3120",,,,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,1118972


In [25]:
df5[df5.parameterWaterBodyCategory == 'LW'].monitoringSiteIdentifier.unique()

array(['LTL65', 'LTL343', 'LTL312', ..., 'UK0CYMW50262', 'UK0CYMW67007',
       'UK0CYMW23193'], dtype=object)

In [21]:
df3[df3.CountryCode == 'SE']

Unnamed: 0,CountryCode,observedPropertyDeterminandBiologyEQRCode,parameterWaterBodyCategory,parameterNCSWaterBodyType,parameterWFDIntercalibrationWaterBodyType,parameterNaturalAWBHMWB,parameterICStatusOfDeterminandBiologyEQR,parameterBoundaryValueClasses12,parameterBoundaryValueClasses23,parameterBoundaryValueClasses34,parameterBoundaryValueClasses45,procedureBiologicalAnalyticalMethodDescription,resultObservationStatus,Remarks,metadata_versionId,metadata_beginLifeSpanVersion,metadata_statusCode,metadata_observationStatus,metadata_statements,UID
143,SE,EEA_11-03-0,LW,"North of Limes norrlandicus, colour>30 mg Pt/l",,Natural,0,0.89,0.67,0.44,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2460
144,SE,EEA_11-03-0,LW,"South of L. norrlandicus, Water colour<30 mg Pt/l",,Natural,0,0.9,0.7,0.4,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2461
145,SE,EEA_11-03-0,LW,"South of L. norrlandicus, Water colour>30 mg Pt/l",,Natural,0,0.88,0.67,0.33,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2462
146,SE,EEA_11-03-0,LW,"South of Limes norrlandicus, colour<30 mg Pt/l",,Natural,0,0.9,0.7,0.4,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2463
147,SE,EEA_11-03-0,LW,"South of Limes norrlandicus, colour>30 mg Pt/l",,Natural,0,0.88,0.67,0.33,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2464
151,SE,EEA_11-04-1,LW,"North of Limes norrlandicus, colour<30 mg Pt/l",,Natural,0,0.5,0.33,0.2,,SS-EN 15204:2006,,NOTE! Only 4 classes: No of species incl in in...,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2468
152,SE,EEA_11-04-1,LW,"North of Limes norrlandicus, colour>30 mg Pt/l",,Natural,0,0.5,0.33,0.2,,SS-EN 15204:2006,,NOTE! Only 4 classes: No of species incl in in...,http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2469
479,SE,EEA_11-03-0,LW,Alpine,,Natural,0,0.8,0.6,0.4,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2456
480,SE,EEA_11-03-0,LW,"North of L. norrlandicus, Water colour<30 mg Pt/l",,Natural,0,0.67,0.56,0.44,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2457
481,SE,EEA_11-03-0,LW,"North of L. norrlandicus, Water colour>30 mg Pt/l",,Natural,0,0.89,0.67,0.44,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2458


In [10]:
df1.columns

Index(['monitoringSiteIdentifier', 'monitoringSiteIdentifierScheme',
       'parameterWaterBodyCategory', 'observedPropertyDeterminandCode',
       'procedureAnalysedFraction', 'procedureAnalysedMedia', 'resultUom',
       'phenomenonTimeReferenceYear', 'parameterSamplingPeriod',
       'procedureLOQValue', 'resultNumberOfSamples',
       'resultQualityNumberOfSamplesBelowLOQ', 'resultQualityMinimumBelowLOQ',
       'resultMinimumValue', 'resultQualityMeanBelowLOQ', 'resultMeanValue',
       'resultQualityMaximumBelowLOQ', 'resultMaximumValue',
       'resultQualityMedianBelowLOQ', 'resultMedianValue',
       'resultStandardDeviationValue', 'procedureAnalyticalMethod',
       'parameterSampleDepth', 'resultObservationStatus', 'Remarks',
       'metadata_versionId', 'metadata_beginLifeSpanVersion',
       'metadata_statusCode', 'metadata_observationStatus',
       'metadata_statements', 'UID'],
      dtype='object')

In [11]:
df2.columns

Index(['waterBodyIdentifier', 'waterBodyIdentifierScheme',
       'parameterWaterBodyCategory', 'observedPropertyDeterminandCode',
       'procedureAnalysedFraction', 'procedureAnalysedMedia', 'resultUom',
       'phenomenonTimeReferenceYear', 'parameterSamplingPeriod',
       'procedureLOQValue', 'resultNumberOfSamples',
       'resultQualityNumberOfSamplesBelowLOQ', 'resultQualityMinimumBelowLOQ',
       'resultMinimumValue', 'resultQualityMeanBelowLOQ', 'resultMeanValue',
       'resultQualityMaximumBelowLOQ', 'resultMaximumValue',
       'resultQualityMedianBelowLOQ', 'resultMedianValue',
       'resultStandardDeviationValue', 'resultNumberOfSitesClass1',
       'resultNumberOfSitesClass2', 'resultNumberOfSitesClass3',
       'resultNumberOfSitesClass4', 'resultNumberOfSitesClass5',
       'resultObservationStatus', 'Remarks', 'metadata_versionId',
       'metadata_beginLifeSpanVersion', 'metadata_statusCode',
       'metadata_observationStatus', 'metadata_statements', 'UID'],
    

In [18]:
df5.columns

Index(['monitoringSiteIdentifier', 'monitoringSiteIdentifierScheme',
       'parameterWaterBodyCategory', 'observedPropertyDeterminandCode',
       'procedureAnalysedFraction', 'procedureAnalysedMedia', 'resultUom',
       'phenomenonTimeSamplingDate', 'resultObservedValue',
       'resultQualityObservedValueBelowLOQ', 'procedureLOQValue',
       'procedureAnalyticalMethod', 'parameterSampleDepth',
       'resultObservationStatus', 'Remarks', 'metadata_versionId',
       'metadata_beginLifeSpanVersion', 'metadata_statusCode',
       'metadata_observationStatus', 'metadata_statements', 'UID'],
      dtype='object')

In [19]:
df6.columns

Index(['monitoringSiteIdentifier', 'monitoringSiteIdentifierScheme',
       'waterBodyIdentifier', 'waterBodyIdentifierScheme',
       'confidentialityStatus', 'lon', 'lat'],
      dtype='object')

In [12]:
df3.columns

Index(['CountryCode', 'observedPropertyDeterminandBiologyEQRCode',
       'parameterWaterBodyCategory', 'parameterNCSWaterBodyType',
       'parameterWFDIntercalibrationWaterBodyType', 'parameterNaturalAWBHMWB',
       'parameterICStatusOfDeterminandBiologyEQR',
       'parameterBoundaryValueClasses12', 'parameterBoundaryValueClasses23',
       'parameterBoundaryValueClasses34', 'parameterBoundaryValueClasses45',
       'procedureBiologicalAnalyticalMethodDescription',
       'resultObservationStatus', 'Remarks', 'metadata_versionId',
       'metadata_beginLifeSpanVersion', 'metadata_statusCode',
       'metadata_observationStatus', 'metadata_statements', 'UID'],
      dtype='object')

In [13]:
df4.columns

Index(['monitoringSiteIdentifier', 'monitoringSiteIdentifierScheme',
       'parameterWaterBodyCategory', 'parameterNCSWaterBodyType',
       'observedPropertyDeterminandBiologyEQRCode',
       'phenomenonTimeReferenceYear', 'parameterSamplingPeriod',
       'resultEcologicalStatusClassValue', 'resultNumberOfSamples',
       'resultEQRValue', 'resultNormalisedEQRValue', 'resultObservationStatus',
       'Remarks', 'metadata_versionId', 'metadata_beginLifeSpanVersion',
       'metadata_statusCode', 'metadata_observationStatus',
       'metadata_statements', 'UID'],
      dtype='object')

In [20]:
(df4.monitoringSiteIdentifier.isin(df6.monitoringSiteIdentifier.unique())).sum()

29716

In [23]:
(df6.monitoringSiteIdentifier.isin(df4.monitoringSiteIdentifier.unique())).sum()

9776

In [24]:
df6.shape

(56464, 7)

In [21]:
df4.shape

(29741, 19)

In [22]:
29741 - 29716

25

In [25]:
print(df2.shape, df6.shape)

(20251, 34) (56464, 7)


In [26]:
(df2.waterBodyIdentifier.isin(df6.waterBodyIdentifier.unique())).sum()

7026

In [27]:
(df6.waterBodyIdentifier.isin(df2.waterBodyIdentifier.unique())).sum()

2823

### Data Quality on Total WaterBodies

In [29]:
df6.head(2)

Unnamed: 0,monitoringSiteIdentifier,monitoringSiteIdentifierScheme,waterBodyIdentifier,waterBodyIdentifierScheme,confidentialityStatus,lon,lat
0,ALRV_201,eionetMonitoringSiteCode,,,F,19.8817,41.6808
1,IT20-0241-CF2000103-ST01,eionetMonitoringSiteCode,,,F,,


#### 4.3% of waterbodies have null waterBodyIdentifier

In [32]:
df6.waterBodyIdentifier.isna().sum() / df6.shape[0]


0.043514451686030034

#### 23.4% of waterbodies have missing geographical coordinates (latitude and longitude)

In [36]:
round(100 * df6.lon.isna().sum() / df6.shape[0], 2)

23.39

#### 1.4% of data has duplicate monitoringSiteIdentifier

In [57]:
round(100 * MonitoringSite_DerivedData[MonitoringSite_DerivedData.monitoringSiteIdentifier.duplicated()].shape[0] / MonitoringSite_DerivedData.shape[0], 2)

1.46

In [55]:
MonitoringSite_DerivedData[MonitoringSite_DerivedData.monitoringSiteIdentifier == "BEVL_VMM_581000"]

Unnamed: 0,monitoringSiteIdentifier,monitoringSiteIdentifierScheme,waterBodyIdentifier,waterBodyIdentifierScheme,confidentialityStatus,lon,lat
4580,BEVL_VMM_581000,euMonitoringSiteCode,BEVL08_48,euSurfaceWaterBodyCode,F,3.18859,50.8011
4581,BEVL_VMM_581000,euMonitoringSiteCode,BEVL08_48,euSurfaceWaterBodyCode,F,3.18859,50.8011


## For country Sweden (SE)

In [63]:
Sweden = BiologyEQRClassificationProcedure[BiologyEQRClassificationProcedure.CountryCode == "SE"].reset_index(drop=True)

In [None]:
Sweden.sort_values('observedPropertyDeterminandBiologyEQRCode').head()

Unnamed: 0,CountryCode,observedPropertyDeterminandBiologyEQRCode,parameterWaterBodyCategory,parameterNCSWaterBodyType,parameterWFDIntercalibrationWaterBodyType,parameterNaturalAWBHMWB,parameterICStatusOfDeterminandBiologyEQR,parameterBoundaryValueClasses12,parameterBoundaryValueClasses23,parameterBoundaryValueClasses34,parameterBoundaryValueClasses45,procedureBiologicalAnalyticalMethodDescription,resultObservationStatus,Remarks,metadata_versionId,metadata_beginLifeSpanVersion,metadata_statusCode,metadata_observationStatus,metadata_statements,UID
0,SE,EEA_11-03-0,LW,"North of Limes norrlandicus, colour>30 mg Pt/l",,Natural,0,0.89,0.67,0.44,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2460
1,SE,EEA_11-03-0,LW,"South of L. norrlandicus, Water colour<30 mg Pt/l",,Natural,0,0.9,0.7,0.4,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2461
2,SE,EEA_11-03-0,LW,"South of L. norrlandicus, Water colour>30 mg Pt/l",,Natural,0,0.88,0.67,0.33,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2462
3,SE,EEA_11-03-0,LW,"South of Limes norrlandicus, colour<30 mg Pt/l",,Natural,0,0.9,0.7,0.4,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2463
4,SE,EEA_11-03-0,LW,"South of Limes norrlandicus, colour>30 mg Pt/l",,Natural,0,0.88,0.67,0.33,,SS-EN 15204:2006,,"NOTE! Only 4 classes: Approx neutral, Moderate...",http://discomap.eea.europa.eu/data/wisesoe/der...,2015-11-30 00:00:00.000,experimental,A,,2464


In [82]:
cols = list(set(Sweden.columns) - set(['UID', 'parameterNCSWaterBodyType']))

In [85]:
Sweden[Sweden[cols].duplicated()].shape

(9, 20)

In [84]:
Sweden.shape

(58, 20)

In [86]:
Sweden.isna().sum()

CountryCode                                        0
observedPropertyDeterminandBiologyEQRCode          0
parameterWaterBodyCategory                         0
parameterNCSWaterBodyType                          0
parameterWFDIntercalibrationWaterBodyType         30
parameterNaturalAWBHMWB                            0
parameterICStatusOfDeterminandBiologyEQR           0
parameterBoundaryValueClasses12                    0
parameterBoundaryValueClasses23                    0
parameterBoundaryValueClasses34                    0
parameterBoundaryValueClasses45                   28
procedureBiologicalAnalyticalMethodDescription     2
resultObservationStatus                           46
Remarks                                            4
metadata_versionId                                 0
metadata_beginLifeSpanVersion                      0
metadata_statusCode                                0
metadata_observationStatus                         0
metadata_statements                           

In [115]:
Sweden_EQRData = pd.merge(
    Sweden,
    df4,
    how='inner',
    on='observedPropertyDeterminandBiologyEQRCode'
)

In [113]:
Merged_BiologyEQRData = pd.merge(
    MonitoringSite_DerivedData,
    df4,
    how='inner',
    on='monitoringSiteIdentifier'
)

In [114]:
Sweden_BiologyEQRData = pd.merge(
    Sweden_EQRData,
    Merged_BiologyEQRData,
    on='monitoringSiteIdentifier'
)

In [119]:
Sweden_BiologyEQRData.waterBodyIdentifier.unique().shape

(7263,)

In [120]:
MonitoringSite_DerivedData.waterBodyIdentifier.unique().shape

(22289,)

#### We do have monitoring data but that is not associated to any waterbody

In [123]:
Sweden_BiologyEQRData.waterBodyIdentifier.isna().sum()

5201

In [121]:
7263/22289

0.3258558033110503

In [93]:
print(df6.shape, df4.shape)

(56464, 7) (29741, 19)


In [103]:
Merged_BiologyEQRData.dtypes

monitoringSiteIdentifier                      object
monitoringSiteIdentifierScheme_x              object
waterBodyIdentifier                           object
waterBodyIdentifierScheme                     object
confidentialityStatus                         object
lon                                          float64
lat                                          float64
monitoringSiteIdentifierScheme_y              object
parameterWaterBodyCategory                    object
parameterNCSWaterBodyType                     object
observedPropertyDeterminandBiologyEQRCode     object
phenomenonTimeReferenceYear                    int64
parameterSamplingPeriod                       object
resultEcologicalStatusClassValue               int64
resultNumberOfSamples                          int64
resultEQRValue                               float64
resultNormalisedEQRValue                     float64
resultObservationStatus                       object
Remarks                                       

In [None]:
{
    "CountryCode": "SE",
    "WaterBodies": [
        {
            waterbody 1 ,
            monitoringInfo: [
                {
                    monitoring info 1
                },
                {
                    monitoring info 2
                }
            ]
        },
        {
            waterbody 2,            
            monitoringInfo: [
                {
                    monitoring info 1
                },
                {
                    monitoring info 2
                }
            ]
        }
    ]
}

### Export to AVRO

In [132]:
from fastavro import writer, reader, parse_schema

In [128]:
Sweden.dtypes

CountryCode                                        object
observedPropertyDeterminandBiologyEQRCode          object
parameterWaterBodyCategory                         object
parameterNCSWaterBodyType                          object
parameterWFDIntercalibrationWaterBodyType          object
parameterNaturalAWBHMWB                            object
parameterICStatusOfDeterminandBiologyEQR            int64
parameterBoundaryValueClasses12                   float64
parameterBoundaryValueClasses23                   float64
parameterBoundaryValueClasses34                   float64
parameterBoundaryValueClasses45                   float64
procedureBiologicalAnalyticalMethodDescription     object
resultObservationStatus                            object
Remarks                                            object
metadata_versionId                                 object
metadata_beginLifeSpanVersion                      object
metadata_statusCode                                object
metadata_obser

In [141]:
Sweden.shape

(58, 20)

In [142]:
Sweden.isna().sum()

CountryCode                                        0
observedPropertyDeterminandBiologyEQRCode          0
parameterWaterBodyCategory                         0
parameterNCSWaterBodyType                          0
parameterWFDIntercalibrationWaterBodyType          0
parameterNaturalAWBHMWB                            0
parameterICStatusOfDeterminandBiologyEQR           0
parameterBoundaryValueClasses12                    0
parameterBoundaryValueClasses23                    0
parameterBoundaryValueClasses34                    0
parameterBoundaryValueClasses45                   28
procedureBiologicalAnalyticalMethodDescription     2
resultObservationStatus                           46
Remarks                                            4
metadata_versionId                                 0
metadata_beginLifeSpanVersion                      0
metadata_statusCode                                0
metadata_observationStatus                         0
metadata_statements                           

In [146]:
Sweden.parameterWFDIntercalibrationWaterBodyType.fillna('', inplace=True)
Sweden.resultObservationStatus.fillna('', inplace=True)
Sweden.Remarks.fillna('', inplace=True)
Sweden.procedureBiologicalAnalyticalMethodDescription.fillna('', inplace=True)

In [147]:
schema = {
    'doc': 'Waterbase - Water quality',
    'name': 'Water Quality',
    'namespace': 'Quality',
    'type': 'record',
    'fields': [
        {'name': 'CountryCode', 'type': 'string'},
        {'name': 'observedPropertyDeterminandBiologyEQRCode', 'type': 'string'},
        {'name': 'parameterWaterBodyCategory', 'type': 'string'},
        {'name': 'parameterNCSWaterBodyType', 'type': 'string'},
        {'name': 'parameterWFDIntercalibrationWaterBodyType', 'type': 'string'},
        {'name': 'parameterNaturalAWBHMWB', 'type': 'string'},
        {'name': 'parameterICStatusOfDeterminandBiologyEQR', 'type': 'int'},
        {'name': 'parameterBoundaryValueClasses12', 'type': 'float'},
        {'name': 'parameterBoundaryValueClasses23', 'type': 'float'},
        {'name': 'parameterBoundaryValueClasses34', 'type': 'float'},
        {'name': 'parameterBoundaryValueClasses45', 'type': 'float'},
        {'name': 'procedureBiologicalAnalyticalMethodDescription', 'type': 'string'},
        {'name': 'resultObservationStatus', 'type': 'string'},
        {'name': 'Remarks', 'type': 'string'},
        {'name': 'metadata_versionId', 'type': 'string'},
        {'name': 'metadata_beginLifeSpanVersion', 'type': 'string'},
        {'name': 'metadata_statusCode', 'type': 'string'},
        {'name': 'metadata_observationStatus', 'type': 'string'},
        {'name': 'metadata_statements', 'type': 'float'},
        {'name': 'UID', 'type': 'int'},   
    ]
}
parsed_schema = parse_schema(schema)

In [148]:
# 2. Convert pd.DataFrame to records - list of dictionaries
records = Sweden.to_dict('records')

# 3. Write to Avro file
with open('Waterbase_v2018_1_WISE4_MonitoringSite_DerivedData.avro', 'wb') as out:
    writer(out, parsed_schema, records)

In [150]:
df4.columns

Index(['monitoringSiteIdentifier', 'monitoringSiteIdentifierScheme',
       'parameterWaterBodyCategory', 'parameterNCSWaterBodyType',
       'observedPropertyDeterminandBiologyEQRCode',
       'phenomenonTimeReferenceYear', 'parameterSamplingPeriod',
       'resultEcologicalStatusClassValue', 'resultNumberOfSamples',
       'resultEQRValue', 'resultNormalisedEQRValue', 'resultObservationStatus',
       'Remarks', 'metadata_versionId', 'metadata_beginLifeSpanVersion',
       'metadata_statusCode', 'metadata_observationStatus',
       'metadata_statements', 'UID'],
      dtype='object')

#### Returns only the observedPropertyDeterminandBiologyEQRCode which are in Sweden DF

In [226]:
Sweden_BiologyEQRData = df4[
    df4.observedPropertyDeterminandBiologyEQRCode.isin(
        Sweden.observedPropertyDeterminandBiologyEQRCode.unique()
    )
]

In [227]:
df4.shape

(29741, 19)

In [228]:
for k,v in Sweden_BiologyEQRData.dtypes.to_dict().items():
    print({"name": k, "type":'string' if v == object else 'int' if v == 'int' else 'float'})

{'name': 'monitoringSiteIdentifier', 'type': 'string'}
{'name': 'monitoringSiteIdentifierScheme', 'type': 'string'}
{'name': 'parameterWaterBodyCategory', 'type': 'string'}
{'name': 'parameterNCSWaterBodyType', 'type': 'string'}
{'name': 'observedPropertyDeterminandBiologyEQRCode', 'type': 'string'}
{'name': 'phenomenonTimeReferenceYear', 'type': 'int'}
{'name': 'parameterSamplingPeriod', 'type': 'string'}
{'name': 'resultEcologicalStatusClassValue', 'type': 'int'}
{'name': 'resultNumberOfSamples', 'type': 'int'}
{'name': 'resultEQRValue', 'type': 'float'}
{'name': 'resultNormalisedEQRValue', 'type': 'float'}
{'name': 'resultObservationStatus', 'type': 'string'}
{'name': 'Remarks', 'type': 'string'}
{'name': 'metadata_versionId', 'type': 'string'}
{'name': 'metadata_beginLifeSpanVersion', 'type': 'string'}
{'name': 'metadata_statusCode', 'type': 'string'}
{'name': 'metadata_observationStatus', 'type': 'string'}
{'name': 'metadata_statements', 'type': 'string'}
{'name': 'UID', 'type': '

In [229]:
for k,v in Sweden_BiologyEQRData.dtypes.to_dict().items():
    if v == object and Sweden_BiologyEQRData[k].isna().sum():
        print(v)
        Sweden_BiologyEQRData[k].fillna('', inplace=True)

object
object
object
object
object


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return self._update_inplace(result)


In [230]:
Sweden_BiologyEQRData.isna().sum()

monitoringSiteIdentifier                        0
monitoringSiteIdentifierScheme                  0
parameterWaterBodyCategory                      0
parameterNCSWaterBodyType                       0
observedPropertyDeterminandBiologyEQRCode       0
phenomenonTimeReferenceYear                     0
parameterSamplingPeriod                         0
resultEcologicalStatusClassValue                0
resultNumberOfSamples                           0
resultEQRValue                               1896
resultNormalisedEQRValue                     8616
resultObservationStatus                         0
Remarks                                         0
metadata_versionId                              0
metadata_beginLifeSpanVersion                   0
metadata_statusCode                             0
metadata_observationStatus                      0
metadata_statements                             0
UID                                             0
dtype: int64

In [231]:
Sweden_BiologyEQRData.parameterNCSWaterBodyType.fillna('', inplace=True)
Sweden_BiologyEQRData.parameterSamplingPeriod.fillna('', inplace=True)
Sweden_BiologyEQRData.resultObservationStatus.fillna('', inplace=True)
Sweden_BiologyEQRData.Remarks.fillna('', inplace=True)
Sweden_BiologyEQRData.metadata_statements.fillna('', inplace=True)

In [232]:
schema = {
    'doc': 'Waterbase - Water quality',
    'name': 'Water Quality - BiologyEQRData',
    'namespace': 'BiologyEQRData',
    'type': 'record',
    'fields': [
        {'name': 'monitoringSiteIdentifier', 'type': 'string'},
        {'name': 'monitoringSiteIdentifierScheme', 'type': 'string'},
        {'name': 'parameterWaterBodyCategory', 'type': 'string'},
        {'name': 'parameterNCSWaterBodyType', 'type': 'string'},
        {'name': 'observedPropertyDeterminandBiologyEQRCode', 'type': 'string'},
        {'name': 'phenomenonTimeReferenceYear', 'type': 'int'},
        {'name': 'parameterSamplingPeriod', 'type': 'string'},
        {'name': 'resultEcologicalStatusClassValue', 'type': 'int'},
        {'name': 'resultNumberOfSamples', 'type': 'int'},
        {'name': 'resultEQRValue', 'type': 'float'},
        {'name': 'resultNormalisedEQRValue', 'type': 'float'},
        {'name': 'resultObservationStatus', 'type': 'string'},
        {'name': 'Remarks', 'type': 'string'},
        {'name': 'metadata_versionId', 'type': 'string'},
        {'name': 'metadata_beginLifeSpanVersion', 'type': 'string'},
        {'name': 'metadata_statusCode', 'type': 'string'},
        {'name': 'metadata_observationStatus', 'type': 'string'},
        {'name': 'metadata_statements', 'type': 'string'},
        {'name': 'UID', 'type': 'int'},
    ]
}
parsed_schema = parse_schema(schema)

In [233]:
# 2. Convert pd.DataFrame to records - list of dictionaries
records = Sweden_BiologyEQRData.to_dict('records')

# 3. Write to Avro file
with open('Waterbase_v2018_1_T_WISE4_BiologyEQRData.avro', 'wb') as out:
    writer(out, parsed_schema, records)

In [188]:
Sweden_BiologyEQRData.columns

Index(['monitoringSiteIdentifier', 'monitoringSiteIdentifierScheme',
       'parameterWaterBodyCategory', 'parameterNCSWaterBodyType',
       'observedPropertyDeterminandBiologyEQRCode',
       'phenomenonTimeReferenceYear', 'parameterSamplingPeriod',
       'resultEcologicalStatusClassValue', 'resultNumberOfSamples',
       'resultEQRValue', 'resultNormalisedEQRValue', 'resultObservationStatus',
       'Remarks', 'metadata_versionId', 'metadata_beginLifeSpanVersion',
       'metadata_statusCode', 'metadata_observationStatus',
       'metadata_statements', 'UID'],
      dtype='object')

In [192]:
Sweden_df1 = df1[
    df1.monitoringSiteIdentifier.isin(Sweden_BiologyEQRData.monitoringSiteIdentifier.unique())
]

In [194]:
for k,v in Sweden_df1.dtypes.to_dict().items():
    if v == object and Sweden_df1[k].isna().sum():
        print(k)
        Sweden_df1[k].fillna('', inplace=True)

In [195]:
for k,v in Sweden_df1.dtypes.to_dict().items():
    print({"name": k, "type":'string' if v == object else 'int' if v == 'int' else 'float'})

{'name': 'monitoringSiteIdentifier', 'type': 'string'}
{'name': 'monitoringSiteIdentifierScheme', 'type': 'string'}
{'name': 'parameterWaterBodyCategory', 'type': 'string'}
{'name': 'observedPropertyDeterminandCode', 'type': 'string'}
{'name': 'procedureAnalysedFraction', 'type': 'string'}
{'name': 'procedureAnalysedMedia', 'type': 'string'}
{'name': 'resultUom', 'type': 'string'}
{'name': 'phenomenonTimeReferenceYear', 'type': 'int'}
{'name': 'parameterSamplingPeriod', 'type': 'string'}
{'name': 'procedureLOQValue', 'type': 'float'}
{'name': 'resultNumberOfSamples', 'type': 'float'}
{'name': 'resultQualityNumberOfSamplesBelowLOQ', 'type': 'float'}
{'name': 'resultQualityMinimumBelowLOQ', 'type': 'float'}
{'name': 'resultMinimumValue', 'type': 'float'}
{'name': 'resultQualityMeanBelowLOQ', 'type': 'float'}
{'name': 'resultMeanValue', 'type': 'float'}
{'name': 'resultQualityMaximumBelowLOQ', 'type': 'float'}
{'name': 'resultMaximumValue', 'type': 'float'}
{'name': 'resultQualityMedianBe

In [196]:
schema = {
    'doc': 'Waterbase - Water quality',
    'name': 'Water Quality - BiologyEQRData',
    'namespace': 'BiologyEQRData',
    'type': 'record',
    'fields': [
        {'name': 'monitoringSiteIdentifier', 'type': 'string'},
        {'name': 'monitoringSiteIdentifierScheme', 'type': 'string'},
        {'name': 'parameterWaterBodyCategory', 'type': 'string'},
        {'name': 'observedPropertyDeterminandCode', 'type': 'string'},
        {'name': 'procedureAnalysedFraction', 'type': 'string'},
        {'name': 'procedureAnalysedMedia', 'type': 'string'},
        {'name': 'resultUom', 'type': 'string'},
        {'name': 'phenomenonTimeReferenceYear', 'type': 'int'},
        {'name': 'parameterSamplingPeriod', 'type': 'string'},
        {'name': 'procedureLOQValue', 'type': 'float'},
        {'name': 'resultNumberOfSamples', 'type': 'float'},
        {'name': 'resultQualityNumberOfSamplesBelowLOQ', 'type': 'float'},
        {'name': 'resultQualityMinimumBelowLOQ', 'type': 'float'},
        {'name': 'resultMinimumValue', 'type': 'float'},
        {'name': 'resultQualityMeanBelowLOQ', 'type': 'float'},
        {'name': 'resultMeanValue', 'type': 'float'},
        {'name': 'resultQualityMaximumBelowLOQ', 'type': 'float'},
        {'name': 'resultMaximumValue', 'type': 'float'},
        {'name': 'resultQualityMedianBelowLOQ', 'type': 'float'},
        {'name': 'resultMedianValue', 'type': 'float'},
        {'name': 'resultStandardDeviationValue', 'type': 'float'},
        {'name': 'procedureAnalyticalMethod', 'type': 'string'},
        {'name': 'parameterSampleDepth', 'type': 'float'},
        {'name': 'resultObservationStatus', 'type': 'string'},
        {'name': 'Remarks', 'type': 'string'},
        {'name': 'metadata_versionId', 'type': 'string'},
        {'name': 'metadata_beginLifeSpanVersion', 'type': 'string'},
        {'name': 'metadata_statusCode', 'type': 'string'},
        {'name': 'metadata_observationStatus', 'type': 'string'},
        {'name': 'metadata_statements', 'type': 'string'},
        {'name': 'UID', 'type': 'int'},
    ]
}
parsed_schema = parse_schema(schema)

In [197]:
# 2. Convert pd.DataFrame to records - list of dictionaries
records = Sweden_df1.to_dict('records')

# 3. Write to Avro file
with open('Waterbase_v2018_1_T_WISE4_AggregatedData.avro', 'wb') as out:
    writer(out, parsed_schema, records)

In [200]:
Sweden_df2 = df2[
    df2.observedPropertyDeterminandCode.isin(Sweden_df1.observedPropertyDeterminandCode.unique())
]

In [203]:
for k,v in Sweden_df2.dtypes.to_dict().items():
    if v == object and Sweden_df2[k].isna().sum():
        print(k)
        Sweden_df2[k].fillna('', inplace=True)

parameterSamplingPeriod
resultObservationStatus
Remarks
metadata_statements


In [204]:
for k,v in Sweden_df2.dtypes.to_dict().items():
    print({"name": k, "type":'string' if v == object else 'int' if v == 'int' else 'float'})

{'name': 'waterBodyIdentifier', 'type': 'string'}
{'name': 'waterBodyIdentifierScheme', 'type': 'string'}
{'name': 'parameterWaterBodyCategory', 'type': 'string'}
{'name': 'observedPropertyDeterminandCode', 'type': 'string'}
{'name': 'procedureAnalysedFraction', 'type': 'string'}
{'name': 'procedureAnalysedMedia', 'type': 'string'}
{'name': 'resultUom', 'type': 'string'}
{'name': 'phenomenonTimeReferenceYear', 'type': 'int'}
{'name': 'parameterSamplingPeriod', 'type': 'string'}
{'name': 'procedureLOQValue', 'type': 'float'}
{'name': 'resultNumberOfSamples', 'type': 'int'}
{'name': 'resultQualityNumberOfSamplesBelowLOQ', 'type': 'float'}
{'name': 'resultQualityMinimumBelowLOQ', 'type': 'float'}
{'name': 'resultMinimumValue', 'type': 'float'}
{'name': 'resultQualityMeanBelowLOQ', 'type': 'float'}
{'name': 'resultMeanValue', 'type': 'float'}
{'name': 'resultQualityMaximumBelowLOQ', 'type': 'float'}
{'name': 'resultMaximumValue', 'type': 'float'}
{'name': 'resultQualityMedianBelowLOQ', 'ty

In [205]:
schema = {
    'doc': 'Waterbase - Water quality',
    'name': 'Water Quality - BiologyEQRData',
    'namespace': 'BiologyEQRData',
    'type': 'record',
    'fields': [
        {'name': 'waterBodyIdentifier', 'type': 'string'},
        {'name': 'waterBodyIdentifierScheme', 'type': 'string'},
        {'name': 'parameterWaterBodyCategory', 'type': 'string'},
        {'name': 'observedPropertyDeterminandCode', 'type': 'string'},
        {'name': 'procedureAnalysedFraction', 'type': 'string'},
        {'name': 'procedureAnalysedMedia', 'type': 'string'},
        {'name': 'resultUom', 'type': 'string'},
        {'name': 'phenomenonTimeReferenceYear', 'type': 'int'},
        {'name': 'parameterSamplingPeriod', 'type': 'string'},
        {'name': 'procedureLOQValue', 'type': 'float'},
        {'name': 'resultNumberOfSamples', 'type': 'int'},
        {'name': 'resultQualityNumberOfSamplesBelowLOQ', 'type': 'float'},
        {'name': 'resultQualityMinimumBelowLOQ', 'type': 'float'},
        {'name': 'resultMinimumValue', 'type': 'float'},
        {'name': 'resultQualityMeanBelowLOQ', 'type': 'float'},
        {'name': 'resultMeanValue', 'type': 'float'},
        {'name': 'resultQualityMaximumBelowLOQ', 'type': 'float'},
        {'name': 'resultMaximumValue', 'type': 'float'},
        {'name': 'resultQualityMedianBelowLOQ', 'type': 'float'},
        {'name': 'resultMedianValue', 'type': 'float'},
        {'name': 'resultStandardDeviationValue', 'type': 'float'},
        {'name': 'resultNumberOfSitesClass1', 'type': 'float'},
        {'name': 'resultNumberOfSitesClass2', 'type': 'float'},
        {'name': 'resultNumberOfSitesClass3', 'type': 'float'},
        {'name': 'resultNumberOfSitesClass4', 'type': 'float'},
        {'name': 'resultNumberOfSitesClass5', 'type': 'float'},
        {'name': 'resultObservationStatus', 'type': 'string'},
        {'name': 'Remarks', 'type': 'string'},
        {'name': 'metadata_versionId', 'type': 'string'},
        {'name': 'metadata_beginLifeSpanVersion', 'type': 'string'},
        {'name': 'metadata_statusCode', 'type': 'string'},
        {'name': 'metadata_observationStatus', 'type': 'string'},
        {'name': 'metadata_statements', 'type': 'string'},
        {'name': 'UID', 'type': 'int'},
    ]
}
parsed_schema = parse_schema(schema)

In [206]:
# 2. Convert pd.DataFrame to records - list of dictionaries
records = Sweden_df2.to_dict('records')

# 3. Write to Avro file
with open('Waterbase_v2018_1_T_WISE4_AggregatedDataByWaterBody.avro', 'wb') as out:
    writer(out, parsed_schema, records)

In [207]:
df3.columns

Index(['CountryCode', 'observedPropertyDeterminandBiologyEQRCode',
       'parameterWaterBodyCategory', 'parameterNCSWaterBodyType',
       'parameterWFDIntercalibrationWaterBodyType', 'parameterNaturalAWBHMWB',
       'parameterICStatusOfDeterminandBiologyEQR',
       'parameterBoundaryValueClasses12', 'parameterBoundaryValueClasses23',
       'parameterBoundaryValueClasses34', 'parameterBoundaryValueClasses45',
       'procedureBiologicalAnalyticalMethodDescription',
       'resultObservationStatus', 'Remarks', 'metadata_versionId',
       'metadata_beginLifeSpanVersion', 'metadata_statusCode',
       'metadata_observationStatus', 'metadata_statements', 'UID'],
      dtype='object')

In [209]:
Sweden_df3 = df3[
    df3.CountryCode == 'SE'
]

In [210]:
for k,v in Sweden_df3.dtypes.to_dict().items():
    if v == object and Sweden_df3[k].isna().sum():
        print(k)
        Sweden_df3[k].fillna('', inplace=True)

parameterWFDIntercalibrationWaterBodyType
procedureBiologicalAnalyticalMethodDescription
resultObservationStatus
Remarks


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return self._update_inplace(result)


In [211]:
for k,v in Sweden_df3.dtypes.to_dict().items():
    print({"name": k, "type":'string' if v == object else 'int' if v == 'int' else 'float'})

{'name': 'CountryCode', 'type': 'string'}
{'name': 'observedPropertyDeterminandBiologyEQRCode', 'type': 'string'}
{'name': 'parameterWaterBodyCategory', 'type': 'string'}
{'name': 'parameterNCSWaterBodyType', 'type': 'string'}
{'name': 'parameterWFDIntercalibrationWaterBodyType', 'type': 'string'}
{'name': 'parameterNaturalAWBHMWB', 'type': 'string'}
{'name': 'parameterICStatusOfDeterminandBiologyEQR', 'type': 'int'}
{'name': 'parameterBoundaryValueClasses12', 'type': 'float'}
{'name': 'parameterBoundaryValueClasses23', 'type': 'float'}
{'name': 'parameterBoundaryValueClasses34', 'type': 'float'}
{'name': 'parameterBoundaryValueClasses45', 'type': 'float'}
{'name': 'procedureBiologicalAnalyticalMethodDescription', 'type': 'string'}
{'name': 'resultObservationStatus', 'type': 'string'}
{'name': 'Remarks', 'type': 'string'}
{'name': 'metadata_versionId', 'type': 'string'}
{'name': 'metadata_beginLifeSpanVersion', 'type': 'string'}
{'name': 'metadata_statusCode', 'type': 'string'}
{'name'

In [212]:
schema = {
    'doc': 'Waterbase - Water quality',
    'name': 'Water Quality - BiologyEQRData',
    'namespace': 'BiologyEQRData',
    'type': 'record',
    'fields': [
        {'name': 'CountryCode', 'type': 'string'},
        {'name': 'observedPropertyDeterminandBiologyEQRCode', 'type': 'string'},
        {'name': 'parameterWaterBodyCategory', 'type': 'string'},
        {'name': 'parameterNCSWaterBodyType', 'type': 'string'},
        {'name': 'parameterWFDIntercalibrationWaterBodyType', 'type': 'string'},
        {'name': 'parameterNaturalAWBHMWB', 'type': 'string'},
        {'name': 'parameterICStatusOfDeterminandBiologyEQR', 'type': 'int'},
        {'name': 'parameterBoundaryValueClasses12', 'type': 'float'},
        {'name': 'parameterBoundaryValueClasses23', 'type': 'float'},
        {'name': 'parameterBoundaryValueClasses34', 'type': 'float'},
        {'name': 'parameterBoundaryValueClasses45', 'type': 'float'},
        {'name': 'procedureBiologicalAnalyticalMethodDescription', 'type': 'string'},
        {'name': 'resultObservationStatus', 'type': 'string'},
        {'name': 'Remarks', 'type': 'string'},
        {'name': 'metadata_versionId', 'type': 'string'},
        {'name': 'metadata_beginLifeSpanVersion', 'type': 'string'},
        {'name': 'metadata_statusCode', 'type': 'string'},
        {'name': 'metadata_observationStatus', 'type': 'string'},
        {'name': 'metadata_statements', 'type': 'float'},
        {'name': 'UID', 'type': 'int'},
    ]
}
parsed_schema = parse_schema(schema)

In [213]:
# 2. Convert pd.DataFrame to records - list of dictionaries
records = Sweden_df3.to_dict('records')

# 3. Write to Avro file
with open('Waterbase_v2018_1_T_WISE4_BiologyEQRClassificationProcedure.avro', 'wb') as out:
    writer(out, parsed_schema, records)

In [214]:
df5.columns

Index(['monitoringSiteIdentifier', 'monitoringSiteIdentifierScheme',
       'parameterWaterBodyCategory', 'observedPropertyDeterminandCode',
       'procedureAnalysedFraction', 'procedureAnalysedMedia', 'resultUom',
       'phenomenonTimeSamplingDate', 'resultObservedValue',
       'resultQualityObservedValueBelowLOQ', 'procedureLOQValue',
       'procedureAnalyticalMethod', 'parameterSampleDepth',
       'resultObservationStatus', 'Remarks', 'metadata_versionId',
       'metadata_beginLifeSpanVersion', 'metadata_statusCode',
       'metadata_observationStatus', 'metadata_statements', 'UID'],
      dtype='object')

In [217]:
Sweden_df4.columns

Index(['monitoringSiteIdentifier', 'monitoringSiteIdentifierScheme',
       'parameterWaterBodyCategory', 'parameterNCSWaterBodyType',
       'observedPropertyDeterminandBiologyEQRCode',
       'phenomenonTimeReferenceYear', 'parameterSamplingPeriod',
       'resultEcologicalStatusClassValue', 'resultNumberOfSamples',
       'resultEQRValue', 'resultNormalisedEQRValue', 'resultObservationStatus',
       'Remarks', 'metadata_versionId', 'metadata_beginLifeSpanVersion',
       'metadata_statusCode', 'metadata_observationStatus',
       'metadata_statements', 'UID'],
      dtype='object')

In [218]:
Sweden_df5 = df5[
    df5.monitoringSiteIdentifier.isin(Sweden_df4.monitoringSiteIdentifier.unique())
].reset_index(drop=True)

In [219]:
for k,v in Sweden_df5.dtypes.to_dict().items():
    if v == object and Sweden_df5[k].isna().sum():
        print(k)
        Sweden_df5[k].fillna('', inplace=True)

procedureAnalyticalMethod
resultObservationStatus
Remarks
metadata_statements


In [220]:
for k,v in Sweden_df5.dtypes.to_dict().items():
    print({"name": k, "type":'string' if v == object else 'int' if v == 'int' else 'float'})

{'name': 'monitoringSiteIdentifier', 'type': 'string'}
{'name': 'monitoringSiteIdentifierScheme', 'type': 'string'}
{'name': 'parameterWaterBodyCategory', 'type': 'string'}
{'name': 'observedPropertyDeterminandCode', 'type': 'string'}
{'name': 'procedureAnalysedFraction', 'type': 'string'}
{'name': 'procedureAnalysedMedia', 'type': 'string'}
{'name': 'resultUom', 'type': 'string'}
{'name': 'phenomenonTimeSamplingDate', 'type': 'string'}
{'name': 'resultObservedValue', 'type': 'float'}
{'name': 'resultQualityObservedValueBelowLOQ', 'type': 'int'}
{'name': 'procedureLOQValue', 'type': 'float'}
{'name': 'procedureAnalyticalMethod', 'type': 'string'}
{'name': 'parameterSampleDepth', 'type': 'float'}
{'name': 'resultObservationStatus', 'type': 'string'}
{'name': 'Remarks', 'type': 'string'}
{'name': 'metadata_versionId', 'type': 'string'}
{'name': 'metadata_beginLifeSpanVersion', 'type': 'string'}
{'name': 'metadata_statusCode', 'type': 'string'}
{'name': 'metadata_observationStatus', 'type

In [221]:
schema = {
    'doc': 'Waterbase - Water quality',
    'name': 'Water Quality - BiologyEQRData',
    'namespace': 'BiologyEQRData',
    'type': 'record',
    'fields': [
        {'name': 'monitoringSiteIdentifier', 'type': 'string'},
        {'name': 'monitoringSiteIdentifierScheme', 'type': 'string'},
        {'name': 'parameterWaterBodyCategory', 'type': 'string'},
        {'name': 'observedPropertyDeterminandCode', 'type': 'string'},
        {'name': 'procedureAnalysedFraction', 'type': 'string'},
        {'name': 'procedureAnalysedMedia', 'type': 'string'},
        {'name': 'resultUom', 'type': 'string'},
        {'name': 'phenomenonTimeSamplingDate', 'type': 'string'},
        {'name': 'resultObservedValue', 'type': 'float'},
        {'name': 'resultQualityObservedValueBelowLOQ', 'type': 'int'},
        {'name': 'procedureLOQValue', 'type': 'float'},
        {'name': 'procedureAnalyticalMethod', 'type': 'string'},
        {'name': 'parameterSampleDepth', 'type': 'float'},
        {'name': 'resultObservationStatus', 'type': 'string'},
        {'name': 'Remarks', 'type': 'string'},
        {'name': 'metadata_versionId', 'type': 'string'},
        {'name': 'metadata_beginLifeSpanVersion', 'type': 'string'},
        {'name': 'metadata_statusCode', 'type': 'string'},
        {'name': 'metadata_observationStatus', 'type': 'string'},
        {'name': 'metadata_statements', 'type': 'string'},
        {'name': 'UID', 'type': 'int'},
    ]
}
parsed_schema = parse_schema(schema)

In [225]:
# 2. Convert pd.DataFrame to records - list of dictionaries
records = Sweden_df5.to_dict('records')

# 3. Write to Avro file
with open('Waterbase_v2018_1_T_WISE4_DisaggregatedData.avro', 'wb') as out:
    writer(out, parsed_schema, records)