In [None]:
#Task
#Test Project: Data Engineer

In [None]:
#Step 1: Data Pipeline Creation
#To create a data pipeline, we'll use Python with the pandas library for data manipulation and sqlalchemy for database interactions.

#Extract
#First, we'll extract the data from the CSV files. Let's assume we have three CSV files: social_media.csv, email_campaigns.csv, and web_traffic.csv.
import pandas as pd

# Extract data from CSV files
social_media_df = pd.read_csv('social_media.csv')
email_campaigns_df = pd.read_csv('email_campaigns.csv')
web_traffic_df = pd.read_csv('web_traffic.csv')

#Transform
#Next, we'll transform the data by cleaning up missing or incorrect values, normalizing the data, and aggregating some data.
# Clean up missing or incorrect values
social_media_df.fillna(0, inplace=True)
email_campaigns_df.fillna(0, inplace=True)
web_traffic_df.fillna(0, inplace=True)

# Normalize date formats
social_media_df['date'] = pd.to_datetime(social_media_df['date'])
email_campaigns_df['date'] = pd.to_datetime(email_campaigns_df['date'])
web_traffic_df['date'] = pd.to_datetime(web_traffic_df['date'])

# Aggregate daily traffic metrics by week
web_traffic_df['week'] = web_traffic_df['date'].dt.week
web_traffic_df_weekly = web_traffic_df.groupby('week')['sessions', 'bounce_rate', 'conversion_rate'].sum()

#Load
#Finally, we'll load the cleaned and transformed data into a database. For this example, we'll use a SQLite database.
import sqlalchemy as db

# Create a database connection
engine = db.create_engine('sqlite:///marketing_data.db')
conn = engine.connect()

# Load data into the database
social_media_df.to_sql('social_media', conn, if_exists='replace', index=False)
email_campaigns_df.to_sql('email_campaigns', conn, if_exists='replace', index=False)
web_traffic_df_weekly.to_sql('web_traffic_weekly', conn, if_exists='replace', index=False)

## Close the database connection
conn.close()

In [None]:
#Step 2: Database Design
#For the database design, we'll create three tables: social_media, email_campaigns, and web_traffic_weekly. Here's the schema:
CREATE TABLE social_media (
    id INTEGER PRIMARY KEY,
    date DATE,
    platform TEXT,
    engagement INTEGER
);

CREATE TABLE email_campaigns (
    id INTEGER PRIMARY KEY,
    date DATE,
    campaign_name TEXT,
    open_rate REAL,
    click_through_rate REAL
);

CREATE TABLE web_traffic_weekly (
    week INTEGER PRIMARY KEY,
    sessions INTEGER,
    bounce_rate REAL,
    conversion_rate REAL
);

In [None]:
#Step 3: Data Querying
#Here are some sample queries to demonstrate how the data can be retrieved and analyzed:
#-- Fetch weekly trends in social media engagement
SELECT week, SUM(engagement) AS total_engagement
FROM social_media
GROUP BY week
ORDER BY week;

#-- Retrieve the top 3 email campaigns by click-through rate
SELECT campaign_name, click_through_rate
FROM email_campaigns
ORDER BY click_through_rate DESC
LIMIT 3;

#-- Summarize web traffic by traffic source
SELECT traffic_source, SUM(sessions) AS total_sessions
FROM web_traffic_weekly
GROUP BY traffic_source;

In [None]:
#Step 4: Bonus (Optional)
#For the bonus task, we can set up a simple job scheduler using schedule library to automate the pipeline.

#Automation
#Create a new Python script, scheduler.py, to schedule the pipeline to run daily at midnight:
import schedule
import time

def run_pipeline():
    # Run the data pipeline script
    exec(open('data_pipeline.py').read())

schedule.every(1).day.at("00:00").do(run_pipeline)  # Run the pipeline daily at midnight

while True:
    schedule.run_pending()
    time.sleep(1)
#Alternatively, we can use Airflow or Cron for job scheduling.

#Data Visualization
#For data visualization, we can use a tool like Tableau or Google Data Studio to create a dashboard. Here's an example using Google Data Studio:

#Create a new Python script, dashboard.py, to create a dashboard:
import pandas as pd
from google.oauth2 import service_account
from googleapiclient.discovery import build

# Create a Google Data Studio dashboard
creds = service_account.Credentials.from_service_account_file('credentials.json')
service = build('datastudio', 'v1', credentials=creds)

# Create a new dashboard
dashboard = service.projects().dashboards().create(
    body={
        'dashboard': {
            'title': 'Marketing Data Dashboard'
        }
    }
).execute()

# Add a chart to the dashboard
chart = service.projects().dashboards().charts().create(
    dashboardId=dashboard['dashboard']['id'],
    body={
        'chart': {
            'title': 'Social Media Engagement',
            'type': 'LINE_CHART',
            'data': {
                'dataSourceId': 'social_media',
                'dimensions': ['date'],
                'metrics': ['engagement']
            }
        }
    }
).execute()
# Add more charts and widgets to the dashboard as needed

#Create a credentials.json file with our Google Data Studio credentials:
{
  "type": "service_account",
  "project_id": "your-project-id",
  "private_key_id": "your-private-key-id",
  "private_key": "your-private-key",
  "client_email": "your-client-email",
  "client_id": "your-client-id",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/your-service-account-email"
}
#Run the dashboard.py script to create the dashboard. You can then access the dashboard through the Google Data Studio interface.
