In [1]:
from pyspark.sql import SparkSession
import os

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,io.delta:delta-core_2.12:2.0.1 pyspark-shell'
spark = SparkSession \
            .builder \
            .appName("water_test") \
            .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .master("local[*]") \
            .getOrCreate()

# Read and Check Schema and Some records

In [6]:
df = spark.read.format('csv').option("header","true").load('/home/jovyan/dags/input/data/Waterbase_v2021_1_T_WISE6_DisaggregatedData.csv')

In [12]:
df.printSchema()

root
 |-- monitoringSiteIdentifier: string (nullable = true)
 |-- monitoringSiteIdentifierScheme: string (nullable = true)
 |-- parameterWaterBodyCategory: string (nullable = true)
 |-- observedPropertyDeterminandCode: string (nullable = true)
 |-- observedPropertyDeterminandLabel: string (nullable = true)
 |-- procedureAnalysedMatrix: string (nullable = true)
 |-- resultUom: string (nullable = true)
 |-- phenomenonTimeSamplingDate: string (nullable = true)
 |-- sampleIdentifier: string (nullable = true)
 |-- resultObservedValue: string (nullable = true)
 |-- resultQualityObservedValueBelowLOQ: string (nullable = true)
 |-- procedureLOQValue: string (nullable = true)
 |-- parameterSampleDepth: string (nullable = true)
 |-- parameterSedimentDepthSampled: string (nullable = true)
 |-- parameterSpecies: string (nullable = true)
 |-- resultMoisture: string (nullable = true)
 |-- resultFat: string (nullable = true)
 |-- resultExtractableLipid: string (nullable = true)
 |-- resultLipid: stri

In [21]:
df.

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [6]:
df.count()

60013752

In [9]:
df.groupBy("monitoringSiteIdentifier","monitoringSiteIdentifierScheme","parameterWaterBodyCategory").count().show()

+------------------------+------------------------------+--------------------------+-----+
|monitoringSiteIdentifier|monitoringSiteIdentifierScheme|parameterWaterBodyCategory|count|
+------------------------+------------------------------+--------------------------+-----+
|             DK18-310-08|          eionetMonitoringS...|                        GW|  719|
|             DK18-380-01|          eionetMonitoringS...|                        GW|  335|
|            DK238-971-01|          euMonitoringSiteCode|                        GW|  537|
|            DK34-1650-01|          euMonitoringSiteCode|                        GW| 1525|
|           DK206-1684-01|          euMonitoringSiteCode|                        GW|  470|
|            DK66-1523-11|          eionetMonitoringS...|                        GW|  224|
|            DK86-2079-03|          euMonitoringSiteCode|                        GW|  443|
|            DK66-1527-05|          eionetMonitoringS...|                        GW|  227|

In [10]:
df.groupBy("monitoringSiteIdentifier","monitoringSiteIdentifierScheme","parameterWaterBodyCategory").count().count()

57454

# Check which columns can be used for this project

In [3]:
s_df = spark.read.format('csv').option("header","true").load('/home/jovyan/dags/input/data/Waterbase_v2021_1_S_WISE6_SpatialObject_DerivedData.csv')

In [12]:
s_df.filter().count()

70815

In [13]:
s_df.printSchema()

