# Load data to Delta Lake from GCS with EXTERNAL TABLE and ETL the data.

This notebook shows you how to create a Delta Lake from by using `CREATE TABLE` to load data from Google Cloud Storage (GCS) as External Tables.

## Creating our database and tables

In [0]:
%sql
CREATE DATABASE test_database

In [0]:
%sql
CREATE TABLE test_database.planned_data
USING csv
OPTIONS (path "gs://databricks_data_example/table_planned.csv", header "true", inferSchema "true", delimiter ",")

In [0]:
%sql
CREATE TABLE test_database.googleads_data
USING csv
OPTIONS (path "gs://databricks_data_example/table_googleads_ads.csv", header "true", inferSchema "true", delimiter ",")

In [0]:
%sql
CREATE TABLE test_database.facebookads_data
USING csv
OPTIONS (path "gs://databricks_data_example/table_fbmarketing_platform.csv", header "true", inferSchema "true", delimiter ",")

In [0]:
%sql
CREATE TABLE test_database.linkedinads_data
USING csv
OPTIONS (path "gs://databricks_data_example/table_linkedinads_ads.csv", header "true", inferSchema "true", delimiter ",")

## Data ETL

Now that our tables are extractiong data from GCP Google Storage, we can extract and process them.

In [0]:
import pandas as pd
from pyspark.sql.functions import lit, col, when

In [0]:
df_planned = spark.read.table("test_database.planned_data")
df_facebook = spark.read.table('test_database.facebookads_data')
df_linkedin = spark.read.table('test_database.linkedinads_data')
df_google = spark.read.table('test_database.googleads_data')


### Renaming fields

We're going to union all 4 tables into one DF, so let's make sure that their fields have the same name.

In [0]:
# Correcting column names of planned table
new_columns = [
            'publisher',
            'date',
            'campaign_name',
            'spend_planned',
            'impressions_planned',
            'clicks_planned',
            'video_views_planned',
            'video_views_100p_planned',
            'leads_planned',
            'sessions_planned',
            'goal1_planned',
            'goal2_planned',
      ]

df_planned = df_planned.toDF(*new_columns)
df_planned = df_planned.drop('sessions_planned', 'goal1_planned', 'goal2_planned')

In [0]:
# Correcting column names of facebook table
new_columns = [
            'publisher',
            'account_id',
            'date',
            'campaign_name',
            'adset_name',
            'ad_name',
            'spend',
            'impressions',
            'clicks',
            'device',
            'video_views',
            'video_views_25p',
            'video_views_50p',
            'video_views_75p',
            'video_views_100p',
            'reactions',
            'comments',
            'shares',
            'engagements',
            'leads',
            'video_views_95p',
      ]

df_facebook = df_facebook.toDF(*new_columns)


In [0]:
# Correcting column names of googleads table
new_columns = [
            'account_id',
            'date',
            'campaign_name',
            'adset_name',
            'ad_name',
            'device',
            'segments_adNetworkType',
            'publisher',
            'spend',
            'impressions',
            'clicks',
            'active_views_impressions',
            'video_views',
            'video_views_25p',
            'video_views_50p',
            'video_views_75p',
            'video_views_100p',
            'engagements',
      ]

df_google = df_google.toDF(*new_columns)
df_google = df_google.withColumn('spend', col('spend') / 1000000)
df_google = df_google.withColumn('publisher', lit('googleads'))


In [0]:
# Correcting column names of googleads table
new_columns = [
            'account_id',
            'date',
            'campaign_name',
            'spend',
            'impressions',
            'clicks',
            'video_views',
            'video_views_25p',
            'video_views_50p',
            'video_views_75p',
            'video_views_100p',
            'reactions',
            'comments',
            'shares',
      ]

df_linkedin = df_linkedin.toDF(*new_columns)
df_linkedin = df_linkedin.withColumn('publisher', lit('linkedinads'))


### Unite all platforms data

In [0]:
all_media_data = df_facebook.unionByName(df_linkedin, allowMissingColumns=True).unionByName(df_google, allowMissingColumns=True)

all_media_data = all_media_data.drop('device', 'adset_name', 'ad_name', 'device', 'video_views_25p', 'video_views_50p', 'video_views_75p', 'video_views_95p', 'reactions', 'comments', 'shares', 'engagements', 'active_views_impressions', 'segments_adNetworkType')

### Unite with planned data

In [0]:
consolidated_data = all_media_data.unionByName(df_planned, allowMissingColumns=True)

