## DaskS3CDC

Manages change data capture operations (CDC) using Dask DataFrames and AWS S3 storage.

### Functionalities:
- **Data Management:** Handles insertion, updating, deletion, and validation of data.
- **Storage and Retrieval:** Saves and retrieves data to/from an AWS S3 bucket as Parquet files.

### Tools Used:
- **Dask:** For efficient handling of large datasets via parallelization.
- **Faker:** Generates fake data for testing purposes.
- **AWS S3:** Storage service for saving and retrieving Parquet files.

### Optimizations:
- **Parallelization:** Utilizes Dask's parallel computing for efficient data operations.
- **Delayed Tasks:** Uses delayed tasks for parallel reading from S3 for faster initialization.
- **Partitioning:** Organizes stored data in S3 by year and month for optimized retrieval.

### Methods:
- `insert_new_data(new_data)`: Inserts new data into the Dask DataFrame.
- `create_dummy_data(num_records)`: Creates fake data if none exists in the S3 bucket.
- `update_records(id_column_values)`: Updates specific records based on provided IDs and values.
- `delete_record(target_ids)`: Deletes specified records by ID.
- `validate_data()`: Validates data by setting 'validity_flag' based on maximum timestamp per ID.
- `save_s3()`: Saves live and historical dataframes to S3 as Parquet files.
- `read_s3()`: Reads data from S3 and initializes live and historical dataframes.


In [4]:
import dask.dataframe as dd
from faker import Faker
import pandas as pd
from datetime import datetime
from dask import delayed

