In [1]:
import findspark
findspark.init()
import os
import hbspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
from datetime import datetime, timedelta

In [32]:
hbase_host_name = 'vl41.mini.pw.edu.pl'
spark_session = SparkSession.builder.appName('my-spark-app').master('local[1]').getOrCreate()
hbspark.connect('vl41.mini.pw.edu.pl', spark_session)

In [33]:
forecasts = hbspark.table('forecasts')

Skanujemy z ostatnich 24 godzin

In [34]:
curr_tstamp = time.time_ns() // 1000
yesterday_tstamp = curr_tstamp - (24 * 60 * 60 * 1000)

In [35]:
df_orig = forecasts.scan(row_prefix=bytes(str(yesterday_tstamp)[:5], 'utf-8'))

In [36]:
df_orig.count()

23/01/12 17:15:38 WARN scheduler.TaskSetManager: Stage 49 contains a task of very large size (4894 KiB). The maximum recommended task size is 1000 KiB.


31931

In [37]:
df_orig.head()

23/01/12 17:15:57 WARN scheduler.TaskSetManager: Stage 51 contains a task of very large size (4894 KiB). The maximum recommended task size is 1000 KiB.


Row(airport:id_airport='123', airport:latitude_deg='63.985001', airport:longitude_deg='-22.6056', airport:name='Keflavik International Airport', date:dt='1673546400', date:tstamp='1673538600', rowkey='16735386001231673546400', weather:pop='0.2', weather:temp='275.01', weather:weather_description='light rain', weather:wind_speed='10.16')

In [8]:
df_renamed = df_orig.withColumnRenamed("airport:id_airport","id_airport")\
.withColumnRenamed("airport:name","name").withColumnRenamed("airport:latitude_deg","latitude_deg")\
.withColumnRenamed("airport:longitude_deg","longitude_deg").withColumnRenamed("weather:temp","temp")\
.withColumnRenamed("weather:weather_description","description").withColumnRenamed("weather:wind_speed","wind_speed")\
.withColumnRenamed("weather:pop","pop").withColumnRenamed("date:dt","dt").withColumnRenamed("date:tstamp","tstamp")

In [9]:
df = df_renamed.withColumn("id_airport", df_renamed.id_airport.cast(IntegerType()))\
.withColumn("latitude_deg",df_renamed.latitude_deg.cast(FloatType()))\
.withColumn("longitude_deg",df_renamed.longitude_deg.cast(FloatType()))\
.withColumn("temp",df_renamed.temp.cast(FloatType()))\
.withColumn("wind_speed",df_renamed.wind_speed.cast(FloatType()))\
.withColumn("pop", df_renamed.pop.cast(FloatType()))\
.withColumn("dt",df_renamed.dt.cast(LongType()))\
.withColumn("tstamp",df_renamed.tstamp.cast(LongType()))\
.withColumn("api_date",from_unixtime("tstamp"))\
.withColumn("forecast_date",from_unixtime("dt"))

In [10]:
df.head()

23/01/12 16:28:37 WARN scheduler.TaskSetManager: Stage 2 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.


Row(id_airport=123, latitude_deg=63.98500061035156, longitude_deg=-22.605600357055664, name='Keflavik International Airport', dt=1673546400, tstamp=1673538600, rowkey='16735386001231673546400', pop=0.20000000298023224, temp=275.010009765625, description='light rain', wind_speed=10.15999984741211, api_date='2023-01-12 15:50:00', forecast_date='2023-01-12 18:00:00')

Filtrowanie rekordów z ostatnich 24h.

In [11]:
yesterday = datetime.today() - timedelta(days=1)
yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S')

In [12]:
df = df.filter(df.api_date > yesterday)

In [20]:
df.count()

23/01/12 16:29:56 WARN scheduler.TaskSetManager: Stage 10 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.


13920

Wyciągamy najświeższą prognozę dla każdego dt.

In [13]:
max_tstamp_df = df.groupBy(['id_airport', 'dt']).max('tstamp').withColumnRenamed('max(tstamp)', 'tstamp')

In [14]:
cond = [df.id_airport == max_tstamp_df.id_airport, df.tstamp == max_tstamp_df.tstamp, df.dt == max_tstamp_df.dt]
max_tstamp_df = max_tstamp_df.alias('max_tstamp_df')
df = df.alias('df')
fresh_forecast_df = df.join(max_tstamp_df, on=cond).select('df.*')

