In [46]:
import re, os
import shutil
from itertools import chain
from datetime import datetime
from pyspark.sql import SparkSession
from spark_schemas import *
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, ArrayType

In [47]:
# instance pyspark session
pyspark = SparkSession.builder.appName('OpenWeather').master('local[*]').getOrCreate()

# create dataframe schema
schema = StructType([
    StructField('_id', StringType(), True),
    StructField('created_at', TimestampType(), True),
    StructField('city_id', IntegerType(), True),
    StructField('lat', DoubleType(), True),
    StructField('lon', DoubleType(), True),
    StructField('country', StringType(), True),
    StructField('temp', DoubleType(), True),
    StructField('max_temp', DoubleType(), True),
    StructField('min_temp', DoubleType(), True),
    StructField('feels_like', DoubleType(), True),
    StructField('humidity', IntegerType(), True)]
)

today = f'{datetime.today().date()}'.replace('-', '')

# read json file
# df_pyspark_schema = pyspark.read.schema(schema).json(f'../data/openweather_{today}.json')
df_pyspark_schema = pyspark.read.schema(schema).json(f'../data/openweather_*.json')

print(df_pyspark_schema.printSchema())
print(df_pyspark_schema.show())

root
 |-- _id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- country: string (nullable = true)
 |-- temp: double (nullable = true)
 |-- max_temp: double (nullable = true)
 |-- min_temp: double (nullable = true)
 |-- feels_like: double (nullable = true)
 |-- humidity: integer (nullable = true)

