## 1. Data Ingestion and 2. Data Processing:

In [23]:
!pip install avro-python3



In [24]:
# Importing necessary libraries
import json
import csv
import random
import string
import datetime
from avro.io import DatumWriter
from avro.datafile import DataFileWriter

# Define the DataIngester class
class DataIngester:
    def __init__(self):
        # Initialize any necessary resources or connections here
        pass

    def ingest_ad_impressions(self, json_data):
        # Process ad impressions JSON data
        for impression in json_data:
            # Perform ingestion operations (e.g., store in database, queue for further processing)
            print("Ingesting ad impression:", impression)

    def ingest_clicks_conversions(self, data):
        # Process clicks/conversions data
        for conversion in data:
            # Perform ingestion operations
            print("Ingesting click/conversion:", conversion)

    def ingest_bid_requests(self, data):
        # Process bid requests data
        for bid_request in data:
            # Perform ingestion operations
            print("Ingesting bid request:", bid_request)

    def generate_sample_clicks_conversions(self, num_records):
        # Generate sample clicks/conversions data
        conversions = []
        for _ in range(num_records):
            conversion = {
                "user_id": ''.join(random.choices(string.ascii_uppercase + string.digits, k=5)),
                "timestamp": str(datetime.datetime.now() - datetime.timedelta(minutes=random.randint(1, 60))),
                "ad_campaign_id": random.randint(1, 5),
                "conversion_type": random.choice(["sign-up", "purchase", "download"])
            }
            conversions.append(conversion)

        return conversions

    def generate_sample_bid_requests(self, num_requests):
        # Generate sample bid requests data
        bid_requests = []
        for _ in range(num_requests):
            bid_request = {
                "user_id": ''.join(random.choices(string.ascii_uppercase + string.digits, k=5)),
                "auction_id": ''.join(random.choices(string.ascii_uppercase + string.digits, k=10)),
                "ad_targeting_criteria": {
                    "age": random.randint(18, 65),
                    "gender": random.choice(["male", "female", "other"]),
                    "location": random.choice(["USA", "UK", "Canada", "Australia"])
                }
            }
            bid_requests.append(bid_request)

        return bid_requests

# Define the DataProcessor class
class DataProcessor:
    def __init__(self):
        pass

    def standardize_timestamp(self, timestamp):
        return datetime.datetime.fromisoformat(timestamp)

    def enrich_data(self, data):
        # Placeholder for data enrichment logic
        return data

    def validate_data(self, data):
        for item in data:
            if 'user_id' not in item:
                raise ValueError("Missing required field: user_id")
            if 'timestamp' not in item:
                raise ValueError("Missing required field: timestamp")
        return data

    def filter_data(self, data):
        # Placeholder for data filtering logic
        return data

    def deduplicate_data(self, data):
        # Placeholder for data deduplication logic
        return data

    def correlate_data(self, impressions, clicks_conversions, time_window=3600):
        correlated_data = []
        for impression in impressions:
            impression_user_id = impression['user_id']
            impression_timestamp = self.standardize_timestamp(impression['timestamp'])
            for click_conversion in clicks_conversions:
                click_conversion_user_id = click_conversion['user_id']
                click_conversion_timestamp = self.standardize_timestamp(click_conversion['timestamp'])
                if impression_user_id == click_conversion_user_id:
                    time_difference = abs((impression_timestamp - click_conversion_timestamp).total_seconds())
                    if time_difference <= time_window:
                        correlated_data.append({'impression': impression, 'click_conversion': click_conversion})
                        break  # Once a match is found, break out of the inner loop
        return correlated_data


