# Direct Line Weather : ETL Workflow Demo

In [1]:
from py4j.java_gateway import java_import
from datetime import datetime, time
from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession, SQLContext, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import date_format
from functools import reduce
import pandas as pd
import os
import zipfile

In [2]:
source_dir = 'C:\DLTestFiles\\'
zipped_dir = 'C:\\Users\malee\\Downloads\\Data Engineer Test.zip'
landing_path = [source_dir]
clean_dir = 'clean\weather\\'
raw_folder = 'raw\weather\\'
base_dir = 'C:\SparkDemo\\'
temp_dir = 'tempdir\\'
tempFiles, extracted_date = [], []
df_dict = {}

# ok lets create a spark session
# you can set additional configs, however this would be dependent on the local resources you have available
def createSparkSession(app_name, master='local'):
    try:
        session = SparkSession.builder.master(master).appName(app_name)\
                  .getOrCreate()
    except Py4JJavaError as e:
        raise e
    return session


sc = createSparkSession('HelloWorld') 

# this function allows us to visualise the file tree root structure 
def print_path_files(printpath):
    import os
    for root, dirs, files in os.walk(printpath):
        level = root.replace(printpath, '').count(os.sep)
        indent = ' ' * 4 * (level)
        print('{}{}/'.format(indent, os.path.basename(root)))
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print('{}{}'.format(subindent, f))

In [3]:
# extracting data from the zipped folder into the ingestion directory
with zipfile.ZipFile(zipped_dir, "r") as zipObj:
    try:
        FileList = zipObj.namelist()
        for file in FileList:
            if file.startswith('weather'):
                zipObj.extract(file, source_dir)
        print("files have been successfully extracted!")
    except OSError as o:
        print(str(o))
        raise o
        
print_path_files(source_dir)

files have been successfully extracted!
/
    weather.20160201.csv
    weather.20160301.csv


In [4]:
# setting hadoop properties with key value pairs
# The aim of setting these properties is to build a bridge between pyspark and hdfs via java
java_import(sc._jvm, 'org.apache.hadoop.fs.Path')
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
sc._jsc.hadoopConfiguration().set("parquet.enable.summary-metadata.level", "false")
sc._jsc.hadoopConfiguration().set("spark.sql.sources.partitionColumnTypeInference.enabled", "true")
sc._jsc.hadoopConfiguration().set("spark.sql.execution.arrow.enabled","true")

# ok, lets read the csv files from source and put them in the ingestion folder
def split_file_date(sourcedir):
    """
    This function is specifically used to extract the date from the
    filenames and converting them to date format. The extracted
    dates are then appended to a list, which is then returned from the
    function.
    """
    import glob
    file_list = []
    try:
        for root, dirs, files in os.walk(sourcedir):
            file_list += glob.glob(os.path.join(root, '*.csv'))
            for i in file_list:
                get_date = i.split(".")[1]
                convert_to_date = datetime.strptime(get_date, '%Y%m%d').strftime('%Y/%m/%d').replace('/', '\\')
                extracted_date.append(convert_to_date)
    except OSError as e:
        print(str(e))
        raise e

    return file_list


# the for loop uses python's built-in enumerate function, with the udf defined above to create a dicitonary,
# which stores the name of the dataframe associated with file that's being loaded into it.
for f, df in enumerate(split_file_date(source_dir)):
    df_dict['df_' + str(df).split(".")[1]] = sc.read.csv(df, header=True)

# another for-loop is then used to iterate over the dict, from which the dataframe is extracted with it's associated name.
# Remember, the name of the dataframe has been extracted from the date port of the filename in form of date so df_20160201
for i, j in df_dict.items():
    try:
        date_split = datetime.strptime(i.split("_")[1], '%Y%m%d').strftime('%Y/%m/%d').replace('/', '\\')
        base_path = base_dir + raw_folder
        full_path = base_path + date_split + '\\' + temp_dir
        # ok, so spark by its nature as an MPP engine will partition files (partitions are dependent on the number of
        # of executors defined- each executor is responsible for a task), which is usefui for a large workload
        # however since, I am only working with a small dataset, I will merge all partitions into a single output.
        j.repartition(1).write.mode("overwrite").option("header", "true").parquet(full_path) 

    except SystemError as e:
        print(str(e))
        raise e

# list comprehension to save the raw_dir file paths to a list
file_process_path = [base_dir+raw_folder + x for x in extracted_date]

