#  <p style = "text-align: center;">AdvertiseX Case Study</p>
### <p style = "text-align: center;"> By Hetansh Patel</p>

## -----------------------------------------------------------------------------------------------------------------------

## Solutions by Order:
### Data Ingestion
### Data Processing
### Data Storage and Query Performance
### Error Handling and Monitoring

## ---------------------------------------------------------------------------------------------------------------------------------

# Data Ingestion

#### Goal: To build a scalable system capable of handling real-time and batch data (in JSON, CSV, and Avro formats) efficiently.
#### Approaches(Possible):
#### 1 )Utilize tools like Apache Kafka for real-time data streaming and Apache NiFi or Fluentd for flexible data routing and ingestion. Kafka can ingest data in various formats, enabling real-time processing and storage scalability.
#### 2) Utilize cloud-native services like AWS Kinesis for streaming data and AWS Glue for data cataloging and ETL. These services offer scalability and ease of use, reducing the operational overhead of managing infrastructure.
#### 3) Use serverless functions (e.g., AWS Lambda, Azure Functions) to ingest data. These can be triggered by event sources (such as new file uploads to cloud storage) and can process data formats on the fly before passing them to a streaming or storage service.

In [None]:
# Fetching Data with Requests and Parsing with Pandas
import requests
import pandas as pd

# Fetching JSON data from an API
response = requests.get('https://api.example.com/data')
data = response.json()

# Converting JSON data to a Pandas DataFrame
df = pd.DataFrame(data)

# Reading a CSV file directly into a Pandas DataFrame
csv_df = pd.read_csv('path/to/your/file.csv')

In [None]:
# Producing Messages to Kafka
from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'your.kafka.broker:9092'})

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed:', err)
    else:
        print('Message delivered to', msg.topic(), msg.partition())

# Sending a message to Kafka
data = {'key': 'value'}
p.produce('your_topic', key='your_key', value=str(data), callback=delivery_report)

# Wait for any outstanding messages to be delivered
p.flush()

# Data Processing

#### Goal: Standardize, enrich, and correlate data (impressions with clicks and conversions) for insights.
#### Approaches(Possible):
#### 1) A distributed data processing system that can handle batch and streaming data. It's ideal for complex transformations, aggregations, and data enrichment tasks, supporting both SQL-like queries and machine learning algorithms.
#### 2) Google Cloud Dataflow for processing and transforming streaming and batch data efficiently, with managed services reducing operational load.
#### Leverage Google BigQuery for its serverless, highly scalable, and cost-effective data warehouse capabilities, enabling advanced analytics directly on ingested data.
#### 3) Apache Flink offers robust stream processing capabilities with a focus on event time processing and state management.
#### Apache Beam provides an abstraction layer that allows for defining and executing data processing pipelines in a way that's independent of the backend technology, making it possible to switch between Flink, Spark, and others as needed.

In [None]:
# Parallel Data Processing with Dask
import dask.dataframe as dd

# Reading a large CSV file with Dask
dask_df = dd.read_csv('path/to/large/file.csv')

# Performing a transformation
transformed_dask_df = dask_df.groupby('column').sum()

# Compute results
result = transformed_dask_df.compute()

In [None]:
# Data Processing with PySpark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('app').getOrCreate()

# Reading JSON data into a DataFrame
df = spark.read.json('path/to/your/json')

# Performing transformations
transformed_df = df.groupBy('column').count()

# Show results
transformed_df.show()

# Data Storage and Query Performance

#### Goal: Efficiently store processed data for fast querying and analysis of ad campaign performance.
#### Approaches(Possible):
#### 1) Apache Cassandra for high availability and scalability, particularly for write-heavy workloads.
#### Use Apache Parquet on a distributed file system (like HDFS or cloud storage) for efficient columnar storage, reducing storage costs and improving analytical query performance.
#### 2) Services like Amazon Redshift, Google BigQuery, or Snowflake offer scalable and fully managed data warehouse solutions, with built-in capabilities for data management and optimization for analytics.
#### 3) Implement a data lake using Amazon S3, Azure Data Lake Storage, or Google Cloud Storage to store raw data in various formats.
#### Integrate with data lake analytics tools (like AWS Athena, Azure Synapse Analytics) to directly query and analyze data in the data lake without the need to transform or load it into a separate data warehouse.

In [None]:
# Using SQLAlchemy for Database Operations
from sqlalchemy import create_engine

# Create engine
engine = create_engine('sqlite:///example.db')

# Write data to database (assuming df is a Pandas DataFrame)
df.to_sql('table_name', con=engine, if_exists='replace', index=False)

# Read data from database
loaded_df = pd.read_sql('SELECT * FROM table_name', con=engine)

In [None]:
#Storing and Reading ParquetFiles with Pandas and PyArrow
import pandas as pd

# Assuming df is your DataFrame
df.to_parquet('path/to/store/data.parquet', engine='pyarrow')

# Reading the parquet file
loaded_df = pd.read_parquet('path/to/store/data.parquet', engine='pyarrow')

# Error Handling and Monitoring

#### Goal: Detect and address data anomalies, discrepancies, or delays in real-time.
#### Approaches(Possible):
#### 1 ) ELK Stack (Elasticsearch, Logstash, Kibana) for logging, searching, and visualizing logs and data flows in real-time.
#### Prometheus for monitoring metrics and setting up alerts based on thresholds or anomalies detected in the data pipeline performance.
#### 2) Use cloud provider solutions like AWS CloudWatch, AWS X-Ray, Google Operations Suite (formerly Stackdriver), or Azure Monitor for comprehensive monitoring, logging, and diagnostics across all cloud resources and services.
#### 3) Apache Airflow for workflow management, where you can define, schedule, and monitor data pipelines using directed acyclic graphs (DAGs). It includes the ability to alert on task failures or retries.
#### Grafana for advanced analytics and monitoring with support for Prometheus as a data source, offering customizable dashboards for visualizing and analyzing metrics.

In [None]:
#Using Python's Logging Module
import logging

# Configure logging
logging.basicConfig(filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')

try:
    # Your code here
    logging.info("Attempting to perform an operation")
except Exception as e:
    logging.error(f"An error occurred: {e}")

In [None]:
# Integration with Sentry for Real-time Monitoring
import sentry_sdk

sentry_sdk.init(
    dsn="your_sentry_dsn_here",
    traces_sample_rate=1.0
)

try:
    # Your code here that might fail
    1 / 0
except Exception as e:
    sentry_sdk.capture_exception(e)