In [1]:
!pip install pandas fastavro sqlalchemy psycopg2

Collecting fastavro
  Downloading fastavro-1.9.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: fastavro
Successfully installed fastavro-1.9.4


In [2]:
import json
import pandas as pd
from sqlalchemy import create_engine

def ingest_ad_impressions(json_file_path, db_engine):
    with open(json_file_path, 'r') as file:
        data = json.load(file)

    impressions_df = pd.json_normalize(data)
    impressions_df.to_sql('ad_impressions', db_engine, if_exists='append', index=False)


In [3]:
def ingest_clicks_conversions(csv_file_path, db_engine):
    clicks_conversions_df = pd.read_csv(csv_file_path)
    clicks_conversions_df.to_sql('clicks_conversions', db_engine, if_exists='append', index=False)


In [4]:
import fastavro

def ingest_bid_requests(avro_file_path, db_engine):
    with open(avro_file_path, 'rb') as file:
        reader = fastavro.reader(file)
        records = [record for record in reader]

    bid_requests_df = pd.DataFrame(records)
    bid_requests_df.to_sql('bid_requests', db_engine, if_exists='append', index=False)


In [5]:
def process_data(db_engine):
    # Read data from the database
    impressions_df = pd.read_sql('ad_impressions', db_engine)
    clicks_conversions_df = pd.read_sql('clicks_conversions', db_engine)

    # Data validation and filtering
    impressions_df.dropna(subset=['ad_creative_id', 'user_id', 'timestamp'], inplace=True)
    clicks_conversions_df.dropna(subset=['event_timestamp', 'user_id', 'ad_campaign_id', 'conversion_type'], inplace=True)

    # Deduplication
    impressions_df.drop_duplicates(inplace=True)
    clicks_conversions_df.drop_duplicates(inplace=True)

    # Correlate ad impressions with clicks and conversions
    merged_df = pd.merge(impressions_df, clicks_conversions_df, on='user_id', how='inner')
    merged_df.to_sql('processed_data', db_engine, if_exists='replace', index=False)


In [6]:
def create_database_connection(db_url):
    engine = create_engine(db_url)
    return engine


In [7]:
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def monitor_data_quality(db_engine):
    query = """
    SELECT COUNT(*) as count, 'ad_impressions' as table_name FROM ad_impressions
    UNION
    SELECT COUNT(*) as count, 'clicks_conversions' as table_name FROM clicks_conversions
    UNION
    SELECT COUNT(*) as count, 'bid_requests' as table_name FROM bid_requests
    """
    result = pd.read_sql(query, db_engine)

    for index, row in result.iterrows():
        if row['count'] == 0:
            logger.error(f"No data found in {row['table_name']} table.")
        else:
            logger.info(f"Table {row['table_name']} has {row['count']} records.")


In [None]:
def main():
    # Database connection URL
    db_url = 'postgresql://username:password@localhost:5432/advertisex'

    # Create database connection
    db_engine = create_database_connection(db_url)

    # Ingest data
    ingest_ad_impressions('ad_impressions.json', db_engine)
    ingest_clicks_conversions('clicks_conversions.csv', db_engine)
    ingest_bid_requests('bid_requests.avro', db_engine)

    # Process data
    process_data(db_engine)

    # Monitor data quality
    monitor_data_quality(db_engine)

if __name__ == "__main__":
    main()