In [15]:
fresh_forecast_df.columns

['id_airport',
 'latitude_deg',
 'longitude_deg',
 'name',
 'dt',
 'tstamp',
 'rowkey',
 'pop',
 'temp',
 'description',
 'wind_speed',
 'api_date',
 'forecast_date']

Dla każdej prognozy wybieramy najbliższą prognozę

In [16]:
min_dt_df = fresh_forecast_df.groupBy(['id_airport', 'tstamp']).min('dt').withColumnRenamed('min(dt)', 'dt')

In [17]:
cond = [df.id_airport == min_dt_df.id_airport, df.tstamp == min_dt_df.tstamp, df.dt == min_dt_df.dt]
min_dt_df = min_dt_df.alias('min_dt_df')
actual_df = df.join(min_dt_df, on=cond).select('df.*')

In [18]:
actual_df.count()

23/01/12 16:28:56 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.
23/01/12 16:28:56 WARN scheduler.TaskSetManager: Stage 3 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

116

In [19]:
agg_df = actual_df.groupBy('id_airport').agg(avg('temp'), avg('pop'))\
.withColumnRenamed('avg(temp)', 'avg_temp')\
.withColumnRenamed('avg(pop)', 'avg_pop')

In [21]:
agg_df.collect()

23/01/12 16:30:43 WARN scheduler.TaskSetManager: Stage 16 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.
23/01/12 16:30:44 WARN scheduler.TaskSetManager: Stage 12 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.
23/01/12 16:30:44 WARN scheduler.TaskSetManager: Stage 13 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[Row(id_airport=3997, avg_temp=288.5, avg_pop=0.0),
 Row(id_airport=2563, avg_temp=281.69000244140625, avg_pop=0.20999999344348907),
 Row(id_airport=2542, avg_temp=279.9700012207031, avg_pop=0.7300000190734863),
 Row(id_airport=2513, avg_temp=283.9100036621094, avg_pop=1.0),
 Row(id_airport=301881, avg_temp=282.9800109863281, avg_pop=0.11999999731779099),
 Row(id_airport=4321, avg_temp=288.17999267578125, avg_pop=0.20999999344348907),
 Row(id_airport=2212, avg_temp=283.07000732421875, avg_pop=0.2800000011920929),
 Row(id_airport=4293, avg_temp=282.1400146484375, avg_pop=0.0),
 Row(id_airport=4155, avg_temp=284.3999938964844, avg_pop=0.0),
 Row(id_airport=4482, avg_temp=274.8500061035156, avg_pop=0.2199999988079071),
 Row(id_airport=317457, avg_temp=280.54998779296875, avg_pop=0.27000001072883606),
 Row(id_airport=2758, avg_temp=277.0400085449219, avg_pop=0.07999999821186066),
 Row(id_airport=4408, avg_temp=280.3299865722656, avg_pop=0.5400000214576721),
 Row(id_airport=4354, avg_temp=2

In [27]:
hdfs_dir = '/user/airports_results/'
current_date = datetime.today().strftime('%Y%m%d')
agg_df.write.parquet(hdfs_dir + current_date + '.parquet')

23/01/12 16:45:59 WARN scheduler.TaskSetManager: Stage 40 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.
23/01/12 16:46:00 WARN scheduler.TaskSetManager: Stage 36 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.
23/01/12 16:46:00 WARN scheduler.TaskSetManager: Stage 38 contains a task of very large size (2137 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [30]:
results_df = spark_session.read.parquet(hdfs_dir + datetime.today().strftime('%Y%m%d') + ".parquet")
results_df.count()

                                                                                

116

In [31]:
results_df.head(5)

[Row(id_airport=4368, avg_temp=281.3500061035156, avg_pop=0.0),
 Row(id_airport=3972, avg_temp=282.9200134277344, avg_pop=0.0),
 Row(id_airport=4610, avg_temp=278.6099853515625, avg_pop=0.14000000059604645),
 Row(id_airport=2222, avg_temp=282.04998779296875, avg_pop=0.0),
 Row(id_airport=4434, avg_temp=281.739990234375, avg_pop=0.0)]