if __name__ == "__main__":
    ingester = DataIngester()

    # Example ingesting ad impressions data
    ad_impressions_json = [
        {"ad_creative_id": 1, "user_id": "101", "timestamp": "2024-05-02T10:00:00", "website": "example.com"},
        {"ad_creative_id": 2, "user_id": "102", "timestamp": "2024-05-02T11:00:00", "website": "example.net"}
    ]
    ingester.ingest_ad_impressions(ad_impressions_json)

    # Example ingesting sample clicks/conversions data
    sample_clicks_conversions = ingester.generate_sample_clicks_conversions(10)  # Increased to 10 records
    ingester.ingest_clicks_conversions(sample_clicks_conversions)

    # Example ingesting sample bid request data
    sample_bid_requests = ingester.generate_sample_bid_requests(3)
    ingester.ingest_bid_requests(sample_bid_requests)

    # Data processing
    processor = DataProcessor()

    # Sample data
    ad_impressions_json = [
        {"ad_creative_id": 1, "user_id": "101", "timestamp": "2024-05-02T10:00:00", "website": "example.com"},
        {"ad_creative_id": 2, "user_id": "102", "timestamp": "2024-05-02T11:00:00", "website": "example.net"}
    ]
    clicks_conversions = [
        {"user_id": "101", "timestamp": "2024-05-02T10:05:00", "ad_campaign_id": 1, "conversion_type": "sign-up"},
        {"user_id": "102", "timestamp": "2024-05-02T11:15:00", "ad_campaign_id": 2, "conversion_type": "purchase"}
    ]

    # Data processing
    standardized_impressions = [processor.standardize_timestamp(impression['timestamp']) for impression in ad_impressions_json]
    enriched_impressions = processor.enrich_data(ad_impressions_json)
    validated_impressions = processor.validate_data(enriched_impressions)
    filtered_impressions = processor.filter_data(validated_impressions)
    deduplicated_impressions = processor.deduplicate_data(filtered_impressions)

    standardized_clicks_conversions = [processor.standardize_timestamp(cc['timestamp']) for cc in clicks_conversions]
    enriched_clicks_conversions = processor.enrich_data(clicks_conversions)
    validated_clicks_conversions = processor.validate_data(enriched_clicks_conversions)
    filtered_clicks_conversions = processor.filter_data(validated_clicks_conversions)
    deduplicated_clicks_conversions = processor.deduplicate_data(filtered_clicks_conversions)

    # Correlation
    correlated_data = processor.correlate_data(deduplicated_impressions, deduplicated_clicks_conversions)
    print("Correlated data:", correlated_data)


Ingesting ad impression: {'ad_creative_id': 1, 'user_id': '101', 'timestamp': '2024-05-02T10:00:00', 'website': 'example.com'}
Ingesting ad impression: {'ad_creative_id': 2, 'user_id': '102', 'timestamp': '2024-05-02T11:00:00', 'website': 'example.net'}
Ingesting click/conversion: {'user_id': 'HOSIV', 'timestamp': '2024-05-02 18:16:53.786989', 'ad_campaign_id': 5, 'conversion_type': 'download'}
Ingesting click/conversion: {'user_id': 'VS50G', 'timestamp': '2024-05-02 18:19:53.787059', 'ad_campaign_id': 2, 'conversion_type': 'download'}
Ingesting click/conversion: {'user_id': 'JWW2S', 'timestamp': '2024-05-02 17:55:53.787071', 'ad_campaign_id': 1, 'conversion_type': 'sign-up'}
Ingesting click/conversion: {'user_id': '1KGLL', 'timestamp': '2024-05-02 18:27:53.787081', 'ad_campaign_id': 4, 'conversion_type': 'sign-up'}
Ingesting click/conversion: {'user_id': '6P7MH', 'timestamp': '2024-05-02 18:06:53.787090', 'ad_campaign_id': 3, 'conversion_type': 'purchase'}
Ingesting click/conversion: 

## 3.Data Storage and Query Performance


In [25]:
!apt install postgresql postgresql-contrib

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
postgresql is already the newest version (14+238).
postgresql-contrib is already the newest version (14+238).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [26]:
!service postgresql start

 * Starting PostgreSQL 14 database server
   ...done.


In [28]:
!pip install psycopg2-binary



In [13]:
!sudo -u postgres psql -c "CREATE DATABASE pesto_techi;"
!sudo -u postgres psql -c "CREATE USER jebin12 WITH PASSWORD 'jebin@055';"
!sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE pesto_tech TO jebin12;"

ERROR:  database "pesto_techi" already exists
ERROR:  role "jebin12" already exists
GRANT


In [30]:
import datetime

