In [11]:
#install required modules and libraries
#! pip install pyarrow

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os

#set directory to folder containing underlying .csv's

os.chdir("/Users/walea/Downloads/Data_Engineer_Test_Green_Flag")

#iterate over csv files into pandas selecting only the required columns
#create a compressed parquet dataset by year>month>region for a smaller footprint and faster querying

for file in os.listdir():
    if file.endswith(".csv"):
        file1 = file
        df = pd.read_csv(os.path.abspath(file1))


        df = df[['ForecastSiteCode','ObservationTime','ObservationDate','ScreenTemperature','SiteName','Region']]
        df = df.sort_values(by=['ForecastSiteCode','ObservationDate','ObservationTime'])
        df = df.reset_index(drop=True)
        df['ObsYear'] = pd.DatetimeIndex(df['ObservationDate']).year
        df['ObsMonth'] = pd.DatetimeIndex(df['ObservationDate']).month
        df['ObsDay'] = pd.DatetimeIndex(df['ObservationDate']).day
        
        table = pa.Table.from_pandas(df)
        
        #create files for testing 
        
        file1 = file1.replace(".csv",".")
        file2 = file1 + 'parquet.snappy'
        pq.write_table(table, file2,compression='snappy')
        
        pq.write_to_dataset(table,root_path='weather_results',partition_cols=['ObsYear','ObsMonth','Region'])
        

#read parquet dataset into pandas dataframe and filter for max temp - print back required columns

weather_data = pq.ParquetDataset('weather_results/')
table = weather_data.read()
weather_table_df = table.to_pandas()

weather_result = weather_table_df.loc[weather_table_df['ScreenTemperature'].idxmax()]

temp = weather_result['ScreenTemperature']
date = weather_result['ObservationDate'][:9]
region = weather_result['Region']

print("Which date was the hottest day? ",date)
print("") 
print("What was the temperature on that day? ",temp)
print("")        
print("In which region was the hottest day? ",region) 


Which date was the hottest day?  2016-03-1

What was the temperature on that day?  15.8

In which region was the hottest day?  Highland & Eilean Siar


In [37]:
#Testing


#open files in excel note down row details for maximum temps
df1 = pd.read_csv(os.path.abspath('weather.20160201.csv'))
df2 = pd.read_csv(os.path.abspath('weather.20160301.csv'))

print(df1['ScreenTemperature'].max(),df2['ScreenTemperature'].max())

15.6 15.8


In [38]:
#check schema of dataset as table
table.schema

ForecastSiteCode: int64
  -- field metadata --
  PARQUET:field_id: '1'
ObservationTime: int64
  -- field metadata --
  PARQUET:field_id: '2'
ObservationDate: string
  -- field metadata --
  PARQUET:field_id: '3'
ScreenTemperature: double
  -- field metadata --
  PARQUET:field_id: '4'
SiteName: string
  -- field metadata --
  PARQUET:field_id: '5'
ObsDay: int64
  -- field metadata --
  PARQUET:field_id: '6'
ObsYear: dictionary<values=int64, indices=int32, ordered=0>
ObsMonth: dictionary<values=int64, indices=int32, ordered=0>
Region: dictionary<values=string, indices=int32, ordered=0>
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 822

In [21]:
#check parquet file metadata and schema

wather_20160301_file = pq.ParquetFile(os.path.abspath('weather.20160301.parquet.snappy'))
wather_20160201_file = pq.ParquetFile(os.path.abspath('weather.20160201.parquet.snappy'))

In [19]:
wather_20160301_file.metadata

<pyarrow._parquet.FileMetaData object at 0x00000196BB3F7D68>
  created_by: parquet-cpp version 1.5.1-SNAPSHOT
  num_columns: 9
  num_rows: 101442
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 5435

In [22]:
wather_20160201_file.metadata

<pyarrow._parquet.FileMetaData object at 0x00000196BB3C5688>
  created_by: parquet-cpp version 1.5.1-SNAPSHOT
  num_columns: 9
  num_rows: 93255
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 5435

In [23]:
wather_20160301_file.schema

<pyarrow._parquet.ParquetSchema object at 0x00000196B85964A8>
required group field_id=0 schema {
  optional int64 field_id=1 ForecastSiteCode;
  optional int64 field_id=2 ObservationTime;
  optional binary field_id=3 ObservationDate (String);
  optional double field_id=4 ScreenTemperature;
  optional binary field_id=5 SiteName (String);
  optional binary field_id=6 Region (String);
  optional int64 field_id=7 ObsYear;
  optional int64 field_id=8 ObsMonth;
  optional int64 field_id=9 ObsDay;
}

In [24]:
wather_20160201_file.schema

<pyarrow._parquet.ParquetSchema object at 0x00000196B8559860>
required group field_id=0 schema {
  optional int64 field_id=1 ForecastSiteCode;
  optional int64 field_id=2 ObservationTime;
  optional binary field_id=3 ObservationDate (String);
  optional double field_id=4 ScreenTemperature;
  optional binary field_id=5 SiteName (String);
  optional binary field_id=6 Region (String);
  optional int64 field_id=7 ObsYear;
  optional int64 field_id=8 ObsMonth;
  optional int64 field_id=9 ObsDay;
}