In [1]:
pass

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1606858440744_0005,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [5]:
import logging

import os
import sys
import json
from pyspark.sql import SparkSession

import boto3
from datetime import datetime
from urllib.parse import urlparse

# https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns
from pyspark.sql.functions import split
from pyspark.sql.functions import collect_set, collect_list # collect_set eliminates duplicates, collect_list does not


LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)

# TODO: log file does not get saved due to permission issues on EMR
# formatter_file = logging.Formatter('%(filename)s:%(name)s:%(asctime)s:%(levelname)s:%(message)s')
# file_handler = logging.FileHandler('ai_audience.log')
# file_handler.setLevel(logging.INFO)
# file_handler.setFormatter(formatter_file)

# formatter_stream = logging.Formatter('%(levelname)s:%(message)s')
# stream_handler = logging.StreamHandler()
# stream_handler.setLevel(logging.INFO)
# stream_handler.setFormatter(formatter_stream)

# LOG.addHandler(file_handler)
# LOG.addHandler(stream_handler)

PATH_ROOT = 's3://zeta-dc-ml/ai-audiences/'
PATH_COOKIE_COOKIES_RELATIONS = 's3://zeta-dcp-prod-private-tables/datacloud_cookie_cookies_relations_links_export/'


class CookieMapUpdate:

    def __init__(self,
                 path_root=PATH_ROOT,
                 path_cookie_cookies_relations=PATH_COOKIE_COOKIES_RELATIONS,
                 ):

        self.path_root = path_root
        self.path_cookie_relations_dest_root = self.path_root + 'datacloud_cookie_cookie_relations/'
        self.path_cookie_cookie_staging = self.path_cookie_relations_dest_root + 'cookie_cookie_staging/'
        self.path_disqus_to_sizmek = self.path_cookie_relations_dest_root + 'disqus_to_sizmek/'
        self.path_zync_to_sizmek = self.path_cookie_relations_dest_root + 'zync_to_sizmek/'

        self.path_cookie_cookies_relations = path_cookie_cookies_relations

        self.list_of_all_dates_that_can_be_processed = None
        self.date_processed_latest = None
        self.date_processed_latest_counter = 0

    @staticmethod
    def get_list_of_all_dates_present(bucket, prefix):

        s3 = boto3.client("s3")
        all_objects = s3.list_objects(Bucket=bucket,
                                      Prefix=prefix,
                                      Delimiter='/')

        list_date_buckets = []
        for o in all_objects.get('CommonPrefixes'):
            list_date_buckets.append(o.get('Prefix'))

        LOG.info(f'number of buckets in s3://{bucket}/{prefix} are {len(list_date_buckets)}')

        list_dates_present = [el.split('/')[-2].split('=')[-1] for el in list_date_buckets]

        list_dates_that_can_be_processed = []

        count = 0
        for el in list_dates_present:
            try:
                list_dates_that_can_be_processed.append(datetime.strptime(el, '%Y-%m-%d'))
            except:
                count = count + 1
                LOG.warning(f'date partition that cannot be processed: {el}')

        LOG.info(f'number of buckets that cannot be processed in s3://{bucket}/{prefix} are {count}')
        LOG.info(f'number of buckets that can be processed in s3://{bucket}/{prefix} are {len(list_dates_that_can_be_processed)}')

        list_dates_that_can_be_processed.sort()

        return list_dates_that_can_be_processed

    def get_list_dates_that_can_be_processed(self):

        parsed_url = urlparse(self.path_cookie_cookies_relations, allow_fragments=False)

        bucket = parsed_url.netloc
        rest_of_path = parsed_url.path.lstrip('/')

        self.list_of_all_dates_that_can_be_processed = self.get_list_of_all_dates_present(bucket=bucket,
                                                                                          prefix=rest_of_path)

    def get_date_processed_latest(self):

        self.date_processed_latest = self.list_of_all_dates_that_can_be_processed[-1 - self.date_processed_latest_counter].strftime('%Y-%m-%d')
        LOG.info(f'Latest Date to process is {self.date_processed_latest}')

    def prep_cookie_cookie_map_table(self):

        # Read from the latest, if the latest fails, read from the second to latest, else keep going back upto 10? days
        num_read_attempts = 10
        self.get_list_dates_that_can_be_processed()
        for i in range(num_read_attempts):
            try:
                self.get_date_processed_latest()
                df = (spark.read.option("inferSchema", True)
                      .orc(os.path.join(self.path_cookie_cookies_relations,
                                        f'dt={self.date_processed_latest}',
                                        '*'
                                        )
                           )
                      )
                LOG.info(f'Source read successful for date: {self.date_processed_latest}')
                break
            except:
                # TODO: Alert AI-ML team as this could mean a change in table/schema
                LOG.warning(f'Source read failed for date: {self.date_processed_latest}')
                self.date_processed_latest_counter = self.date_processed_latest_counter + 1

        if self.date_processed_latest_counter == 10:
            LOG.error('Cookie - Cookie Mapping table has issues')
            return None

        split_col = split(df['cookie'], '::')
        df = df.withColumn('source', split_col.getItem(0))
        df = df.withColumn('cookie_source', split_col.getItem(1))

        split_col = split(df['relation'], '::')
        df = df.withColumn('destination', split_col.getItem(0))
        df = df.withColumn('cookie_destination', split_col.getItem(1))

        df = df.select(df['source'], df['cookie_source'], df['destination'], df['cookie_destination'], df['last_updated'])

        try:
            df.write.parquet(os.path.join(self.path_cookie_cookie_staging,
                                          f'dt={self.date_processed_latest}'
                                          )
                             )
            LOG.info(f'processed cookie_cookie_mappings with latest date processed: {self.date_processed_latest}')
        except:
            LOG.warning(f'data of cookie_cookie_mappings for date {self.date_processed_latest} might already exist')

    def prep_disqus_sizmek_map_table(self):

        bucket = self.path_cookie_cookie_staging.split('/')[2]
        rest_of_path = os.path.join(*(self.path_cookie_cookie_staging.split('/')[3:]))

        temp_list_of_dates = self.get_list_of_all_dates_present(bucket=bucket,
                                                                prefix=rest_of_path)
        date_processed_latest_ = temp_list_of_dates[-1].strftime('%Y-%m-%d')

        df = (spark.read.option("inferSchema", True)
              .parquet(os.path.join(self.path_cookie_cookie_staging,
                                    f'dt={date_processed_latest_}',
                                    '*parquet')
                       )
              )

        df_disqus_to_sizmek = df.filter((df.source == 'disqus') & (df.destination == 'sizmek'))

        df_disqus_to_sizmek = df_disqus_to_sizmek.selectExpr('cookie_source as cookie_disqus',
                                                             'cookie_destination as cookie_dest')
        df_disqus_to_sizmek = df_disqus_to_sizmek.drop_duplicates()

        df_disqus_to_sizmek = (df_disqus_to_sizmek
                               .groupBy('cookie_disqus')
                               .agg(collect_set('cookie_dest').alias('cookie_sizmek'))
                               )
        try:
            df_disqus_to_sizmek.write.parquet(os.path.join(self.path_disqus_to_sizmek,
                                                           f'dt={date_processed_latest_}'
                                                           )
                                              )
            LOG.info(f'processed disqus_to_sizmek_cookie_mappings with latest date processed: {date_processed_latest_}')
        except:
            LOG.warning(f'data of disqus_to_sizmek for date {date_processed_latest_} might already exist')

    def prep_zync_sizmek_map_table(self):

        bucket = self.path_cookie_cookie_staging.split('/')[2]
        rest_of_path = os.path.join(*(self.path_cookie_cookie_staging.split('/')[3:]))

        temp_list_of_dates = self.get_list_of_all_dates_present(bucket=bucket,
                                                                prefix=rest_of_path)
        date_processed_latest_ = temp_list_of_dates[-1].strftime('%Y-%m-%d')

        df = (spark.read.option("inferSchema", True)
              .parquet(os.path.join(self.path_cookie_cookie_staging,
                                    f'dt={date_processed_latest_}',
                                    '*.parquet')
                       )
              )

        df_zync_to_sizmek = df.filter((df.source == 'zync') & (df.destination == 'sizmek'))

        df_zync_to_sizmek = df_zync_to_sizmek.selectExpr('cookie_source as cookie_zync',
                                                         'cookie_destination as cookie_dest')
        df_zync_to_sizmek = df_zync_to_sizmek.drop_duplicates()

        df_zync_to_sizmek = (df_zync_to_sizmek
                             .groupBy('cookie_zync')
                             .agg(collect_set('cookie_dest').alias('cookie_sizmek'))
                             )

        try:
            df_zync_to_sizmek.write.parquet(os.path.join(self.path_zync_to_sizmek,
                                                         f'dt={date_processed_latest_}'
                                                         )
                                            )
            LOG.info(f'processed zync_to_sizmek_cookie_mappings with latest date processed: {date_processed_latest_}')
        except:
            LOG.warning(f'data of zync_to_sizmek for date {date_processed_latest_} might already exist')

    def run_step(self):
        try:
            self.prep_cookie_cookie_map_table()
        except:
            # TODO: Alert AI-ML team about a potential change in table/schema
            LOG.warning('difficulty reading cookie cookie mapping table')
        self.prep_disqus_sizmek_map_table()
        self.prep_zync_sizmek_map_table()
        LOG.info('finished step CookieMapUpdate')