# please note that the function below is very similar to the one used to manage files in databricks, with Azure data lake
def delete_from_temp(fullpath):
    """
    param input: raw file path
    When you originally write the parquet files to the fs, the naming convention
    of the file remains undefined. Subsequently, you usually have a filename
    that references the partition output id from the spark conversion.
    The function below is used to leverage hdfs (spark pretty much sits on top of hdfs)
    methods to create a temp dir, renaming the file and then deleting the temp folder.
    """
    try:
        for l in fullpath:
            fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
            file = fs.globStatus(sc._jvm.Path(l + '\\' + temp_dir + 'part*'))[0].getPath().getName()
            fs.rename(sc._jvm.Path(l + '\\' + temp_dir + file),
                      sc._jvm.Path(l + '\\' + l[-19:].replace('\\', '') + '.parquet'))
            fs.delete(sc._jvm.Path(l + '\\' + temp_dir), True)
            fs.delete(sc._jvm.Path(l + '\\' + '.' + l[-19:].replace('\\', '') + '.parquet.crc'), True)
    except IOError as e:
        print("failed to delete the temp folder" + str(e))
        raise e

    return fullpath


ingestion_to_raw = delete_from_temp(file_process_path)

print_path_files(base_dir+raw_folder)

/
2016/
    02/
        01/
            weather20160201.parquet
    03/
        01/
            weather20160301.parquet


### Data Quality Checks & Validation

In [5]:
# sqlCtx = SQLContext(sc)
# ok, so I have done a explicitly define both a spark and sql context, primarily for spark df to pandas df conversion.
sqlCtx = SQLContext(sparkContext=sc.sparkContext, sparkSession=sc)

clean_df = sqlCtx.read.option("inferSchema", "true").parquet(*file_process_path)

In [6]:
# check to ensure that source file is not empty
try:
    if len(clean_df.head(1)) != 0:
        print("The source file is not empty")
except IOError as e:
    raise e

The source file is not empty


In [7]:
# row_count comparison between source and target files, to ensure that we haven't lost any data between the file conversion
# from csv to parquet
def get_row_count(path, file_format, file_type, input):
    """
    param input path: path to read file
    param input file_format: file extension i.e. '.csv' '.parquet'
    param input file_type: pyspark args; type of file
    param input [input]: boolean True/False header property
    row count is saved in a list of dicts, which is then
    used to create a pandas & spark df. The function returns the
    spark df
    """
    new_row = []
    try:
        for i in path:
            for root, dirs, files in os.walk(i):
                for file in files:
                    if file.endswith(file_format):
                        f = os.path.join(root, file)
                        row_count = sc.read.option("header", input).format(file_type).load(f).count()
                        new_row.append({'filename': file, 'RowCount': row_count})
        row_df = pd.DataFrame(new_row)
        sp_df = sqlCtx.createDataFrame(row_df)
    except TypeError as t:
        print(str(t))
        raise AssertionError(t)

    return sp_df


def create_row_output(original_file, modified_file, source_file = [], target_file = []):
    """
    param input original_file: df for original file
    param input modified_file: df for modified file
    param input source_file: list of paths for source files
    param input target_file: list of paths for target files
    The funciton from above is used to create the multiple source/target
    df's, which are then merged for comparison purposes.
    """
    try:
        df_original = get_row_count(*source_file).withColumn("Type", F.format_string(original_file))
        df_modified = get_row_count(*target_file).withColumn("Type", F.format_string(modified_file))
        combined = df_original.union(df_modified)
        cols = list(combined.columns)
        cols = [cols[-1]] + cols[:-1]
        rowComparisonDf = combined[cols]
    except LookupError as l:
        print(str(l))
        raise l

    return rowComparisonDf

# the functions above can be used to specify various formats for comparison purposes
array = []
csv_file_rows = ['.csv', 'csv', 'true']  # use as source/target
parquet_file_rows = ['.parquet', 'parquet', 'false']  # use as source/target

create_row_output('SourceFile', 'TargetFile', [landing_path, *csv_file_rows], [file_process_path, *parquet_file_rows]).show()

+----------+--------+--------------------+
|      Type|RowCount|            filename|
+----------+--------+--------------------+
|SourceFile|   93255|weather.20160201.csv|
|SourceFile|  101442|weather.20160301.csv|
|TargetFile|   93255|weather20160201.p...|
|TargetFile|  101442|weather20160301.p...|
+----------+--------+--------------------+



In [8]:
# Data Type Check
# ok, typically when you infer schema from a pyspark dataframe, spark automatically gives you the best-fit data types 
# for the cols.
# However, if you want to custom specify a schema, you can use the approach below. A benefit for the below approach is that
# you might have, through your pre-processing data validation stage identified data types most suited to your dataset.
# Additionally, you can pass in the lists below from various sources including as parameters to your pipeline. 

