In [1]:
import findspark
findspark.init()

import builtins
import math
import datetime as dt
import holidays
import geohash
import os.path as osp

from pyspark import keyword_only, StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Transformer, Pipeline

from utils import TimestampTransformer, GeohashTransformer

In [2]:
sess = SparkSession.builder \
                   .master("local[*]") \
                   .config("spark.driver.memory", "32g") \
                   .getOrCreate()

In [3]:
df = sess.read.parquet('./luftdaten.info.20180117.parquet/')
df.printSchema()
print(df.count())

root
 |-- sensor_id: integer (nullable = true)
 |-- sensor_type: string (nullable = true)
 |-- location: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- P1: double (nullable = true)
 |-- durP1: double (nullable = true)
 |-- ratioP1: double (nullable = true)
 |-- P2: double (nullable = true)
 |-- durP2: double (nullable = true)
 |-- ratioP2: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- altitude: double (nullable = true)
 |-- pressure_sealevel: double (nullable = true)

649471354


In [4]:
df.sample(fraction=100 / df.count(), withReplacement=True).toPandas()

Unnamed: 0,sensor_id,sensor_type,location,lat,lon,timestamp,P1,durP1,ratioP1,P2,durP2,ratioP2,temperature,humidity,pressure,altitude,pressure_sealevel
0,49,PPD42NS,22,48.585,8.832,2015-11-29 22:15:59.094,137.35,79026.0,0.26,0.62,0.0,0.0,,,,,
1,48,DHT22,19,48.722,9.209,2016-05-22 17:29:47.251,,,,,,,27.20,38.20,,,
2,5520,DHT22,2782,52.361,10.011,2017-12-13 03:24:33.000,,,,,,,17.50,47.70,,,
3,374,DHT22,174,23.669,120.178,2017-01-15 10:04:04.000,,,,,,,20.60,50.10,,,
4,337,SDS011,157,52.380,9.670,2017-12-07 09:05:09.000,10.60,,,9.77,,,,,,,
5,257,SDS011,112,48.791,8.964,2017-04-19 18:30:28.000,11.67,,,9.67,,,,,,,
6,263,SDS011,115,47.738,9.117,2017-01-14 15:11:08.000,4.78,,,2.37,,,,,,,
7,553,SDS011,59,48.777,9.157,2017-04-07 10:57:57.000,41.47,,,24.76,,,,,,,
8,554,SDS011,263,48.490,9.201,2017-11-02 10:46:36.000,8.69,,,4.81,,,,,,,
9,731,SDS011,350,48.058,12.570,2017-03-05 18:20:22.000,26.25,,,12.50,,,,,,,


In [5]:
df.select("temperature").describe().toPandas()

Unnamed: 0,summary,temperature
0,count,302815692.0
1,mean,15.84527873837482
2,stddev,359.22878473666987
3,min,-147.6
4,max,65536.0


In [6]:
ts_trans = TimestampTransformer(
    inputCol="timestamp",
    outputCol="datedim"
)

gh_trans = GeohashTransformer(
    inputCols=["lat", "lon"],
    outputCol="geohash"
)

pipeline = Pipeline(stages=[ts_trans, gh_trans])
model = pipeline.fit(df)
transformed_data = model.transform(df)

#transformed_data.select("datedim.holiday").where("datedim.holiday = 1").show(10, False)
transformed_data.printSchema()

root
 |-- sensor_id: integer (nullable = true)
 |-- sensor_type: string (nullable = true)
 |-- location: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- P1: double (nullable = true)
 |-- durP1: double (nullable = true)
 |-- ratioP1: double (nullable = true)
 |-- P2: double (nullable = true)
 |-- durP2: double (nullable = true)
 |-- ratioP2: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- altitude: double (nullable = true)
 |-- pressure_sealevel: double (nullable = true)
 |-- datedim: struct (nullable = true)
 |    |-- year: integer (nullable = true)
 |    |-- month: integer (nullable = true)
 |    |-- day: integer (nullable = true)
 |    |-- day_of_week: integer (nullable = true)
 |    |-- weekend: integer (nullable = true)
 |    |-- holiday: integer (nullable = true)
 |    |-- day_cat: 

In [7]:
transformed_data.count()

649471354

