In [143]:
import psycopg2 as pg2
import json
import pyspark
import pyspark.sql.functions as f
from pyspark.sql.functions import col, lit, current_date,current_timestamp,monotonically_increasing_id,desc,expr,split
from pyspark.sql.types import StringType, BooleanType, IntegerType, FloatType, DateType
from pyspark.sql import SparkSession

In [144]:
spark=SparkSession.builder.appName('ETL Pipeline').getOrCreate()

In [None]:
# #this sparksession used for S3 bucket configuring
# spark=SparkSession.builder\
#             .appName("ETL")\
#             .config("spark.hadoop.fs.s3a.access.key", "****************")\
#             .config("spark.hadoop.fs.s3a.secret.key", '*****************')\
#             .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
#             .config("spark.jars", 'hadoop-aws-3.3.1.jar,aws-java-sdk-bundle-1.11.901.jar','spark-postgres-jar')\
#             .getOrCreate()

# Data Extraction

<h4>These files are extracted from AWS S3 bucket and also given the local input file path as an optional in the case of exception due to the ****HADOOP JAR VERSION CAMPATIBILTY ISSUES**** by this we can choose any inpupath either s3 or local, where these are considred as RAW csv data files loaded to S3 from given URL </h4>

In [146]:
campaigns_df = spark.read.option('header',True).csv(aws_config['inputpath']['local_campaigns'])
campaigns_df1 = campaigns_df.withColumn("created_timestamp", current_timestamp())\
    .withColumn("campaign_id",col('campaign_id').cast('bigint'))\
    .withColumn("created_by",lit('abspark'))\
    .withColumn("modified_timestamp",current_timestamp())\
    .withColumn("created_by",lit('abspark'))\
    .withColumn("modified_timestamp",current_timestamp())
campaigns_df1.show()
campaigns_df1.printSchema()


+-----------+---------------+-------+--------------------+----------+--------------------+
|campaign_id|structure_value| status|   created_timestamp|created_by|  modified_timestamp|
+-----------+---------------+-------+--------------------+----------+--------------------+
| 1578451881|          venum|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1578451584|        ellesse|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1578451386|       converse|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1578412457|         wilson|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 9872103720|         wham-o|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 9872103720|         wham-o|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1578451386|       converse|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1578451623|       spalding|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|

In [147]:
adgroups_df = spark.read.option('header',True).csv(aws_config['inputpath']['local_adgroups'])
adgroups_df1 = adgroups_df.withColumn("created_timestamp", current_timestamp())\
    .withColumn("created_by",lit('abspark'))\
    .withColumn("ad_group_id",col('ad_group_id').cast('bigint'))\
    .withColumn("campaign_id",col('campaign_id').cast('bigint'))\
    .withColumn("modified_timestamp",current_timestamp())\
    .withColumn("created_by",lit('abspark'))\
    .withColumn("modified_timestamp",current_timestamp())
adgroups_df1.show()
adgroups_df1.printSchema()

+------------+-----------+--------------------+-------+--------------------+----------+--------------------+
| ad_group_id|campaign_id|               alias| status|   created_timestamp|created_by|  modified_timestamp|
+------------+-----------+--------------------+-------+--------------------+----------+--------------------+
| 66372665454| 1578451881|Shift - Shopping ...|REMOVED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 84481260174| 1578451584|Shift - Shopping ...|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 59624654596| 1578451386|Shift - Shopping ...|REMOVED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 59977520149| 1578412457|Shift - Shopping ...|REMOVED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
|102171970298| 9872103720|Shift - Shopping ...|REMOVED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
|102171970298| 9872103720|Shift - Shopping ...|REMOVED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 58309319903| 1578

In [148]:
search_terms_df = spark.read.option('header',True).csv(aws_config['inputpath']['local_search_terms'])
search_terms_df1 = search_terms_df.withColumn("created_timestamp", current_timestamp())\
    .withColumn("search_terms_id",monotonically_increasing_id())\
    .withColumn("ad_group_id",col('ad_group_id').cast('bigint'))\
    .withColumn("campaign_id",col('campaign_id').cast('bigint'))\
    .withColumn('date',col('date').cast('date'))\
    .withColumn("clicks",col('clicks').cast('int'))\
    .withColumn("cost",col('cost').cast('double'))\
    .withColumn("conversion_value",col('conversion_value').cast('double'))\
    .withColumn("conversions",col('conversions').cast('int'))\
    .withColumn("modified_timestamp",current_timestamp())\
    .withColumn("created_by",lit('abspark'))\
    .withColumn("modified_timestamp",current_timestamp())