In [6]:
path_root = 's3://zeta-dc-ml/ai-audiences/'

config_step_z12 = {
        'path_root': path_root,
        'path_cookie_cookies_relations': 's3://zeta-dcp-prod-private-tables/datacloud_cookie_cookies_relations_links_export/',
}


step_cookie_map_update = CookieMapUpdate(path_root=config_step_z12['path_root'],
                                         path_cookie_cookies_relations=config_step_z12['path_cookie_cookies_relations'],
                                        )


In [7]:
step_cookie_map_update.prep_disqus_sizmek_map_table()

In [8]:
step_cookie_map_update.prep_zync_sizmek_map_table()

In [3]:
path_sizmek = 's3://zeta-dc-ml/ai-audiences/modeling/dt=2020-11-17/z21_user_item_interactions_sizmek/'
path_zync = 's3://zeta-dc-ml/ai-audiences/modeling/dt=2020-11-17/z22_user_item_interactions_zync/'
path_disqus = 's3://zeta-dc-ml/ai-audiences/modeling/dt=2020-11-17/z23_user_item_interactions_disqus/'

In [4]:
df = spark.read.option("inferSchema", True).parquet(path_sizmek + '*.parquet')

In [5]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- zcodes: string (nullable = true)
 |-- count: long (nullable = true)

In [6]:
df2 = spark.read.option("inferSchema", True).parquet(path_zync + '*.parquet')
df2.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- zcodes: string (nullable = true)
 |-- count: long (nullable = true)

In [7]:
df3 = spark.read.option("inferSchema", True).parquet(path_disqus + '*.parquet')
df3.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- zcodes: string (nullable = true)
 |-- count: long (nullable = true)