In [6]:
import os
import re
import shutil
import logging
from datetime import datetime
from pyspark.sql import SparkSession, functions as F, Window


"""
ETL application for counting impressions and clicks by date/hour for a specific user agent.
Uses PySpark, environment variables (DB credentials, etc.), and writes CSV outputs.
Follows best practices: DRY, PEP-8, logging, error handling, missing data handling.
"""


# Environment variables (example usage, no hardcoded credentials)
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_USER = os.getenv("DB_USER", "user")
DB_PASSWORD = os.getenv("DB_PASSWORD", "password")

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

def parse_filename_datetime(filename: str) -> str:
    """
    From a filename like 'impressions_processed_dk_20220526113212045_172845633-172845636_1.parquet',
    extract the date/time in the format '2022-05-26 11:32'.
    """
    match = re.search(r'_dk_(\d{8})(\d{4})', filename)
    if not match:
        return None
    date_str, time_str = match.group(1), match.group(2)
    parsed_dt = datetime.strptime(date_str + time_str, "%Y%m%d%H%M")
    return parsed_dt.strftime("%Y-%m-%d %H:%M")

def create_date_hour_df(spark, date_str):
    """
    Create a Spark DataFrame with all hours of the given date (00-23) to ensure no missing hours.
    """
    base_date = datetime.strptime(date_str, "%Y-%m-%d")
    rows = [(base_date.strftime("%Y-%m-%d"), h) for h in range(24)]
    return spark.createDataFrame(rows, ["date", "hour"])

def main():
    spark = SparkSession.builder.appName("ImpressionsClicksETL").getOrCreate()

    # Example: input directory, output directory
    input_dir = "./input_parquet"
    output_dir = "./output_csv"
    target_user_agent = "some user agent"

    # Force creation of the directory if it doesn't exist
    os.makedirs(input_dir, exist_ok=True)

    # Process all parquet files in the input directory
    for file_name in os.listdir(input_dir):
        if file_name.endswith(".parquet"):
            file_path = os.path.join(input_dir, file_name)

            # Parse date/time from filename
            date_time_str = parse_filename_datetime(file_name)
            if not date_time_str:
                logging.warning("Skipping file %s, unable to parse date/time.", file_name)
                continue

            # Read parquet
            df = spark.read.parquet(file_path)

            # Extract date/hour from the filename’s date/time
            extracted_date = date_time_str.split(" ")[0]

            # Filter records by our target user agent
            filtered_df = df.filter(F.col("device_settings.user_agent") == target_user_agent)

            # Suppose impressions/clicks are columns: "impressions", "clicks"
            # Derive hour from the processed date_time_str (for merging with the hour dimension)
            hour_val = int(date_time_str.split(" ")[1].split(":")[0])
            aggregated_df = filtered_df.agg(
                F.sum("impressions").alias("impressions_sum"),
                F.sum("clicks").alias("clicks_sum")
            ).withColumn("date", F.lit(extracted_date)) \
             .withColumn("hour", F.lit(hour_val))

            # Create a full date-hour DataFrame (00-23) for the same date
            date_hour_df = create_date_hour_df(spark, extracted_date)

            # Join aggregated_df to date_hour_df to fill missing hours with zeros
            joined_df = date_hour_df.join(
                aggregated_df,
                on=["date", "hour"],
                how="left"
            ).na.fill({"impressions_sum": 0, "clicks_sum": 0})

            # Rename columns to match the required output
            result_df = joined_df.select(
                "date",
                "hour",
                F.col("impressions_sum").alias("impression_count"),
                F.col("clicks_sum").alias("click_count")
            )

            # Write to CSV
            output_file = os.path.join(output_dir, f"impressions_clicks_{extracted_date}.csv")
            result_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_file)
            logging.info("Processed file %s -> %s", file_path, output_file)

            # Remove input file
            os.remove(file_path)
            logging.info("Removed input file: %s", file_path)

    spark.stop()

if __name__ == "__main__":
    main()

In [19]:
import os
import psycopg2
from psycopg2 import OperationalError

try:
    conn = psycopg2.connect(
        dbname="adform_db",  # default database name
        user="adform_user",    # using the correct username
        password="adform_pass", # replace with your actual password
        host="localhost",
        port="5433"
    )
    print("Connected OK!")
    conn.close()
except OperationalError as e:
    print(f"Error connecting to database: {e}")