to_int_cols = ['WindSpeed', 'WindDirection', 'WindGust', 'Pressure', 'SignificantWeatherCode']
to_long_cols = ['ForecastSiteCode', 'Visibility']
to_date_cols = ['ObservationDate']
to_double_cols = ['ScreenTemperature', 'Latitude', 'Longitude']

# the assumption was made that the time fields in the weather datasets were of int type, and required formatting to 
# a time format
clean_df = clean_df.withColumn('ObservationTime', F.lpad(clean_df['ObservationTime'], 4, '0').substr(3, 4))
clean_df = clean_df.withColumn('ObservationTime', F.rpad(clean_df['ObservationTime'], 6, '0')).\
                    withColumn("ObservationTime", (F.regexp_replace('ObservationTime',"""(\d\d)""", "$1:")).substr(0,8))

# using a cast function from spark to modify the data types
for col in clean_df.columns:
    try:
        if col in to_int_cols:
            clean_df = clean_df.withColumn(col, F.col(col).cast('int'))
        elif col in to_long_cols:
            clean_df = clean_df.withColumn(col, F.col(col).cast('long'))
        elif col in to_date_cols:
            clean_df = clean_df.withColumn(col, F.col(col).cast('date'))
        elif col in to_double_cols:
            clean_df = clean_df.withColumn(col, F.col(col).cast('double'))
        else:
            pass
    except AttributeError as ae:
        print(str(ae))
        raise ae

# the df schema should now reflect the changes in the df data types
clean_df.printSchema()

root
 |-- ForecastSiteCode: long (nullable = true)
 |-- ObservationTime: string (nullable = true)
 |-- ObservationDate: date (nullable = true)
 |-- WindDirection: integer (nullable = true)
 |-- WindSpeed: integer (nullable = true)
 |-- WindGust: integer (nullable = true)
 |-- Visibility: long (nullable = true)
 |-- ScreenTemperature: double (nullable = true)
 |-- Pressure: integer (nullable = true)
 |-- SignificantWeatherCode: integer (nullable = true)
 |-- SiteName: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)



In [9]:
# handling NULLS
# It's good practice to handle Nulls and blanks in your dataset.
# Below, using list comprehension all categorical and continous fields have been added to lists
all_str_cols = [item[0] for item in clean_df.dtypes if item[1].startswith('string')]
all_continuous_cols =[item[0] for item in clean_df.dtypes if item[1].startswith(('int', 'long', 'double'))]
try:
    clean_df = clean_df.na.fill("Unknown", all_str_cols)
    clean_df = clean_df.na.fill(0, all_continuous_cols)
    default_time = '00:00:00'
    clean_df = clean_df.fillna({'ObservationTime': default_time})
    print("Null values have been handled")
except Exception as e:
    print(str(e))
    raise e

clean_df.show(n=3, truncate=False, vertical=True)

Null values have been handled
-RECORD 0--------------------------------------------
 ForecastSiteCode       | 3002                       
 ObservationTime        | 00:00:00                   
 ObservationDate        | 2016-03-01                 
 WindDirection          | 8                          
 WindSpeed              | 23                         
 WindGust               | 30                         
 Visibility             | 16000                      
 ScreenTemperature      | -99.0                      
 Pressure               | 0                          
 SignificantWeatherCode | 8                          
 SiteName               | BALTASOUND (3002)          
 Latitude               | 60.749                     
 Longitude              | -0.854                     
 Region                 | Orkney & Shetland          
 Country                | SCOTLAND                   
-RECORD 1--------------------------------------------
 ForecastSiteCode       | 3005                      

In [10]:
# ok, I am going to revert to pandas for a bit
# The observation date is converted into datetime, so we can set an index on the pandas dataframe,
# the date column is then formatted to a date from datetime
pd_df = clean_df.toPandas()
pd_df['ObservationDate'] = pd.to_datetime(pd_df['ObservationDate'], format='%Y-%m-%d')
pd_df['ObservedDate'] = pd_df['ObservationDate'].dt.date

# we could've also used the pyspark approach defined above.
def create_file_paritions(file_prefix='weather'):
    """
    This function is used to create daily file partitions
    from date taken from the filename. This is a pandas approach,
    where we group data from the date column in the dataframe. The
    for-loop is then to used to write the files to target dir based
    on the distinct yyyy/mm/dd specified in the date column. This 
    could also be adjusted for a higher level categorization i.e. YYYY/mm
    """
    partitioned_files = pd_df.groupby([pd_df.ObservationDate.dt.date])
    processed_df_list = []
    try:
        for (ObservationDate), group in partitioned_files:
            fname = file_prefix + ObservationDate.strftime('%Y%m%d')
            clean_process_path = base_dir + clean_dir + str(ObservationDate).replace('-', '\\')
            group.iloc[:, 0:].to_parquet(clean_process_path + '\\' + f'{fname}.parquet')
            # the files are then read back into a pandas df and df's ar appended to a list
            processed_df_list.append(pd.read_parquet(clean_process_path))
    except RuntimeError as re:
        print(str(re))
        raise re

    return processed_df_list

