# Take Home Assessment

# 1. Data Ingestion:

In [1]:
import json
import pandas as pd
from fastavro import reader

# Read JSON data
def read_json(file_path):
    with open(file_path, 'r') as json_file:
        data = json.load(json_file)
    return data

json_data = read_json('ad_impressions.json')
json_data

[{'ad_id': 82,
  'user_id': 1117,
  'timestamp': '2024-04-08 14:05:06',
  'website': 'testsite.com'},
 {'ad_id': 77,
  'user_id': 1245,
  'timestamp': '2024-04-08 14:05:06',
  'website': 'testsite.com'},
 {'ad_id': 58,
  'user_id': 1860,
  'timestamp': '2024-04-08 14:05:06',
  'website': 'samplepage.com'},
 {'ad_id': 63,
  'user_id': 1546,
  'timestamp': '2024-04-08 14:05:06',
  'website': 'samplepage.com'},
 {'ad_id': 12,
  'user_id': 1802,
  'timestamp': '2024-04-08 14:05:06',
  'website': 'example.com'},
 {'ad_id': 69,
  'user_id': 1834,
  'timestamp': '2024-04-08 14:05:06',
  'website': 'testsite.com'},
 {'ad_id': 3,
  'user_id': 1944,
  'timestamp': '2024-04-08 14:05:06',
  'website': 'testsite.com'},
 {'ad_id': 63,
  'user_id': 1292,
  'timestamp': '2024-04-08 14:05:06',
  'website': 'example.com'},
 {'ad_id': 33,
  'user_id': 1954,
  'timestamp': '2024-04-08 14:05:06',
  'website': 'example.com'},
 {'ad_id': 32,
  'user_id': 1251,
  'timestamp': '2024-04-08 14:05:06',
  'website

In [2]:
# Read CSV data
def read_csv(file_path):
    data = pd.read_csv(file_path)
    return data

csv_data = read_csv('clicks_conversions.csv')
csv_data

Unnamed: 0,user_id,campaign_id,timestamp,conversion_type
0,1237,7,2024-04-08 14:05:25,signup
1,1137,7,2024-04-08 14:05:25,signup
2,1326,10,2024-04-08 14:05:25,purchase
3,1854,6,2024-04-08 14:05:25,download
4,1016,7,2024-04-08 14:05:25,download
5,1012,7,2024-04-08 14:05:25,purchase
6,1137,5,2024-04-08 14:05:25,download
7,1153,9,2024-04-08 14:05:25,signup
8,1185,9,2024-04-08 14:05:25,purchase
9,1463,3,2024-04-08 14:05:25,download


In [3]:
# Read Avro data
def read_avro(file_path):
    with open(file_path, 'rb') as avro_file:
        reader_avro = reader(avro_file)
        data = [record for record in reader_avro]
    return data

avro_data = read_avro('bid_requests.avro')
avro_data

[{'user_id': 1442, 'auction_id': 'JGODK', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1096, 'auction_id': 'ZNRQD', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1627, 'auction_id': 'BHAKQ', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1977, 'auction_id': 'UWYOH', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1093, 'auction_id': 'VSWJB', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1342, 'auction_id': 'EDREI', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1380, 'auction_id': 'JRNPX', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1298, 'auction_id': 'AOGTY', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1538, 'auction_id': 'WUWYV', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1820, 'auction_id': 'YIEPC', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1741, 'auction_id': 'MWLRK', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1259, 'auction_id': 'UPJMN', 'timestamp': '2024-04-08 14:07:26'},
 {'user_id': 1977, 'auction_id': 'XUALZ', 'timestamp

# 2. Data Processing:

In [4]:
import pandas as pd

def convert_json_to_df(json_data):
    json_df = pd.DataFrame(json_data)
    json_df['timestamp'] = pd.to_datetime(json_df['timestamp'])
    return json_df


In [5]:
def convert_csv_to_df(csv_data):
    csv_df = pd.DataFrame(csv_data)
    csv_df['timestamp'] = pd.to_datetime(csv_df['timestamp'])
    return csv_df


In [6]:
def combine_data(json_df, csv_df):
    combined_data = pd.merge(json_df, csv_df, on='user_id', how='inner')
    return combined_data


In [7]:
def enrich_data(combined_data, avro_data):
    avro_df = pd.DataFrame(avro_data)
    combined_data = pd.merge(combined_data, avro_df, on='user_id', how='left')
    combined_data['auction_id'].fillna('N/A', inplace=True)
    return combined_data


In [8]:
def calculate_conversion_rates(combined_data):
    combined_data['conversion_rate'] = combined_data['conversion_type'].apply(lambda x: 1 if x == 'signup' else 0)
    return combined_data


In [9]:
# Assuming you have json_data, csv_data, and avro_data available
json_df = convert_json_to_df(json_data)
csv_df = convert_csv_to_df(csv_data)
combined_data = combine_data(json_df, csv_df)
enriched_data = enrich_data(combined_data, avro_data)
processed_data = calculate_conversion_rates(enriched_data)

# Use processed_data for MySQL storage
processed_data

Unnamed: 0,ad_id,user_id,timestamp_x,website,campaign_id,timestamp_y,conversion_type,auction_id,timestamp,conversion_rate
0,3,1944,2024-04-08 14:05:06,testsite.com,2,2024-04-08 14:05:25,purchase,,,0


# 3. Data Storage and Query Performance:

In [10]:
# !pip install mysql-connector-python

In [17]:
import mysql.connector
from mysql.connector import Error

# Data storage in MySQL
def store_data_mysql(processed_data, host, database, user, password, table_name):
    try:
        # Connect to MySQL database
        connection = mysql.connector.connect(
            host=host,
            database=database,
            user=user,
            password=password
        )

        if connection.is_connected():
            cursor = connection.cursor()

            # Create the table if it doesn't exist
            create_table_query = f"""
            CREATE TABLE IF NOT EXISTS {table_name} (
                user_id INT,
                ad_id INT,
                timestamp DATETIME,
                website VARCHAR(255),
                campaign_id INT,
                timestamp_y DATETIME,
                conversion_type VARCHAR(50),
                auction_id VARCHAR(50),
                timestamp_x DATETIME,
                conversion_rate FLOAT
            )
            """
            cursor.execute(create_table_query)

            # Check for NaN values and replace them with None
            processed_data = processed_data.where(pd.notnull(processed_data), None)

            # Insert data into the table
            insert_query = f"INSERT INTO {table_name} (user_id, ad_id, timestamp, website, campaign_id, timestamp_y, conversion_type, auction_id, timestamp_x, conversion_rate) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
            records_to_insert = [(row['user_id'], row['ad_id'], row['timestamp'], row['website'], row['campaign_id'], row['timestamp_y'], row['conversion_type'], row['auction_id'], row['timestamp_x'], row['conversion_rate']) for index, row in processed_data.iterrows()]
            cursor.executemany(insert_query, records_to_insert)

            connection.commit()
            print("Data inserted successfully into MySQL table")
    except Error as e:
        print(f"Error while connecting to MySQL: {e}")
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()
            print("MySQL connection closed")

# Example usage with processed_data obtained from previous steps
# Replace placeholders with your MySQL details
store_data_mysql(
    processed_data, 
    host='localhost',
    database='takehomeassignment_data_engineering',
    user='datta_bodake',
    password='MyDB2505',
    table_name='campaign_data'
)


Data inserted successfully into MySQL table
MySQL connection closed


In [18]:
import logging

# Set up logging
logging.basicConfig(filename='data_engineering.log', level=logging.INFO)

# Example error handling
try:
    # Attempt data processing
    processed_data = process_data(json_data, csv_data, avro_data)
except Exception as e:
    # Log error
    logging.error(f"Error during data processing: {str(e)}")