In [None]:
import os
import boto3
import hashlib
import uuid
import json
import pandas as pd
from datetime import datetime
import requests
from tenacity import retry, wait_exponential, stop_after_attempt

# AWS Clients
s3 = boto3.client('s3')
secrets = boto3.client('secretsmanager')
dynamodb = boto3.resource('dynamodb').Table('extraction_state')

# Constants
BUCKET = 'your-data-bucket'
SOURCES = ['0_0', '0_1', 'ctgov']

def get_secret(secret_name):
    try:
        return secrets.get_secret_value(SecretId=secret_name)['SecretString']
    except Exception as e:
        raise RuntimeError(f"Secret {secret_name} retrieval failed: {str(e)}")

def check_existing_extraction(source, year_month):
    response = dynamodb.get_item(Key={'source': source, 'year_month': year_month})
    return 'Item' in response

@retry(wait=wait_exponential(), stop=stop_after_attempt(3))
def fetch_0_0(api_key, year_month):
    """Fetch 0_0 articles for a specific month"""
    articles = []
    query = f"{datetime.strptime(year_month, '%Y-%m').strftime('%Y/%m')}[dp]"
    base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
    
    search_url = f"{base_url}esearch.fcgi?db=0_0&term={query}&api_key={api_key}&retmax=100000"
    search_res = requests.get(search_url)
    id_list = search_res.text.split('<Id>')[1:-1]
    
    for i in range(0, len(id_list), 500):
        batch_ids = [id.split('<')[0] for id in id_list[i:i+500]]
        fetch_url = f"{base_url}efetch.fcgi?db=0_0&id={','.join(batch_ids)}&api_key={api_key}&retmode=xml"
        fetch_res = requests.get(fetch_url)
        articles.extend(fetch_res.text.split('<0_0Article>')[1:])
    
    return articles

@retry(wait=wait_exponential(), stop=stop_after_attempt(3))
def fetch_ctgov(year_month):
    """Fetch 0_2 trials for a specific month"""
    trials = []
    page_size = 100
    skip = 0
    
    while True:
        url = f"https://clinicaltrials.gov/api/query/full_studies?expr=AREA[LastUpdatePostDate]RANGE[{year_month}-01,{year_month}-31]&min_rnk={skip+1}&max_rnk={skip+page_size}&fmt=json"
        res = requests.get(url).json()
        trials.extend(res.get("FullStudiesResponse", {}).get("FullStudies", []))
        
        if len(res.get("FullStudiesResponse", {}).get("FullStudies", [])) < page_size:
            break
        skip += page_size
    
    return trials

@retry(wait=wait_exponential(), stop=stop_after_attempt(3))
def fetch_0_1(api_key):
    """Fetch 0_1 data"""
    headers = {"X-API-Key": api_key}
    response = requests.get("https://api.0_1.com/v1/drugs?limit=1000", headers=headers)
    return response.json()

def process_source(source, year_month):
    metadata = []
    print(f"Processing {source} for {year_month}")
    
    try:
        # Check if already processed
        if check_existing_extraction(source, year_month):
            print(f"Already processed {source} for {year_month}")
            return metadata

        # Source-specific processing
        if source == '0_0':
            api_key = get_secret('0_0/api_key')
            articles = fetch_0_0(api_key, year_month)
            
            for article in articles:
                file_content = f"<0_0Article>{article}</0_0Article>"
                pmid = article.split('<PMID Version="1">')[1].split('<')[0]
                metadata.append(create_metadata_entry(source, pmid, 'xml', file_content, year_month))

        elif source == 'ctgov':
            trials = fetch_ctgov(year_month)
            
            for trial in trials:
                nct_id = trial['Study']['ProtocolSection']['IdentificationModule']['NCTId']
                file_content = json.dumps(trial)
                metadata.append(create_metadata_entry(source, nct_id, 'json', file_content, year_month))

        elif source == '0_1':
            api_key = get_secret('0_1/api_key')
            drugs = fetch_0_1(api_key)
            
            for drug in drugs.get('results', []):
                drug_id = drug['drugId']
                file_content = json.dumps(drug)
                metadata.append(create_metadata_entry(source, drug_id, 'json', file_content, year_month))

        # Mark as processed in DynamoDB
        dynamodb.put_item(Item={
            'source': source,
            'year_month': year_month,
            'processed_at': datetime.utcnow().isoformat()
        })

    except Exception as e:
        print(f"Error processing {source}: {str(e)}")
        raise

    return metadata