None
+--------------------+-------------------+-------+-------+-------+-------+------+--------+--------+----------+--------+
|                 _id|         created_at|city_id|    lat|    lon|country|  temp|max_temp|min_temp|feels_like|humidity|
+--------------------+-------------------+-------+-------+-------+-------+------+--------+--------+----------+--------+
|{"$oid":"61bf0e36...|2021-12-19 11:49:26|2950159|52.5244|13.4105|     DE|279.39|  280.79|  278.76|    275.21|      91|
|{"$oid":"61bf0e3b...|2021-12-19 11:49:31|2988507|48.8534| 2.3488|     FR|277.89|

### Cleaning data

In [48]:
# _id - remove ('{"$oid":') from _id string, drop create id column, drop _id column and reorder columns
def extract(col):
    if col: return re.findall(r'"\d+\w+"', col)
    else: return None

extract_udf = F.udf(lambda x: extract(x), ArrayType(StringType()))

df_pyspark = df_pyspark_schema.withColumn('id', extract_udf(df_pyspark_schema._id)[0]).drop('_id')\
                .select('id', 'created_at', 'city_id', 'lat', 'lon', 'country', 'temp', 'max_temp', 'min_temp', 'feels_like', 'humidity')
df_pyspark.show()

+--------------------+-------------------+-------+-------+-------+-------+------+--------+--------+----------+--------+
|                  id|         created_at|city_id|    lat|    lon|country|  temp|max_temp|min_temp|feels_like|humidity|
+--------------------+-------------------+-------+-------+-------+-------+------+--------+--------+----------+--------+
|"61bf0e36c0963c89...|2021-12-19 11:49:26|2950159|52.5244|13.4105|     DE|279.39|  280.79|  278.76|    275.21|      91|
|"61bf0e3bc0963c89...|2021-12-19 11:49:31|2988507|48.8534| 2.3488|     FR|277.89|  278.62|  277.19|    276.44|      97|
|"61bf0e40c0963c89...|2021-12-19 11:49:36|3128760|41.3888|  2.159|     ES|284.19|  286.78|  281.78|    283.36|      77|
|"61bf0e45c0963c89...|2021-12-19 11:49:41|2759794| 52.374| 4.8897|     NL| 280.2|   281.4|  279.19|    278.34|      94|
|"61bf0e4ac0963c89...|2021-12-19 11:49:46|3094802|50.0833|19.9167|     PL| 278.4|  279.01|  276.56|     278.4|      89|
|"61bf0e50c0963c89...|2021-12-19 11:49:5

In [49]:
# create city column based on city_id column
cities_map = {
        "2950159": "Berlin",
        "2988507": "Paris",
        "3128760": "Barcelona",
        "2759794": "Amsterdam",
        "3094802": "Krakow",
        "2761369": "Vienna",
        "2643743": "London"
    }

mapping_expr = F.create_map([F.lit(x) for x in chain(*cities_map.items())])
df_cities = df_pyspark.withColumn('city', mapping_expr.getItem(F.col("city_id")))
df_cities.show()



+--------------------+-------------------+-------+-------+-------+-------+------+--------+--------+----------+--------+---------+
|                  id|         created_at|city_id|    lat|    lon|country|  temp|max_temp|min_temp|feels_like|humidity|     city|
+--------------------+-------------------+-------+-------+-------+-------+------+--------+--------+----------+--------+---------+
|"61bf0e36c0963c89...|2021-12-19 11:49:26|2950159|52.5244|13.4105|     DE|279.39|  280.79|  278.76|    275.21|      91|   Berlin|
|"61bf0e3bc0963c89...|2021-12-19 11:49:31|2988507|48.8534| 2.3488|     FR|277.89|  278.62|  277.19|    276.44|      97|    Paris|
|"61bf0e40c0963c89...|2021-12-19 11:49:36|3128760|41.3888|  2.159|     ES|284.19|  286.78|  281.78|    283.36|      77|Barcelona|
|"61bf0e45c0963c89...|2021-12-19 11:49:41|2759794| 52.374| 4.8897|     NL| 280.2|   281.4|  279.19|    278.34|      94|Amsterdam|
|"61bf0e4ac0963c89...|2021-12-19 11:49:46|3094802|50.0833|19.9167|     PL| 278.4|  279.01|

In [50]:
# replace country column with country names
country_map = {
   "NL": "Netherlands",
   "PL": "Poland",
   "AT": "Austria",
   "GB": "England",
   "DE": "Germany",
   "ES": "Spain",
   "FR": "France"
}
mapping_expr = F.create_map([F.lit(x) for x in chain(*country_map.items())])
df_countries = df_cities.withColumn('country', mapping_expr.getItem(F.col("country")))
df_countries.show()

+--------------------+-------------------+-------+-------+-------+-----------+------+--------+--------+----------+--------+---------+
|                  id|         created_at|city_id|    lat|    lon|    country|  temp|max_temp|min_temp|feels_like|humidity|     city|
+--------------------+-------------------+-------+-------+-------+-----------+------+--------+--------+----------+--------+---------+
|"61bf0e36c0963c89...|2021-12-19 11:49:26|2950159|52.5244|13.4105|    Germany|279.39|  280.79|  278.76|    275.21|      91|   Berlin|
|"61bf0e3bc0963c89...|2021-12-19 11:49:31|2988507|48.8534| 2.3488|     France|277.89|  278.62|  277.19|    276.44|      97|    Paris|
|"61bf0e40c0963c89...|2021-12-19 11:49:36|3128760|41.3888|  2.159|      Spain|284.19|  286.78|  281.78|    283.36|      77|Barcelona|
|"61bf0e45c0963c89...|2021-12-19 11:49:41|2759794| 52.374| 4.8897|Netherlands| 280.2|   281.4|  279.19|    278.34|      94|Amsterdam|
|"61bf0e4ac0963c89...|2021-12-19 11:49:46|3094802|50.0833|19.9

In [51]:
# temp/max_temp/min_temp/feels_like from Kelvin to Fahreheint(F) and Ceucius(C)
# Fahreheint (K − 273,15) × 9/5 + 32
F_func = lambda x: (x - 273.15) * 9/5 + 32
F_udf = F.udf(F_func, DoubleType())
df_fahrehenint = df_countries.withColumn('temp_F', F.round(F_udf(df_countries.temp), 2))
df_fahrehenint = df_fahrehenint.withColumn('max_temp_F', F.round(F_udf(df_countries.max_temp), 2))
df_fahrehenint = df_fahrehenint.withColumn('min_temp_F', F.round(F_udf(df_countries.min_temp), 2))
df_fahrehenint = df_fahrehenint.withColumn('feels_like_F', F.round(F_udf(df_countries.feels_like), 2))

df_fahrehenint.show()

+--------------------+-------------------+-------+-------+-------+-----------+------+--------+--------+----------+--------+---------+------+----------+----------+------------+
|                  id|         created_at|city_id|    lat|    lon|    country|  temp|max_temp|min_temp|feels_like|humidity|     city|temp_F|max_temp_F|min_temp_F|feels_like_F|
+--------------------+-------------------+-------+-------+-------+-----------+------+--------+--------+----------+--------+---------+------+----------+----------+------------+
|"61bf0e36c0963c89...|2021-12-19 11:49:26|2950159|52.5244|13.4105|    Germany|279.39|  280.79|  278.76|    275.21|      91|   Berlin| 43.23|     45.75|      42.1|       35.71|
|"61bf0e3bc0963c89...|2021-12-19 11:49:31|2988507|48.8534| 2.3488|     France|277.89|  278.62|  277.19|    276.44|      97|    Paris| 40.53|     41.85|     39.27|       37.92|
|"61bf0e40c0963c89...|2021-12-19 11:49:36|3128760|41.3888|  2.159|      Spain|284.19|  286.78|  281.78|    283.36|      

In [52]:
# Ceucius (K − 273,15)
C_func = lambda x: x - 273.15
C_udf = F.udf(C_func, DoubleType())
df_celcius = df_fahrehenint.withColumn('temp_C', F.round(C_udf(df_countries.temp), 2))
df_celcius = df_celcius.withColumn('max_temp_C', F.round(C_udf(df_countries.max_temp), 2))
df_celcius = df_celcius.withColumn('min_temp_C', F.round(C_udf(df_countries.min_temp), 2))
df_celcius = df_celcius.withColumn('feels_like_C', F.round(C_udf(df_countries.feels_like), 2))
df_celcius.show()

+--------------------+-------------------+-------+-------+-------+-----------+------+--------+--------+----------+--------+---------+------+----------+----------+------------+------+----------+----------+------------+
|                  id|         created_at|city_id|    lat|    lon|    country|  temp|max_temp|min_temp|feels_like|humidity|     city|temp_F|max_temp_F|min_temp_F|feels_like_F|temp_C|max_temp_C|min_temp_C|feels_like_C|
+--------------------+-------------------+-------+-------+-------+-----------+------+--------+--------+----------+--------+---------+------+----------+----------+------------+------+----------+----------+------------+
|"61bf0e36c0963c89...|2021-12-19 11:49:26|2950159|52.5244|13.4105|    Germany|279.39|  280.79|  278.76|    275.21|      91|   Berlin| 43.23|     45.75|      42.1|       35.71|  6.24|      7.64|      5.61|        2.06|
|"61bf0e3bc0963c89...|2021-12-19 11:49:31|2988507|48.8534| 2.3488|     France|277.89|  278.62|  277.19|    276.44|      97|    P

In [53]:
# drop Kelvin columns
df_drop = df_celcius.drop('temp', 'max_temp', 'min_temp', 'feels_like')

df_drop.show()

+--------------------+-------------------+-------+-------+-------+-----------+--------+---------+------+----------+----------+------------+------+----------+----------+------------+
|                  id|         created_at|city_id|    lat|    lon|    country|humidity|     city|temp_F|max_temp_F|min_temp_F|feels_like_F|temp_C|max_temp_C|min_temp_C|feels_like_C|
+--------------------+-------------------+-------+-------+-------+-----------+--------+---------+------+----------+----------+------------+------+----------+----------+------------+
|"61bf0e36c0963c89...|2021-12-19 11:49:26|2950159|52.5244|13.4105|    Germany|      91|   Berlin| 43.23|     45.75|      42.1|       35.71|  6.24|      7.64|      5.61|        2.06|
|"61bf0e3bc0963c89...|2021-12-19 11:49:31|2988507|48.8534| 2.3488|     France|      97|    Paris| 40.53|     41.85|     39.27|       37.92|  4.74|      5.47|      4.04|        3.29|
|"61bf0e40c0963c89...|2021-12-19 11:49:36|3128760|41.3888|  2.159|      Spain|      77|Bar

In [54]:
print(df_final.dtypes)

[('id', 'string'), ('created_at', 'timestamp'), ('month', 'int'), ('day', 'int'), ('hour', 'int'), ('city_id', 'int'), ('city', 'string'), ('country', 'string'), ('lat', 'double'), ('lon', 'double'), ('humidity', 'int'), ('min_temp_F', 'double'), ('temp_F', 'double'), ('max_temp_F', 'double'), ('feels_like_F', 'double'), ('min_temp_C', 'double'), ('temp_C', 'double'), ('max_temp_C', 'double'), ('feels_like_C', 'double')]


In [55]:
# separate month, day and hour
# MONTH
df_month = df_ceucius.withColumn("month", F.month(df_ceucius.created_at))

# DAY
df_day = df_month.withColumn("day", F.dayofmonth(df_month.created_at))

# HOUR
df_hour = df_day.withColumn("hour", F.hour(df_day.created_at))
df_hour.show()

+--------------------+-------------------+-------+-------+-------+-----------+------+--------+--------+----------+--------+---------+------+----------+----------+------------+------+----------+----------+------------+-----+---+----+
|                  id|         created_at|city_id|    lat|    lon|    country|  temp|max_temp|min_temp|feels_like|humidity|     city|temp_F|max_temp_F|min_temp_F|feels_like_F|temp_C|max_temp_C|min_temp_C|feels_like_C|month|day|hour|
+--------------------+-------------------+-------+-------+-------+-----------+------+--------+--------+----------+--------+---------+------+----------+----------+------------+------+----------+----------+------------+-----+---+----+
|"61bf0e36c0963c89...|2021-12-19 11:49:26|2950159|52.5244|13.4105|    Germany|279.39|  280.79|  278.76|    275.21|      91|   Berlin| 43.23|     45.75|      42.1|       35.71|  6.24|      7.64|      5.61|        2.06|   12| 19|  11|
|"61bf0e3bc0963c89...|2021-12-19 11:49:31|2988507|48.8534| 2.3488|  

In [56]:
# reorder columns
df_final = df_hour.select(
    'id', 'created_at', 'month', 'day', 'hour', 'city_id', 'city', 'country', 'lat', 'lon', 'humidity',
    'min_temp_F', 'temp_F', 'max_temp_F', 'feels_like_F',
    'min_temp_C', 'temp_C', 'max_temp_C', 'feels_like_C',
)

df_final.printSchema()
df_final.show(10)

root
 |-- id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- min_temp_F: double (nullable = true)
 |-- temp_F: double (nullable = true)
 |-- max_temp_F: double (nullable = true)
 |-- feels_like_F: double (nullable = true)
 |-- min_temp_C: double (nullable = true)
 |-- temp_C: double (nullable = true)
 |-- max_temp_C: double (nullable = true)
 |-- feels_like_C: double (nullable = true)

+--------------------+-------------------+-----+---+----+-------+---------+-----------+-------+-------+--------+----------+------+----------+------------+----------+------+----------+------------+
|                  id|         created_at|month|d

In [79]:
# save as parquet
output_dir = '../data/'
filename = f'openweather_{today}'
file_path = f"{output_dir}{filename}.parquet"

# check if outputdir exists
if not os.path.exists(output_dir):
        os.makedirs(output_dir)

# check if daily parquet file already exists and delete it to save a new one
# case the file runs more than once a day.
if os.path.exists(file_path):
    shutil.rmtree(file_path, ignore_errors=True)
    
df_final.write.parquet(file_path)