class DataProcessor:
    def __init__(self):
        pass

    def standardize_timestamp(self, timestamp):
        return datetime.datetime.fromisoformat(timestamp)

    def enrich_data(self, data):
        # Placeholder for data enrichment logic
        return data

    def validate_data(self, data):
        for item in data:
            if 'user_id' not in item:
                raise ValueError("Missing required field: user_id")
            if 'timestamp' not in item:
                raise ValueError("Missing required field: timestamp")
        return data

    def filter_data(self, data):
        # Placeholder for data filtering logic
        return data

    def deduplicate_data(self, data):
        # Placeholder for data deduplication logic
        return data

    def correlate_data(self, impressions, clicks_conversions, time_window=3600):
        correlated_data = []
        for impression in impressions:
            impression_user_id = impression['user_id']
            impression_timestamp = self.standardize_timestamp(impression['timestamp'])
            for click_conversion in clicks_conversions:
                click_conversion_user_id = click_conversion['user_id']
                click_conversion_timestamp = self.standardize_timestamp(click_conversion['timestamp'])
                if impression_user_id == click_conversion_user_id:
                    time_difference = abs((impression_timestamp - click_conversion_timestamp).total_seconds())
                    if time_difference <= time_window:
                        correlated_data.append({'impression': impression, 'click_conversion': click_conversion})
                        break  # Once a match is found, break out of the inner loop
        return correlated_data

# Creating an instance of DataProcessor
processor = DataProcessor()

# Sample data
ad_impressions_json = [
    {"ad_creative_id": 1, "user_id": "101", "timestamp": "2024-05-02T10:00:00", "website": "example.com"},
    {"ad_creative_id": 2, "user_id": "102", "timestamp": "2024-05-02T11:00:00", "website": "example.net"}
]
clicks_conversions = [
    {"user_id": "101", "timestamp": "2024-05-02T10:05:00", "ad_campaign_id": 1, "conversion_type": "sign-up"},
    {"user_id": "102", "timestamp": "2024-05-02T11:15:00", "ad_campaign_id": 2, "conversion_type": "purchase"}
]

# Data processing
standardized_impressions = [processor.standardize_timestamp(impression['timestamp']) for impression in ad_impressions_json]
enriched_impressions = processor.enrich_data(ad_impressions_json)
validated_impressions = processor.validate_data(enriched_impressions)
filtered_impressions = processor.filter_data(validated_impressions)
deduplicated_impressions = processor.deduplicate_data(filtered_impressions)

standardized_clicks_conversions = [processor.standardize_timestamp(cc['timestamp']) for cc in clicks_conversions]
enriched_clicks_conversions = processor.enrich_data(clicks_conversions)
validated_clicks_conversions = processor.validate_data(enriched_clicks_conversions)
filtered_clicks_conversions = processor.filter_data(validated_clicks_conversions)
deduplicated_clicks_conversions = processor.deduplicate_data(filtered_clicks_conversions)

# Correlation
correlated_data = processor.correlate_data(deduplicated_impressions, deduplicated_clicks_conversions)
print("Correlated data:", correlated_data)


Correlated data: [{'impression': {'ad_creative_id': 1, 'user_id': '101', 'timestamp': '2024-05-02T10:00:00', 'website': 'example.com'}, 'click_conversion': {'user_id': '101', 'timestamp': '2024-05-02T10:05:00', 'ad_campaign_id': 1, 'conversion_type': 'sign-up'}}, {'impression': {'ad_creative_id': 2, 'user_id': '102', 'timestamp': '2024-05-02T11:00:00', 'website': 'example.net'}, 'click_conversion': {'user_id': '102', 'timestamp': '2024-05-02T11:15:00', 'ad_campaign_id': 2, 'conversion_type': 'purchase'}}]


In [31]:
# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname='pesto_techi',
    user='jebin12',
    password='jebin@055',
    host='localhost',
    port='5432'
)

# Create a cursor object
cur = conn.cursor()

# Execute SQL command to create the click_conversions table
cur.execute("""
    CREATE TABLE IF NOT EXISTS click_conversions (
        user_id VARCHAR(50),
        timestamp TIMESTAMP,
        ad_campaign_id INT,
        conversion_type VARCHAR(50)
    );
""")

# Insert correlated data into the ad_impressions and click_conversions tables
for pair in correlated_data:
    impression = pair['impression']
    click_conversion = pair['click_conversion']
    cur.execute("""
        INSERT INTO ad_impressions (ad_creative_id, user_id, timestamp, website)
        VALUES (%s, %s, %s, %s);
    """, (impression['ad_creative_id'], impression['user_id'], impression['timestamp'], impression['website']))
    cur.execute("""
        INSERT INTO click_conversions (user_id, timestamp, ad_campaign_id, conversion_type)
        VALUES (%s, %s, %s, %s);
    """, (click_conversion['user_id'], click_conversion['timestamp'], click_conversion['ad_campaign_id'], click_conversion['conversion_type']))

