In [None]:
import os       as os
import numpy    as np
import datetime as dt
import sys      as sys

##
## Graphical libs and setup
import matplotlib.pyplot as plt
plt.ioff()
 
#print(plt.style.available)
plt.style.use(['ggplot', 'fast'])
plt.rc('image', cmap='cubehelix') # See https://www.mrao.cam.ac.uk/~dag/CUBEHELIX/
 
import seaborn  as sns
import pandas   as pd
pd.options.display.width = 250
 
from bnzds.utilities import CommonNotebook as cn
from bnzds.utilities import CommonIO       as cio
from bnzds.utilities import CommonGraphs   as cg
from bnzds.utilities import CommonSpark    as cs
 
# Not yet guaranteed to work under phttp://pxlbig03:10020/notebooks/DS_Python_Utilities/bnzds/examples/Jupyter/Lab_Test_BoxGraph.ipynb#ython2
# ML libs should be on python3 anyway
#if sys.version_info[0] >= 3:
#    from bnzds.utilities import CommonML       as cml
#    from bnzds.utilities import CommonTA       as cta
 
## Set OUTPUT paths
mainpath = os.environ.get('DSOUTPUT')
if mainpath is None:
    from os.path import expanduser
    mainpath = expanduser("~") + '/output/'

dpath = mainpath + '/data/'
gpath = mainpath + '/graphs/'
mpath = mainpath + '/model/'
                
## Platform Init
cn.platformInit(libs=[cn, cio], gpath=gpath)
#print = cn.aprint

In [None]:
pip install duckdb

In [None]:
import duckdb
con1 = duckdb.connect(':memory:', config={'allow_unsigned_extensions' : 'true'})

# Edge node has 504G
con1.sql("SET memory_limit = '100GB'")  

# See https://stackoverflow.com/questions/71952623/reading-partitioned-parquet-files-in-duckdb
con1.sql("SET temp_directory = '/data/disk1/tmp/duckdbcaches/" + os.environ.get('USER') + "'")

# Use progress bar (if possible)
print(con1.sql("PRAGMA enable_print_progress_bar"))


## Setting up data

In [None]:
df_fin = pd.read_csv('business-financial-data-march-2024-csv.csv')
df_fin.columns = [str(col).lower().replace(' ','_') for col in df_fin.columns]
df_fin['year'] = df_fin['period'].astype(str).str.slice(0,4).astype(int)
df_fin.rename(columns={'group':'ind_group'}, inplace='True')
df_fin.head()

In [None]:
df_emp = pd.read_csv('machine-readable-business-employment-data-mar-2024-quarter.csv')
df_emp.columns = [str(col).lower().replace(' ','_') for col in df_emp.columns]
df_emp.rename(columns={'group':'ind_group'}, inplace='True')
df_emp.head()

In [None]:
duckdb.register('fin',df_fin)
duckdb.register('emp',df_emp)

In [None]:
print(df_fin.dtypes)

## Question One

In [None]:
Q1_query = """
with sal_wgs as
(--Get all data where series_title_1 = salaries and wages
select 
    *
from fin 

where series_title_1 = 'Salaries and wages'
      
)

, first_year as
(--All industries where first year for salaries and wages after 2016
select
    series_title_2
    , ind_group
    , min(year) as first_year
from sal_wgs

group by series_title_2, ind_group

having min(year) > 2016
)

select
    series_title_2 as industry
    ,avg_filled_jobs as overall_filled_jobs
from 
(--Average actual filled jobs per industry
    select
        emp.series_title_2
        ,avg(emp.data_value) as avg_filled_jobs
        ,emp.series_title_1
    from emp 
    
    inner join first_year fst on fst.series_title_2 = emp.series_title_2 --Noted that emp dataset only includes NZSIOC Level 2
    --Actual filled jobs only
    where emp.series_title_1 = 'Filled jobs'
          and emp.series_title_3 = 'Actual'

    group by emp.series_title_2
            , emp.series_title_1
) mx

order by avg_filled_jobs desc

limit 1
;
"""

result = duckdb.sql(Q1_query).df()
result

## Question Two

In [None]:
Q2_query = """
with base as 
(--Get all data for business industry is level 2, and variables are seasonally adjusted operating income
select 
    *
from fin
where series_title_1 = 'Sales (operating income)'
      and ind_group = 'Industry by financial variable (NZSIOC Level 2)'
      and series_title_4 = 'Seasonally adjusted'
)


select 
    period
    ,series_title_2 as industry
    ,data_value as operating_income
from
    (--Rank seasonally adjusted operating income largest to smallest across all periods and industries
    select
        period
        ,series_title_2
        ,data_value
        ,dense_rank() over (order by data_value desc) as operating_inc_rnk
    from base
    ) rnk
where operating_inc_rnk = 2
"""

