Notebook to re-partition the historical data of SOS reports into the final destination: DH-SECURE-SOSREPORTS

In [None]:
!pip install pyspark==2.4.3

In [None]:
import datetime
import os
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
import socket
import random

In [None]:
from source_buckets import source_buckets

In [None]:
os.environ['PYSPARK_PYTHON'] = '/opt/app-root/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/app-root/bin/python3'
# spark.jars.ivy={os.environ['HOME']}
SPARK_CLUSTER = 'spark://172.44.13.154:7077'
S3_ENDPOINT = 'https://s3.upshift.redhat.com/'
SPARK_APP_NAME = f'repartition-multiple-buckets-{datetime.datetime.now().strftime("%Y-%m-%d %H:%M")}'
HOSTNAME = socket.gethostbyname(socket.gethostname())
print('Spark Cluster: {}'.format(SPARK_CLUSTER))
print('S3 endpoint: {}'.format(S3_ENDPOINT))
print('Spark App Name: {}'.format(SPARK_APP_NAME))
print('Hostname: {}'.format(HOSTNAME))

In [None]:
def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
    print('Spark cluster is: {}'.format(spark_cluster))
    sc_conf = (
        pyspark.SparkConf().setMaster(spark_cluster) \
        .set('spark.driver.host', HOSTNAME) \
        .set('spark.driver.port', 42000) \
        .set('spark.driver.bindAddress', '0.0.0.0') \
        .set('spark.driver.blockManager.port', 42100) \
        .set('spark.executor.cores', '3') \
        .set('spark.executor.memory', '4500M') \
        .set('spark.driver.memory', '4G') \
        .set('spark.sql.parquet.enableVectorizedReader', True) \ # Turn this to False if low on Memory or faving OOM issues
        .set('spark.kubernetes.memoryOverheadFactor', '0.20')
    )
    return sc_conf

In [None]:
def setup_spark():
    spark_config = create_spark_config(SPARK_CLUSTER)
    print('spark_config is: {}'.format(spark_config))
    print("Creating Spark Session at cluster: {}".format(SPARK_CLUSTER))
    spark = SparkSession.builder.appName(SPARK_APP_NAME).enableHiveSupport().config(conf=spark_config).getOrCreate()
    spark.sparkContext.setLogLevel('ERROR')
    hadoopConf = spark.sparkContext._jsc.hadoopConfiguration()
    hadoopConf.set('fs.s3a.endpoint', S3_ENDPOINT)
    hadoopConf.set('fs.s3a.path.style.access', 'true')
    hadoopConf.set('fs.s3a.access.key', os.environ.get('AWS_ACCESS_KEY_ID'))
    hadoopConf.set('fs.s3a.secret.key', os.environ.get('AWS_SECRET_ACCESS_KEY'))
    hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    print("hadoop is configured!")
    return spark

Change the path of the source tables in the `if` block, for the destination buckets in the `else` blocks

In [None]:
def form_path_string(bucket_name, repartitioned_data=False):
    table_name = bucket_name[3:-4].lower().replace('-', '_')
    print('table_name: {}'.format(table_name))
    if not repartitioned_data:
        path_string = 's3a://{}/extraction/sos/parquet/{}/'.format(bucket_name, table_name)
    else:
        path_string = 's3a://{}/extraction/sos/parquet/{}/'.format('DH-SECURE-SOSREPORTS', table_name)
    return path_string

In [None]:
def read_dataframe_from_bucket(bucket_name, repartitioned_data=False):
    src_path = form_path_string(bucket_name, repartitioned_data)
    print('src_path: {}'.format(src_path))
    df = spark.read.parquet(f'{src_path}')
    count = 0 #df.count()
    distinct_count = 0 #df.distinct().count()
    num_partitions = df.rdd.getNumPartitions()
    print('count: {}'.format(count))
    print('unique_count: {}'.format(distinct_count))
    print('partitions: {}'.format(num_partitions))
    return (df, count, distinct_count, num_partitions)

The function below takes the source dataframe, re-partitions it in a way that ensures one object per partition and saves it at the destination. Change the mode to 'overwrite' or 'append' depending on the use case.
For incoming new data, use append.

In [None]:
def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('Trying to save repartitioned data at: {}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").mode('append').parquet(dest_path)
    print('Data repartitioning complete with at the following location: ')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions

In [None]:
try:
    spark.stop()
    spark = setup_spark()
except:
    spark = setup_spark()

In [None]:
def save_failed_bucket_to_file(bucket_name, error):
    print(bucket_name)
    print(error)
    file_name = bucket_name + '.txt'
    with open(file_name, 'a') as file_:
        file_.write('\n')
        file_.write('----------{}----------\n'.format(bucket_name))
#         file_.write(error)
#         file_.writeline('\n----------------------------------------------')

In [None]:
for count, bucket_name in enumerate(source_buckets):
    try:
        print('trying to partition {}th bucket: {}'.format(count+1, bucket_name))
        initial_df, initial_count, initial_distinct_count, initial_num_partitions = read_dataframe_from_bucket(bucket_name)
        final_count, final_distinct_count, final_num_partitions = save_repartitioned_dataframe(bucket_name, initial_df)
        print('partition count difference: {}'.format(final_num_partitions-initial_num_partitions))
        if final_distinct_count != initial_distinct_count:
            data_lost = initial_distinct_count - final_distinct_count
            print('Data lost: {}'.format(data_lost))
            with open('{}_data_list.txt'.format(bucket_name), 'w') as file_:
                file_.write('\n')
                file_.write('{}'.format(data_lost))
    except Exception as e:
        print('Bucket failed: {}'.format(bucket_name))
        save_failed_bucket_to_file(bucket_name, e)
        pass