print_path_files(base_dir+clean_dir)

/
2016/
    02/
        01/
            weather20160201.parquet
        02/
            weather20160202.parquet
        03/
            weather20160203.parquet
        04/
            weather20160204.parquet
        05/
            weather20160205.parquet
        06/
            weather20160206.parquet
        07/
            weather20160207.parquet
        08/
            weather20160208.parquet
        09/
            weather20160209.parquet
        10/
            weather20160210.parquet
        11/
            weather20160211.parquet
        12/
            weather20160212.parquet
        13/
            weather20160213.parquet
        14/
            weather20160214.parquet
        15/
            weather20160215.parquet
        16/
            weather20160216.parquet
        17/
            weather20160217.parquet
        18/
            weather20160218.parquet
        19/
            weather20160219.parquet
        20/
            weather20160220.parquet
        21/
            

In [11]:
# the pandas concat function is used to merge dfs in the list into a single dataframe
processed_df = pd.concat(create_file_paritions(), ignore_index=True)
processed_df.drop('ObservationDate', axis=1, inplace=True)
processed_df.rename(columns={'ObservedDate': 'ObservationDate'}, inplace=True)


# grouped data by date
group_temp_by_day = processed_df.groupby(['ObservationDate'], sort=False)['ScreenTemperature'].max()


# DL Queries
# first get the max screentemp
maxTemp = processed_df['ScreenTemperature'].max()
# second map the maxtemp var to date column in df to extract the associated date
date_of_max_temp = processed_df[processed_df['ScreenTemperature'] == maxTemp]['ObservationDate'].values[0]
# third, map the maxtemp var to region column to extract the associated date
region_of_max_temp = processed_df[processed_df['ScreenTemperature'] == maxTemp]['Region'].values[0]
# combining the above three vars into a single statement
filtered_max_temp = processed_df[(processed_df['ObservationDate'] == date_of_max_temp) &
                                 (processed_df['Region'] == region_of_max_temp) &
                                 (processed_df['ScreenTemperature'] == maxTemp)]
# saving output to a list
output = filtered_max_temp[['ObservationDate', 'Region', 'ScreenTemperature']]
print('Maximum Temperature on date of occurrence' + " " + "(" + str(date_of_max_temp) + ")" + ':' + " " + str(maxTemp))
print('Maximum Temperature was recorded in the region' + " " + str(region_of_max_temp) + ':' + " " + str(maxTemp))

Maximum Temperature on date of occurrence (2016-03-17): 15.8
Maximum Temperature was recorded in the region Highland & Eilean Siar: 15.8


In [12]:
# revert pandas dataframe to spark dataframe
processed_spark = sqlCtx.createDataFrame(processed_df)
# processed_spark.printSchema()

"""
The spark sql context can be used to register a temp table (in-memory) to make sql queries.
The output of both the python and sql queries should be identical (and well it is!!)
"""
try:
    processed_spark.registerTempTable("Weather")
    sql_max_temp = sqlCtx.sql("select MAX(ScreenTemperature) from Weather").show()
    sql_max_dtemp_date = sqlCtx.sql("SELECT ObservationDate FROM Weather WHERE ScreenTemperature = "
                                    "(SELECT MAX(ScreenTemperature) FROM Weather)")
    sql_max_region = sqlCtx.sql("SELECT Region FROM Weather WHERE ScreenTemperature = "
                                "(SELECT MAX(ScreenTemperature) FROM Weather)").show()
    sql_max_aggreagted = sqlCtx.sql("SELECT ObservationDate, Region, ScreenTemperature "
                                    "FROM Weather WHERE ScreenTemperature = "
                                    "(SELECT MAX(ScreenTemperature) FROM Weather)").show()
    sqlCtx.sql("DROP TABLE Weather")
except Exception as e:
    print(e)
    raise e

+----------------------+
|max(ScreenTemperature)|
+----------------------+
|                  15.8|
+----------------------+

+--------------------+
|              Region|
+--------------------+
|Highland & Eilean...|
+--------------------+

+---------------+--------------------+-----------------+
|ObservationDate|              Region|ScreenTemperature|
+---------------+--------------------+-----------------+
|     2016-03-17|Highland & Eilean...|             15.8|
+---------------+--------------------+-----------------+