result = duckdb.sql(Q2_query).df()
result

## Question Three

In [None]:
Q3_query = """
with 
base as 
(--Get base data for territorial authorities and filled jobs
select
      series_title_2
      ,data_value
      ,period
from emp
where ind_group = 'Territorial authority by employment variable'
      and series_title_1 = 'Filled jobs'
)

,highest_avg as
(--Get the territorial authority with the highest average value for filled jobs 
select
      series_title_2 as territorial_authority
      ,avg_filled_jobs
from      
      (
      select
            series_title_2
            ,avg(data_value) as avg_filled_jobs
      from base
      group by series_title_2
      ) avrg
order by avg_filled_jobs desc
limit 1      
)

, sorted_ta as
(
select
      base.period
      ,ha.territorial_authority
      ,base.data_value as filled_jobs
from highest_avg as ha
inner join base on base.series_title_2 = ha.territorial_authority
)

--Calculate quarterly cumulative number of filled jobs over time
select      
      sta.period
      ,sta.filled_jobs
      ,sum(prd.filled_jobs) as cumulative_filled_jobs
from sorted_ta sta
inner join sorted_ta prd on prd.period <= sta.period
group by sta.period, sta.filled_jobs
order by sta.period

;
"""

result = duckdb.sql(Q3_query).df()
result

## Question Four

We could use libraries like Pandas to check the data for such aberrations, and that they only include values we expect. <br>

For duplicates, we should first check if there are any duplicates, and drop them if so. <br>

e.g. 


In [None]:
df_fin_dedupe = df_fin
df_fin_dedupe[df_fin_dedupe.duplicated() == True] # to check for duplicates <br>
df_fin_dedupe.drop_duplicates # to drop duplicates

To identify incorrect data types, we would first want to see what data types the datasets contain, and if they are what we expected, using df.dtypes.
Since the datasets are part of a pipeline, we should know what columns and data types we expect to see for each. <br>
Thus, we can first define a dictionary showing the columns and their expected data types.
For example, using the financial data csv, we may define something like the below, and then check the actual data types of our dataset against this dictionary.

In [None]:
expected_schema = {
    'series_reference': 'object',
    'period': 'object',
     'data_value': 'float64',
     'suppressed': 'object',
     'status': 'object',
     'units': 'object',
     'magnitude': 'int64',
     'subject': 'object',
     'ind_group': 'object',
     'series_title_1': 'object',
     'series_title_2': 'object',
     'series_title_3': 'object',
     'series_title_4': 'object',
     'series_title_5': 'object',
     'year': 'int64'
}

#Then, we would want to check the actual data types of our dataset against this dictionary using:

for col, dtype in expected_schema.items():
    assert df_fin_dedupe[col].dtype == dtype

When we run this, we will see an AssertionError because period and series_title_5 are not our expected data types of object. <br>
To improve the pipeline, a function could then be written to compare the expected data types against the actual, and if they are different, then cast the actual to the expected. <br>
e.g. The function could be something like:

In [None]:
def enforce_schema(df_fin_dedupe, expected_schema):
    for col, expected_dtype in expected_schema.items():
        actual_dtype = df_fin[col].dtype

        # Skip if it's already correct
        if str(actual_dtype) == expected_dtype:
            continue

        print(f"[INFO] Column '{col}' is {actual_dtype}, expected {expected_dtype}. Attempting to cast...")

        try:
            if 'float' in expected_dtype:
                df_fin[col] = pd.to_numeric(df_fin[col], errors='coerce')
            elif 'int' in expected_dtype:
                df_fin[col] = pd.to_numeric(df_fin[col], errors='coerce').astype('Int64')  # nullable int
            elif expected_dtype == 'object':
                df_fin[col] = df_fin[col].fillna('').astype(str)
            elif 'datetime' in expected_dtype:
                df_fin[col] = pd.to_datetime(df_fin[col], errors='coerce')
            else:
                print(f"[WARN] Unhandled type: {expected_dtype}")
        except Exception as e:
            print(f"[ERROR] Failed to cast column '{col}' to {expected_dtype}: {e}")

    return df_fin


In [None]:
df_fin_stg = enforce_schema(df_fin_dedupe,expected_schema)
print(df_fin_stg.dtypes)

Similarly for missing dates, we may want to define the range of dates we are expecting for each dataset.
From there, we can check if the dataset is missing any of these dates.

In [None]:
#Define the date range we are expecting
expected_periods = pd.date_range('2016-01-01','2024-03-31', freq = 'Q')
#Define missing dates as ones not within the expected_periods
missing_periods = set(expected_periods) - set(pd.to_datetime(df_fin['period'], format = '%Y.%m')+ pd.offsets.MonthEnd(0))

missing_periods