In [8]:
class DaskS3CDC:
    def __init__(self, new_data:pd.DataFrame=pd.DataFrame()):
        """
        Initializes the DaskS3CDC class.
        Handles a Dask DataFrame for change data capture operations.

        Args:
        - new_data: Optional parameter to provide initial data.
        """

        # AWS credentials and S3 bucket information
        self.aws_access_key_id = 'AWS_ACCESS_KEY_ID'
        self.aws_secret_access_key = 'AWS_SECRET_ACCESS_KEY'
        self.bucket='SELECTED_BUCKET'
        self.region='SELECTED_REGION'

        # Initializing Dask DataFrames
        self.historical_df = dd.from_pandas(pd.DataFrame(), npartitions=2)
        
        # Check if new_data is provided, else attempt to read from S3 or create dummy data
        if not new_data.empty:
            self.insert_new_data(new_data)
        else:
            try:
                self.read_s3()
            except:
                self.create_dummy_data(5)
                print(f'Files not found, created dummy data.' )

    def insert_new_data(self, new_data):
        """
        Inserts new data into the Dask DataFrame.
        Handles parallelization to enhance data processing.

        Args:
        - new_data: DataFrame to be inserted.
        """

        # Convert provided data into Dask DataFrame with parallelized partitions
        new_data_dask = dd.from_pandas(new_data, npartitions=2)

        # Check if 'id' column is present in the new data
        if 'id' not in new_data_dask.columns:
            raise ValueError("The 'id' column is required in the provided data.")
        
        # Add timestamp if not present in the new data
        if 'timestamp' not in new_data_dask.columns:
            new_data_dask['timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # Update dataframes with the new data
        if self.historical_df is not None and len(self.historical_df) != 0:
            self.table = dd.concat([self.table, new_data_dask])
            self.historical_df = dd.concat([self.historical_df, new_data_dask.assign(validity_flag=1, operation='insert')])
        else:
            self.table = new_data_dask
            self.historical_df = new_data_dask.assign(validity_flag=1, operation='insert')

        self.save_s3()


    def create_dummy_data(self, num_records):
        """
        Creates dummy data if no data is found in the specified S3 bucket.

        Args:
        - num_records: Number of records to generate.
        """
        # Generate fake data using Faker library
        fake = Faker()
        table = {
            'id': [fake.uuid4() for _ in range(num_records)],
            'name': [fake.name() for _ in range(num_records)],
            'email': [fake.email() for _ in range(num_records)],
            'address': [fake.address() for _ in range(num_records)],
            'timestamp': [datetime.now().strftime("%Y-%m-%d %H:%M:%S") for _ in range(num_records)]
        }
        dtypes = {
            'id': 'str',
            'name': 'str',
            'email': 'str',
            'address': 'str',
            'timestamp': 'str'
        }
        new_df_pandas = pd.DataFrame(table).astype(dtypes)
        new_df_dask = dd.from_pandas(new_df_pandas, npartitions=2)

        if self.historical_df is not None and len(self.historical_df) != 0:
            self.table = dd.concat([self.table, new_df_dask])
            self.historical_df = dd.concat([self.historical_df, new_df_dask.assign(validity_flag=1, operation='insert')])
        else:
            self.table = new_df_dask
            self.historical_df = new_df_dask.assign(validity_flag=1, operation='insert')

        self.save_s3()

    def update_records(self, id_column_values=None):
        """
        Updates specific records in the DataFrame based on provided ID and column values.
        Marks updated records in the historical dataframe.

        Args:
        - id_column_values: Dictionary containing columns and their respective updated values.
        """
        if id_column_values is None:
            id_column_values = {
                'id': [
                    self.table['id'].compute().iloc[0],
                    self.table['id'].compute().iloc[1]
                ],
                'email': [
                    'new_email_value_1',
                    'new_value_2'
                ],
                # Add other columns and their respective updated values here
                # 'column_name': ['new_value_1', 'new_value_2']
            }

        def batch_update(partition):
            updates = {col: {id_value: new_value for id_value, new_value in zip(id_column_values['id'], values)}
                   for col, values in id_column_values.items() if col != 'id'}

            for column, values in updates.items():
                self.table[column] = self.table['id'].map(values).fillna(self.table[column])
                    
            return partition

        # Apply the batch update to the dataframe
        self.table = self.table.map_partitions(batch_update)
        

        # Update timestamp once for all modified records
        self.table['timestamp'] = self.table['timestamp'].mask(
            self.table['id'].isin(id_column_values['id']),
            datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        )

        updated = self.table[self.table['id'].isin(id_column_values['id'])]
        updated['operation'] = 'update'

        # Concatenate with historical_df
        self.historical_df = dd.concat([updated, self.historical_df])

        self.validate_data()

    def delete_record(self, target_ids: list = None):
        """
        Deletes specified records by ID, marking them as 'delete' in the historical dataframe.

        Args:
        - target_ids: List of IDs to be deleted.
        """
        if target_ids is not None and not isinstance(target_ids, list):
            raise TypeError("target_ids must be a list")
    
        if target_ids is None:
            target_ids = self.table['id'].compute().iloc[:2].tolist()

        for target_id in target_ids:
            specific_row = self.historical_df[
                (self.historical_df['id'] == target_id) & 
                (self.historical_df['validity_flag'] == 1)
            ]
            
            current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            specific_row['timestamp'] = current_timestamp
            specific_row['operation'] = 'delete'

            self.historical_df = dd.concat([specific_row, self.historical_df])
            self.table = self.table[self.table['id'] != target_id]

        self.validate_data()

    def validate_data(self):
        """
        Validates the data by setting a 'validity_flag' based on the maximum timestamp per ID.
        """
       
        def define_validity(df):

            # Group by 'id' and find the maximum timestamp within each group
            max_timestamp_per_id = df.groupby('id')['timestamp'].transform('max')

            # Set 'validity_flag' to 1 for rows that have the maximum timestamp for their respective ID
            df['validity_flag'] = (df['timestamp'] == max_timestamp_per_id).astype(int)
            
            return df
        
        self.historical_df = self.historical_df.sort_values(by=['id','timestamp'], ascending=False).set_index('id')
        self.historical_df = self.historical_df.map_partitions(define_validity).reset_index()

        self.save_s3()

    def save_s3(self):
        """
        Saves dataframes to S3 bucket as Parquet files.
        Implements parallel computation for faster data storage.

        Uses partitioning to organize data by year and month.
        """
        
        duplicated_table = self.table.copy()
        duplicated_table['timestamp'] = dd.to_datetime(duplicated_table['timestamp'])
        duplicated_table['year'] = duplicated_table['timestamp'].dt.year
        duplicated_table['month'] = duplicated_table['timestamp'].dt.month

        duplicated_historic = self.historical_df.copy()
        duplicated_historic['timestamp'] = dd.to_datetime(duplicated_historic['timestamp'])
        duplicated_historic['year'] = duplicated_historic['timestamp'].dt.year
        duplicated_historic['month'] = duplicated_historic['timestamp'].dt.month
        
        # Define function for saving to S3 with partitioning
        def _save_s3(data_frame, folder_name):
            data_frame.to_parquet(
                f"s3://{self.bucket}/{folder_name}/",
                partition_on=['year', 'month'],
                compression="SNAPPY",
                storage_options={
                    'key': self.aws_access_key_id,
                    'secret': self.aws_secret_access_key 
                },
                compute=True
            )

        # Save self.table and self.historical_df to S3 in parallel
        dd.compute(
            _save_s3(duplicated_table, 'table'),
            _save_s3(duplicated_historic, 'historical')
        )

    def read_s3(self):
        """
        Reads data from S3 bucket and initializes dataframes.
        Utilizes delayed tasks for parallel reading and faster initialization.
        """
        # Define function for reading from S3
        @delayed
        def _read_s3(path):
            return dd.read_parquet(path
            ,engine='fastparquet'
            ,compression="SNAPPY"
            ,read_partition_on=['year', 'month']
            ,storage_options={
                'key': self.aws_access_key_id,
                'secret': self.aws_secret_access_key
            }
            ).drop(['year', 'month'], axis=1)

        # Create delayed tasks for reading from S3
        read_table = _read_s3(f"s3://{self.bucket}/table/")
        read_historical = _read_s3(f"s3://{self.bucket}/historical/")

        # Trigger the computation of delayed tasks
        delayed_tasks = [read_table, read_historical]
        self.table, self.historical_df = delayed(delayed_tasks).compute()

In [10]:
cdc = DaskS3CDC()

Files not found, created dummy data.


In [11]:
cdc.table.compute()

Unnamed: 0,id,name,email,address,timestamp
0,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,dlyons@example.com,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:48:19
1,0f0898d4-920a-4c5e-940c-9d793d6e2c0f,Patrick Payne,stevefisher@example.net,"469 Carrillo Mill Suite 215\nNew Julie, KS 95119",2023-12-05 23:48:19
2,d244e12c-7378-4e60-96a7-92f9696854ee,Nathan Goodwin,dwright@example.net,"9282 Ortega Wall\nDianaton, TX 42652",2023-12-05 23:48:19
3,3c0e671b-3be4-4709-bcab-2aa0f5b626ac,Tyler Bailey,aaronscott@example.net,"PSC 3539, Box 8887\nAPO AP 47593",2023-12-05 23:48:19
4,27175f13-69cd-4d86-b6ab-0e059d3f1414,Martha Adams,solomondarlene@example.com,"414 Lamb Roads Apt. 976\nNew Aaronmouth, VT 00512",2023-12-05 23:48:19


In [12]:
cdc.historical_df.compute()

Unnamed: 0,id,name,email,address,timestamp,validity_flag,operation
0,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,dlyons@example.com,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:48:19,1,insert
1,0f0898d4-920a-4c5e-940c-9d793d6e2c0f,Patrick Payne,stevefisher@example.net,"469 Carrillo Mill Suite 215\nNew Julie, KS 95119",2023-12-05 23:48:19,1,insert
2,d244e12c-7378-4e60-96a7-92f9696854ee,Nathan Goodwin,dwright@example.net,"9282 Ortega Wall\nDianaton, TX 42652",2023-12-05 23:48:19,1,insert
3,3c0e671b-3be4-4709-bcab-2aa0f5b626ac,Tyler Bailey,aaronscott@example.net,"PSC 3539, Box 8887\nAPO AP 47593",2023-12-05 23:48:19,1,insert
4,27175f13-69cd-4d86-b6ab-0e059d3f1414,Martha Adams,solomondarlene@example.com,"414 Lamb Roads Apt. 976\nNew Aaronmouth, VT 00512",2023-12-05 23:48:19,1,insert


In [13]:
updates = {
                'id': [
                    'e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa'
                ],
                'email': ['new_email@test.gr'
                ],
                # Add other columns and their respective updated values here
                # 'column_name': ['new_value_1', 'new_value_2']
            }

cdc.update_records(updates)

In [14]:
cdc.table.compute()

Unnamed: 0,id,name,email,address,timestamp
0,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,new_email@test.gr,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:52:21
1,0f0898d4-920a-4c5e-940c-9d793d6e2c0f,Patrick Payne,stevefisher@example.net,"469 Carrillo Mill Suite 215\nNew Julie, KS 95119",2023-12-05 23:48:19
2,d244e12c-7378-4e60-96a7-92f9696854ee,Nathan Goodwin,dwright@example.net,"9282 Ortega Wall\nDianaton, TX 42652",2023-12-05 23:48:19
3,3c0e671b-3be4-4709-bcab-2aa0f5b626ac,Tyler Bailey,aaronscott@example.net,"PSC 3539, Box 8887\nAPO AP 47593",2023-12-05 23:48:19
4,27175f13-69cd-4d86-b6ab-0e059d3f1414,Martha Adams,solomondarlene@example.com,"414 Lamb Roads Apt. 976\nNew Aaronmouth, VT 00512",2023-12-05 23:48:19


In [15]:
cdc.historical_df.compute()

Unnamed: 0,id,name,email,address,timestamp,operation,validity_flag
0,0f0898d4-920a-4c5e-940c-9d793d6e2c0f,Patrick Payne,stevefisher@example.net,"469 Carrillo Mill Suite 215\nNew Julie, KS 95119",2023-12-05 23:48:19,insert,1
0,27175f13-69cd-4d86-b6ab-0e059d3f1414,Martha Adams,solomondarlene@example.com,"414 Lamb Roads Apt. 976\nNew Aaronmouth, VT 00512",2023-12-05 23:48:19,insert,1
0,3c0e671b-3be4-4709-bcab-2aa0f5b626ac,Tyler Bailey,aaronscott@example.net,"PSC 3539, Box 8887\nAPO AP 47593",2023-12-05 23:48:19,insert,1
0,d244e12c-7378-4e60-96a7-92f9696854ee,Nathan Goodwin,dwright@example.net,"9282 Ortega Wall\nDianaton, TX 42652",2023-12-05 23:48:19,insert,1
1,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,new_email@test.gr,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:52:21,update,1
2,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,dlyons@example.com,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:48:19,insert,0


In [16]:
deletes = ['d244e12c-7378-4e60-96a7-92f9696854ee']

cdc.delete_record(target_ids=deletes)

In [17]:
cdc.table.compute()

Unnamed: 0,id,name,email,address,timestamp
0,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,new_email@test.gr,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:52:21
1,0f0898d4-920a-4c5e-940c-9d793d6e2c0f,Patrick Payne,stevefisher@example.net,"469 Carrillo Mill Suite 215\nNew Julie, KS 95119",2023-12-05 23:48:19
3,3c0e671b-3be4-4709-bcab-2aa0f5b626ac,Tyler Bailey,aaronscott@example.net,"PSC 3539, Box 8887\nAPO AP 47593",2023-12-05 23:48:19
4,27175f13-69cd-4d86-b6ab-0e059d3f1414,Martha Adams,solomondarlene@example.com,"414 Lamb Roads Apt. 976\nNew Aaronmouth, VT 00512",2023-12-05 23:48:19


In [18]:
cdc.historical_df.compute()

Unnamed: 0,id,name,email,address,timestamp,operation,validity_flag
0,0f0898d4-920a-4c5e-940c-9d793d6e2c0f,Patrick Payne,stevefisher@example.net,"469 Carrillo Mill Suite 215\nNew Julie, KS 95119",2023-12-05 23:48:19,insert,1
0,27175f13-69cd-4d86-b6ab-0e059d3f1414,Martha Adams,solomondarlene@example.com,"414 Lamb Roads Apt. 976\nNew Aaronmouth, VT 00512",2023-12-05 23:48:19,insert,1
0,3c0e671b-3be4-4709-bcab-2aa0f5b626ac,Tyler Bailey,aaronscott@example.net,"PSC 3539, Box 8887\nAPO AP 47593",2023-12-05 23:48:19,insert,1
0,d244e12c-7378-4e60-96a7-92f9696854ee,Nathan Goodwin,dwright@example.net,"9282 Ortega Wall\nDianaton, TX 42652",2023-12-05 23:54:00,delete,1
1,d244e12c-7378-4e60-96a7-92f9696854ee,Nathan Goodwin,dwright@example.net,"9282 Ortega Wall\nDianaton, TX 42652",2023-12-05 23:48:19,insert,0
2,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,new_email@test.gr,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:52:21,update,1
3,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,dlyons@example.com,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:48:19,insert,0


In [19]:
inserts = {
                'id': [
                    'insert_test'
                ],
                'email': ['insert@test.gr'
                ],
                # Add other columns and their respective updated values here
                # 'column_name': ['new_value_1', 'new_value_2']
            }

cdc.insert_new_data(pd.DataFrame(inserts))

In [20]:
cdc.table.compute()

Unnamed: 0,id,name,email,address,timestamp
0,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,new_email@test.gr,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:52:21
1,0f0898d4-920a-4c5e-940c-9d793d6e2c0f,Patrick Payne,stevefisher@example.net,"469 Carrillo Mill Suite 215\nNew Julie, KS 95119",2023-12-05 23:48:19
3,3c0e671b-3be4-4709-bcab-2aa0f5b626ac,Tyler Bailey,aaronscott@example.net,"PSC 3539, Box 8887\nAPO AP 47593",2023-12-05 23:48:19
4,27175f13-69cd-4d86-b6ab-0e059d3f1414,Martha Adams,solomondarlene@example.com,"414 Lamb Roads Apt. 976\nNew Aaronmouth, VT 00512",2023-12-05 23:48:19
0,insert_test,,insert@test.gr,,2023-12-05 23:57:02


In [21]:
cdc.historical_df.compute()

Unnamed: 0,id,name,email,address,timestamp,operation,validity_flag
0,0f0898d4-920a-4c5e-940c-9d793d6e2c0f,Patrick Payne,stevefisher@example.net,"469 Carrillo Mill Suite 215\nNew Julie, KS 95119",2023-12-05 23:48:19,insert,1
0,27175f13-69cd-4d86-b6ab-0e059d3f1414,Martha Adams,solomondarlene@example.com,"414 Lamb Roads Apt. 976\nNew Aaronmouth, VT 00512",2023-12-05 23:48:19,insert,1
0,3c0e671b-3be4-4709-bcab-2aa0f5b626ac,Tyler Bailey,aaronscott@example.net,"PSC 3539, Box 8887\nAPO AP 47593",2023-12-05 23:48:19,insert,1
0,d244e12c-7378-4e60-96a7-92f9696854ee,Nathan Goodwin,dwright@example.net,"9282 Ortega Wall\nDianaton, TX 42652",2023-12-05 23:54:00,delete,1
1,d244e12c-7378-4e60-96a7-92f9696854ee,Nathan Goodwin,dwright@example.net,"9282 Ortega Wall\nDianaton, TX 42652",2023-12-05 23:48:19,insert,0
2,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,new_email@test.gr,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:52:21,update,1
3,e48fff8e-7172-4d79-a3d5-ad9d7e1bf1aa,Michelle Green,dlyons@example.com,441 Kathleen Dam Suite 470\nLake Robertborough...,2023-12-05 23:48:19,insert,0
0,insert_test,,insert@test.gr,,2023-12-05 23:57:02,insert,1
