### Przetwarzanie danych z sondaży aerologicznych

Źródło danych: http://weather.uwyo.edu/upperair/sounding.html

Dokonano ekstrakcji danych pomiarowych dla zakresu dat: 2013.06.01 - 2017.05.31

Ilość rekordów wynosi: 273 497

Każdy wiersz danych składa się z 6 pól:
* data i godzina pomiaru
* ciśnienie
* wysokość balonu
* temperatura powietrza
* temperatura punktu rosy
* wilgotność względna

##### Załadowanie zewnętrznych danych

Pierwszym krokiem jest załadowanie zewnętrznych danych, aby utworzyć RDD (Resilient Distributed Dataset). Jest to abstrakcyjny typ danych używany w Apache Spark jako podstawowy.

In [3]:
meteo_rdd = sc.textFile("/FileStore/tables/el11t8q01497432769031/data_csv-1aa57.gz")
print('Number of raw records: {}'.format(meteo_rdd.count()))

# podziel kazda linie na poszczegolne pola 
splitted_lines = meteo_rdd.map(lambda l: l.split(";"))
def group_height(h):
  if h <= 250:
    return(1)
  elif h > 250 and h <= 2000:
    return(2)
  elif h > 2000:
    return(int(h / 6000) + 3)
  else:
    return(0)

# zrzutuj podzielone pola na odpowiednie typy i stworz krotke (tuple)
measures = splitted_lines.map(lambda p: (int("20" + p[0][:2]), int(p[0][2:4]), int(p[0][4:6]), float(p[1]), int(p[2]), float(p[3]), float(p[4]), int(p[5]), group_height(int(p[2])), str(p[0][:6])))


##### Wykonanie agregacji na RDD

Początkowym etapem jest wyodrębnienie osobnych części RDD ze zbioru danych za pomocą operacji filter(). Do sprawdzenia warunku zadeklarowano prostą funkcję, która sprawdza czy wysokość mieści się w zadanym przedziale. Za pomocą operacji map() pobieramy pomiary ciśnienia atmosferycznego. Następnie za pomocą funkcji reduce() możemy policzyć całkowite ciśnienie na danej wysokości i podzielić tą sumę, aby otrzymać uśrednione wartości.

###### 1) za pomocą reduce()

In [5]:
# funkcja do filtrowania wysokosci
def h_is_near(h, hp, r):
  if h >= hp - r and h <= hp + r:
    return(True)
  else:
    return(False)

# wyodrebnienie danych RDD i pobranie cisnienia
pressure_near_ground = measures.filter(lambda x: h_is_near(int(x[4]), 100, 50)).map(lambda x: int(x[3]))
pressure_near_1km = measures.filter(lambda x: h_is_near(int(x[4]), 1000, 100)).map(lambda x: int(x[3]))
pressure_near_5km = measures.filter(lambda x: h_is_near(int(x[4]), 5000, 500)).map(lambda x: int(x[3]))
pressure_near_15km = measures.filter(lambda x: h_is_near(int(x[4]), 15000, 1500)).map(lambda x: int(x[3]))
pressure_near_30km = measures.filter(lambda x: h_is_near(int(x[4]), 30000, 3000)).map(lambda x: int(x[3]))

# obliczenie calkowitej liczby probek
near_ground_count = pressure_near_ground.count()
near_1km_count = pressure_near_1km.count()
near_5km_count = pressure_near_5km.count()
near_15km_count = pressure_near_15km.count()
near_30km_count = pressure_near_30km.count()

# agregacje
total_pressure_near_ground = pressure_near_ground.reduce(lambda x, y: x+y)
total_pressure_near_1km = pressure_near_1km.reduce(lambda x, y: x+y)
total_pressure_near_5km = pressure_near_5km.reduce(lambda x, y: x+y)
total_pressure_near_15km = pressure_near_15km.reduce(lambda x, y: x+y)
total_pressure_near_30km = pressure_near_30km.reduce(lambda x, y: x+y)

# wyniki
print("Average pressure near the ground:\t{}".format(total_pressure_near_ground/near_ground_count))
print("Average pressure at 1 km:\t{}".format(total_pressure_near_1km/near_1km_count))
print("Average pressure at 5 km:\t{}".format(total_pressure_near_5km/near_5km_count))
print("Average pressure at 15 km:\t{}".format(total_pressure_near_15km/near_15km_count))
print("Average pressure at 30 km:\t{}".format(total_pressure_near_30km/near_30km_count))


###### 2) za pomocą aggregate()

Krótszy kod, wykonanie kilku operacji w każdej iteracji co jest około 2x szybsze.

In [7]:
# funkcja do filtrowania wysokosci
def h_is_near(h, hp, r):
  if h >= hp - r and h <= hp + r:
    return(True)
  else:
    return(False)

# funkcja dodajaca wartosc do akumulatora
def add2acc(acc, value):
  return(acc[0] + value, acc[1] + 1)

# funkcja dodajaca dwa akumulatory
def addaccs(acc1, acc2):
  return(acc1[0] + acc2[0], acc1[1] + acc2[1])