# Commit the transaction
conn.commit()

# Close the cursor and connection
cur.close()
conn.close()


In [32]:
import psycopg2

# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname='pesto_techi',
    user='jebin12',
    password='jebin@055',
    host='localhost',
    port='5432'
)

# Create a cursor object
cur = conn.cursor()

# Execute SQL queries to fetch data from ad_impressions and click_conversions tables
cur.execute("SELECT * FROM ad_impressions;")
impression_rows = cur.fetchall()

cur.execute("SELECT * FROM click_conversions;")
conversion_rows = cur.fetchall()

# Print fetched data from ad_impressions table
print("Data from ad_impressions table:")
for row in impression_rows:
    print(row)

# Print fetched data from click_conversions table
print("\nData from click_conversions table:")
for row in conversion_rows:
    print(row)

# Close the cursor and connection
cur.close()
conn.close()


Data from ad_impressions table:
(1, '101', datetime.datetime(2024, 5, 2, 10, 0), 'example.com')
(2, '102', datetime.datetime(2024, 5, 2, 11, 0), 'example.net')
(1, '101', datetime.datetime(2024, 5, 2, 10, 0), 'example.com')
(2, '102', datetime.datetime(2024, 5, 2, 11, 0), 'example.net')
(1, '101', datetime.datetime(2024, 5, 2, 10, 0), 'example.com')
(2, '102', datetime.datetime(2024, 5, 2, 11, 0), 'example.net')
(1, '101', datetime.datetime(2024, 5, 2, 10, 0), 'example.com')
(2, '102', datetime.datetime(2024, 5, 2, 11, 0), 'example.net')

Data from click_conversions table:
('101', datetime.datetime(2024, 5, 2, 10, 5), 1, 'sign-up')
('102', datetime.datetime(2024, 5, 2, 11, 15), 2, 'purchase')
('101', datetime.datetime(2024, 5, 2, 10, 5), 1, 'sign-up')
('102', datetime.datetime(2024, 5, 2, 11, 15), 2, 'purchase')


In [34]:
!PGPASSWORD=jebin@055 psql -h localhost -U jebin12 -d pesto_techi -c "\copy (SELECT * FROM ad_impressions) TO '/content/ad_impressions.csv' CSV HEADER;"


COPY 8


In [35]:
!gcloud auth login


Go to the following link in your browser, and complete the sign-in prompts:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=32555940559.apps.googleusercontent.com&redirect_uri=https%3A%2F%2Fsdk.cloud.google.com%2Fauthcode.html&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fappengine.admin+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=IHfbkMudR191d8rrd75Q8lpkFs1VGy&prompt=consent&token_usage=remote&access_type=offline&code_challenge=ok1NVbPti-r3vx-zNftaNg41BMPVnq4cCdqn-AbVjyo&code_challenge_method=S256

Once finished, enter the verification code provided in your browser: 4/0AdLIrYfd0-cGVhhKSiHJ3dNQb-ijSb2YhsclAk-mobgnXF3NfX4NKKvOXMd93Jtdh9qQRQ

You are now logged in as [jebineabraham94@gmail.com].
Your current proj

In [40]:
!gcloud config set project jebine



Updated property [core/project].


In [42]:
!gsutil cp /content/ad_impressions.csv gs://datas_for_projects/destination/


Copying file:///content/ad_impressions.csv [Content-Type=text/csv]...
-
Operation completed over 1 objects/345.0 B.                                      


In [43]:
!gsutil ls gs://datas_for_projects/destination/


gs://datas_for_projects/destination/ad_impressions.csv


## 4. Error Handling and Monitoring:

In [45]:
class DataProcessor:
    def __init__(self):
        pass

    def enrich_data(self, data):
        # Perform data enrichment, e.g., add additional information based on user_id or ad_creative_id
        for item in data:
            # Example: Enrich with user demographics based on user_id
            item['user_demographics'] = self.get_user_demographics(item['user_id'])
        return data

    def aggregate_data(self, data):
        # Perform data aggregation, e.g., calculate total impressions or conversions per ad campaign
        aggregated_data = {}
        for item in data:
            ad_campaign_id = item['ad_campaign_id']
            if ad_campaign_id not in aggregated_data:
                aggregated_data[ad_campaign_id] = {'total_impressions': 0, 'total_conversions': 0}
            aggregated_data[ad_campaign_id]['total_impressions'] += 1
            if 'conversion_type' in item:
                aggregated_data[ad_campaign_id]['total_conversions'] += 1
        return aggregated_data

    def get_user_demographics(self, user_id):
        # Placeholder for user demographics retrieval
        # You can implement logic to fetch user demographics from a database or external API
        return {"age": 25, "gender": "male", "location": "USA"}


# Creating an instance of DataProcessor
processor = DataProcessor()

# Example data
clicks_conversions = [
    {"user_id": "101", "timestamp": "2024-05-02T10:05:00", "ad_campaign_id": 1, "conversion_type": "sign-up"},
    {"user_id": "102", "timestamp": "2024-05-02T11:15:00", "ad_campaign_id": 2, "conversion_type": "purchase"}
]

# Enriching data
enriched_clicks_conversions = processor.enrich_data(clicks_conversions)

# Aggregating data
aggregated_data = processor.aggregate_data(enriched_clicks_conversions)
print("Aggregated data:", aggregated_data)


Aggregated data: {1: {'total_impressions': 1, 'total_conversions': 1}, 2: {'total_impressions': 1, 'total_conversions': 1}}


In [47]:
# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname='pesto_techi',
    user='jebin12',
    password='jebin@055',
    host='localhost',
    port='5432'
)

# Create a cursor object
cur = conn.cursor()

# Example: Adding index to ad_impressions table for faster retrieval
cur.execute("CREATE INDEX ON ad_impressions(ad_creative_id);")

# Commit the transaction
conn.commit()

# Close the cursor and connection
cur.close()
conn.close()


In [48]:
import subprocess

# Define pipeline tasks as shell commands
tasks = [
    "python data_processing_script.py",  # Run data processing script
    "python analysis_script.py",          # Run analysis script
    "python visualization_script.py"      # Run visualization script
]

# Execute pipeline tasks sequentially
for task in tasks:
    subprocess.run(task, shell=True)


In [54]:
# Install virtualenv
!pip install virtualenv

# Create a new directory for your project
!mkdir airflow_project

# Navigate to the project directory
%cd airflow_project

# Create a virtual environment
!virtualenv venv

# Activate the virtual environment
!source venv/bin/activate

# Install Apache Airflow in the virtual environment
!pip install apache-airflow


