In [1]:
import analytics_util as au
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from config import MOCKAROO_END_POINT, CAMPAIGN_SCHEMA_NAME, DIM_CAMPAIGN_STAGE_PATH, CAMPAIGN_DETAILS_LANDING_ZONE_PATH

In [3]:
spark = au.create_spark_session("MOCKAROO_CAMPAIGN_DATA_GEN")

## Extract Data from Mockaroo End point

In [4]:
data = au.fetch_data_from_mockaroo(MOCKAROO_END_POINT,schema = CAMPAIGN_SCHEMA_NAME,  no_of_records=25)

In [10]:
mockaroo_campaign_schema = StructType([
    StructField('campaign_agenda', StringType() ),
    StructField('campaign_agenda_metrics', ArrayType(MapType(StringType(), IntegerType()) 
                                                    ),
                nullable=False
               ),
    StructField("campaign_category", StringType()),
    StructField("course_campaign_end_date", StringType(), nullable=True),
    StructField("course_campaign_name", StringType(), nullable=True),
    StructField("course_campaign_start_date", StringType(), nullable=False),
    StructField("marketing_info", ArrayType(MapType(StringType(), StringType())
                                            ),
                nullable=False
               )


    
])

In [11]:
campaing_data_raw = spark.createDataFrame(data, schema=mockaroo_campaign_schema)

## Tranformation

In [12]:
campaign_data = campaing_data_raw.withColumn("campaign_agenda_sent", col("campaign_agenda_metrics")[0].getField('sent'))\
.withColumn("campaign_agenda_open", col("campaign_agenda_metrics")[0].getField('open'))\
.withColumn("campaign_agenda_click", col("campaign_agenda_metrics")[0].getField('click'))\
.withColumn("campaign_agenda_unsubscribe", col("campaign_agenda_metrics")[0].getField('unsubscribe'))\
.withColumn("digital_marketing_team", col("marketing_info")[0].getField('team'))\
.withColumn("marketing_product", col("marketing_info")[0].getField("product"))\
.drop('campaign_agenda_metrics')\
.drop('marketing_info')


##### Handling Nulls for important fields

In [14]:
campaign_data =campaign_data.fillna(100, subset='campaign_agenda_sent')

campaign_data = campaign_data.withColumn("course_campaign_start_date", when(col('course_campaign_start_date').isNull(), current_date() )\
                                                    .otherwise(col('course_campaign_start_date'))
                        )\
.withColumn("course_campaign_end_date", when(col('course_campaign_end_date').isNull(), date_add(col('course_campaign_start_date'), 14 ))\
                                        .otherwise(col('course_campaign_end_date'))
           
           )\
.withColumn("campaign_agenda_open", when(col('campaign_agenda_open').isNull(), round(col('campaign_agenda_sent')*0.6, 0) )\
                                    .otherwise(col('campaign_agenda_open'))
           
           )\
.withColumn("campaign_agenda_click", when(col('campaign_agenda_click').isNull(), round(col('campaign_agenda_sent')*0.3, 0) )\
                                    .otherwise(col('campaign_agenda_click'))
           
           )\
.withColumn("campaign_agenda_unsubscribe", when(col('campaign_agenda_unsubscribe').isNull(), round(col('campaign_agenda_sent')*0.02, 0) )\
                                    .otherwise(col('campaign_agenda_unsubscribe'))
           
           )\
.withColumn('marketing_product', regexp_extract(col('course_campaign_name'), r"email_(\w+)",1))



##### Checking the percentage of nulls after handling Nulls

In [15]:
fields_with_null_values = 0 
for c in campaign_data.columns:
    nulls =  campaign_data.where(col(c).isNull()).count()
    if nulls>0:
        nulls_perc = (nulls/campaign_data.count())*100
        print("Field {0} has {1}% nulls".format(c, nulls_perc))

                                                                                

##### Creating campaign id


In [16]:
dim_campaign_schema = StructType([
    StructField("campaign_id", IntegerType(), nullable=True)
    
])
dim_campaign_data = spark.read.parquet(DIM_CAMPAIGN_STAGE_PATH+"*/*",schema = dim_campaign_schema, header = True)

In [17]:
max_campaign_id = eval(dim_campaign_data.agg(max('campaign_id').alias('max_id')).first()[0])
window_spec = Window.partitionBy().orderBy('course_campaign_name')
campaign_data = campaign_data.withColumn('campaign_id',(row_number().over(window_spec))+max_campaign_id )

                                                                                

## Load

In [20]:
col_order = ['campaign_id','course_campaign_name','campaign_agenda','campaign_category','campaign_agenda_sent','campaign_agenda_open','campaign_agenda_click',
'campaign_agenda_unsubscribe','digital_marketing_team','course_campaign_start_date','course_campaign_end_date','marketing_product']


campaign_data_prcessed = campaign_data[col_order]

In [23]:
campaign_data_prcessed.coalesce(1).write.format("csv").mode("append").mode("append").option("path", CAMPAIGN_DETAILS_LANDING_ZONE_PATH).option("header", True).save()

24/04/21 23:49:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [262]:
spark.stop()