def create_metadata_entry(source, source_id, file_type, content, year_month):
    file_name = f"{source_id}.{file_type}"
    checksum = hashlib.md5(content.encode()).hexdigest()
    s3_path = f"raw/{source}/{year_month.replace('-', '/')}/{file_name}"
    
    # Upload to S3
    s3.put_object(
        Bucket=BUCKET,
        Key=s3_path,
        Body=content,
        Metadata={'source': source}
    )
    
    return {
        'file_id': str(uuid.uuid4()),
        'source_system': source,
        'source_id': source_id,
        'file_name': file_name,
        'extraction_date': datetime.utcnow().isoformat(),
        's3_raw_path': s3_path,
        'checksum': checksum,
        'file_size': len(content),
        'transformation_required': True,
        'transformation_status': 'PENDING',
        'transformed_s3_path': '',
        'last_updated': datetime.utcnow().isoformat()
    }

def main():
    current_month = datetime.utcnow().strftime('%Y-%m')
    all_metadata = []
    
    for source in SOURCES:
        try:
            all_metadata.extend(process_source(source, current_month))
        except Exception as e:
            print(f"Failed to process {source}: {str(e)}")
            continue

    if all_metadata:
        # Save metadata
        df = pd.DataFrame(all_metadata)
        metadata_key = f"metadata/{current_month}/metadata_{datetime.utcnow().strftime('%Y%m%dT%H%M%S')}.csv"
        s3.put_object(Bucket=BUCKET, Key=metadata_key, Body=df.to_csv(index=False))
        
        # Update latest pointer
        s3.put_object(Bucket=BUCKET, Key="metadata/latest.csv", Body=df.to_csv(index=False))
        
        # Trigger Lambda if needed
        if df['transformation_required'].any():
            lambda_client = boto3.client('lambda')
            lambda_client.invoke(
                FunctionName='data-transformation-lambda',
                InvocationType='Event',
                Payload=json.dumps({'metadata_key': metadata_key})
            )

if __name__ == "__main__":
    main()

In [2]:
## Lambda

import os
import json
import boto3
import pandas as pd
from datetime import datetime

s3 = boto3.client('s3')
BUCKET = 'your-data-bucket'

def transform_0_0(content):
    """Example 0_0 XML to CSV transformation"""
    from xml.etree import ElementTree as ET
    root = ET.fromstring(f"<root>{content}</root>")
    return {
        'pmid': root.findtext('.//PMID'),
        'title': root.findtext('.//ArticleTitle'),
        'abstract': root.findtext('.//AbstractText')
    }

def transform_ctgov(content):
    """Example 0_2 JSON flattening"""
    trial = json.loads(content)
    return {
        'nct_id': trial['Study']['ProtocolSection']['IdentificationModule']['NCTId'],
        'title': trial['Study']['ProtocolSection']['IdentificationModule']['OfficialTitle'],
        'phase': trial['Study']['ProtocolSection']['DesignModule'].get('Phase', 'N/A')
    }

def transform_0_1(content):
    """Example 0_1 data cleaning"""
    drug = json.loads(content)
    return {
        'drug_id': drug['drugId'],
        'name': drug['name'],
        'targets': ', '.join(drug.get('targets', []))
    }

def lambda_handler(event, context):
    metadata_key = event['metadata_key']
    
    try:
        # Get metadata
        obj = s3.get_object(Bucket=BUCKET, Key=metadata_key)
        df = pd.read_csv(obj['Body'])
        
        # Process pending transformations
        for index, row in df[(df['transformation_required']) & (df['transformation_status'] == 'PENDING')].iterrows():
            try:
                # Download raw file
                raw_obj = s3.get_object(Bucket=BUCKET, Key=row['s3_raw_path'])
                content = raw_obj['Body'].read().decode()
                
                # Source-specific transformation
                if row['source_system'] == '0_0':
                    transformed = transform_0_0(content)
                elif row['source_system'] == 'ctgov':
                    transformed = transform_ctgov(content)
                elif row['source_system'] == '0_1':
                    transformed = transform_0_1(content)
                else:
                    continue
                
                # Upload transformed file
                transformed_key = f"transformed/{row['source_system']}/{row['s3_raw_path'].split('/')[3]}/{row['file_name'].split('.')[0]}.csv"
                csv_data = pd.DataFrame([transformed]).to_csv(index=False)
                s3.put_object(Bucket=BUCKET, Key=transformed_key, Body=csv_data)
                
                # Update metadata
                df.at[index, 'transformation_status'] = 'COMPLETED'
                df.at[index, 'transformed_s3_path'] = transformed_key
                df.at[index, 'last_updated'] = datetime.utcnow().isoformat()
                
            except Exception as e:
                df.at[index, 'transformation_status'] = 'FAILED'
                print(f"Failed to transform {row['file_id']}: {str(e)}")
        
        # Save updated metadata
        updated_key = metadata_key.replace('/metadata/', '/metadata/processed/')
        s3.put_object(Bucket=BUCKET, Key=updated_key, Body=df.to_csv(index=False))
        
    except Exception as e:
        print(f"Critical error: {str(e)}")
        raise

    return {'statusCode': 200}