except Exception as e:
    print(f"An error occurred: {e}")


Connected OK!


In [6]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, to_date, count, when

# Sukurkite Spark sesiją
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

# Perskaitykite Parquet failą
df = spark.read.parquet("C:/Users/Rokas/Documents/Airidas/adform/adform_spark_app/raw_data/impressions_processed_dk_20220527113212045_172845634-172845637_1.parquet")
print(df.show())
print(df.printSchema())
# # Filtruokite pagal user agent
# filtered_df = df.filter(col("device_settings.user_agent") == "some user agent")

# # Pridėkite datą ir valandą iš timestamp
# processed_df = filtered_df.withColumn("date", to_date(col("timestamp"))) \
#                           .withColumn("hour", hour(col("timestamp")))

# # Grupė pagal datą ir valandą
# aggregated_df = processed_df.groupBy("date", "hour") \
#                             .agg(
#                                 count("*").alias("impression_count"),
#                                 sum(when(col("event_type") == "click", 1).otherwise(0)).alias("click_count")
#                             )

# # Užpildykite trūkstamas valandas su nuliais
# hours_df = spark.range(0, 24).toDF("hour")
# final_df = hours_df.join(aggregated_df, on="hour", how="left").fillna(0)

# # Paverskite į Pandas DataFrame ir išsaugokite kaip CSV
# final_pandas_df = final_df.toPandas()
# final_pandas_df.to_csv("path/to/output/task1_output_2022-05-26.csv", index=False)


+--------------+--------------------+------------------------------------+------------------------------------+--------------------------------+--------------------------------+------------------------------------+----------------------------------+-----------------------+--------------------------+---------------------------------+-------------------------------+-----------------------------------+-----------------------------------+----------------+------------------+---------------+-------------+--------------------------+--------------------------------+--------------------------------+---------------------------------------+
|interaction_id|            page_url|transaction_header_transaction_id_lo|transaction_header_transaction_id_hi|transaction_header_creation_time|transaction_header_producer_time|transaction_header_original_producer|transaction_header_recent_producer|user_identity_cookie_id|user_identity_is_opted_out|user_identity_cookie_id_origin_id|user_identity_browser_cookie

In [2]:
import pandas as pd
import random
import string
from datetime import datetime, timedelta

class TestDataGenerator:
    def __init__(self):
        self.base_timestamp = datetime.strptime('2022-05-27', '%Y-%m-%d')

    def generate_test_files(self, file_count=7):
        files = []
        
        for i in range(1, file_count + 1):
            impressions_data = self.generate_impressions(1000)
            clicks_data = self.generate_clicks(800, impressions_data)
            
            file_info = {
                'impressions_file': f"impressions_processed_dk_20220527113212045_{172845633 + i}-{172845636 + i}_{i}.parquet",
                'clicks_file': f"clicks_processed_dk_20220527113145108_{163644805 + i}-{163644809 + i}_{i}.parquet",
                'impressions_data': impressions_data,
                'clicks_data': clicks_data
            }
            
            files.append(file_info)
        
        return files

    def generate_impressions(self, count):
        impressions = []
        for i in range(count):
            impression = {
                'timestamp': (self.base_timestamp + timedelta(minutes=i)).timestamp(),
                'impression_id': f"imp_{self._random_string(7)}",
                'campaign_id': random.randint(1000, 9999),
                'banner_id': random.randint(10000, 99999),
                'interaction_id': random.randint(100000, 999999),
                'cookie_id': f"cookie_{random.randint(1000, 9999)}",
                'winning_price_dkk': round(random.uniform(1, 100) / 100, 2),
                'country': 'DK',
                'device_id': f"device_{random.randint(1000, 9999)}",
                'page_url': f"https://test{i}.example.com"
            }
            impressions.append(impression)
        return impressions

    def generate_clicks(self, count, impressions):
        clicks = []
        for i in range(count):
            if i < len(impressions):
                base_impression = impressions[i]
            else:
                base_impression = impressions[0]
            use_mismatched = random.random() > 0.8
            
            click = {
                'timestamp': base_impression['timestamp'] + random.randint(0, 3600),
                'impression_id': f"imp_{self._random_string(7)}" if use_mismatched else base_impression['impression_id'],
                'campaign_id': random.randint(1000, 9999) if use_mismatched else base_impression['campaign_id'],
                'banner_id': base_impression['banner_id'],
                'interaction_id': base_impression['interaction_id'],
                'cookie_id': base_impression['cookie_id'],
                'landing_url': f"https://landing{i}.example.com",
                'country': 'DK',
                'device_id': base_impression['device_id']
            }
            clicks.append(click)
        return clicks

    def calculate_mismatch_statistics(self, impressions, clicks):
        impression_ids = set(imp['impression_id'] for imp in impressions)
        mismatches = [click for click in clicks if click['impression_id'] not in impression_ids]
        
        total_impressions = len(impressions)
        total_clicks = len(clicks)
        mismatched_clicks = len(mismatches)
        mismatch_percentage = round((mismatched_clicks / total_clicks) * 100, 2) if total_clicks > 0 else 0.0
        
        return {
            'total_impressions': total_impressions,
            'total_clicks': total_clicks,
            'mismatched_clicks': mismatched_clicks,
            'mismatch_percentage': mismatch_percentage
        }

    def _random_string(self, length):
        return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))