Collecting virtualenv
  Downloading virtualenv-20.26.1-py3-none-any.whl (3.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m16.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting distlib<1,>=0.3.7 (from virtualenv)
  Downloading distlib-0.3.8-py2.py3-none-any.whl (468 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m468.9/468.9 kB[0m [31m38.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: distlib, virtualenv
Successfully installed distlib-0.3.8 virtualenv-20.26.1
/content/airflow_project
created virtual environment CPython3.10.12.final.0-64 in 1268ms
  creator CPython3Posix(dest=/content/airflow_project/venv, clear=False, no_vcs_ignore=False, global=False)
  seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/root/.local/share/virtualenv)
    added seed packages: pip==24.0, setuptools==69.5.1, wheel==0.43.0
  activators BashActivator,CShellActivator,FishActivator

In [55]:
!pip install apache-airflow --ignore-installed blinker



Collecting apache-airflow
  Using cached apache_airflow-2.9.0-py3-none-any.whl (13.3 MB)
Collecting blinker
  Using cached blinker-1.8.1-py3-none-any.whl (9.5 kB)
Collecting alembic<2.0,>=1.13.1 (from apache-airflow)
  Using cached alembic-1.13.1-py3-none-any.whl (233 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Using cached argcomplete-3.3.0-py3-none-any.whl (42 kB)
Collecting asgiref (from apache-airflow)
  Using cached asgiref-3.8.1-py3-none-any.whl (23 kB)
Collecting attrs>=22.1.0 (from apache-airflow)
  Downloading attrs-23.2.0-py3-none-any.whl (60 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.8/60.8 kB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting colorlog<5.0,>=4.0.2 (from apache-airflow)
  Using cached colorlog-4.8.0-py2.py3-none-any.whl (10 kB)
Collecting configupdater>=3.1.1 (from apache-airflow)
  Using cached ConfigUpdater-3.2-py2.py3-none-any.whl (34 kB)
Collecting connexion[flask]<3.0,>=2.10.0 (from apache-airflow)
  Usin

In [5]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

# Define your DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2022, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

dag = DAG(
    'my_dag',
    default_args=default_args,
    description='A simple DAG',
    schedule='@daily',  # Use schedule instead of schedule_interval
)

# Define your tasks
def my_python_function():
    print("Running Python function")

task1 = PythonOperator(
    task_id='python_task',
    python_callable=my_python_function,
    dag=dag,
)

task2 = BashOperator(
    task_id='bash_task',
    bash_command='echo "Running bash command"',
    dag=dag,
)

task1 >> task2  # Define task dependencies


<Task(BashOperator): bash_task>

In [6]:
import psycopg2

# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname='pesto_techi',
    user='jebin12',
    password='jebin@055',
    host='localhost',
    port='5432'
)

# Create a cursor object
cur = conn.cursor()

# Execute SQL command to create the ad_impressions table
cur.execute("""
    CREATE TABLE IF NOT EXISTS ad_impressions (
        ad_creative_id INT,
        user_id VARCHAR(50),
        timestamp TIMESTAMP,
        website VARCHAR(100)
    );
""")

# Execute SQL command to create the click_conversions table
cur.execute("""
    CREATE TABLE IF NOT EXISTS click_conversions (
        user_id VARCHAR(50),
        timestamp TIMESTAMP,
        ad_campaign_id INT,
        conversion_type VARCHAR(50)
    );
""")

# Create indexes for faster retrieval
cur.execute("CREATE INDEX ON ad_impressions(ad_creative_id);")
cur.execute("CREATE INDEX ON click_conversions(ad_campaign_id);")

# Commit the transaction
conn.commit()

# Close the cursor and connection
cur.close()
conn.close()


In [9]:
import psycopg2

# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname='pesto_techi',
    user='jebin12',
    password='jebin@055',
    host='localhost',
    port='5432'
)

# Create a cursor object
cur = conn.cursor()

# Clear existing data from the click_conversions table
cur.execute("DELETE FROM click_conversions;")

# Commit the transaction
conn.commit()

# Execute SQL command to insert data into the click_conversions table if it doesn't exist
cur.execute("""
    INSERT INTO click_conversions (user_id, timestamp, ad_campaign_id, conversion_type)
    SELECT * FROM (VALUES
        ('101', '2024-05-02T10:05:00'::timestamp, 1, 'sign-up'),
        ('102', '2024-05-02T11:15:00'::timestamp, 2, 'purchase')
    ) AS data (user_id, timestamp, ad_campaign_id, conversion_type)
    WHERE NOT EXISTS (
        SELECT 1 FROM click_conversions WHERE user_id = data.user_id AND timestamp = data.timestamp::timestamp
    );
""")

# Commit the transaction
conn.commit()

# Execute SQL query to fetch data from the click_conversions table
cur.execute("SELECT * FROM click_conversions;")
rows = cur.fetchall()

# Print fetched data from the click_conversions table
print("Data from click_conversions table:")
for row in rows:
    print(row)

# Close the cursor and connection
cur.close()
conn.close()


Data from click_conversions table:
('101', datetime.datetime(2024, 5, 2, 10, 5), 1, 'sign-up')
('102', datetime.datetime(2024, 5, 2, 11, 15), 2, 'purchase')


In [10]:
import psycopg2

# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname='pesto_techi',
    user='jebin12',
    password='jebin@055',
    host='localhost',
    port='5432'
)

# Create a cursor object
cur = conn.cursor()

# Define SQL commands to create tables for ad impressions and clicks/conversions
create_ad_impressions_table = """
    CREATE TABLE IF NOT EXISTS ad_impressions (
        ad_creative_id INT,
        user_id VARCHAR(50),
        impression_timestamp TIMESTAMP,
        website VARCHAR(100)
    );
"""

create_clicks_conversions_table = """
    CREATE TABLE IF NOT EXISTS clicks_conversions (
        user_id VARCHAR(50),
        click_conversion_timestamp TIMESTAMP,
        ad_campaign_id INT,
        conversion_type VARCHAR(50)
    );
"""

# Execute SQL commands to create tables
cur.execute(create_ad_impressions_table)
cur.execute(create_clicks_conversions_table)

# Commit the transaction
conn.commit()

# Close the cursor and connection
cur.close()
conn.close()


In [11]:
import psycopg2

# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname='pesto_techi',
    user='jebin12',
    password='jebin@055',
    host='localhost',
    port='5432'
)

# Create a cursor object
cur = conn.cursor()

# Define SQL commands to create indexes on ad_impressions and clicks_conversions tables
create_ad_impressions_index = """
    CREATE INDEX IF NOT EXISTS ad_impressions_user_id_idx ON ad_impressions(user_id);
"""

create_clicks_conversions_index = """
    CREATE INDEX IF NOT EXISTS clicks_conversions_user_id_idx ON clicks_conversions(user_id);
"""

# Execute SQL commands to create indexes
cur.execute(create_ad_impressions_index)
cur.execute(create_clicks_conversions_index)

# Commit the transaction
conn.commit()

# Close the cursor and connection
cur.close()
conn.close()


In [12]:
import logging
import sys

# Define a custom logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Create a file handler and set its level to DEBUG
file_handler = logging.FileHandler('data_processing.log')
file_handler.setLevel(logging.DEBUG)

# Create a console handler and set its level to ERROR
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.ERROR)