In [3]:
## cloudwatch

import os
import boto3
import hashlib
import uuid
import json
import pandas as pd
import logging
from datetime import datetime
import requests
from tenacity import retry, wait_exponential, stop_after_attempt

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

# AWS Clients
s3 = boto3.client('s3')
secrets = boto3.client('secretsmanager')
cloudwatch = boto3.client('cloudwatch')

# Constants
BUCKET = os.environ['DATA_BUCKET']
SOURCES = ['0_0', '0_1', 'ctgov']
METRIC_NAMESPACE = 'DataPipeline'

def put_metric(metric_name, value, dimensions):
    cloudwatch.put_metric_data(
        Namespace=METRIC_NAMESPACE,
        MetricData=[{
            'MetricName': metric_name,
            'Dimensions': dimensions,
            'Value': value,
            'Unit': 'Count'
        }]
    )

def get_secret(secret_name):
    try:
        return secrets.get_secret_value(SecretId=secret_name)['SecretString']
    except Exception as e:
        logger.error(f"Secret {secret_name} retrieval failed: {str(e)}")
        raise

@retry(wait=wait_exponential(), stop=stop_after_attempt(3))
def fetch_0_0(api_key, year_month):
    """Fetch 0_0 articles with enhanced logging"""
    logger.info(f"Starting 0_0 fetch for {year_month}")
    try:
        # ... (keep previous fetch_0_0 implementation)
        logger.info(f"Fetched {len(articles)} 0_0 articles")
        return articles
    except Exception as e:
        logger.error(f"0_0 fetch failed: {str(e)}")
        raise

def process_source(source, year_month):
    """Process a source with CloudWatch metrics and logging"""
    logger.info(f"Processing source: {source} for {year_month}")
    metadata = []
    try:
        # Check if metadata already exists for this month
        metadata_prefix = f"metadata/{year_month}/"
        existing = s3.list_objects_v2(Bucket=BUCKET, Prefix=metadata_prefix)
        if existing.get('KeyCount', 0) > 0:
            logger.warning(f"Metadata already exists for {year_month}, skipping")
            put_metric('SkippedExtractions', 1, [{'Name': 'Source', 'Value': source}])
            return metadata

        # ... (keep previous source processing logic)

        logger.info(f"Processed {len(metadata)} files from {source}")
        put_metric('ProcessedFiles', len(metadata), [{'Name': 'Source', 'Value': source}])

    except Exception as e:
        logger.error(f"Failed processing {source}: {str(e)}", exc_info=True)
        put_metric('ProcessingErrors', 1, [{'Name': 'Source', 'Value': source}])
        raise

    return metadata

def main():
    """Main execution with structured logging"""
    try:
        current_month = datetime.utcnow().strftime('%Y-%m')
        logger.info(f"Starting monthly extraction for {current_month}")
        
        all_metadata = []
        for source in SOURCES:
            try:
                all_metadata.extend(process_source(source, current_month))
            except Exception as e:
                logger.error(f"Critical error processing {source}: {str(e)}", exc_info=True)
                continue

        if all_metadata:
            # ... (keep previous metadata handling)
            logger.info(f"Uploaded metadata with {len(all_metadata)} entries")
            
            # Trigger Lambda
            lambda_client = boto3.client('lambda')
            lambda_client.invoke(
                FunctionName=os.environ['TRANSFORMATION_LAMBDA'],
                InvocationType='Event',
                Payload=json.dumps({'metadata_key': metadata_key})
            )
            logger.info("Triggered transformation lambda")

        logger.info("Extraction completed successfully")
        put_metric('SuccessfulExtractions', 1, [])
        
    except Exception as e:
        logger.error(f"Fatal error in main execution: {str(e)}", exc_info=True)
        put_metric('FailedExtractions', 1, [])
        raise

