In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from utils.config import load_config

In [None]:
config = load_config("config.yaml")

In [None]:
sh_conf = config["spark_hadoop"]

spark_master = sh_conf["spark_master"]
hdfs_namenode = sh_conf["hdfs_namenode"]

hdfs_input = f"{hdfs_namenode}/input/json/"

hdfs_output_v1 = f"{hdfs_namenode}/output/data.parquet"
hdfs_output_v2 = f"{hdfs_namenode}/output/data_v2.parquet"
hdfs_output_orc = f"{hdfs_namenode}/output/data_v2.orc"


In [None]:
# Inicjalizacja SparkSession
spark = (
    SparkSession
    .builder
    .appName("JSON to Parquet")
    .master(spark_master)
    .getOrCreate()
)

In [None]:
spark

In [None]:
%%time

df = spark.read.json(hdfs_input)

In [None]:
%%time

df.count()

In [None]:
%%time

df.printSchema()

In [None]:
%%time

df.show()

In [None]:
%%time

df_flat = df \
    .withColumn("clouds_all", F.col("clouds.all")) \
    .withColumn("coord_lat", F.col("coord.lat")) \
    .withColumn("coord_lon", F.col("coord.lon")) \
    .withColumn("main_feels_like", F.col("main.feels_like")) \
    .withColumn("main_grnd_level", F.col("main.grnd_level")) \
    .withColumn("main_humidity", F.col("main.humidity")) \
    .withColumn("main_pressure", F.col("main.pressure")) \
    .withColumn("main_sea_level", F.col("main.sea_level")) \
    .withColumn("main_temp", F.col("main.temp")) \
    .withColumn("main_temp_max", F.col("main.temp_max")) \
    .withColumn("main_temp_min", F.col("main.temp_min")) \
    .withColumn("rain_1h", F.col("rain.`1h`")) \
    .withColumn("sys_sunrise", F.col("sys.sunrise")) \
    .withColumn("sys_sunset", F.col("sys.sunset")) \
    .withColumn("wind_deg", F.col("wind.deg")) \
    .withColumn("wind_gust", F.col("wind.gust")) \
    .withColumn("wind_speed", F.col("wind.speed")) \
    .withColumn("weather_exploded", F.explode_outer("weather")) \
    .withColumn("weather_id", F.col("weather_exploded.id")) \
    .withColumn("weather_main", F.col("weather_exploded.main")) \
    .withColumn("weather_description", F.col("weather_exploded.description")) \
    .drop("clouds", "coord", "main", "rain", "sys", "wind", "weather", "weather_exploded", "base", "cod", "timezone")


In [None]:
df_flat.printSchema()

In [None]:
%%time

df_flat.show()

In [None]:
%%time

df_flat = df_flat \
    .withColumn("dt_ts", F.to_timestamp(F.col("dt"))) \
    .withColumn("sys_sunrise_ts", F.to_timestamp(F.col("sys_sunrise"))) \
    .withColumn("sys_sunset_ts", F.to_timestamp(F.col("sys_sunset"))) \
    .drop("dt", "sys_sunrise", "sys_sunset")

In [None]:
%%time

df_flat.show()

In [None]:
df_flat.printSchema()

In [None]:
%%time

df_flat.write.mode("overwrite").parquet(hdfs_output_v1)

In [None]:
%%time

df_flat.coalesce(1).write.mode("overwrite").parquet(hdfs_output_v2)

In [None]:
%%time

df_flat.coalesce(1).write.mode("overwrite").format("orc").save(hdfs_output_orc)

In [None]:
%%time

df_read = spark.read.parquet(hdfs_output_v1)
df_read.show(5, truncate=False)

In [None]:
%%time

df_read.printSchema()

In [None]:
%%time

df_read = spark.read.parquet(hdfs_output_v2)
df_read.show(5, truncate=False)

In [None]:
%%time

df_read.printSchema()

In [None]:
%%time

df_read = spark.read.orc(hdfs_output_orc)
df_read.show(5, truncate=False)

In [None]:
%%time

df_read.printSchema()

In [None]:
spark.stop()