Read data into a dataframe from the north-east station.

In [0]:
raw_path = "/Volumes/workspace/default/environmental-data-analysis/raw-data/"

north_east_df = spark.read \
            .format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(raw_path + "/northeast.csv")

In [0]:
display(north_east_df)

Now I need to inspect the dataframe and determine what needs to be cleaned.

From the display above, I can see the following issues:
- column names are not in english, and there are spaces between words and special characters (done)
- a lot of -9999 values, need to investigate why
- index starts at 0 (done-removed)
- the hours data uses the current date and only the hours are different. need to make this a date timestamp (done)
- a lot of the station data is duplicated from another table called stations.csv. I will keep the station code and remove the rest (done)

Before I investigate, I need to rename the column names as it can disrupt some of the functions. 

Now I need to inspect the dataframe and determine what needs to be cleaned.




In [0]:
# import dependancies
from pyspark.sql.functions import col, date_format, to_timestamp, concat_ws
from pyspark.sql.types import TimestampType

In [0]:
# Take hours from 'hora' and save this as a new column 'formatted_time'
df_with_formatted_time = north_east_df.withColumn(
    "formatted_time",
    date_format(col("Hora"), "HH:mm:ss"))

In [0]:
# Concatenate 'date' and 'formatted_time' to create a new column 'date_time'
df_with_formatted_time = df_with_formatted_time.withColumn(
    "date",
    concat_ws(" ", col("Data"), col("formatted_time"))
    )


In [0]:
# Convert 'date_time' to timestamp format
df_with_timestamp = df_with_formatted_time.withColumn(
    "date",
    col("date").cast(TimestampType())
)

In [0]:
# drop unused columns
df_with_dropped_columns = df_with_timestamp.drop("index", "Data", "Hora", "formatted_time", "region", "state", "station", "latitude", "longitude", "height")

# rename column names
df_with_renamed_columns = df_with_dropped_columns.withColumnRenamed("PRECIPITAÇÃO TOTAL, HORÁRIO (mm)", "prcp") \
                       .withColumnRenamed("PRESSAO ATMOSFERICA AO NIVEL DA ESTACAO, HORARIA (mB)", "stp") \
                       .withColumnRenamed("PRESSÃO ATMOSFERICA MAX.NA HORA ANT. (AUT) (mB)", "smax") \
                       .withColumnRenamed("PRESSÃO ATMOSFERICA MIN. NA HORA ANT. (AUT) (mB)", "smin") \
                       .withColumnRenamed("RADIACAO GLOBAL (Kj/m²)", "gbrd") \
                       .withColumnRenamed("TEMPERATURA DO AR - BULBO SECO, HORARIA (°C)", "temp") \
                       .withColumnRenamed("TEMPERATURA DO PONTO DE ORVALHO (°C)", "dewp") \
                       .withColumnRenamed("TEMPERATURA MÁXIMA NA HORA ANT. (AUT) (°C)", "tmax") \
                       .withColumnRenamed("TEMPERATURA MÍNIMA NA HORA ANT. (AUT) (°C)", "tmin") \
                       .withColumnRenamed("TEMPERATURA ORVALHO MAX. NA HORA ANT. (AUT) (°C)", "dmax") \
                       .withColumnRenamed("TEMPERATURA ORVALHO MIN. NA HORA ANT. (AUT) (°C)", "dmin") \
                       .withColumnRenamed("UMIDADE REL. MAX. NA HORA ANT. (AUT) (%)", "hmax") \
                       .withColumnRenamed("UMIDADE REL. MIN. NA HORA ANT. (AUT) (%)", "hmin") \
                       .withColumnRenamed("UMIDADE RELATIVA DO AR, HORARIA (%)", "hmdy") \
                       .withColumnRenamed("VENTO, DIREÇÃO HORARIA (gr) (° (gr))", "wdct") \
                       .withColumnRenamed("VENTO, RAJADA MAXIMA (m/s)", "gust") \
                       .withColumnRenamed("VENTO, VELOCIDADE HORARIA (m/s)", "wdsp") \
                       .withColumnRenamed("station_code", "inme") 

In [0]:
display(df_with_renamed_columns)

In [0]:
north_eng_df.dtypes

# north_df.describe().show()

# north_df.count()

# north_df.distinct().count()

In [0]:
north_eng_df.describe().display()

In [0]:
north_eng_df.count()

In [0]:
north_eng_df.distinct().count()

In [0]:
pip install ydata-profiling[pyspark]

In [0]:
%restart_python

In [0]:
north_eng_df.write.format('delta').mode('overwrite').saveAsTable('north_eng')

df = spark.table('north_eng')
display(df)


In [0]:
from ydata_profiling import ProfileReport

report = ProfileReport(
    df,
    title="env analysis",
    infer_dtypes=False,
    interactions=None,
    missing_diagrams=None,
    correlations={
        "auto": {"calculate": False},
        "pearson": {"calculate": True},
        "spearman": {"calculate": True},
    }
)

In [0]:
report_html = report.to_html()


Creating a metadata dictionary:

In [0]:
# metadata dictionary, as I will be using the abbreviations for column names
metadata_dict = {
    "date": {
        "abbreviation": "date",
        "full_heading": "date",
        "description": "date (YYYY-MM-DD)",
        "data_type": "date"
    },
    "hr": {
        "abbreviation": "hr",
        "full_heading": "hour",
        "description": "hour (HH:00)",
        "data_type": "timestamp"
    },
    "prcp": {
        "abbreviation": "prcp",
        "full_heading": "total precipitation (mm)",
        "description": "Amount of precipitation in millimetres (last hour)",
        "data_type": "numerical"
    },
    "stp": {
        "abbreviation": "stp",
        "full_heading": "atmospheric pressure at station height (mb)",
        "description": "Atmospheric pressure at station level (mb)",
        "data_type": "numerical"
    },
    "smax": {
        "abbreviation": "smax",
        "full_heading": "atmospheric pressure max. in the previous hour (mb)",
        "description": "Maximum air pressure for the last hour in hPa to tenths",
        "data_type": "numerical"
    },
    "smin": {
        "abbreviation": "smin",
        "full_heading": "atmospheric pressure min. in the previous hour (mb)",
        "description": "Minimum air pressure for the last hour in hPa to tenths",
        "data_type": "numerical"
    },
    "gbrd": {
        "abbreviation": "gbrd",
        "full_heading": "radiation (kj/m2)",
        "description": "Solar radiation KJ/m2",
        "data_type": "numerical"
    },
    "temp": {
        "abbreviation": "temp",
        "full_heading": "air temperature - dry bulb (°c)",
        "description": "Air temperature (instant) in celsius degrees",
        "data_type": "numerical"
    },
    "dewp": {
        "abbreviation": "dewp",
        "full_heading": "dew point temperature (°c)",
        "description": "Dew point temperature (instant) in celsius degrees",
        "data_type": "numerical"
    },
    "tmax": {
        "abbreviation": "tmax",
        "full_heading": "max. temperature in the previous hour (°c)",
        "description": "Maximum temperature for the last hour in celsius degrees",
        "data_type": "numerical"
    },
    "tmin": {
        "abbreviation": "tmin",
        "full_heading": "min. temperature in the previous hour (°c)",
        "description": "Minimum temperature for the last hour in celsius degrees",
        "data_type": "numerical"
    },
    "dmax": {
        "abbreviation": "dmax",
        "full_heading": "dew temperature max. in the previous hour (°c)",
        "description": "Maximum dew point temperature for the last hour in celsius degrees",
        "data_type": "numerical"
    },
    "dmin": {
        "abbreviation": "dmin",
        "full_heading": "dew temperature min. in the previous hour (°c)",
        "description": "Minimum dew point temperature for the last hour in celsius degrees",
        "data_type": "numerical"
    },
    "hmax": {
        "abbreviation": "hmax",
        "full_heading": "relative humidity max. in the previous hour (%)",
        "description": "Maximum relative humid temperature for the last hour in %",
        "data_type": "numerical"
    },
    "hmin": {
        "abbreviation": "hmin",
        "full_heading": "relative humidity min. in the previous hour (%)",
        "description": "Minimum relative humid temperature for the last hour in %",
        "data_type": "numerical"
    },
    "hmdy": {
        "abbreviation": "hmdy",
        "full_heading": "air relative humidity (%)",
        "description": "Relative humid in % (instant)",
        "data_type": "numerical"
    },
    "wdct": {
        "abbreviation": "wdct",
        "full_heading": "wind direction (° (gr))",
        "description": "Wind direction in radius degrees (0-360)",
        "data_type": "numerical"
    },
    "gust": {
        "abbreviation": "gust",
        "full_heading": "wind rajada maxima (m/s)",
        "description": "Wind gust in metres per second",
        "data_type": "numerical"
    },
    "wdsp": {
        "abbreviation": "wdsp",
        "full_heading": "wind speed (m/s)",
        "description": "Wind speed in metres per second",
        "data_type": "numerical"
    },
    "region": {
        "abbreviation": "region",
        "full_heading": "region",
        "description": "Brazilian geopolitical regions",
        "data_type": "string"
    },
    "stateprov": {
        "abbreviation": "stateprov",
        "full_heading": "state",
        "description": "State (Province)",
        "data_type": "string"
    },
    "wsnm": {
        "abbreviation": "wsnm",
        "full_heading": "station",
        "description": "Name station (usually city location or nickname)",
        "data_type": "string"
    },
    "inme": {
        "abbreviation": "inme",
        "full_heading": "station_code",
        "description": "Station number (INMET number) for the location",
        "data_type": "string"
    },
    "lat": {
        "abbreviation": "lat",
        "full_heading": "latitude",
        "description": "Latitude",
        "data_type": "numerical"
    },
    "lon": {
        "abbreviation": "lon",
        "full_heading": "longitude",
        "description": "Longitude",
        "data_type": "numerical"
    },
    "elvt": {
        "abbreviation": "elvt",
        "full_heading": "height",
        "description": "Elevation",
        "data_type": "numerical"
    }
}