if __name__ == "__main__":
    # Initialize logging handler
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    main()

In [None]:
## lambda cloud

import os
import json
import boto3
import pandas as pd
import logging
from datetime import datetime

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)

# AWS Clients
s3 = boto3.client('s3')
cloudwatch = boto3.client('cloudwatch')

# Constants
BUCKET = os.environ['DATA_BUCKET']
METRIC_NAMESPACE = 'DataPipeline'

def put_metric(metric_name, value, dimensions):
    cloudwatch.put_metric_data(
        Namespace=METRIC_NAMESPACE,
        MetricData=[{
            'MetricName': metric_name,
            'Dimensions': dimensions,
            'Value': value,
            'Unit': 'Count'
        }]
    )

def lambda_handler(event, context):
    """Lambda handler with enhanced observability"""
    logger.info("Starting transformation process")
    metrics = {
        'processed': 0,
        'success': 0,
        'failures': 0
    }
    
    try:
        metadata_key = event['metadata_key']
        logger.info(f"Processing metadata file: {metadata_key}")
        
        # Get metadata
        obj = s3.get_object(Bucket=BUCKET, Key=metadata_key)
        df = pd.read_csv(obj['Body'])
        
        # Process files
        for index, row in df.iterrows():
            metrics['processed'] += 1
            if not row['transformation_required'] or row['transformation_status'] != 'PENDING':
                continue
            
            try:
                # ... (keep previous transformation logic)
                metrics['success'] += 1
                logger.info(f"Transformed {row['file_id']} successfully")
                put_metric('TransformationSuccess', 1, [{'Name': 'Source', 'Value': row['source_system']}])
                
            except Exception as e:
                metrics['failures'] += 1
                logger.error(f"Failed to transform {row['file_id']}: {str(e)}", exc_info=True)
                put_metric('TransformationFailures', 1, [{'Name': 'Source', 'Value': row['source_system']}])

        # ... (keep metadata update logic)
        
        logger.info(f"Transformation complete. Success: {metrics['success']}, Failures: {metrics['failures']}")
        put_metric('TransformationRuns', 1, [{'Name': 'Status', 'Value': 'Completed'}])
        return {'statusCode': 200}
        
    except Exception as e:
        logger.error(f"Critical transformation error: {str(e)}", exc_info=True)
        put_metric('TransformationRuns', 1, [{'Name': 'Status', 'Value': 'Failed'}])
        raise

In [None]:
3. CloudWatch Strategy
Log Groups:

/aws/ecs/extraction-task for ECS logs

/aws/lambda/data-transformation for Lambda logs

Key Metrics:

python
Metrics = [
    'ProcessedFiles',            # Per source
    'SuccessfulExtractions',     # Overall status
    'FailedExtractions',
    'TransformationSuccess',    # Per source
    'TransformationFailures',
    'TransformationRuns'         # With Status dimension
]
Alarms:

FailedExtractions > 0 for 1 consecutive period

TransformationFailures > 5% of ProcessedFiles for 15 minutes

TransformationRuns.Status=Failed > 0 for 1 period

Dashboards:

Extraction Success Rate: 100 * (SuccessfulExtractions / (SuccessfulExtractions + FailedExtractions))

Transformation Failure Rate: 100 * (TransformationFailures / ProcessedFiles)

Data Volume Trends: ProcessedFiles by Source

In [4]:
#Infra

# Add CloudWatch Alarms
transformation_errors_alarm = cloudwatch.Alarm(
    self, "TransformationErrors",
    metric=cloudwatch.Metric(
        namespace=METRIC_NAMESPACE,
        metric_name="TransformationFailures",
        statistic="sum",
        period=core.Duration.minutes(5)
    ),
    threshold=5,
    evaluation_periods=1,
    comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD
)

# Add logging permissions
lambda_role.add_to_policy(iam.PolicyStatement(
    actions=[
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "cloudwatch:PutMetricData"
    ],
    resources=["*"]
))

ecs_task_role.add_to_policy(iam.PolicyStatement(
    actions=[
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "cloudwatch:PutMetricData"
    ],
    resources=["*"]
))