# Usage
generator = TestDataGenerator()
test_files = generator.generate_test_files(7)

# Log statistics for each file pair
for index, file in enumerate(test_files):
    stats = generator.calculate_mismatch_statistics(
        file['impressions_data'],
        file['clicks_data']
    )
    print(f"\nFile pair {index + 1} statistics:")
    print(stats)


File pair 1 statistics:
{'total_impressions': 1000, 'total_clicks': 800, 'mismatched_clicks': 154, 'mismatch_percentage': 19.25}

File pair 2 statistics:
{'total_impressions': 1000, 'total_clicks': 800, 'mismatched_clicks': 148, 'mismatch_percentage': 18.5}

File pair 3 statistics:
{'total_impressions': 1000, 'total_clicks': 800, 'mismatched_clicks': 147, 'mismatch_percentage': 18.38}

File pair 4 statistics:
{'total_impressions': 1000, 'total_clicks': 800, 'mismatched_clicks': 160, 'mismatch_percentage': 20.0}

File pair 5 statistics:
{'total_impressions': 1000, 'total_clicks': 800, 'mismatched_clicks': 159, 'mismatch_percentage': 19.88}

File pair 6 statistics:
{'total_impressions': 1000, 'total_clicks': 800, 'mismatched_clicks': 171, 'mismatch_percentage': 21.38}

File pair 7 statistics:
{'total_impressions': 1000, 'total_clicks': 800, 'mismatched_clicks': 152, 'mismatch_percentage': 19.0}


In [9]:
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from datetime import datetime, timedelta
import random
import uuid

