### Lets Load basic spark and glue libraries.

In [1]:
import boto3
import botocore
import sys
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.job import Job

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1648483062748_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Overview :
<img src="imgs/overview.png" width=1000 height=1000 />

### Lets initiate a spark context and spark variable to do our big data processing

In [2]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = SparkSession.builder.appName("index_create").getOrCreate()
job = Job(glueContext)
spark.conf.set("spark.sql.sources.partitionOverwriteMode","DYNAMIC")
s3_client = boto3.client('s3')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Define some useful varibles

In [6]:
bucket_name = 'datascience-ml-workshop-prep' #  Pls Edit this, and this will be the bucket name you are keeping
source = 'data_prep_component' # The folder we are reading from 
destination = 'labeling_data_component/data_prep_output' # The folder we will be writing to
run = "incremental" # Default value of run

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Lets get the data downloaded

Lets open the terminal in new tab and download the data we will be using following commands one by one

`` sudo su``

``curl --location --remote-header-name --remote-name https://github.com/ConcurDataScience/ConcurMLWorkshop/blob/main/01_Data_Prep/data_prep_component.zip``

``unzip data_prep_component.zip -d data_prep_component``

In [7]:
f"aws s3 cp data_prep_component/ s3://{bucket_name}/data_prep_component/ --recursive"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'aws s3 cp data_prep_component/ s3://datascience-ml-workshop-prep/data_prep_component/ --recursive'

### Below are some of the utlity functions that we will be making use of 

In [4]:

def load_latest(spark, bucket_name, source, upsert_or_delete):
    """
        Description:The function identifies & Loads the latest object in S3 
        and reads that latest object processes it, and stores it
        Input: bucket_name, source, upserts or deletes
        Output: the loaded dataframe
    """
    prefix = str(source + '/' + upsert_or_delete+ '/')
    path = get_most_recent_s3_object(bucket_name, prefix)
    print("Currently Reading", path)
    df = spark.read.csv(path, header=True, sep='\t')
    df = df.drop('_c0')
    return df


def get_most_recent_s3_object(bucket_name,prefix):
    """
        Description:The function identifies the latest object in S3 
        and passes back the latest objects URI
        Input: bucket_name, prefix
        Output: the latest object S3 URI
    """
    s3 = boto3.client('s3')
    paginator = s3.get_paginator( "list_objects_v2" )
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
    latest = []
    for page in page_iterator:
        if "Contents" in page:
            latest.append(max(page['Contents'], key=lambda x: x['LastModified']))
    total_max = max(latest, key=lambda x: x['LastModified'])
    latest_data_path = total_max['Key']
    latest_data_path = "/".join(latest_data_path.split("/")[:-1])
    return str('s3://'+ bucket_name + '/' + latest_data_path)  


def process_incrememtal_upserts(spark,delta_upserts, processed_data):
    """
        Description:The function parses the incremental upserts by unioning it and 
        then partioning by the unique key and sorted on updated_date
        Input: delta_upserts and processed_Data
        Output: the final updated dataset
    """
    df = processed_data.unionByName(delta_upserts)
    w = Window.partitionBy('dp_unique_key').orderBy(F.desc('updated_date'))
    df = df.withColumn('Rank',F.dense_rank().over(w))
    final_upsert_data = df.filter(df.Rank == 1).drop(df.Rank)
    return final_upsert_data

def process_first_upserts(spark,delta_upserts): 
    return delta_upserts
    

def process_incrememtal_deletes(spark, delta_deletes, processed_data):
    """
        Description:The function parses the incremental deletes by doing a left anti join 
        Input: delta_deletes and processed_Data
        Output: the final updated dataset
    """
    if delta_deletes.count()>0:
        data_post_delete_processing = processed_data.join(delta_deletes, 'dp_unique_key','left_anti')
        return data_post_delete_processing
    else:
        return None
    
def write_files(data, bucket_name, destination, script_type='processed_data'):
    """
        Description:The function writes the data to the s3 location 
        Input: bucket_name, destination and type of script 
        Output: None
    """
    data.write.mode("overwrite").csv("s3://" + bucket_name+ "/" + destination + "/tmp/" + script_type + "_tmp", header=True, sep='\t')
    data =spark.read.csv("s3://"+bucket_name + "/"+ destination +"/tmp/"+ script_type + "_tmp", header=True, sep='\t')
    data.write.mode("overwrite").csv("s3://" + bucket_name + "/" + destination +"/"+ script_type, header=True, sep='\t')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1: Lets load the DP data that we got today 21st of March! and process the upserts

In [11]:
# We will first read our porcessed bucket to see what has been processed so far, 
# on an exception, it will mean that its a first run

try:
    processed_data = spark.read.csv("s3://"+bucket_name+ "/"+destination+"/processed_data/", header=True, sep='\t')
except:
    run="first"
    
# Then we will load the latest upsert data
delta_upserts = load_latest(spark, bucket_name, source,  'upserts')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Currently Reading s3://datascience-ml-workshop-prep/data_prep_component/upserts/03-21-2022

In [12]:
## Now if it was a first run, we will process process_first_upserts()
#or if it is incremental we will process process_incremental_upserts()

if run=="first":
    print("processing_first_run")
    final_data = process_first_upserts(spark, delta_upserts)