For aberrations such as other missing values, we may want to drop any rows with missing data, or fill with another value, such 'Unknown'. This would help prevent misinterpretation of missing data.

In [None]:
#Drop any rows with NULL in data_value column
df_fin_na = df_fin_stg.dropna(subset = ['data_value'])

#Fill any NAs or missing values for suppressed and series_title_5 with 'Unknown'
df_fin_fill = df_fin_stg.replace('',pd.NA).fillna({
    'suppressed': 'Unknown',
    'series_title_5': 'Unknown'
})


It may also be worth considering why certain values are missing to assist with analysis further down the line.
For example, are they missing completely at random, missing at random or missing not at random? 

## Question Five
We will explore if there is a correlation between actual filled jobs and unadjusted sales(operating income) for the NZSIOC Level 2 industries per period. <br>
We should expect to see a positive correlation between the two, as generally more labour will result in higher productivity, and therefore higher sales. <br>
However, we should expect this to vary significantly between industries, as well as see clearly defined cut-offs between industries in number of filled jobs.

In [None]:
# Filter employment data for relevant observations
df_emp_jbs = df_emp[
    (df_emp['ind_group']=='Industry by employment variable') &
    (df_emp['series_title_1']=='Filled jobs') &
    (df_emp['series_title_3']=='Actual')
    ]

# Filter financial data for relevant observations
df_fin_inc = df_fin[
    (df_fin['ind_group']== 'Industry by financial variable (NZSIOC Level 2)') &
    (df_fin['series_title_1']=='Sales (operating income)') &
    (df_fin['series_title_4']=='Unadjusted')
]

In [None]:
df_fin_inc.head()

In [None]:
df_emp_jbs.head()

In [None]:
emp_summary = df_emp_jbs.groupby(['period','series_title_2']).agg({'data_value':'mean'}).reset_index()
emp_summary.rename(columns={'data_value':'filled_jobs', 'series_title_2': 'industry'}, inplace = True)
emp_summary.head()

In [None]:
inc_summary = df_fin_inc.groupby(['period','series_title_2']).agg({'data_value':'mean'}).reset_index()
inc_summary.rename(columns={'data_value':'sales', 'series_title_2': 'industry'}, inplace = True)
inc_summary.head()

In [None]:
df_inc_jbs = pd.merge(emp_summary, inc_summary, on =['period','industry'])
df_inc_jbs.dropna(inplace = True)
df_inc_jbs

In [None]:
# Summary statistics
df_inc_jbs.describe()
df_inc_jbs['industry'].value_counts()
df_inc_jbs['sales'].groupby(df_inc_jbs['industry']).mean()
df_inc_jbs['filled_jobs'].groupby(df_inc_jbs['industry']).mean()

In [None]:
%matplotlib inline

In [None]:
plt.figure(figsize=(10,6))
sns.scatterplot(data=df_inc_jbs, x='filled_jobs', y='sales', hue ='industry', style ='industry', palette = 'viridis')
plt.title('Filled Jobs vs Sales (Operating Income) by Industry')
plt.xlabel('Filled Jobs')
plt.ylabel('Sales (Operating Income)')
plt.legend(bbox_to_anchor=(1.05,1), loc = 'upper left')

In [None]:
# Correlation of filled jobs to sales by industry
ind_corrs = (
    df_inc_jbs.groupby('industry')[['filled_jobs','sales']].corr().unstack().iloc[:,1].reset_index(name ='correlation')
)
print(ind_corrs.sort_values(by='correlation',ascending = False))

In [None]:
# Covariance of filled jobs and sales by industry
ind_std = (
    df_inc_jbs.groupby('industry')[['filled_jobs','sales']].cov().reset_index()
)
print(ind_std.sort_values(by='sales', ascending = False))


In [None]:
# Regression plot by industry
sns.lmplot(x='filled_jobs', y='sales', data=df_inc_jbs, col ='industry', aspect = 0.9)

As expected, we see in the scatter plot that each industry's job filled and sales values are clearly defined from one another. <br>
From our correlation results, we also see that almost all industries, with the exception of one, have positive correlation between actual jobs filled and sales. <br>
Interestingly enough, Health Care and Social Assistance has the highest correlation of 0.98 (2 d.p.), which we can also visually see in the scatter plot. <br>
Information Media and Telecommunications is the only industry with negative correlation (-0.35), as well as negative covariance. This may be because many parts of this sector are more automated and capital intensive, rather than labour intensive. Thus, sales does not increase linearly with filled jobs. This may also explain why Health Care and Social Assistance has the highest correlation value, as this job will be highly labour intensive. For example, the more doctors that are employed, the more patients can be treated, and therefore healthcare providers earn more treatment fees. <br>
Another interesting point is that Construction has the highest sales covariance value. This makes sense, as the industry is highly labour intensive. Additionally, construction projects and income scales very closely with workforce size.

