In [1]:
%additional_python_modules awswrangler

import boto3
import datetime
import awswrangler as wr
from pyspark.sql import DataFrame
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.utils import AnalysisException
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue import DynamicFrame

session = boto3.session.Session(region_name='eu-west-1')
s3 = session.client('s3')
glue = session.client('glue')
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(SparkContext.getOrCreate())


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Additional python modules to be included:
awswrangler
Trying to create a Glue session for the kernel.
Session Type: etl
Session ID: 8b2ca828-7d66-4144-89ec-aa6504abd124
Applying the following default arguments:
--glue_kernel_version 1.0.6
--enable-glue-datacatalog true
--additional-python-modules awswrangler
Waiting for session 8b2ca828-7d66-4144-89ec-aa6504abd124 to get into ready status...
Session 8b2ca828-7d66-4144-89ec-aa6504abd124 has been created.



In [2]:
env        = 'dev'
output_full_path = 's3://sds-dev-transform-dwh-etl/ym_fct_competitor_prices_14_day_como_full/'
output_full_path_tmp = 's3://sds-dev-transform-dwh-etl/ym_fct_competitor_prices_14_day_tmp_como_full/'
output_limited_path = 's3://sds-dev-transform-dwh-etl/ym_fct_competitor_prices_14_day_como_limited/'
output_limited_path_tmp = 's3://sds-dev-transform-dwh-etl/ym_fct_competitor_prices_14_day_tmp_como_limited/'
output_limited_country_path = 's3://sds-dev-transform-dwh-etl/ym_fct_competitor_prices_14_day_como_limited_{}/'
output_limited_country_path_tmp = 's3://sds-dev-transform-dwh-etl/ym_fct_competitor_prices_14_day_tmp_como_limited_{}/'




In [4]:

SqlQuery = """
WITH base as (
 SELECT
    rank() OVER (PARTITION BY protection, competitor, CASE WHEN substr(shop_detail,6,3) = 'GDS' THEN 'GDS' ELSE 'No GDS' END, broker, mapped_scrs, mapped_fmod, mapped_crs, length_of_rent, pickup_date_time, pickup_station, return_date_time, return_station, tariff_type, payment_type, customer_no, mapped_mileage, point_of_sales, original_vehicle_category, original_crs, original_mileage ORDER BY created_at DESC, shop_date_time DESC, (CASE WHEN (COALESCE(converted_price, 0) = 0) THEN original_price ELSE converted_price END) ASC, rand() ASC) row_num
   , competitor
   , broker
   , shop_date_time
   , shop_detail
   , CASE WHEN substr(shop_detail,6,3) = 'GDS' THEN 'GDS' ELSE 'No GDS' END AS GDS
   , converted_price
   , original_price
   , converted_currency
   , original_currency
   , original_vehicle_category
   , original_crs
   , original_sample_vehicle
   , mapped_scrs
   , mapped_fmod
   , mapped_crs
   , length_of_rent
   , pickup_date_time
   , pickup_station
   , return_date_time
   , return_station
   , tariff_type
   , payment_type
   , customer_no
   , original_mileage
   , mapped_mileage
   , point_of_sales
   , rate_level
   , sixt_plan
   , protection
   FROM competitorprices)
SELECT base.protection AS YCPR_PROTECTION_FLG_STRING
    , (CASE WHEN (base.competitor <> 'Sixt') THEN 'Competitors' WHEN (CAST(base.protection AS integer) = 1) THEN 'Inclusive' ELSE 'Exclusive' END) AS YCPR_PROTECTION_SIXT
    , (CASE WHEN (base.competitor = 'Sixt') THEN 'Sixt' WHEN (CAST(base.protection AS integer) = 1) THEN 'Inclusive' ELSE 'Exclusive' END) AS YCPR_PROTECTION_COMPETITORS
    , base.competitor AS YCPR_COMPETITOR
    , base.broker AS YCPR_BROKER
    , CAST(base.shop_date_time AS timestamp) AS YCPR_SHOP_DATM
    , concat(date_format(CAST(base.shop_date_time AS timestamp), 'y'), '_', extract(week FROM shop_date_time)) AS YCPR_SHOP_WEEK
    , CAST(date_format(CAST(base.shop_date_time AS timestamp), 'yMMdd') AS integer) YCPR_SHOP_DTID
    , datediff(current_date, date(CAST(base.shop_date_time AS timestamp))) YCPR_DATE_DIFF
    , base.shop_detail YCPR_SHOP_DETAIL
    , base.GDS YCPR_GDS
    , CAST((CASE WHEN (COALESCE(converted_price, 0) = 0) THEN original_price ELSE converted_price END) AS decimal(10,2)) YCPR_FINAL_PRICE
    , (CASE WHEN (base.converted_currency IS NULL) THEN base.original_currency ELSE CAST(converted_currency AS string) END) YCPR_FINAL_CURRENCY
    , base.original_vehicle_category YCPR_VEHICLE_CATEGORY_ORIGINAL
    , base.original_crs YCPR_CRS_ORIGINAL
    , base.original_sample_vehicle YCPR_SAMPLE_VEHICLE_ORIGINAL
    , base.mapped_scrs YCPR_SCRS_MAPPED
    , base.mapped_fmod YCPR_FMOD_MAPPED
    , base.mapped_crs YCPR_CRS_MAPPED
    , (CASE WHEN base.mapped_crs like '%E' THEN 'E'
                WHEN substr(base.mapped_crs,4,1) = 'H' THEN 'H'
                WHEN base.mapped_crs in ('CCCC', 'FCCC', 'PCCC', 'CDHC', 'SFAC', 'SPAC', 'LFAC', 'XFAC') THEN 'O'
                WHEN substr(base.mapped_crs,4,1) = 'C' THEN 'E'
                ELSE 'O'
            END) AS VHCL_ELECTRIC
   
    , CASE WHEN vg.vhgr_type_code in ('P', 'B', 'M')  then 'C&B' when (vg.vhgr_type_code='T' and base.mapped_crs<>'P') then 'C&B' else 'V&T' end as YCPR_VEHICLE_TYPE    
    , CAST(base.length_of_rent AS integer) YCPR_RENTAL_DAYS
    , CAST(base.length_of_rent AS string) YCPR_RENTAL_DAYS_STRING
    , CAST(base.pickup_date_time AS timestamp) YCPR_HANDOVER_DATM
    , CAST(date(CAST(base.pickup_date_time AS timestamp)) AS string) YCPR_HANDOVER_DATE
    , date_trunc('week', CAST(base.pickup_date_time AS timestamp)) YCPR_HANDOVER_WEEK_DATE
    , concat(date_format(CAST(base.pickup_date_time AS timestamp), 'y'), '_', extract(week FROM base.pickup_date_time)) YCPR_HANDOVER_WEEK
    , date_format(CAST(base.pickup_date_time AS timestamp), 'y_MM') YCPR_HANDOVER_MONTH
    , date_format(CAST(base.pickup_date_time AS timestamp), 'EEEE') YCPR_HANDOVER_DAY
    , CAST(date_format(CAST(base.pickup_date_time AS timestamp), 'yMMdd') AS integer) YCPR_HANDOVER_DTID
    , CAST(base.return_date_time AS timestamp) YCPR_RETURN_DATM
    , CAST(date_format(CAST(base.return_date_time AS timestamp), 'yMMdd') AS integer) YCPR_RETURN_DTID
    , CAST(base.pickup_station AS integer) BRNC_HANDOVER_CODE
    , concat(CAST(br.brnc_code AS string), ' ', br.brnc_name) BRNC_HANDOVER_NAME
    , br.brnc_country_code_iso BRNC_HANDOVER_COUNTRY
    , br.brnc_region_code BRNC_HANDOVER_REGION_CODE
    , concat(concat(CAST(br.brnc_region_code AS string), ' '), br.brnc_region) BRNC_HANDOVER_REGION
    , br.brnc_pool_code BRNC_HANDOVER_POOL_CODE
    , concat(concat(CAST(br.brnc_pool_code AS string), ' '), br.brnc_pool_name) BRNC_HANDOVER_POOL
    , CAST(base.return_station AS string) BRNC_RETURN_CODE
    , (CASE WHEN (base.pickup_station = CAST(base.return_station AS integer)) THEN 'N' ELSE 'Y' END) BRNC_ONE_WAY
    , base.tariff_type YCPR_TARIFF_TYPE
    , base.payment_type YCPR_PAYMENT_TYPE
    , base.customer_no YCPR_CUSTOMER_NUMBER
    , CAST(base.original_mileage AS bigint) YCPR_MILEAGE_ORIGINAL
    , CAST(base.mapped_mileage AS string) YCPR_MILEAGE_MAPPED
    , base.point_of_sales YCPR_POSL_COUNTRY_CODE
    , CASE WHEN (br.brnc_country_code_iso = 'MC' and base.point_of_sales='FR') THEN 'Domestic'
        WHEN base.point_of_sales=br.brnc_country_code_iso THEN 'Domestic'
        else 'Inbound' end as YCPR_DOMESTIC_INBOUND
    , base.rate_level YCPR_RATE_LEVEL
    , cast(base.sixt_plan as string) YCPR_SIXT_PLAN_STRING
    , base.row_num YCPR_RECORD_SEQUENCE
    , current_timestamp() as YCPR_CURRENT_TIMESTAMP
FROM base
LEFT JOIN br_dim_branches br ON ((CAST(base.pickup_station AS integer) = br.brnc_code) AND (br.sys_deleted_flg = 0))
LEFT JOIN ve_dim_vehicle_groups vg ON ((base.mapped_crs = vg.vhgr_crs) AND (vg.sys_deleted_flg = 0))
WHERE (row_num IN (1)) AND ( case when br.brnc_country_code_iso in ('US', 'CA') then CAST(base.pickup_date_time AS timestamp) >= date_add(current_date, -1) else CAST(base.pickup_date_time AS timestamp) >= current_date end )
"""