# search_terms_df1.show()
search_terms_df1.printSchema()

root
 |-- date: date (nullable = true)
 |-- ad_group_id: long (nullable = true)
 |-- campaign_id: long (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- cost: double (nullable = true)
 |-- conversion_value: double (nullable = true)
 |-- conversions: integer (nullable = true)
 |-- search_term: string (nullable = true)
 |-- created_timestamp: timestamp (nullable = false)
 |-- search_terms_id: long (nullable = false)
 |-- modified_timestamp: timestamp (nullable = false)
 |-- created_by: string (nullable = false)



## Data cleaning (Removing Duplicates) for loading data into STORE data  schema

###############################################################
### "DISTINCT data of Campaigns"
###############################################################

In [149]:
print('The DUPLICATE and DISTINCT data of Campaigns')
campaigns_unique = campaigns_df1.dropDuplicates()
campaigns_duplicate = campaigns_df1.exceptAll(campaigns_df1.dropDuplicates(['campaign_id','structure_value','status']))
campaigns_unique.show()
print(f"Total number of ACTUAL rows in Search Terms DF: {campaigns_df1.count()}")
print(f"Total number of DUPLICATE rows in Search Terms DF: {campaigns_duplicate.count()}")
print(f"Total number of UNIQUE rows in Search Terms DF: {campaigns_unique.count()}")

The DUPLICATE and DISTINCT data of Campaigns
+-----------+---------------+-------+--------------------+----------+--------------------+
|campaign_id|structure_value| status|   created_timestamp|created_by|  modified_timestamp|
+-----------+---------------+-------+--------------------+----------+--------------------+
| 9903945998|           nerf|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 8677993568|      smellwell|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1585315601|        arsenal|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1578451362|           beco|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1578451356|       converse|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1578411749|     cartasport|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 9564222913|  england rugby|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 1578629923|    fitness mad|ENABLED|2022-02-

###############################################################
### "DISTINCT data of Adgroups"
###############################################################

In [150]:
print('The DUPLICATE and DISTINCT data of Adgroups')
adgroups_unique = adgroups_df1.dropDuplicates()
adgroups_duplicate = adgroups_df1.exceptAll(adgroups_df1.dropDuplicates())
adgroups_unique.show()

print(f"Total number of ACTUAL rows in Search Terms DF: {adgroups_df1.count()}")
print(f"Total number of DUPLICATE rows in Search Terms DF: {adgroups_duplicate.count()}")
print(f"Total number of UNIQUE rows in Search Terms DF: {adgroups_unique.count()}")

The DUPLICATE and DISTINCT data of Adgroups
+------------+-----------+--------------------+-------+--------------------+----------+--------------------+
| ad_group_id|campaign_id|               alias| status|   created_timestamp|created_by|  modified_timestamp|
+------------+-----------+--------------------+-------+--------------------+----------+--------------------+
| 61108919258| 1578412214|Shift - Shopping ...|REMOVED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
|124352384626| 1578630361|Shift - Shopping ...|ENABLED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
|103621538761| 1578411800|Shift - Shopping ...|REMOVED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 64246021402| 1578630391|Shift - Shopping ...|REMOVED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 60221413330| 1578630223|Shift - Shopping ...|REMOVED|2022-02-03 00:48:...|   abspark|2022-02-03 00:48:...|
| 79415425560| 1578630238|Shift - Shopping ...|ENABLED|2022-02-03 00:48:...|   abspa

###############################################################
### "DISTINCT data of Search Terms"
###############################################################

In [151]:
print('The duplicate and distinct data of Search Terms')
search_terms_unique = search_terms_df1.dropDuplicates()
search_terms_duplicate = search_terms_df1.exceptAll(search_terms_df1.dropDuplicates())
search_terms_unique.show()
print(f"Total number of ACTUAL row's in Search Terms DF: {search_terms_df1.count()}")
print(f"Total number of DUPLICATE row's in Search Terms DF: {search_terms_duplicate.count()}")
print(f"Total number of UNIQUE row's in Search Terms DF: {search_terms_unique.count()}")

The duplicate and distinct data of Search Terms
+----------+------------+-----------+------+----+----------------+-----------+--------------------+--------------------+---------------+--------------------+----------+
|      date| ad_group_id|campaign_id|clicks|cost|conversion_value|conversions|         search_term|   created_timestamp|search_terms_id|  modified_timestamp|created_by|
+----------+------------+-----------+------+----+----------------+-----------+--------------------+--------------------+---------------+--------------------+----------+
|2019-01-31| 65082925368| 1578630361|     1|0.01|             0.0|          0|puma archive t7 t...|2022-02-03 00:48:...|            129|2022-02-03 00:48:...|   abspark|
|2018-10-25| 59410043037| 1578411797|     1|0.06|             0.0|          0|nike sunray prote...|2022-02-03 00:48:...|            797|2022-02-03 00:48:...|   abspark|
|2021-10-28|112825649726| 1645615867|     1|0.25|             0.0|          0|     berghaus jacket|2022-02-

In [152]:
campaigns_unique.printSchema()
adgroups_unique.printSchema()
search_terms_unique.printSchema()

root
 |-- campaign_id: long (nullable = true)
 |-- structure_value: string (nullable = true)
 |-- status: string (nullable = true)
 |-- created_timestamp: timestamp (nullable = false)
 |-- created_by: string (nullable = false)
 |-- modified_timestamp: timestamp (nullable = false)

root
 |-- ad_group_id: long (nullable = true)
 |-- campaign_id: long (nullable = true)
 |-- alias: string (nullable = true)
 |-- status: string (nullable = true)
 |-- created_timestamp: timestamp (nullable = false)
 |-- created_by: string (nullable = false)
 |-- modified_timestamp: timestamp (nullable = false)

root
 |-- date: date (nullable = true)
 |-- ad_group_id: long (nullable = true)
 |-- campaign_id: long (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- cost: double (nullable = true)
 |-- conversion_value: double (nullable = true)
 |-- conversions: integer (nullable = true)
 |-- search_term: string (nullable = true)
 |-- created_timestamp: timestamp (nullable = false)
 |-- search_terms_id:

## Transformations for REPORT schema data

<h4>These trasnformed dataframes will be loaded into report data schema for business requirement</h4>

In [128]:
adgroups_split = adgroups_unique.withColumn("created_timestamp", current_timestamp())\
    .withColumn("created_by",lit('abspark'))\
    .withColumn("ad_group_id",col('ad_group_id').cast('bigint'))\
    .withColumn("country", split(col("alias"), " - ").getItem(2))\
    .withColumn("priority", split(col("alias"), " - ").getItem(4))\
    .withColumn("modified_timestamp",current_timestamp())\
    .withColumn("created_by",lit('abspark'))\
    .withColumn("modified_timestamp",current_timestamp()).drop('alias')
adgroups_split.count()
adgroups_split.printSchema()

root
 |-- ad_group_id: long (nullable = true)
 |-- campaign_id: long (nullable = true)
 |-- status: string (nullable = true)
 |-- created_timestamp: timestamp (nullable = false)
 |-- created_by: string (nullable = false)
 |-- modified_timestamp: timestamp (nullable = false)
 |-- country: string (nullable = true)
 |-- priority: string (nullable = true)



### "final_agg "data frame is created according to the task brief .md file in git repo whre it is end requiremnt for this challange

In [182]:
fianl_agg_df = adgroups_split.join(search_terms_unique, adgroups_split.ad_group_id == search_terms_unique.ad_group_id, 'inner')\
            .select(adgroups_split.ad_group_id,
                    adgroups_split.campaign_id,
                    adgroups_split.country,
                    adgroups_split.priority,
                    ((search_terms_unique.conversion_value)/search_terms_unique.cost).alias('roas'),
                    search_terms_unique.date,
                    adgroups_split.status,
                    search_terms_unique.search_term,).filter(col('conversion_value')>0)\
                    .orderBy(adgroups_split.ad_group_id)
print("Note: 'roas = return on add spent'")

Note: 'roas = return on add spent'


In [226]:
fianl_agg_df.count()

4918

In [177]:
fianl_agg_df.show()

+-----------+-----------+-------+--------+------------------+----------+-------+--------------------+
|ad_group_id|campaign_id|country|priority|              roas|      date| status|         search_term|
+-----------+-----------+-------+--------+------------------+----------+-------+--------------------+
|55067386730| 1578630361|     GB|    HIGH|229.79999999999998|2019-01-27|REMOVED|best green footba...|
|55067402050| 1578412016|     GB|    HIGH| 832.6666666666667|2019-04-16|ENABLED|rose gold kicker ...|
|55067982370| 1578630361|     GB|    HIGH|474.74999999999994|2019-02-18|REMOVED|   dortmund home top|
|55068003730| 1578411800|     GB|    HIGH|            1598.0|2020-05-21|REMOVED|kids nike sweat s...|
|55068004690| 1578411800|     GB|    HIGH|            1298.0|2019-03-30|REMOVED|  nike jersey shorts|
|55070131090| 1578629881|     GB|     LOW|266.33333333333337|2018-11-18|REMOVED|   adidas crib shoes|
|55070262370| 1578451158|     GB|  MEDIUM|30.965517241379313|2019-04-25|REMOVED|ba

In [167]:
adgroups_split.count()

19271

# Data Loading into PostgreSQL 

<h2> Create three levels of data loading schemas</h2>
<ul>
    <li>Stage Data schema</li>
    <li>Store Data schema</li>
    <li>Report Data schema</li>
</ul>

In [215]:
aws_config = json.load(open('aws_cofig.json'))

In [162]:
#configure postgreSQL to create by using psycopg2
conn = pg2.connect(user=aws_config['connection']['user'],
                   password=aws_config['connection']['password'],
                   host=aws_config['connection']['host'],
                   port=aws_config['connection']['port'],
                   database = aws_config['connection']['db'] )

In [163]:
conn.autocommit = True
cur = conn.cursor()

<h3>Creating  stage data schema</h3>

In [164]:
cur.execute(""" CREATE SCHEMA landingdataS3""")

In [196]:
#JDBC driver for loading spark dataframes in to postgreSQL

In [216]:
driver = r"C:\Users\Abhishikth\OneDrive\Desktop\Data Engineering\ProjectBidnamic\postgresql-42.3.2.jar"
url = aws_config['connection']['url']
user = aws_config['connection']['user']
pasword =aws_config['connection']['password']

In [218]:
table1 = "landingdataS3.campaigns"
table2 = "landingdataS3.adgroups"
table3 = "landingdataS3.search_terms"

campaigns_df1.write.format("jdbc").option("driver",driver).option("url",url).option("dbtable",table1)\
            .option("mode","append").save()
adgroups_df1.write.format("jdbc").option("driver",driver).option("url",url).option("dbtable",table2)\
            .option("mode","append").save()
search_terms_df1.write.format("jdbc").option("driver",driver).option("url",url).option("dbtable",table3)\
            .option("mode","append").save()

## Creating schema for store data
<h4>In this schema level duplicates are removed and data types are assigned acordingly</h4>

In [221]:
cur.execute(""" CREATE SCHEMA storedata""")

In [220]:
table4 = "storedata.campaigns"
table5 = "storedata.adgroups"
table6 = "storedata.search_terms"

campaigns_unique.write.format("jdbc").option("driver",driver).option("url",url).option("dbtable",table4)\
            .option("mode","append").save()
adgroups_unique.write.format("jdbc").option("driver",driver).option("url",url).option("dbtable",table5)\
            .option("mode","append").save()
search_terms_unique.write.format("jdbc").option("driver",driver).option("url",url).option("dbtable",table6)\
            .option("mode","append").save()

## Creating schema for Report Data
<h4>This schema handles the business required and tranformed data table where these table is created to know ROAS(RETURN ON ADD SPENT) aggregated by country and/or by priority and also for enabling fexible querying on table accroding to the business requirement</h4>

In [222]:
cur.execute(""" CREATE SCHEMA reportdata""")

In [227]:
table7 = "reportdata.reporting"
fianl_agg_df.write.format("jdbc").option("driver",driver).option("url",url).option("dbtable",table7)\
            .option("mode","overwrite").save()