In [66]:
from datetime import datetime

from pyspark import Row
from pyspark.sql import SparkSession, Window, functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

In [67]:
spark = (SparkSession
         .builder
         .master("local[*]")
         .appName("data_exploration")
         .config("spark.driver.memory", "2g")
         .getOrCreate())

sc = spark.sparkContext

In [68]:
sc.uiWebUrl

'http://192.168.0.16:4040'

To start the exploration, we use a few month worth of data from 2003 that know to contain a heatwave from the assignment description.

Each file has several lines of headers that we need to strip.



In [69]:
def parse_float(string_to_parse: str):
    string = string_to_parse.rstrip()
    if string == "":
        return None
    else:
        return float(string_to_parse)


def parse_timestamp(string_to_parse: str):
    string = string_to_parse.rstrip()
    return datetime.strptime(string, "%Y-%m-%d %H:%M:%S")


def parse_row(row: str):
    return Row(
        timestamp=parse_timestamp(row[:21]),
        location=row[21:41].rstrip(),
        location_name=row[41:89].rstrip(),
        latitude=parse_float(row[89:109]),
        longitude=parse_float(row[109:129]),
        altitude=parse_float(row[129:149]),
        u_bool_10=parse_float(row[149:169]),
        t_dryb_10=parse_float(row[169:189]),
        tn_10cm_past_6h_10=parse_float(row[189:209]),
        t_dewp_10=parse_float(row[209:229]),
        t_dewp_sea_10=parse_float(row[229:249]),
        t_dryb_sea_10=parse_float(row[249:269]),
        tn_dryb_10=parse_float(row[269:289]),
        t_wetb_10=parse_float(row[289:309]),
        tx_dryb_10=parse_float(row[309:329]),
        u_10=parse_float(row[329:349]),
        u_sea_10=parse_float(row[349:]),
    )

In [70]:
parse_input_rdd = (sc
                   .textFile("../data/kis_tot_20030*", 3)
                   .map(lambda x: x.split(",")[0])
                   .filter(lambda x: not x.startswith("#"))
                   .map(lambda x: parse_row(x)))

Create a schema based on the files' schema.

| Field                | Description                                                         |
|----------------------|---------------------------------------------------------------------|
| `DTG`                | date of measurement                                                 |
| `LOCATION`           | location of the meteorological station                              |
| `NAME`               | name of the meteorological station                                  |
| `LATITUDE`           | in degrees (WGS84)                                                  |
| `LONGITUDE`          | in degrees (WGS84)                                                  |
| `ALTITUDE`           | in 0.1 m relative to Mean Sea Level (MSL)                           |
| `U_BOOL_10`          | air humidity code boolean 10' unit                                  |
| `T_DRYB_10`          | air temperature 10' unit Celcius degrees                            |
| `TN_10CM_PAST_6H_10` | air temperature minimum 0.1m 10' unit Celcius degrees               |
| `T_DEWP_10`          | air temperature derived dewpoint - 10' unit Celcius degrees         |
| `T_DEWP_SEA_10`      | air temperature derived dewpoint- sea 10' unit Celcius degrees      |
| `T_DRYB_SEA_10`      | air temperature height oil platform 10 minutes unit Celcius degrees |
| `TN_DRYB_10`         | air temperature minimum 10' unit Celcius degrees                    |
| `T_WETB_10`          | air temperature derived wet bulb- 10' unit Celcius degrees          |
| `TX_DRYB_10`         | air temperature maximum 10' unit Celcius degrees                    |
| `U_10`               | relative air humidity 10' unit %                                    |
| `U_SEA_10`           | is relative sea air humidity 10' unit %                             |