In [None]:
data-pipeline/
│
├── metadata/
│   ├── __init__.py
│   ├── schema.py
│   └── storage.py
│
├── extraction/
│   ├── __init__.py
│   ├── extractors.py
│   └── task.py
│
├── transformation/
│   ├── __init__.py
│   └── lambda_handler.py
│
├── requirements.txt
└── README.md

In [None]:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
import uuid
import pandas as pd

@dataclass
class MetadataEntry:
    """Data class representing a single file's metadata"""
    source_system: str
    source_id: str
    file_name: str
    extraction_date: datetime
    s3_raw_path: str
    checksum: str
    file_size: int
    transformation_required: bool
    transformation_status: str = "PENDING"
    transformed_s3_path: Optional[str] = None
    file_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    last_updated: datetime = field(default_factory=datetime.utcnow)

    def validate(self):
        """Validate field constraints"""
        if self.transformation_status not in ["PENDING", "COMPLETED", "FAILED"]:
            raise ValueError(f"Invalid status: {self.transformation_status}")
        if not isinstance(self.transformation_required, bool):
            raise TypeError("transformation_required must be boolean")

    @classmethod
    def from_raw_file(cls, content: bytes, source: str, source_id: str, s3_path: str):
        """Factory method for creating entries from raw files"""
        return cls(
            source_system=source,
            source_id=source_id,
            file_name=s3_path.split("/")[-1],
            extraction_date=datetime.utcnow(),
            s3_raw_path=s3_path,
            checksum=hashlib.md5(content).hexdigest(),
            file_size=len(content),
            transformation_required=True
        )

@dataclass
class MetadataFile:
    """Collection of metadata entries with file operations"""
    entries: list[MetadataEntry] = field(default_factory=list)

    def add_entry(self, entry: MetadataEntry):
        entry.validate()
        self.entries.append(entry)

    def get_pending_transformations(self):
        return [entry for entry in self.entries 
                if entry.transformation_required 
                and entry.transformation_status == "PENDING"]

    def to_dataframe(self):
        return pd.DataFrame([vars(entry) for entry in self.entries])

    @classmethod
    def from_dataframe(cls, df: pd.DataFrame):
        return cls([MetadataEntry(**row) for _, row in df.iterrows()])

In [None]:
## Storage module

import boto3
from datetime import datetime
from io import StringIO
import logging

logger = logging.getLogger(__name__)

class MetadataStorage:
    def __init__(self, bucket_name: str):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name

    def save_metadata(self, metadata, year_month: str, versioned=False):
        """Save metadata to S3 with versioning support"""
        try:
            csv_buffer = StringIO()
            metadata.to_dataframe().to_csv(csv_buffer, index=False)
            
            base_key = f"metadata/{year_month.replace('-', '/')}"
            key = f"{base_key}/metadata.csv"
            
            if versioned:
                timestamp = datetime.utcnow().strftime('%Y%m%dT%H%M%S')
                key = f"{base_key}/versions/{timestamp}.csv"

            self.s3.put_object(
                Bucket=self.bucket,
                Key=key,
                Body=csv_buffer.getvalue()
            )
            logger.info(f"Saved metadata to s3://{self.bucket}/{key}")
            return key
            
        except Exception as e:
            logger.error(f"Failed to save metadata: {str(e)}")
            raise

    def load_metadata(self, key: str):
        """Load metadata from S3"""
        try:
            obj = self.s3.get_object(Bucket=self.bucket, Key=key)
            df = pd.read_csv(obj['Body'])
            return MetadataFile.from_dataframe(df)
        except Exception as e:
            logger.error(f"Failed to load metadata: {str(e)}")
            raise

    def metadata_exists(self, year_month: str):
        """Check if metadata exists for given month"""
        try:
            prefix = f"metadata/{year_month.replace('-', '/')}/metadata.csv"
            result = self.s3.list_objects_v2(Bucket=self.bucket, Prefix=prefix)
            return result.get('KeyCount', 0) > 0
        except Exception as e:
            logger.error(f"Metadata check failed: {str(e)}")
            return False

In [None]:
## Extractors 

import requests
import logging
from tenacity import retry, wait_exponential, stop_after_attempt

logger = logging.getLogger(__name__)

class BaseExtractor:
    def __init__(self, source_name: str):
        self.source_name = source_name
        
    @retry(wait=wait_exponential(), stop=stop_after_attempt(3))
    def fetch_data(self, *args, **kwargs):
        raise NotImplementedError

