In [30]:
import polars as pl
from pathlib import Path
import ipynbname
from math import radians


# polars settings
pl.Config.set_tbl_cols(-1)
pl.Config.set_tbl_column_data_type_inline(True) 
pl.Config.set_tbl_rows(50)

# files paths
week_11_path = ipynbname.path().parent
dsb_branches_file = week_11_path.joinpath('DSB Branches.csv')
dsb_customer_locations_file = week_11_path.joinpath('DSB Customer Locations.csv')


# ------------------------------------------------------------------------------------------------
# load and transform the data 
# ------------------------------------------------------------------------------------------------

df_comb = ( pl.scan_csv(dsb_branches_file)
                  #.with_columns(pl.col(['Branch Long', 'Branch Lat']).apply(radians))
                  .join(pl.scan_csv(dsb_customer_locations_file), how='cross')
                  .with_columns(pl.col('^.*\s+?(Long|Lat)$').apply(radians))
                  # calculate the distance
                  .with_columns(( 3963 * ( (pl.col('Branch Lat').sin() * pl.col('Address Lat').sin())
                                     + pl.col('Branch Lat').cos() * pl.col('Address Lat').cos() 
                                     * (pl.col('Branch Long') - pl.col('Address Long')).cos() ).arccos() )
                                     .round(2)
                                     .alias('distance_in_miles')
                                )
                  .collect()              
          )

                            # rank stores distance for each customer and keep the most closest store for each customer (i.e., rank=1)
df = ( df_comb.with_columns(pl.col('distance_in_miles').rank('dense', reverse=False).over('Customer').cast(pl.Int8).alias('cust_rank_by_distance_per_branch'))
              .filter(pl.col('cust_rank_by_distance_per_branch') == 1)
              .sort('Customer', reverse=False)
              # create a column ranking the distance for each branch for setting customer priority
     ).with_column(pl.col('distance_in_miles').rank('dense', reverse=False).over('Branch').cast(pl.UInt16).alias('customer priority')) \
      .select(pl.all().exclude('cust_rank_by_distance_per_branch'))


df.rename({col: col.capitalize().replace('_', ' ') for col in df.columns}).sort(['Branch', 'Customer priority'], [False, False])



# ------------------------------------------------------------------------------------------------
# output the data 
# ------------------------------------------------------------------------------------------------

output_dir = week_11_path.joinpath('output')

if not output_dir.exists():
    output_dir.mkdir(parents=False, exist_ok=False)

df.write_parquet(f'{output_dir}/output-py.parquet', compression='zstd')
df.write_csv(f'{output_dir}/output-py.csv', sep=',', has_header=True)
df.write_json(f'{output_dir}/output-py.json', pretty=True, row_oriented=False)