SqlQueryLimited = """
SELECT a.* 
FROM competitorprices_full a
LEFT JOIN ym_ref_como_lor_monitoring b 
  ON a.brnc_handover_country=b.ycpr_country AND a.ycpr_rental_days=b.ycpr_lor
LEFT JOIN ym_ref_como_crs_monitoring c 
  ON a.brnc_handover_country=c.ycpr_country AND a.ycpr_crs_mapped=c.ycpr_crs
WHERE a.ycpr_rental_days=b.ycpr_lor AND a.ycpr_crs_mapped=c.ycpr_crs
"""




In [7]:
def path_exists(path: str) -> bool:
    files = wr.s3.list_objects(path=path, s3_additional_kwargs={'MaxKeys': 1})
    return len(files) > 0

def safe_write_df_to_s3(df: DataFrame, temp_path: str, final_path: str, partition_keys=None):
    wr.s3.delete_objects(temp_path)
    if partition_keys is None:
        df.write.parquet(temp_path)
    else:
        glueContext.write_dynamic_frame.from_options(
            frame=DynamicFrame.fromDF(df, glueContext, 'partition_writing'),
            connection_type="s3",
            format="parquet",
            connection_options={
                "path": temp_path,
                "partitionKeys":partition_keys,
            }
        )        
    wr.s3.delete_objects(final_path)
    wr.s3.copy_objects(paths=wr.s3.list_objects(path=temp_path), source_path=temp_path, target_path=final_path)
    
def create_spark_view(database: str, table_name: str):
    spark_df = glueContext.create_dynamic_frame.from_catalog(database=database, table_name=table_name).toDF()
    spark_df.createOrReplaceTempView(table_name)

def drop_spark_view(table_name: str):
    spark.catalog.dropTempView(table_name)




In [8]:
current_date = datetime.datetime.now()
folder_template = 's3://sds-{}-ingest-external-sftp-files-out/yield-sftp-user/CompetitorPrice/year={}/month={:02}/day={:02}/'
source_folders = [folder_template.format(env, d.year, d.month, d.day) 
                  for d in [current_date - datetime.timedelta(days=i) for i in range(16)]]




In [10]:

existing_folders = [f for f in source_folders if path_exists(f)]

source_df = spark.read.parquet(*existing_folders)
source_df.createOrReplaceTempView('source_data')

prev_date_str = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') 
source_competitor_prices = source_df.filter(f"pickup_date_time >= date '{prev_date_str}'")




In [12]:
create_spark_view(database='common_shop', table_name='br_dim_branches')
create_spark_view(database='fleet_shop', table_name='ve_dim_vehicle_groups')
source_competitor_prices.createOrReplaceTempView('competitorprices')

competitor_prices_full = spark.sql(SqlQuery)

drop_spark_view('br_dim_branches')
drop_spark_view('ve_dim_vehicle_groups')



In [14]:
safe_write_df_to_s3(competitor_prices_full, output_full_path_tmp, output_full_path)




In [16]:
create_spark_view(database='yield_shop', table_name='ym_ref_como_crs_monitoring')
create_spark_view(database='yield_shop', table_name='ym_ref_como_lor_monitoring')
competitor_prices_full.createOrReplaceTempView('competitorprices_full')

competitor_prices_limited = spark.sql(SqlQueryLimited)

drop_spark_view('ym_ref_como_crs_monitoring')
drop_spark_view('ym_ref_como_lor_monitoring')
drop_spark_view('competitorprices_full')




safe_write_df_to_s3(competitor_prices_limited, output_limited_path_tmp, output_limited_path)

for country in COUNTRIES:
    competitor_prices_limited_filtered = competitor_prices_limited.filter(f"brnc_handover_country = '{country}'")
    safe_write_df_to_s3(
        competitor_prices_limited_filtered, 
        output_limited_country_path_tmp.format(country),
        output_limited_country_path.format(country))

In [18]:
safe_write_df_to_s3(competitor_prices_limited, output_limited_path_tmp, output_limited_path, ['brnc_handover_country'])