class 0_0Extractor(BaseExtractor):
    def __init__(self, api_key: str):
        super().__init__("0_0")
        self.api_key = api_key

    def fetch_data(self, year_month: str):
        logger.info(f"Fetching 0_0 data for {year_month}")
        # Implement 0_0 API logic
        return [...]  # List of raw data items

class CTGovExtractor(BaseExtractor):
    def __init__(self):
        super().__init__("ctgov")

    def fetch_data(self, year_month: str):
        logger.info(f"Fetching ClinicalTrials.gov data for {year_month}")
        # Implement 0_2 API logic
        return [...]  # List of raw data items

class 0_1Extractor(BaseExtractor):
    def __init__(self, api_key: str):
        super().__init__("0_1")
        self.api_key = api_key

    def fetch_data(self, year_month: str):
        logger.info(f"Fetching 0_1 data for {year_month}")
        # Implement 0_1 API logic
        return [...]  # List of raw data items

In [None]:
## Extraction task

import os
import logging
from datetime import datetime
from metadata.schema import MetadataEntry, MetadataFile
from metadata.storage import MetadataStorage
from extraction.extractors import (
    0_0Extractor,
    CTGovExtractor,
    0_1Extractor
)

logger = logging.getLogger(__name__)

class ExtractionPipeline:
    def __init__(self):
        self.bucket = os.environ['DATA_BUCKET']
        self.storage = MetadataStorage(self.bucket)
        self.extractors = {
            '0_0': 0_0Extractor(os.environ['0_0_API_KEY']),
            'ctgov': CTGovExtractor(),
            '0_1': 0_1Extractor(os.environ['0_1_API_KEY'])
        }

    def process_source(self, source: str, year_month: str):
        logger.info(f"Processing {source} for {year_month}")
        
        if self.storage.metadata_exists(year_month):
            logger.warning(f"Metadata already exists for {year_month}, skipping")
            return MetadataFile()

        extractor = self.extractors[source]
        raw_items = extractor.fetch_data(year_month)
        
        metadata = MetadataFile()
        for item in raw_items:
            content = item['content'].encode()
            s3_path = f"raw/{source}/{year_month.replace('-', '/')}/{item['id']}.{item['format']}"
            
            entry = MetadataEntry.from_raw_file(
                content=content,
                source=source,
                source_id=item['id'],
                s3_path=s3_path
            )
            metadata.add_entry(entry)
            
            # Upload to S3
            self.storage.s3.put_object(
                Bucket=self.bucket,
                Key=s3_path,
                Body=content
            )

        return metadata

    def run(self):
        current_month = datetime.utcnow().strftime('%Y-%m')
        logger.info(f"Starting extraction for {current_month}")
        
        all_metadata = MetadataFile()
        
        for source in ['0_0', 'ctgov', '0_1']:
            try:
                source_metadata = self.process_source(source, current_month)
                all_metadata.entries.extend(source_metadata.entries)
            except Exception as e:
                logger.error(f"Failed to process {source}: {str(e)}")
                continue

        if all_metadata.entries:
            metadata_key = self.storage.save_metadata(all_metadata, current_month)
            self.trigger_transformation(metadata_key)

        logger.info("Extraction completed")

    def trigger_transformation(self, metadata_key: str):
        logger.info("Triggering transformation lambda")
        lambda_client = boto3.client('lambda')
        lambda_client.invoke(
            FunctionName=os.environ['TRANSFORMATION_LAMBDA_NAME'],
            InvocationType='Event',
            Payload=json.dumps({'metadata_key': metadata_key})
        )

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    pipeline = ExtractionPipeline()
    pipeline.run()

In [None]:
# Lambda
import os
import logging
from datetime import datetime
from metadata.schema import MetadataFile
from metadata.storage import MetadataStorage

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def transform_0_0(content: str) -> dict:
    # Implementation for 0_0 transformation
    return {...}

def transform_ctgov(content: str) -> dict:
    # Implementation for 0_2 transformation
    return {...}

def transform_0_1(content: str) -> dict:
    # Implementation for 0_1 transformation
    return {...}