class ParquetGenerator:
    def __init__(self):
        self.base_date = datetime(2022, 5, 27)
        # Schema exactly matching the provided structure
        self.schema = pa.schema([
            ('interaction_id', pa.int64()),
            ('page_url', pa.string()),
            ('transaction_header_transaction_id_lo', pa.int64()),
            ('transaction_header_transaction_id_hi', pa.int64()),
            ('transaction_header_creation_time', pa.int64()),
            ('transaction_header_producer_time', pa.int64()),
            ('transaction_header_original_producer', pa.string()),
            ('transaction_header_recent_producer', pa.string()),
            ('user_identity_cookie_id', pa.string()),
            ('user_identity_is_opted_out', pa.bool_()),
            ('user_identity_cookie_id_origin_id', pa.int64()),
            ('user_identity_browser_cookie_id', pa.string()),
            ('user_identity_browser_cookie_status', pa.string()),
            ('user_identity_device_advertising_id', pa.string()),
            ('banner_banner_id', pa.int64()),
            ('banner_campaign_id', pa.int64()),
            ('banner_media_id', pa.int64()),
            ('banner_tag_id', pa.int64()),
            ('banner_banner_placement_id', pa.int64()),
            ('rtb_vars_winning_price_in_dkk_lo', pa.int64()),
            ('rtb_vars_winning_price_in_dkk_hi', pa.int64()),
            ('rtb_vars_winning_price_in_dkk_signScale', pa.int64())
        ])

    def generate_record(self, date=None):
        """Generate a single record with the exact schema structure"""
        if date is None:
            date = self.base_date
            
        timestamp = int((date + timedelta(
            hours=random.choice([11, 19]),  # Only generate records for hours 11 and 19
            minutes=random.randint(0, 59),
            seconds=random.randint(0, 59)
        )).timestamp() * 1000)
        
        return {
            'interaction_id': random.randint(100000, 999999),
            'page_url': f"https://example{random.randint(1,999):02d}...",
            'transaction_header_transaction_id_lo': random.randint(100000, 999999),
            'transaction_header_transaction_id_hi': random.randint(100000, 999999),
            'transaction_header_creation_time': timestamp,
            'transaction_header_producer_time': timestamp + random.randint(0, 1000),
            'transaction_header_original_producer': 'test',
            'transaction_header_recent_producer': 'test',
            'user_identity_cookie_id': str(uuid.uuid4()),
            'user_identity_is_opted_out': random.choice([True, False]),
            'user_identity_cookie_id_origin_id': random.randint(1, 100),
            'user_identity_browser_cookie_id': f"browser_{uuid.uuid4().hex[:8]}",
            'user_identity_browser_cookie_status': 'Enabled',
            'user_identity_device_advertising_id': f"device_{uuid.uuid4().hex[:8]}",
            'banner_banner_id': random.randint(10000, 99999),
            'banner_campaign_id': random.randint(1000, 9999),
            'banner_media_id': random.randint(1000, 9999),
            'banner_tag_id': random.randint(1000, 9999),
            'banner_banner_placement_id': random.randint(1000, 9999),
            'rtb_vars_winning_price_in_dkk_lo': random.randint(100, 999),
            'rtb_vars_winning_price_in_dkk_hi': 0,
            'rtb_vars_winning_price_in_dkk_signScale': 0
        }

    def generate_parquet_file(self, filename, base_date, record_count=1000):
        """Generate a Parquet file with the exact schema"""
        records = [self.generate_record(base_date) for _ in range(record_count)]
        df = pd.DataFrame(records)
        table = pa.Table.from_pandas(df, schema=self.schema)
        pq.write_table(table, filename)
        return filename

    def generate_files_for_dates(self):
        """Generate files for both dates with appropriate distribution"""
        # May 26 files
        may_26 = datetime(2022, 5, 26)
        for i in range(3):
            imp_filename = f"impressions_processed_dk_20220526113212045_172845633-172845636_{i}.parquet"
            self.generate_parquet_file(imp_filename, may_26, 1000)

        # May 27 files
        may_27 = datetime(2022, 5, 27)
        # Impressions
        for i in range(3):
            imp_filename = f"impressions_processed_dk_20220527113212045_172845633-172845636_{i}.parquet"
            self.generate_parquet_file(imp_filename, may_27, 1000)
        
        # Clicks
        for i in range(3):
            click_filename = f"clicks_processed_dk_20220527113145108_163644805-163644809_{i}.parquet"
            self.generate_parquet_file(click_filename, may_27, 800)  # 20% fewer clicks

def verify_file(filename):
    """Verify a generated Parquet file matches the expected schema"""
    table = pq.read_table(filename)
    print(f"\nVerifying {filename}")
    print("Schema:")
    print(table.schema)
    return table

def main():
    generator = ParquetGenerator()
    generator.generate_files_for_dates()
    
    # Verify first files of each type
    print("\nVerifying generated files:")
    verify_file("impressions_processed_dk_20220526113212045_172845633-172845636_0.parquet")
    verify_file("impressions_processed_dk_20220527113212045_172845633-172845636_0.parquet")
    verify_file("clicks_processed_dk_20220527113145108_163644805-163644809_0.parquet")

if __name__ == "__main__":
    main()


Verifying generated files:

Verifying impressions_processed_dk_20220526113212045_172845633-172845636_0.parquet
Schema:
interaction_id: int64
page_url: string
transaction_header_transaction_id_lo: int64
transaction_header_transaction_id_hi: int64
transaction_header_creation_time: int64
transaction_header_producer_time: int64
transaction_header_original_producer: string
transaction_header_recent_producer: string
user_identity_cookie_id: string
user_identity_is_opted_out: bool
user_identity_cookie_id_origin_id: int64
user_identity_browser_cookie_id: string
user_identity_browser_cookie_status: string
user_identity_device_advertising_id: string
banner_banner_id: int64
banner_campaign_id: int64
banner_media_id: int64
banner_tag_id: int64
banner_banner_placement_id: int64
rtb_vars_winning_price_in_dkk_lo: int64
rtb_vars_winning_price_in_dkk_hi: int64
rtb_vars_winning_price_in_dkk_signScale: int64
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n'