# Data Engineering Case Study 
*Carla Mota Leal*

Imagine you are a data engineer working for AdvertiseX, a digital advertising technology company. AdvertiseX specializes in programmatic advertising and manages multiple online advertising campaigns for its clients. The company handles vast amounts of data generated by ad impressions, clicks, conversions, and more. Your role as a data engineer is to address the following challenges:

### Data Sources and Formats:

#### 1- Ad Impressions:

- Data Source: AdvertiseX serves digital ads to various online platforms and websites.
- Data Format: Ad impressions data is generated in JSON format, containing information such as ad creative ID, user ID, timestamp, and the website where the ad was displayed.

#### 2- Clicks and Conversions:

- Data Source: AdvertiseX tracks user interactions with ads, including clicks and conversions (e.g., sign-ups, purchases).
- Data Format: Click and conversion data is logged in CSV format and includes event timestamps, user IDs, ad campaign IDs, and conversion type.

#### 3- Bid Requests:

- Data Source: AdvertiseX participates in real-time bidding (RTB) auctions to serve ads to users.
- Data Format: Bid request data is received in a semi-structured format, mostly in Avro, and includes user information, auction details, and ad targeting criteria.

### Requirements: 

#### 1- Data Ingestion:

- Implement a scalable data ingestion system capable of collecting and processing ad impressions (JSON), clicks/conversions (CSV), and bid requests (Avro) data.
- Ensure that the ingestion system can handle high data volumes generated in real-time and batch modes.


In [2]:
import json
import csv
from fastavro import reader
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, JSON
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


def ingest_ad_impressions(data):
    print("Ingesting ad impressions data:")
    for impression in data:
        ad_impression = AdImpression(
            creative_id=impression["creative_id"],
            user_id=impression["user_id"],
            timestamp=impression["timestamp"],
            website=impression["website"]
        )
        session.add(ad_impression)
    session.commit()


def ingest_clicks_conversions(data):
    print("Ingesting clicks and conversions data:")
    for click_conversion in data:
        click_conversion["timestamp"] = datetime.strptime(click_conversion["timestamp"], "%Y-%m-%d %H:%M:%S")
        click_conversion = ClickConversion(
            timestamp=click_conversion["timestamp"],
            user_id=click_conversion["user_id"],
            campaign_id=click_conversion["campaign_id"],
            conversion_type=click_conversion["conversion_type"]
        )
        session.add(click_conversion)
    session.commit()


def ingest_bid_requests(data):
    print("Ingesting bid requests data:")
    for bid_request in data:
        user_info = json.dumps(bid_request["user_info"])
        auction_details = json.dumps(bid_request["auction_details"])
        ad_targeting = json.dumps(bid_request["ad_targeting"])
        bid_request = BidRequest(
            user_info=user_info,
            auction_details=auction_details,
            ad_targeting=ad_targeting
        )
        session.add(bid_request)
    session.commit()

if __name__ == "__main__":
    ad_impressions_data = [
        {
            "creative_id": 1,
            "user_id": 101,
            "timestamp": "2023-11-01 10:00:00",
            "website": "example.com"
        },
        {
            "creative_id": 2,
            "user_id": 102,
            "timestamp": "2023-11-01 10:05:00",
            "website": "example.org"
        }
    ]

    clicks_conversions_data = [
        {
            "timestamp": "2023-11-01 10:02:00",
            "user_id": 101,
            "campaign_id": 1,
            "conversion_type": "signup"
        },
        {
            "timestamp": "2023-11-01 10:06:00",
            "user_id": 102,
            "campaign_id": 2,
            "conversion_type": "purchase"
        }
    ]

    bid_requests_data = [
        {
            "user_info": {"user_id": 201, "location": "USA"},
            "auction_details": {"auction_id": 1, "bid_amount": 0.1},
            "ad_targeting": {"creative_id": 3, "website": "example.net"}
        },
        {
            "user_info": {"user_id": 202, "location": "Canada"},
            "auction_details": {"auction_id": 2, "bid_amount": 0.15},
            "ad_targeting": {"creative_id": 4, "website": "example.org"}
        }
    ]

    # connection to the PostgreSQL database
    engine = create_engine('postgresql://your_username:your_password@your_host:your_port/your_database')
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()


    ingest_ad_impressions(ad_impressions_data)
    ingest_clicks_conversions(clicks_conversions_data)
    ingest_bid_requests(bid_requests_data)


IndentationError: unindent does not match any outer indentation level (<tokenize>, line 57)

In this code, I have enhanced the data ingestion functions to process data from different sources:

1- ingest_ad_impressions handles JSON data. 

2- ingest_clicks_conversions handles CSV data. 

3- ingest_bid_requests handles Avro data. 

These functions convert the data into the appropriate format and store it in the database (PostgreSQL database on a cloud platform like Amazon RDS).

#### 2- Data Processing:

- Develop data transformation processes to standardize and enrich the data. Handle data validation, filtering, and deduplication.
- Implement logic to correlate ad impressions with clicks and conversions to provide meaningful insights.

