In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
import seaborn as sns

from utils.config import load_config

In [None]:
config = load_config("config.yaml")

In [None]:
config

In [None]:
sh_conf = config["spark_hadoop"]

spark_master = sh_conf["spark_master"]
hdfs_namenode = sh_conf["hdfs_namenode"]

hdfs_input = f"{hdfs_namenode}/input"

hdfs_output_v1 = f"{hdfs_namenode}/output/data.parquet"
hdfs_output_v2 = f"{hdfs_namenode}/output/data_v2.parquet"
hdfs_output_orc = f"{hdfs_namenode}/output/data_v2.orc"


In [None]:
# inicjalizacja sesji Sparka
spark = (
    SparkSession
    .builder
    .appName("JSON to Parquet")
    .master(spark_master)
    .getOrCreate()
)

In [None]:
spark

In [None]:
hdfs_input

In [None]:
%%time

# wczytanie wszystkich plików JSON z folderu 'hdfs_input'
df = spark.read.json(hdfs_input)

In [None]:
%%time

# ile rekordów mamy wczytyanych?
df.count()

In [None]:
%%time

# jak wygląda schemat wczytanych danych?
df.printSchema()

In [None]:
%%time

# jak wyglądają wczytane dane?
df.show()

In [None]:
%%time

# trochę "wydłubywania" zagnieżdżonych danych
df_flat = df \
    .withColumn("clouds_all", F.col("clouds.all")) \
    .withColumn("coord_lat", F.col("coord.lat")) \
    .withColumn("coord_lon", F.col("coord.lon")) \
    .withColumn("main_feels_like", F.col("main.feels_like")) \
    .withColumn("main_grnd_level", F.col("main.grnd_level")) \
    .withColumn("main_humidity", F.col("main.humidity")) \
    .withColumn("main_pressure", F.col("main.pressure")) \
    .withColumn("main_sea_level", F.col("main.sea_level")) \
    .withColumn("main_temp", F.col("main.temp")) \
    .withColumn("main_temp_min", F.col("main.temp_min")) \
    .withColumn("main_temp_max", F.col("main.temp_max")) \
    .withColumn("rain_1h", F.col("rain.`1h`")) \
    .withColumn("sys_sunrise", F.col("sys.sunrise")) \
    .withColumn("sys_sunset", F.col("sys.sunset")) \
    .withColumn("wind_deg", F.col("wind.deg")) \
    .withColumn("wind_speed", F.col("wind.speed")) \
    .withColumn("weather_exploded", F.explode_outer("weather")) \
    .withColumn("weather_id", F.col("weather_exploded.id")) \
    .withColumn("weather_main", F.col("weather_exploded.main")) \
    .withColumn("weather_description", F.col("weather_exploded.description")) \
    .drop("clouds", "coord", "main", "rain", "sys", "wind", "weather", "weather_exploded", "base", "cod", "timezone")


In [None]:
df_flat.printSchema()

In [None]:
%%time

# jak wyglądają dane po zmianie?
df_flat.show()

In [None]:
%%time

# feature engineering - tworzymy cechy z innych cech
df_flat = df_flat \
    .withColumn("dt_ts", F.to_timestamp(F.col("dt"))) \
    .withColumn("year", F.year(F.col("dt_ts"))) \
    .withColumn("month", F.month(F.col("dt_ts"))) \
    .withColumn("day", F.day(F.col("dt_ts"))) \
    .withColumn("hour", F.hour(F.col("dt_ts"))) \
    .withColumn("minute", F.minute(F.col("dt_ts"))) \
    .withColumn("sys_sunrise_ts", F.to_timestamp(F.col("sys_sunrise"))) \
    .withColumn("sys_sunset_ts", F.to_timestamp(F.col("sys_sunset"))) \
    .drop("dt", "sys_sunrise", "sys_sunset")

In [None]:
%%time
# jak wyglądają dane po zmianie, w nowym data frame?
df_flat.show()

In [None]:
# jak wygląda schemat danych po zmianie, w nowym data frame?
df_flat.printSchema()

In [None]:
%%time

# agregacja - średnia temperatura w danej godzinie
df_flat \
    .groupBy(['year', 'month', 'day', 'hour']) \
    .agg(F.avg('main_temp').alias('mean_temp')) \
    .orderBy(['year', 'month', 'day', 'hour']) \
    .show()

In [None]:
# pobranie danych ze Sparka do Pandas

df_plot = df_flat \
    .groupBy('hour') \
    .agg(F.avg('main_temp').alias('mean_temp'),
         F.min('main_temp').alias('min_temp'),
         F.max('main_temp').alias('max_temp'),) \
    .orderBy('hour') \
    .toPandas()

df_plot

In [None]:
# narysowanie wykresu z danych w Pandas
plt.figure(figsize=(10,6))

sns.scatterplot(data=df_plot,
                x='hour',
                y='mean_temp',
                color='black')

sns.scatterplot(data=df_plot,
                x='hour',
                y='min_temp',
                color='red')

sns.scatterplot(data=df_plot,
                x='hour',
                y='max_temp',
                color='green')

plt.show()

In [None]:
# ścieżki docelowe
print(" hdfs_output_v1:", hdfs_output_v1)
print(" hdfs_output_v2:", hdfs_output_v2)
print("hdfs_output_orc:", hdfs_output_orc)

In [None]:
%%time

# zapisanie plików jako parquet - tak, jak Spark je rozłoży
df_flat.write.mode("overwrite").parquet(hdfs_output_v1)

In [None]:
%%time

# zapisanie plików jako parquet - jako jeden plik
df_flat.coalesce(1).write.mode("overwrite").parquet(hdfs_output_v2)

In [None]:
%%time

# zapisanie plików jako ORC, jako jeden plik
df_flat.coalesce(1).write.mode("overwrite").format("orc").save(hdfs_output_orc)

In [None]:
%%time

# wczytanie danych z parqueta (wielo-plikowego)
df_read = spark.read.parquet(hdfs_output_v1)
df_read.show(5, truncate=False)

In [None]:
%%time

# jak wyglądają dane po zmianie, w nowym data frame?
df_read.printSchema()

In [None]:
%%time

# wczytanie danych z parqueta (jedno-plikowego)
df_read = spark.read.parquet(hdfs_output_v2)
df_read.show(5, truncate=False)

In [None]:
%%time

df_read.printSchema()

In [None]:
%%time

# wczytanie danych z ORC
df_read = spark.read.orc(hdfs_output_orc)
df_read.show(5, truncate=False)

In [None]:
%%time

df_read.printSchema()

In [None]:
spark.stop()