TASK 1 for folder hierarchy

data-lake-bucket/
│
├── raw/
│   ├── On-premise-Data-Warehouse/
│   │   └── Line-of-Business/
│   │       └── Database-Name/
│   │           └── Table-Name/
│   │               └── Partition1/
│   │                   └── Partition2/
│   │                       └── .../
│   │                           └── Data-Files
│   └── Projects-Data/
│       └── Line-of-Business/
│           └── Database-Name/
│               └── Table-Name/
│                   └── Partition1/
│                       └── Partition2/
│                           └── .../
│                               └── Data-Files
│
├── transformed/
│   ├── On-premise-Data-Warehouse/
│   │   └── Line-of-Business/
│   │       └── Database-Name/
│   │           └── Table-Name/
│   │               └── Partition1/
│   │                   └── Partition2/
│   │                       └── .../
│   │                           └── Data-Files
│   └── Projects-Data/
│       └── Line-of-Business/
│           └── Database-Name/
│               └── Table-Name/
│                   └── Partition1/
│                       └── Partition2/
│                           └── .../
│                               └── Data-Files
│
├── curated/
│   ├── Business-Unit-1/
│   │   └── Database-Name/
│   │       └── Table-Name/
│   │           └── Partition1/
│   │               └── Partition2/
│   │                   └── .../
│   │                       └── Data-Files
│   ├── Business-Unit-2/
│   │   └── Database-Name/
│   │       └── Table-Name/
│   │           └── Partition1/
│   │               └── Partition2/
│   │                   └── .../
│   │                       └── Data-Files
│   └── ...
│
└── logs/
    ├── s3-access-logs/
    ├── cloudwatch-logs/
    └── cloudtrail-logs/


TASK 2: Designing the Fact and Dimension Tables

In [None]:
Fact Tables:
    1. Sales Transactions Fact Table:
        Primary Key: Transaction ID
        Foreign Keys:
            Customer ID (links to Customer dimension)
            Product ID (links to Product dimension)
            Time ID (links to Time dimension)
        Measures: Quantity Sold, Total Sales Amount, Discounts, Taxes, etc.
    Financial Summary Fact Table:
        Primary Key: Time ID
        Measures: Total Revenue, Total Expenses, Net Profit, etc.
Dimension Tables:
    Customer Dimension Table:
        Primary Key: Customer ID    
            Attributes: Customer Name, Customer Segment, Customer Location, Contact Information, etc.
    Product Dimension Table:
        Primary Key: Product ID
        Attributes: Product Name, Category, Subcategory, Brand, Price, etc.
    Time Dimension Table:
        Primary Key: Time ID
        Attributes: Date, Day of Week, Month, Quarter, Year, Fiscal Period, etc.
Primary and Foreign Keys:
    In the Sales Transactions Fact Table:
        Primary Key: Transaction ID
        Foreign Keys:
            Customer ID (from Customer Dimension Table)
            Product ID (from Product Dimension Table)
            Time ID (from Time Dimension Table)
    In the Financial Summary Fact Table:
        Primary Key: Time ID (if aggregated over time)

task 3 scripting

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StringType
import logging



In [None]:
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
contx = SparkContext()
glueContx = GlueContext(contx)
spark = glueContx.spark_session

In [None]:
args = getResolvedOptions(sys.argv,['JOB'])

In [None]:
raw_data = 's3:url'
refined_data = 's3:url'

In [35]:
raw_db = 'raw_db'
raw_tb = 'raw_table'
refined_db = 'refined_db'
refined_table = 'ref_tb'

In [None]:
partition_col = ['monthly','daily']

In [None]:
schema_evolution_strat = 'merge'

In [None]:
try:

    raw_dynamic_frame = glueContx.create_dynamic_frame.from_catalog(database=raw_db, table_name = raw_tb)

    raw_data_frame = raw_dynamic_frame.toDF()

    refined_data_frame = raw_data_frame.select('col1','col2','col3','col4').filter(col('col1')>0)

    refined_dynamic_frame = DynamicFrame.fromDF(refined_data_frame,glueContx, 'refined_data')
    glueContext.write_dynamic_frame.from_options(
        frame=refined_dynamic_frame,
        connection_type = 's3',
        connection_options = {'path':refined_data},
        format='parquet'

    )

    glueContx.write_dynamic_frame.from_catalog(
        frame=refined_dynamic_frame,
        database=refined_data,
        table_name = refined_table,
        transformation_contx = 'refined_data_cat',
        additional_ops = {'schema_evolution_strategy':schema_evolution_strat}
    )

    total_records = refined_data_frame.count()
    null_records = refined_data_frame.filter(col('col1').isNull() | col('col2').isNull() | col('col3').isNull() | col('col4').isNull()).count()

    if null_records / total_records > 0.8:
        raise ValueError('data quality not matched')
    
    logger.info('job complete')

except Exception as e:

    logger.error(f'error: {e}')
    raise e