In [None]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from your_data_models import AdImpression, ClickConversion

engine = create_engine('postgresql://your_username:your_password@your_host:your_port/your_database')
Session = sessionmaker(bind=engine)
session = Session()

def standardize_and_enrich_data():
    ad_impressions = session.query(AdImpression).all()
    for ad_impression in ad_impressions:
        if "additional_info" not in ad_impression:
            ad_impression.additional_info = "N/A"

        ad_impression.enriched_field = ad_impression.creative_id * 2

    session.commit()

def handle_data_validation_and_filtering():
    click_conversions = session.query(ClickConversion).filter(ClickConversion.timestamp > '2023-11-01 10:04:00').all()
    for click_conversion in click_conversions:
        if click_conversion.campaign_id < 2:
            click_conversion.filtered = True

    session.commit()

def deduplicate_data():
    click_conversions = session.query(ClickConversion).all()
    seen_user_ids = set()
    for click_conversion in click_conversions:
        if click_conversion.user_id in seen_user_ids:
            click_conversion.is_duplicate = True
        else:
            seen_user_ids.add(click_conversion.user_id)

    session.commit()

def correlate_data():
    ad_impressions = session.query(AdImpression).all()
    click_conversions = session.query(ClickConversion).all()

    for ad_impression in ad_impressions:
        for click_conversion in click_conversions:
            if ad_impression.user_id == click_conversion.user_id:
                ad_impression.click_conversion = click_conversion

    session.commit()

if __name__ == "__main__":
    standardize_and enrich_data()
    handle_data_validation_and_filtering()
    deduplicate_data()
    correlate_data()


> In summary, the code handles data standardization, enrichment, validation, filtering, deduplication, and correlation between ad impressions and click conversions, ensuring that the data is processed and stored in the database in a structured and meaningful way.

#### 3- Data Storage and Query Performance:

- Select an appropriate data storage solution for storing processed data efficiently, enabling fast querying for campaign performance analysis.
- Optimize the storage system for analytical queries and aggregations of ad campaign data.

> The choice of the storage system may depend on factors like the volume of data, query patterns, and the technology stack you're using. 

> I have chosen to store structured data (ad impressions and click conversions) in the PostgreSQL database and semi-structured data (bid requests) in MongoDB.

In [None]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from your_data_models import AdImpression, ClickConversion
from pymongo import MongoClient


pg_engine = create_engine('postgresql://your_username:your_password@your_host:your_port/your_database')
PgSession = sessionmaker(bind=pg_engine)
pg_session = PgSession()


mongo_conn_params = {
    "host": "your_mongodb_host",
    "port": your_mongodb_port
}
mongo_client = MongoClient(**mongo_conn_params)
mongo_db = mongo_client["your_mongodb_database"]


def store_ad_impressions(data):
    for impression in data:
        ad_impression = AdImpression(
            creative_id=impression["creative_id"],
            user_id=impression["user_id"],
            timestamp=impression["timestamp"],
            website=impression["website"]
        )
        pg_session.add(ad_impression)
    pg_session.commit()

def store_click_conversions(data):
    for click_conversion in data:
        click_conversion = ClickConversion(
            timestamp=click_conversion["timestamp"],
            user_id=click_conversion["user_id"],
            campaign_id=click_conversion["campaign_id"],
            conversion_type=click_conversion["conversion_type"]
        )
        pg_session.add(click_conversion)
    pg_session.commit()


def store_bid_requests(data):
    bid_requests_collection = mongo_db["bid_requests"]
    bid_requests_collection.insert_many(data)

if __name__ == "__main__":
    ad_impressions_data = [...]  
    click_conversions_data = [...]  
    bid_requests_data = [...]  

    store_ad_impressions(ad_impressions_data)
    store_click_conversions(click_conversions_data)
    store_bid_requests(bid_requests_data)


#### 4- Error Handling and Monitoring:

- Create an error handling and monitoring system to detect data anomalies, discrepancies, or delays.
- Implement alerting mechanisms to address data quality issues in real-time, ensuring that discrepancies are resolved promptly to maintain ad campaign effectiveness.

In [None]:
import logging
import time
from prometheus_client import start_http_server, Summary, Counter


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name)


DATA_PROCESSING_TIME = Summary('data_processing_time', 'Time taken for data processing')
ERROR_COUNT = Counter('data_processing_errors', 'Number of data processing errors')


@DATA_PROCESSING_TIME.time()
def process_data(data):
    try:
        if data < 0:
            raise ValueError("Negative value detected")


    except Exception as e:
        logger.error(f"Data processing error: {str(e)}")
        ERROR_COUNT.inc()


if __name__ == '__main__':
    start_http_server(8000)  

    while True:
        try:
            data = 10

            process_data(data)

            time.sleep(1)
        except KeyboardInterrupt:
            break
        except Exception as e:
            logger.error(f"Unhandled error: {str(e)}")


    logger.info("Shutting down...")


> With this setup, I use Prometheus to monitor and set up alerts based on these metrics, ensuring that it can detect and respond to data quality issues in real-time.