class TransformationPipeline:
    def __init__(self):
        self.bucket = os.environ['DATA_BUCKET']
        self.storage = MetadataStorage(self.bucket)
        
    def process_entry(self, entry):
        try:
            # Download raw file
            obj = self.storage.s3.get_object(Bucket=self.bucket, Key=entry.s3_raw_path)
            content = obj['Body'].read().decode()
            
            # Transform based on source
            if entry.source_system == '0_0':
                transformed = transform_0_0(content)
            elif entry.source_system == 'ctgov':
                transformed = transform_ctgov(content)
            elif entry.source_system == '0_1':
                transformed = transform_0_1(content)
            else:
                return

            # Upload transformed file
            transformed_key = f"transformed/{entry.source_system}/{entry.s3_raw_path.split('/')[3]}/{entry.file_name.split('.')[0]}.csv"
            self.storage.s3.put_object(
                Bucket=self.bucket,
                Key=transformed_key,
                Body=pd.DataFrame([transformed]).to_csv(index=False)
            )
            
            # Update metadata
            entry.transformation_status = "COMPLETED"
            entry.transformed_s3_path = transformed_key
            entry.last_updated = datetime.utcnow()

        except Exception as e:
            logger.error(f"Transformation failed for {entry.file_id}: {str(e)}")
            entry.transformation_status = "FAILED"

    def handle(self, metadata_key: str):
        try:
            metadata = self.storage.load_metadata(metadata_key)
            
            for entry in metadata.get_pending_transformations():
                self.process_entry(entry)
            
            # Save updated metadata
            new_key = metadata_key.replace('metadata/', 'metadata/processed/')
            self.storage.save_metadata(metadata, new_key.split('/')[2], versioned=True)
            
        except Exception as e:
            logger.error(f"Transformation pipeline failed: {str(e)}")
            raise

def lambda_handler(event, context):
    pipeline = TransformationPipeline()
    metadata_key = event['metadata_key']
    pipeline.handle(metadata_key)
    return {
        'statusCode': 200,
        'body': f"Processed {metadata_key}"
    }

In [None]:
#Requirements
boto3==1.34.25
pandas==2.1.4
requests==2.31.0
tenacity==8.2.3
python-dotenv==1.0.0

In [None]:
##Env variables
DATA_BUCKET=your-data-bucket
TRANSFORMATION_LAMBDA_NAME=data-transformation-lambda
0_0_API_KEY=your-key
0_1_API_KEY=your-key

In [None]:
AWS Setup:

Create S3 bucket with versioning enabled

Store API keys in AWS Secrets Manager

Create ECS Fargate task with proper IAM permissions

Configure Lambda with appropriate timeout/memory

Monitoring:

CloudWatch Alarms for:

Failed transformations

API request errors

S3 upload failures

X-Ray tracing for Lambda functions

CloudTrail for API auditing

This implementation provides:

Centralized metadata management

Production-grade error handling

CloudWatch integration

Modular architecture

Easy schema evolution

Source-specific processing

Versioned metadata storage

To use:

Create the directory structure

Copy files to appropriate locations

Install dependencies

Configure environment variables

Deploy to AWS infrastructure



In [None]:
Let me clarify where the **transformation requirement check** exists in the code and where we need improvements:

### **Current Implementation Status** 🔍
1. **Metadata Field**  
   We have the `transformation_required` boolean field in the `MetadataEntry` class:
   ```python
   @dataclass
   class MetadataEntry:
       transformation_required: bool  # Exists in the schema
   ```

2. **Default Value**  
   In the extraction logic, **all files** are marked as requiring transformation by default:
   ```python
   entry = MetadataEntry.from_raw_file(
       # ...
       transformation_required=True  # Hardcoded in from_raw_file()
   )
   ```

3. **Transformation Trigger**  
   The Lambda processes entries where:
   ```python
   entry.transformation_required and entry.transformation_status == "PENDING"
   ```

### **What's Missing** ❌
We need **conditional logic** to determine if transformation is actually required for each file.  
This should be based on **business rules** like:
- File type (e.g., XML vs. CSV)
- Source system (e.g., 0_0 vs. 0_1)
- Content patterns (e.g., presence of specific fields)

---

### **Enhanced Implementation** 🛠