In [0]:
display(consolidated_data)

publisher,account_id,date,campaign_name,spend,impressions,clicks,video_views,video_views_100p,leads,spend_planned,impressions_planned,clicks_planned,video_views_planned,video_views_100p_planned,leads_planned
facebook,123456789101112.0,2022-01-01,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1872.0,468000.0,23400.0,28080.0,4492.8,898.56,,,,,,
facebook,123456789101112.0,2022-01-02,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1853.28,468000.0,23166.0,28360.8,4447.872,907.5456,,,,,,
facebook,123456789101112.0,2022-01-03,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1834.7472,472680.0,22934.34,28077.192,4492.35072,907.5456,,,,,,
facebook,123456789101112.0,2022-01-04,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1853.094672,477406.8,22934.34,28077.192,4447.427213,907.5456,,,,,,
facebook,123456789101112.0,2022-01-05,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1853.094672,477406.8,22704.9966,28357.96392,4402.952941,898.470144,,,,,,
facebook,123456789101112.0,2022-01-06,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1834.563725,482180.868,22704.9966,28641.54356,4358.923411,889.4854426,,,,,,
facebook,123456789101112.0,2022-01-07,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1852.909363,477359.0593,22704.9966,28641.54356,4358.923411,898.380297,,,,,,
facebook,123456789101112.0,2022-01-08,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1852.909363,482132.6499,22932.04657,28355.12812,4315.334177,889.396494,,,,,,
facebook,123456789101112.0,2022-01-09,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1852.909363,482132.6499,23161.36703,28071.57684,4272.180835,898.290459,,,,,,
facebook,123456789101112.0,2022-01-10,agency3_customer9_campaign-type_facebookads_campaign-objective_fiscal-year,1852.909363,482132.6499,22929.75336,28071.57684,4314.902644,898.290459,,,,,,


### Data cleansing

Let's make sure that our metrics have no NA in their values

In [0]:
fillna_parameters = {
          'spend' : 0,
          'impressions' : 0,
          'clicks' : 0,
          'video_views' : 0,
          'video_views_100p' : 0,
          'leads' : 0,
          'spend_planned' : 0,
          'impressions_planned' : 0,
          'clicks_planned' : 0,
          'video_views_planned' : 0,
          'video_views_100p_planned' : 0,
          'leads_planned' : 0
          }
consolidated_data = consolidated_data.fillna(fillna_parameters)

### Data Aggregation

Now that we have all metrics in order, we can aggregate them and make our Pacing visualization.

In [0]:
aggregated_data = consolidated_data.groupBy('publisher').agg({'spend': 'sum', 'spend_planned' : 'sum', 'clicks': 'sum', 'clicks_planned' : 'sum', 'video_views': 'sum', 'video_views_planned' : 'sum'})
aggregated_data = aggregated_data.withColumn('investment_difference', when(col('sum(spend_planned)') - col('sum(spend)') < 0, 0).when(col('sum(spend_planned)') - col('sum(spend)') >= 0, col('sum(spend_planned)') - col('sum(spend)')))
aggregated_data = aggregated_data.withColumn('clicks_difference', when(col('sum(spend_planned)') - col('sum(spend)') < 0, 0).when(col('sum(spend_planned)') - col('sum(spend)') >= 0, col('sum(spend_planned)') - col('sum(spend)')))
aggregated_data = aggregated_data.withColumn('video_views_difference', when(col('sum(spend_planned)') - col('sum(spend)') < 0, 0).when(col('sum(spend_planned)') - col('sum(spend)') >= 0, col('sum(spend_planned)') - col('sum(spend)')))
display(aggregated_data)

publisher,sum(spend),sum(video_views),sum(clicks_planned),sum(spend_planned),sum(video_views_planned),sum(clicks),investment_difference,clicks_difference,video_views_difference
facebook,617551.9403330008,9780169.541779991,7675297.961000004,646902.9930000012,22689993.33500001,7309807.60466,29351.05266700033,29351.05266700033,29351.05266700033
linkedinads,367907.1102572001,3358468.6046829973,75423.55099999995,374265.2539999999,3358468.596999999,71831.94895129999,6358.143742799817,6358.143742799817,6358.143742799817
googleads,1630001.4633670012,37587583.49546998,22033801.655999992,1662601.493,37587583.494000025,20984572.99752,32600.029632998863,32600.029632998863,32600.029632998863


Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks

Output can only be rendered in Databricks