#### Setting the Credentials

In [0]:
%run /Workspace/ProjectMedallion/nb_setup_cred

#### Import Libs

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

#### Reading the data

In [0]:

df = spark.read.format("csv")\
               .option("header", "true")\
               .option("inferSchema", "true")\
               .load(ds_path)

In [0]:
# df = df.withColumn("ID",monotonically_increasing_id())

In [0]:
# df.count()

111394

#### Transformation

In [0]:
def transform(df):

    df = df.dropna(how='any').withColumn("ID",monotonically_increasing_id())

    window_spec = Window.partitionBy('Policy Reference',
                                    'Inception Date',
                                    'Expiry Date',
                                    'Underwriter Name',
                                    'Department',
                                    'TriFocus Group',
                                    'Policy YOA',
                                    'Stats Code',
                                    'Policy Settlement Currency',
                                    'Syndicate Number',
                                    'Total Acquisition Cost',
                                    'Estimated Premium Income',
                                    'MOP',
                                    'report_date').orderBy('ID')

    df_with_row_number = df.withColumn('rownumber', row_number().over(window_spec))

    df = df_with_row_number.filter(df_with_row_number.rownumber == 1).drop("rownumber")\
                           .withColumn('Gross Written Premium',(col('Estimated Premium Income') - col('Total Acquisition Cost')))\
                           .withColumn('MOP', trim(col('MOP')))\
                           .withColumn('Binder', when((col('MOP') == 'B') | (col('MOP') == 'L'), 1).otherwise(0))\
                           .withColumn('Acquisition Costs', col('Total Acquisition Cost'))
    
    df = df.select([col(x).alias(x.replace(' ', '')) for x in df.columns])

    return df

In [0]:
# df.explain(extended=True)

== Parsed Logical Plan ==
Project [PolicyReference#51, InceptionDate#67, ExpiryDate#82, UnderwriterName#97, Department#112, TriFocusGroup#127, PolicyYOA#142, StatsCode#157, PolicySettlementCurrency#172, SyndicateNumber#187, TotalAcquisitionCost#202, EstimatedPremiumIncome#217, MOP#232, report_date#36 AS report_date#247]
+- Project [PolicyReference#51, InceptionDate#67, ExpiryDate#82, UnderwriterName#97, Department#112, TriFocusGroup#127, PolicyYOA#142, StatsCode#157, PolicySettlementCurrency#172, SyndicateNumber#187, TotalAcquisitionCost#202, EstimatedPremiumIncome#217, MOP#35 AS MOP#232, report_date#36]
   +- Project [PolicyReference#51, InceptionDate#67, ExpiryDate#82, UnderwriterName#97, Department#112, TriFocusGroup#127, PolicyYOA#142, StatsCode#157, PolicySettlementCurrency#172, SyndicateNumber#187, TotalAcquisitionCost#202, Estimated Premium Income#34 AS EstimatedPremiumIncome#217, MOP#35, report_date#36]
      +- Project [PolicyReference#51, InceptionDate#67, ExpiryDate#82, Unde

In [0]:
# df = df.select([col(x).alias(x.replace(' ', '')) for x in df.columns
# ])

In [0]:
df = transform(df)

In [0]:
# col_names = df.columns


In [0]:
# col_names

In [0]:
# df = df.filter(~(col('Total Acquisition Cost')== 0) & ~(col('Estimated Premium Income') == 0))

In [0]:
# window_spec = Window.partitionBy('Policy Reference',
#  'Inception Date',
#  'Expiry Date',
#  'Underwriter Name',
#  'Department',
#  'TriFocus Group',
#  'Policy YOA',
#  'Stats Code',
#  'Policy Settlement Currency',
#  'Syndicate Number',
#  'Total Acquisition Cost',
#  'Estimated Premium Income',
#  'MOP',
#  'report_date').orderBy('ID')

# df_with_row_number = df.withColumn("row_number", row_number().over(window_spec))

# df = df_with_row_number.filter(df_with_row_number.row_number == 1).drop("row_number")

In [0]:
# df = df.withColumn('Gross Written Premium',(col('Estimated Premium Income') - col('Total Acquisition Cost')))

In [0]:

# # Remove leading/trailing spaces
# df = df.withColumn('MOP', trim(col('MOP')))

In [0]:
# df = df.withColumn('Binder', when((col('MOP') == 'B') | (col('MOP') == 'L'), 1).otherwise(0))

In [0]:
# df = df.withColumn('Acquisition Costs', col('Total Acquisition Cost'))

In [0]:
# columns_new = df.columns
# columns_new

In [0]:
# for columns in columns_new:
#     df = df.withColumnRenamed(columns, columns.replace(' ', ''))

#### Saving as Parquet (delta) 

In [0]:
df.write.format('delta')\
        .mode('overwrite')\
        .option('path','abfss://silver@azureblobstorage404.dfs.core.windows.net/')\
        .save()