# wyodrebnienie danych RDD i pobranie cisnienia
pressure_near_ground = measures.filter(lambda x: h_is_near(int(x[4]), 100, 50)).map(lambda x: int(x[3]))
pressure_near_1km = measures.filter(lambda x: h_is_near(int(x[4]), 1000, 100)).map(lambda x: int(x[3]))
pressure_near_5km = measures.filter(lambda x: h_is_near(int(x[4]), 5000, 500)).map(lambda x: int(x[3]))
pressure_near_15km = measures.filter(lambda x: h_is_near(int(x[4]), 15000, 1500)).map(lambda x: int(x[3]))
pressure_near_30km = measures.filter(lambda x: h_is_near(int(x[4]), 30000, 3000)).map(lambda x: int(x[3]))

# agregacje (calkowite cisnienie, calkowita liczba probek)
aggr_near_ground = pressure_near_ground.aggregate((0, 0), add2acc, addaccs)
aggr_near_1km = pressure_near_1km.aggregate((0, 0), add2acc, addaccs)
aggr_near_5km = pressure_near_5km.aggregate((0, 0), add2acc, addaccs)
aggr_near_15km = pressure_near_15km.aggregate((0, 0), add2acc, addaccs)
aggr_near_30km = pressure_near_30km.aggregate((0, 0), add2acc, addaccs)

# wyniki
print("Average pressure near the ground:\t{}".format(aggr_near_ground[0]/aggr_near_ground[1]))
print("Average pressure at 1 km:\t{}".format(aggr_near_1km[0]/aggr_near_1km[1]))
print("Average pressure at 5 km:\t{}".format(aggr_near_5km[0]/aggr_near_5km[1]))
print("Average pressure at 15 km:\t{}".format(aggr_near_15km[0]/aggr_near_15km[1]))
print("Average pressure at 30 km:\t{}".format(aggr_near_30km[0]/aggr_near_30km[1]))


##### Przekształcenie RDD do DataFrame

In [9]:
meteo_df = spark.createDataFrame(measures, ['year', 'month', 'day', 'pressure', 'height', 'temp', 'dewpoint', 'humidity', 'height_group', 'datestamp'])
meteo_df.printSchema()
meteo_df.show()


##### Wykonanie agregacji na DF

Z wykorzystaniem DSL i SQL.

In [11]:
# ilosc pomiarow w poszczegolnych latach ponizej zadanej wysokosci
meteo_df.filter(meteo_df['height'] < 150).groupBy('year').count().orderBy('year').show() 

meteo_df.createOrReplaceTempView("meteo")
spark.sql("SELECT year, count(*) FROM meteo WHERE height < 150 GROUP BY year ORDER BY year").show()

averages_in_months_and_heights = meteo_df.groupBy('month','height_group').avg('temp','pressure','humidity','dewpoint').orderBy('month','height_group', ascending=True)
maximums_in_months_and_heights = meteo_df.groupBy('month','height_group').max('temp','pressure','humidity','dewpoint').orderBy('month','height_group', ascending=True)

from pyspark.sql.functions import mean, min, max, col
aggregate = ['temp','pressure','humidity','dewpoint'] 
funs = [mean, min, max]
exprs = [f(col(c)) for f in funs for c in aggregate]

stats_in_months_and_heights = meteo_df.groupBy('month','height_group').agg(*exprs).orderBy('month','height_group', ascending=True)



##### Wizualizacja danych

In [13]:
display(stats_in_months_and_heights)


##### Wykorzystanie dowolnego algorytmu z Spark MLlib

Dokonano klasteryzacji z wykorzystaniem metody k-średnich.

In [15]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array
from math import sqrt

# bierzemy najnizszy pomiar dla kazdego dnia
reducedMeasures = measures.map(lambda x: (x[9],x,)).reduceByKey(lambda x, y: x if x[4] < y[4] else y)
#[x for x in reducedMeasures.toLocalIterator()]

# do k-srednich wrzucamy temperature i punkt rosy
measures_for_kmeans = reducedMeasures.map(lambda x: array([float(x[1][5]), float(x[1][7])]))
#[x for x in measures_for_kmeans.toLocalIterator()]

# dzielimy na 2 grupy umownie lato i zima
clusters = KMeans.train(measures_for_kmeans, 4, maxIterations=10, initializationMode="random")

def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = measures_for_kmeans.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# laczymy wyniki klasteryzacji z danymi (dokonujemy predykcji z modelu)
predictions = reducedMeasures.map(lambda x: (x[1][0], x[1][1], x[1][2], x[1][3], x[1][4], x[1][5], x[1][6], x[1][7], x[1][8], x[1][9], clusters.predict(array([x[1][5],x[1][7]]))))

predictionsDF = spark.createDataFrame(predictions,['year', 'month', 'day', 'pressure', 'height', 'temp', 'dewpoint', 'humidity', 'height_group', 'datestamp', 'CLUSTER'])
predictionsDF.orderBy('year', 'month', 'day', ascending=True).show()


In [16]:
# wyswietlamy rezultat dla konkretnego roku
#display(predictionsDF.filter(predictionsDF.year == 2014).orderBy('month', 'day', ascending=True))
display(predictionsDF.filter(predictionsDF.year == 2015).orderBy('month', 'day', ascending=True))
#display(predictionsDF.filter(predictionsDF.year == 2016).orderBy('month', 'day', ascending=True))

# ustawic datestamp/cluster/temp i jako wykres obszarowy