In [8]:
lat_bounds = (48.3, 49.1)
lon_bounds = (8.9, 9.5)
stuttgart_data = transformed_data.where("lat > %s" % lat_bounds[0]) \
                                 .where("lat < %s" % lat_bounds[1]) \
                                 .where("lon > %s" % lon_bounds[0]) \
                                 .where("lon < %s" % lon_bounds[1])
stuttgart_data.count()

174865832

In [9]:
min_stuttgart_year = stuttgart_data.agg({"datedim.year": "min"}).collect()[0][0]
min_stuttgart_year

2015

In [10]:
stuttgart_data.select("geohash.hash").distinct().show()

+------------+
|        hash|
+------------+
|u0wtfh1hdd6g|
|u0wkvnf6dvf1|
|u0ww1qw0sezv|
|u0wkuezmmr1m|
|u0wmqg0byrqr|
|u0w7yqu96m40|
|u0ws16g1xd15|
|u0wwrpgrwc7t|
|u0wt6pn1jsw6|
|u0wt2z3hbcrv|
|u0wt2kejqm0q|
|u0wmr23xqmsq|
|u0wmxmzw8b39|
|u0wkuz0nkde5|
|u0wx5rb6svs5|
|u0wth1wzzs3s|
|u0wt2p5nvzn0|
|u0wkvq714qw1|
|u0ww10826h23|
|u0wkvnehf57g|
+------------+
only showing top 20 rows



In [19]:
def find_closest_hash(haystack, needle):
    prefix_length = [len(osp.commonprefix([needle, h])) for h in haystack]
    return haystack[prefix_length.index(builtins.max(prefix_length))]

def find_hashes(geo_df):
    return [r[0] for r in geo_df.select("geohash.hash").distinct().collect()]

In [20]:
temp_df = sess.read.parquet("./dwd/temp.parquet").where("datedim.year >= %d" % min_stuttgart_year)
pert_df = sess.read.parquet("./dwd/precipitation.parquet").where("datedim.year >= %d" % min_stuttgart_year)
cloud_df = sess.read.parquet("./dwd/cloudiness.parquet").where("datedim.year >= %d" % min_stuttgart_year)
sun_df = sess.read.parquet("./dwd/sun.parquet").where("datedim.year >= %d" % min_stuttgart_year)
wind_df = sess.read.parquet("./dwd/wind.parquet").where("datedim.year >= %d" % min_stuttgart_year)

temp_hashes = find_hashes(temp_df)
pert_hashes = find_hashes(pert_df)
cloud_hashes = find_hashes(cloud_df)
sun_hashes = find_hashes(sun_df)
wind_hashes = find_hashes(wind_df)

closest_temp_station_udf  = udf(lambda needle: find_closest_hash(temp_hashes, needle))
closest_pert_station_udf  = udf(lambda needle: find_closest_hash(pert_hashes, needle))
closest_cloud_station_udf = udf(lambda needle: find_closest_hash(cloud_hashes, needle))
closest_sun_station_udf   = udf(lambda needle: find_closest_hash(sun_hashes, needle))
closest_wind_station_udf  = udf(lambda needle: find_closest_hash(wind_hashes, needle))

print(len(temp_hashes))
print(len(pert_hashes))
print(len(cloud_hashes))
print(len(sun_hashes))
print(len(wind_hashes))

8
9
2
6
3


In [21]:
stuttgart_data = stuttgart_data.withColumn("closest_temp_station",  closest_temp_station_udf(col("geohash.hash"))) \
                               .withColumn("closest_pert_station",  closest_pert_station_udf(col("geohash.hash"))) \
                               .withColumn("closest_cloud_station", closest_cloud_station_udf(col("geohash.hash"))) \
                               .withColumn("closest_sun_station",   closest_sun_station_udf(col("geohash.hash"))) \
                               .withColumn("closest_wind_station",  closest_wind_station_udf(col("geohash.hash")))  

In [14]:
_4741_data = stuttgart_data \
            .where("sensor_id = 4741") \
            .where("datedim.year = 2017") \
            .where("datedim.month = 2") \
            .where("datedim.day = 1")