else:
    print("processing_incremental_run")
    final_data = process_incrememtal_upserts(spark,delta_upserts,processed_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

processing_first_run

In [13]:
print("Count of 1st batch upserts:", delta_upserts.count())
print("Count After 1st batch upserts is processed:", final_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count of 1st batch upserts: 72004
Count After 1st batch upserts is processed: 72004

In [16]:
# finally lets write the data as processed data
write_files(final_data, bucket_name, destination)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step2: Lets now process the deletes

In [18]:
# Load the deletes
delta_deletes = load_latest(spark, bucket_name, source,  'deletes')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Currently Reading s3://datascience-ml-workshop-prep/data_prep_component/deletes/03-21-2022

In [20]:
# read the processed data and process deletes if any deletes are suppplied.
processed_data = spark.read.csv("s3://"+bucket_name+ "/"+destination+"/processed_data", header=True, sep='\t')
data_post_delete_processing = process_incrememtal_deletes(spark, delta_deletes, processed_data)
if data_post_delete_processing is not None:
        write_files(data_post_delete_processing, bucket_name, destination)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
processed_data = spark.read.csv("s3://"+bucket_name+ "/"+destination+"/processed_data", header=True, sep='\t')
print("Count of 1st batch Delete:", delta_deletes.count())
print("Count After 1st batch upserts & Deletes are processed:", processed_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count of 1st batch Delete: 0
Count After 1st batch upserts & Deletes are processed: 72004

_**These are some of the rows from the Data Processed so far**_:
<img src="imgs/un_updated.png" width=1000 height=1000 />


### Imagine now its tomorrow 22nd March, We get another batch of Upserts and Deletes, Lets try processing that.

#### But before that lets mimick Data Platform Api by running below  command in the terminal.

In the same terminal window that you had opened, try running below command

In [12]:
f"bash run_DP_API.sh {bucket_name}"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'bash run_DP_API.sh datascience-ml-workshop-prep'

#### Now lets do the same again  so that new data gets processed

_**These rows got updated in todays run**_:
<img src="imgs/after_update.png" width=1000 height=1000 />

### Lets process upserts first

In [29]:
run = "incremental"
try:
    processed_data = spark.read.csv("s3://"+bucket_name+ "/"+destination+"/processed_data/", header=True, sep='\t')
except:
    run="first"
delta_upserts = load_latest(spark, bucket_name, source,  'upserts')
if run=="first":
    print("processing_first_run")
    final_data = process_first_upserts(spark, delta_upserts)
else:
    print("processing_incremental_run")
    final_data = process_incrememtal_upserts(spark,delta_upserts,processed_data)
    
print("Count of 2nd batch upserts:", delta_upserts.count())
print("Count After 2nd batch upserts is processed:", final_data.count())
write_files(final_data, bucket_name, destination)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Currently Reading s3://datascience-ml-workshop-prep/data_prep_component/upserts/03-22-2022
processing_incremental_run
Count of 2nd batch upserts: 2690
Count After 2nd batch upserts is processed: 74686

In [28]:
(72004+2690)-74686

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8

##### These were the 8 records that got updated out of 2690 total new updates( 2682 inserts +8 updates)

_**These rows came in delete batch as they were identified to be malicious and containing PII**_:
<img src="imgs/to_be_del.png" width=1000 height=1000 />

### Lets process deletes now.

In [32]:
delta_deletes = load_latest(spark, bucket_name, source,  'deletes')
processed_data = spark.read.csv("s3://"+bucket_name+ "/"+destination+"/processed_data", header=True, sep='\t')
data_post_delete_processing = process_incrememtal_deletes(spark, delta_deletes, processed_data)
if data_post_delete_processing is not None:
    write_files(data_post_delete_processing, bucket_name, destination)
processed_data = spark.read.csv("s3://"+bucket_name+ "/"+destination+"/processed_data", header=True, sep='\t')
print("Count of 2nd batch Delete:", delta_deletes.count())
print("Count After 2nd batch upserts & Deletes are processed:", processed_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Currently Reading s3://datascience-ml-workshop-prep/data_prep_component/deletes/03-22-2022
Count of 2nd batch Delete: 4
Count After 2nd batch upserts & Deletes are processed: 74682

##### These were the 4 deletes that were processed(74686-4 = 74682)

## Enrichment Script

In [41]:
import boto3
import botocore
import sys
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.job import Job

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
bucket_name = 'datascience-ml-workshop-prep' #  Pls Edit this, and this will be the bucket name you are keeping
source = 'data_prep_component' # The folder we are reading from 
destination = 'labeling_data_component/data_prep_output' # The folder we will be writing to
run = "incremental" # Default value of run

In [None]:
def write_files(data, bucket_name, destination, script_type='processed_data'):
    """
        Description:The function writes the data to the s3 location 
        Input: bucket_name, destination and type of script 
        Output: None
    """
    data.write.mode("overwrite").csv("s3://" + bucket_name+ "/" + destination + "/tmp/" + script_type + "_tmp", header=True, sep='\t')
    data =spark.read.csv("s3://"+bucket_name + "/"+ destination +"/tmp/"+ script_type + "_tmp", header=True, sep='\t')
    data.write.mode("overwrite").csv("s3://" + bucket_name + "/" + destination +"/"+ script_type, header=True, sep='\t')

In [34]:
data_part_2 = spark.read.csv('s3://'+bucket_name +'/'+ source + '/id_entity_mapper.csv',header=True )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
data_part_1 = spark.read.csv("s3://"+bucket_name+ "/"+destination+"/processed_data", header=True, sep='\t')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
joined_data = data_part_1.join(data_part_2, ['Id','dp_unique_key'], 'inner').drop('_c0')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
joined_data.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

74680

In [40]:
write_files(joined_data, bucket_name, destination, 'enriched_data')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…