#### 1. **Add Transformation Check Logic** (`metadata/schema.py`)
```python
@dataclass
class MetadataEntry:
    # ... existing fields ...

    @classmethod
    def from_raw_file(cls, content: bytes, source: str, source_id: str, s3_path: str):
        # Determine if transformation is needed
        requires_transform = cls._check_transformation_required(
            content=content,
            source=source,
            file_name=s3_path.split("/")[-1]
        )
        
        return cls(
            # ...
            transformation_required=requires_transform,  # Dynamic value
        )

    @classmethod
    def _check_transformation_required(cls, content: bytes, source: str, file_name: str) -> bool:
        """Business rules for transformation requirement"""
        # Example 1: File type check
        if file_name.endswith(".csv"):
            return False  # Assume CSV files don't need transformation
        
        # Example 2: Source-specific logic
        if source == "0_0":
            return True  # Always transform 0_0 XML
        
        # Example 3: Content-based check
        if b"<clinical_study>" in content:
            return True  # Raw XML needs processing
        
        return False  # Default case
```

#### 2. **Update Extraction Logic** (`extraction/task.py`)
```python
def process_source(self, source: str, year_month: str):
    # ...
    for item in raw_items:
        content = item['content'].encode()
        s3_path = f"raw/{source}/{year_month.replace('-', '/')}/{item['id']}.{item['format']}"
        
        entry = MetadataEntry.from_raw_file(
            content=content,
            source=source,
            source_id=item['id'],
            s3_path=s3_path
        )
        # Now transformation_required is set based on actual checks
```

---

### **Key Improvements** 🚀
1. **Dynamic Requirement Check**  
   Files are now evaluated using:
   - File extension
   - Source system
   - Content patterns

2. **Business Rules Centralization**  
   All transformation logic lives in `_check_transformation_required()`

3. **Flexibility**  
   Add/remove rules without changing pipeline flow:
   ```python
   def _check_transformation_required(...):
       # New rule: Skip files smaller than 1KB
       if len(content) < 1024:
           return False
       
       # New rule: 0_1 JSON files need transformation
       if source == "0_1" and file_name.endswith(".json"):
           return True
   ```

---

### **Final Architecture** 🏛
```mermaid
graph TD
    A[Extract File] --> B{Check Transformation<br/>Required?}
    B -->|Yes| C[Mark transformation_required=True]
    B -->|No| D[Mark transformation_required=False]
    C --> E[Trigger Lambda]
    D --> F[Skip Transformation]
```

This ensures only files needing transformation are processed, optimizing costs and runtime. Would you like me to add specific business rules for your use case?

In [None]:
def process_entry(self, entry):
    try:
        if not entry.transformation_required:
            logger.info(f"Skipping transformation for {entry.file_id} as it's not required.")
            return

        logger.info(f"Starting transformation for {entry.file_id} from source: {entry.source}")

        # Step 1: Download raw file content
        obj = self.storage.s3.get_object(Bucket=self.bucket, Key=entry.s3_raw_path)
        content = obj['Body'].read().decode()

        # Step 2: Transform based on the source
        if entry.source == "0_1":
            transformed_content = self.0_1_transformer.transform(content)
        elif entry.source == "0_0":
            transformed_content = self.0_0_transformer.transform(content)
        elif entry.source == "clinical_trials":
            transformed_content = self.clinical_trials_transformer.transform(content)
        else:
            logger.warning(f"Unknown source '{entry.source}' for {entry.file_id}. Skipping transformation.")
            return

        # Step 3: Upload transformed file to S3
        transformed_key = f"transformed/{entry.source}/{entry.file_id}.json"
        self.storage.s3.put_object(
            Bucket=self.bucket,
            Key=transformed_key,
            Body=json.dumps(transformed_content)
        )

        # Step 4: Update metadata
        entry.s3_transformed_path = transformed_key
        entry.status = "transformed"
        logger.info(f"Transformation complete for {entry.file_id}, uploaded to {transformed_key}")

    except Exception as e:
        logger.error(f"Error transforming {entry.file_id}: {str(e)}")
        entry.status = "error"


In [None]:
{
  "file_id": "abc123",
  "source": "0_0",
  "extraction_timestamp": "2025-05-22T14:30:00Z",
  "extracted_by": "eventbridge-ecs-task",
  "s3_raw_path": "raw/0_0/abc123.json",
  "transformation_required": true,
  "transformation_timestamp": "2025-05-22T14:45:00Z",
  "transformed_by": "lambda-transformer-v1",
  "s3_transformed_path": "transformed/0_0/abc123.json",
  "status": "transformed",  // could be "extracted", "transformed", "error"
  "error_message": null,
  "file_size_bytes": 34256,
  "content_type": "application/json",
  "checksum_md5": "d41d8cd98f00b204e9800998ecf8427e",
  "version": "1.0",
  "remarks": "Transformation completed successfully"
}