root
 |-- countryCode: string (nullable = true)
 |-- thematicIdIdentifier: string (nullable = true)
 |-- thematicIdIdentifierScheme: string (nullable = true)
 |-- monitoringSiteIdentifier: string (nullable = true)
 |-- monitoringSiteIdentifierScheme: string (nullable = true)
 |-- monitoringSiteName: string (nullable = true)
 |-- waterBodyIdentifier: string (nullable = true)
 |-- waterBodyIdentifierScheme: string (nullable = true)
 |-- waterBodyName: string (nullable = true)
 |-- specialisedZoneType: string (nullable = true)
 |-- naturalAWBHMWB: string (nullable = true)
 |-- reservoir: string (nullable = true)
 |-- surfaceWaterBodyTypeCode: string (nullable = true)
 |-- subUnitIdentifier: string (nullable = true)
 |-- subUnitIdentifierScheme: string (nullable = true)
 |-- subUnitName: string (nullable = true)
 |-- rbdIdentifier: string (nullable = true)
 |-- rbdIdentifierScheme: string (nullable = true)
 |-- rbdName: string (nullable = true)
 |-- confidentialityStatus: string (nullable 

In [5]:
lt_df = s_df.filter("countryCode=='LT'")

In [6]:
se_df = s_df.filter("countryCode=='SE'")

In [21]:
se_df.count()

760

In [5]:
from pyspark.sql.functions import col
s_df.filter('monitoringSiteIdentifier is not null').count()

69184

In [8]:
s_df = spark.read.format('csv').option("header","true").load('/home/jovyan/dags/input/data/Waterbase_v2021_1_S_WISE6_SpatialObject_DerivedData.csv')

In [19]:
from pyspark.sql.functions import to_json, struct, from_json

In [6]:
(s_df.select(to_json(struct('*')).alias('value')).limit(10)
 .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker:29092")
  .option("topic", "test_air")
  .save())

In [15]:
kafka_read = spark.read.format('kafka').option('subscribe','water_spatial_info').option('kafka.bootstrap.servers','broker:29092').load()

In [23]:
kafka_read.selectExpr('cast(value as string)').select(from_json('value',k).alias("d")).select("d.*") \
.write.format('delta').save('/home/jovyan/dags/deployment/output/ra_test/')

In [18]:
k = s_df.schema

In [35]:
from pyspark.sql.dataframe import StructType

In [22]:
StructType.fromJson(k)

StructType(List(StructField(countryCode,StringType,true),StructField(thematicIdIdentifier,StringType,true),StructField(thematicIdIdentifierScheme,StringType,true),StructField(monitoringSiteIdentifier,StringType,true),StructField(monitoringSiteIdentifierScheme,StringType,true),StructField(monitoringSiteName,StringType,true),StructField(waterBodyIdentifier,StringType,true),StructField(waterBodyIdentifierScheme,StringType,true),StructField(waterBodyName,StringType,true),StructField(specialisedZoneType,StringType,true),StructField(naturalAWBHMWB,StringType,true),StructField(reservoir,StringType,true),StructField(surfaceWaterBodyTypeCode,StringType,true),StructField(subUnitIdentifier,StringType,true),StructField(subUnitIdentifierScheme,StringType,true),StructField(subUnitName,StringType,true),StructField(rbdIdentifier,StringType,true),StructField(rbdIdentifierScheme,StringType,true),StructField(rbdName,StringType,true),StructField(confidentialityStatus,StringType,true),StructField(lat,Strin

In [23]:
k

{'type': 'struct',
 'fields': [{'name': 'countryCode',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'thematicIdIdentifier',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'thematicIdIdentifierScheme',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'monitoringSiteIdentifier',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'monitoringSiteIdentifierScheme',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'monitoringSiteName',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'waterBodyIdentifier',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'waterBodyIdentifierScheme',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'waterBodyName',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'specialisedZoneType',
   'type': 'string',
   'nullable': True,
   'metadata': {

In [25]:
import json

In [37]:
with open("/home/jovyan/dags/config/schemas/spatial_obj.json") as sc:
    j = json.load(sc)
    k = StructType.fromJson(j)

In [38]:
s

StructType(List(StructField(countryCode,StringType,true),StructField(thematicIdIdentifier,StringType,true),StructField(thematicIdIdentifierScheme,StringType,true),StructField(monitoringSiteIdentifier,StringType,true),StructField(monitoringSiteIdentifierScheme,StringType,true),StructField(monitoringSiteName,StringType,true),StructField(waterBodyIdentifier,StringType,true),StructField(waterBodyIdentifierScheme,StringType,true),StructField(waterBodyName,StringType,true),StructField(specialisedZoneType,StringType,true),StructField(naturalAWBHMWB,StringType,true),StructField(reservoir,StringType,true),StructField(surfaceWaterBodyTypeCode,StringType,true),StructField(subUnitIdentifier,StringType,true),StructField(subUnitIdentifierScheme,StringType,true),StructField(subUnitName,StringType,true),StructField(rbdIdentifier,StringType,true),StructField(rbdIdentifierScheme,StringType,true),StructField(rbdName,StringType,true),StructField(confidentialityStatus,StringType,true),StructField(lat,Strin

In [10]:
f_df = s_df.filter("monitoringSiteIdentifier is not null").filter("countryCode=='LT' or countryCode=='SE'")

In [25]:
g = {'g':'t','a':'l'}

for i in g:
    print(i)

g
a


In [14]:
df.join(f_df,'monitoringSiteIdentifier').count()

941248