In [22]:
def join_dwd_data(data_df, dwd_df, closest_station_column):
    join_cond = [data_df["datedim.year"] == dwd_df["datedim.year"], 
                 data_df["datedim.month"] == dwd_df["datedim.month"],
                 data_df["datedim.day"] == dwd_df["datedim.day"],
                 data_df["datedim.hour_bin"] == dwd_df["datedim.hour_bin"],
                 data_df[closest_station_column] == dwd_df["geohash.hash"]]
    
    return data_df.alias("d") \
                  .join(broadcast(dwd_df).alias("w"), join_cond, how='left') \
                  .drop(col("w.STATIONS_ID")) \
                  .drop(col("w.datedim")) \
                  .drop(col("w.geohash")) \
                  .drop(col("w.MESS_DATUM")) \
                  .drop(col("w.TIMESTAMP")) \
                  .drop(col("w.geoBreite")) \
                  .drop(col("w.geoLaenge")) \
                  .drop(col("w.Stationshoehe")) \
                  .drop(col("w.Stationsname"))

In [23]:
ext_df = join_dwd_data(stuttgart_data, temp_df, "closest_temp_station")
ext_df = join_dwd_data(ext_df, pert_df, "closest_pert_station")
ext_df = join_dwd_data(ext_df, cloud_df, "closest_cloud_station")
ext_df = join_dwd_data(ext_df, sun_df, "closest_sun_station")
ext_df = join_dwd_data(ext_df, wind_df, "closest_wind_station")
ext_df = ext_df \
            .withColumn("dwddim", struct("TT_TU", "RF_TU", "R1", "RS_IND", "WRTR", 
                                         "V_N_I", "V_N", "SD_SO", "F", "D")) \
            .drop("QN_9", "QN_8", "QN_7", "QN_3") \
            .drop("TT_TU", "RF_TU", "R1", "RS_IND", "WRTR", 
                  "V_N_I", "V_N", "SD_SO", "F", "D") \
            .drop("closest_temp_station") \
            .drop("closest_pert_station") \
            .drop("closest_sun_station") \
            .drop("closest_cloud_station") \
            .drop("closest_wind_station")
            
ext_df.printSchema()

#select("datedim.year", "datedim.month", "datedim.day", "datedim.hour_bin").show()

root
 |-- sensor_id: integer (nullable = true)
 |-- sensor_type: string (nullable = true)
 |-- location: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- P1: double (nullable = true)
 |-- durP1: double (nullable = true)
 |-- ratioP1: double (nullable = true)
 |-- P2: double (nullable = true)
 |-- durP2: double (nullable = true)
 |-- ratioP2: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- altitude: double (nullable = true)
 |-- pressure_sealevel: double (nullable = true)
 |-- datedim: struct (nullable = true)
 |    |-- year: integer (nullable = true)
 |    |-- month: integer (nullable = true)
 |    |-- day: integer (nullable = true)
 |    |-- day_of_week: integer (nullable = true)
 |    |-- weekend: integer (nullable = true)
 |    |-- holiday: integer (nullable = true)
 |    |-- day_cat: 

In [25]:
ext_df.limit(10).toPandas()

Unnamed: 0,sensor_id,sensor_type,location,lat,lon,timestamp,P1,durP1,ratioP1,P2,durP2,ratioP2,temperature,humidity,pressure,altitude,pressure_sealevel,datedim,geohash,dwddim
0,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:02,26.5,,,18.0,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."
1,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:05,27.35,,,17.8,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."
2,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:09,27.25,,,17.95,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."
3,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:13,27.3,,,18.1,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."
4,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:17,26.9,,,18.0,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."
5,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:21,26.77,,,18.0,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."
6,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:24,26.6,,,18.0,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."
7,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:28,26.45,,,17.9,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."
8,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:32,26.55,,,17.9,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."
9,4740,SDS011,2384,48.745,9.118,2017-09-21 00:00:36,25.9,,,17.7,,,,,,,,"(2017, 9, 21, 3, 0, 0, Thursday, 0.42931547619...","(u0wmr3z80y9e, 15004089122820690642, u0wmr3z8)","(4.5, 100.0, 0.0, 0, -999, P, 0, None, 0.6,..."


In [26]:
ext_df.write.mode("overwrite").parquet('./stgt_sensors_with_date_geo_dwd.parquet')