In [None]:
# Creating a script to get messages for creating a keyword mapping.
# Then creating a ranking system based on number of keywords found.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.functions import collect_list, concat_ws, split, explode, \
array_intersect, col, count, size, expr
from pyspark.ml.feature import Tokenizer
import requests
import json

In [3]:
spark = SparkSession.builder.appName("floods").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/12 00:47:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
data = spark.read.option("multiline", "true").json(
    "/Users/siddp278/Desktop/projects/dummy data/spark-data/flood_data.json"
)
data.printSchema()

root
 |-- @context: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- @id: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- eaAreaName: string (nullable = true)
 |    |    |-- eaRegionName: string (nullable = true)
 |    |    |-- floodArea: struct (nullable = true)
 |    |    |    |-- @id: string (nullable = true)
 |    |    |    |-- county: string (nullable = true)
 |    |    |    |-- notation: string (nullable = true)
 |    |    |    |-- polygon: string (nullable = true)
 |    |    |    |-- riverOrSea: string (nullable = true)
 |    |    |-- floodAreaID: string (nullable = true)
 |    |    |-- isTidal: boolean (nullable = true)
 |    |    |-- message: string (nullable = true)
 |    |    |-- severity: string (nullable = true)
 |    |    |-- severityLevel: long (nullable = true)
 |    |    |-- timeMessageChanged: string (nullable = true)
 |    |    |-- timeRaised: strin

In [5]:
df_exploded = data.withColumn("items", sf.explode(sf.col("items")))
df_exploded.repartition(25)
df_exploded.take(2)

[Row(@context='http://environment.data.gov.uk/flood-monitoring/meta/context.jsonld', items=Row(@id='http://environment.data.gov.uk/flood-monitoring/id/floods/112WAFTUBA', description='Upper Bristol Avon area', eaAreaName='Wessex', eaRegionName='No longer used', floodArea=Row(@id='http://environment.data.gov.uk/flood-monitoring/id/floodAreas/112WAFTUBA', county='Gloucestershire, South Gloucestershire, Wiltshire', notation='112WAFTUBA', polygon='http://environment.data.gov.uk/flood-monitoring/id/floodAreas/112WAFTUBA/polygon', riverOrSea='Bristol River Avon'), floodAreaID='112WAFTUBA', isTidal=False, message='The levels on the River Avon upstream of Malmesbury and at Great Somerford are now falling. Flooding possible at  locations near the Bristol Avon, Tetbury Avon, Sherston Avon, Dauntsey Brook, with low lying land and roads expected to be most affected, particularly low lying areas in Tetbury, Badminton, Brook End at Luckington, Corsham, Lacock and the road at Reybridge. \n\nThe weath

In [6]:
# Combine all messages into a single string distributedly
# messages_df = (
#     df_exploded.select("items.message").agg(concat_ws(" ", collect_list("message")).alias("all_messages"))
# )

# messages_df.coalesce(1)
# messages_df.write.mode("overwrite").text("data/message.txt")

In [7]:
keywords = [
    "River", "Flooding", "Water", "Levels", "Land", "Roads", "Low-lying", "Footpaths", "Bridges", "Flood water",
    "Monitoring", "Dry", "Rainfall", "High", "Falling", "Storm", "Darragh", "Warning", "Areas", "Discharge",
    "Causeway", "Risk", "Danger", "Tributaries", "Floodplain", "Roads", "Tides", "Overflow", "Channels", "Rain",
    "Reservoir", "Protection", "Diversion", "Communities", "Rising", "Drains", "Brook", "Gauge", "Upstream",
    "Downstream", "Stagnant", "Sewage", "Evacuation", "Property", "Damage", "Environment", "Agency", "Hydrology",
    "Submergence", "Drainage", "Weedscreens", "Obstruction", "Pathways", "Culverts", "Debris", "Saturation",
    "Overflows", "Sediment", "Washes", "Tide lock", "Backwater", "Mooring", "Sluices", "Spillway", "Levees",
    "Bank", "Overflowing", "Crest", "Flash", "Drought", "Precipitation", "Watershed", "Erosion", "Subsidence",
    "Riverbanks", "Aquifers", "Gradient", "Streams", "Surface", "Inundation", "Pumping", "Dykes", "Dams",
    "Groundwater", "Floodgate", "Seepage", "Retention", "Detention", "Backflow", "Saturated", "Basin", "Channels",
    "Barriers", "Overflow", "Surge", "Scouring", "Flash flood", "Overflowing", "Alert"
]

In [8]:
df_exploded.count()

112

In [9]:
# Ranking system - # of keywords found/ total # of words in message

# Filter for the specific words and count their occurrences
df_with_word_counts = df_exploded.withColumn("words", split(col("items.message"), " ")) \
    .withColumn("total_word_count", size(col("words"))) \
    .withColumn(
        "certain_word_count",
        expr(
            f"size(filter(words, word -> word IN ({', '.join([f'\'{word}\'' for word in keywords])})))"
        ),
    )

# Select required columns
result = df_with_word_counts.select("total_word_count", "certain_word_count")

In [10]:
df_exploded.count()

112

In [11]:
result.count()

112

In [13]:
result.take(5)

[Row(total_word_count=115, certain_word_count=3),
 Row(total_word_count=115, certain_word_count=2),
 Row(total_word_count=129, certain_word_count=1),
 Row(total_word_count=82, certain_word_count=3),
 Row(total_word_count=77, certain_word_count=3)]