# Create a formatter and set it for both handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

# Add the handlers to the logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)


# Error Alerting
def send_slack_notification(message):
    # Placeholder for sending Slack notifications
    logger.error(f"Error occurred: {message}")
    # Add code to send Slack notification here


# Data Quality Checks
def perform_data_quality_checks(data):
    # Placeholder for data quality checks
    for item in data:
        if not item.get('user_id'):
            send_slack_notification("Missing user_id in data")
            logger.error("Missing user_id in data")
        if not item.get('timestamp'):
            send_slack_notification("Missing timestamp in data")
            logger.error("Missing timestamp in data")
        # Add more data quality checks as needed


# Example of using the logger and error alerting in the data processing pipeline
def process_data(data):
    try:
        # Perform data processing operations
        # Example: Enrich data
        enriched_data = enrich_data(data)

        # Example: Validate data
        validated_data = validate_data(enriched_data)

        # Example: Perform data quality checks
        perform_data_quality_checks(validated_data)

        # Example: Store processed data in database
        store_data(validated_data)
    except Exception as e:
        # Handle exceptions and send error alert
        send_slack_notification(f"An error occurred during data processing: {str(e)}")
        logger.exception("An error occurred during data processing", exc_info=True)


# Example function to simulate data processing operations
def enrich_data(data):
    # Placeholder for data enrichment logic
    return data


# Example function to simulate data validation
def validate_data(data):
    # Placeholder for data validation logic
    return data


# Example function to store processed data in database
def store_data(data):
    # Placeholder for storing data in database
    pass


# Example usage of the data processing pipeline
sample_data = [
    {"user_id": "101", "timestamp": "2024-05-02T10:00:00", "ad_campaign_id": 1},
    {"user_id": "", "timestamp": "2024-05-02T11:00:00", "ad_campaign_id": 2},
    {"user_id": "103", "timestamp": "", "ad_campaign_id": 3}
]

process_data(sample_data)


2024-05-02 19:55:50,684 - __main__ - ERROR - Error occurred: Missing user_id in data
[[34m2024-05-02T19:55:50.684+0000[0m] {[34m<ipython-input-12-255a1aa61a42>:[0m29} ERROR[0m - Error occurred: Missing user_id in data[0m
2024-05-02 19:55:50,690 - __main__ - ERROR - Missing user_id in data
[[34m2024-05-02T19:55:50.690+0000[0m] {[34m<ipython-input-12-255a1aa61a42>:[0m39} ERROR[0m - Missing user_id in data[0m
2024-05-02 19:55:50,694 - __main__ - ERROR - Error occurred: Missing timestamp in data
[[34m2024-05-02T19:55:50.694+0000[0m] {[34m<ipython-input-12-255a1aa61a42>:[0m29} ERROR[0m - Error occurred: Missing timestamp in data[0m
2024-05-02 19:55:50,699 - __main__ - ERROR - Missing timestamp in data
[[34m2024-05-02T19:55:50.699+0000[0m] {[34m<ipython-input-12-255a1aa61a42>:[0m42} ERROR[0m - Missing timestamp in data[0m