In [71]:
schema = StructType([
    StructField("timestamp", TimestampType(), False),
    StructField("location", StringType(), True),
    StructField("location_name", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("altitude", DoubleType(), True),
    StructField("u_bool_10", DoubleType(), True),
    StructField("t_dryb_10", DoubleType(), True),
    StructField("tn_10cm_past_6h_10", DoubleType(), True),
    StructField("t_dewp_10", DoubleType(), True),
    StructField("t_dewp_sea_10", DoubleType(), True),
    StructField("t_dryb_sea_10", DoubleType(), True),
    StructField("tn_dryb_10", DoubleType(), True),
    StructField("t_wetb_10", DoubleType(), True),
    StructField("tx_dryb_10", DoubleType(), True),
    StructField("u_10", DoubleType(), True),
    StructField("u_sea_10", DoubleType(), True),
])

In [72]:
df = (spark
      .createDataFrame(parse_input_rdd, schema=schema)
      .where(f.col("location") == "260_T_a")
      .select("timestamp",
              "location_name",
              f.col("T_DRYB_10").alias("temperature"),
              f.col("TN_DRYB_10").alias("temperature_minimum"),
              f.col("TX_DRYB_10").alias("temperature_maximum"),
              )
      )

In [75]:
max_df = df.groupBy(f.to_date("timestamp").alias("date")).agg(
    f.max(f.coalesce("temperature_maximum", "temperature")).alias("temperature"))

In [76]:
max_df.count()

                                                                                

184

In [79]:
max_df.cache()

DataFrame[date: date, temperature: double]

In [82]:
filtered_df = max_df.where(f.col("temperature") >= 25).orderBy("date")

In [123]:

window_spec = Window.orderBy("date")

window_df = (filtered_df
             .withColumn("date_lag", f.lag("date").over(window_spec))
             .withColumn("date_lag_delta", f.datediff("date", "date_lag"))
             .withColumn("date_lead", f.lead("date").over(window_spec))
             .withColumn("date_lead_delta", f.datediff("date_lead", "date"))
             .withColumn("sequence_start", f.when(f.col("date_lag_delta") > 1, "start"))
             .withColumn("sequence_end", f.when(f.col("date_lead_delta") > 1, "end"))
             # .withColumn("sequence_partition", f.when(f.col("sequence_start").isNotNull(), f.col("date")).otherwise(f.lag("date").over(window_spec)))
             .withColumn("sequence_partition_2",
                         f.when(f.col("sequence_start").isNotNull(), f.col("date")).otherwise(f.last(
                             f.when(f.col('sequence_start').isNull(), f.col('date')), True).over(window_spec.rowsBetween(Window.currentRow, Window.unboundedFollowing))
                                                                                              )
                         )

#              .withColumn('grp', f.first(
#     f.when(f.col('sequence_start').isNotNull(), f.col('date')), True).over(
#     window_spec.rowsBetween(-1, 0)
# )
#                          )

             )

In [None]:
win = Window.orderBy("X")
# Condition : if preceeding row in column "Flag" is not 0
condition = F.lag(F.col("Flag"), 1).over(win) != 0
# Add a new column : if condition is true, value is value of column "X" at the previous row
a = a.withColumn("Flag_X", F.when(condition, F.col("X") - 1))

a = a.withColumn("Flag_X",
                 F.last(F.col("Flag_X"), ignorenulls=True)\
     .over(win))

In [130]:

# Use a window function
win = Window.orderBy("date")
# Condition : if preceeding row in column "Flag" is not 0
condition = f.lag(f.col("sequence_start"), 1).over(win) != ''
# Add a new column : if condition is true, value is value of column "X" at the previous row
a = window_df.withColumn("part", f.when(condition, f.col("date") - 1))

a = a.withColumn("part",
                 f.last(f.col("part"), ignorenulls=True)\
     .over(win))

In [140]:
# Use a window function
win = Window.orderBy("date")
# Condition : if preceeding row in column "Flag" is not 0
condition = f.lag(f.col("sequence_start"), 1).over(win) != ''
start_condition = f.col("sequence_start") == 'start'
# Add a new column : if condition is true, value is value of column "X" at the previous row
a = window_df.withColumn("part", f.when(start_condition, f.col("date")))

a = a.withColumn("part",
                 f.last(f.col("part"), ignorenulls=True)\
     .over(win))

In [142]:
a.show(200)

23/07/15 20:57:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/07/15 20:57:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/07/15 20:57:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/07/15 20:57:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/07/15 20:57:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/07/15 20:57:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/07/15 2

+----------+-----------+----------+--------------+----------+---------------+--------------+------------+--------------------+----------+
|      date|temperature|  date_lag|date_lag_delta| date_lead|date_lead_delta|sequence_start|sequence_end|sequence_partition_2|      part|
+----------+-----------+----------+--------------+----------+---------------+--------------+------------+--------------------+----------+
|2003-05-29|       25.9|      null|          null|2003-05-30|              1|          null|        null|          2003-09-18|      null|
|2003-05-30|       27.5|2003-05-29|             1|2003-05-31|              1|          null|        null|          2003-09-18|      null|
|2003-05-31|       25.1|2003-05-30|             1|2003-06-01|              1|          null|        null|          2003-09-18|      null|
|2003-06-01|       28.1|2003-05-31|             1|2003-06-02|              1|          null|        null|          2003-09-18|      null|
|2003-06-02|       27.2|2003-06-01

In [None]:
a.withColumn("partition", f.when(f.col("sequence_start").isNotNull(), f.col("sequence_start")))