# Manual Live BusObservatory Data Lake Compaction

In [None]:
system_id = "TEST_tfnsw_bus"
# system_id = "TEST_njtransit_bus"

In [None]:
import os
os.environ["AWS_PROFILE"] = "remote_notebook_user"

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

block_size = str(1024 * 1024 * 128)

conf = (SparkConf()
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.2')
        .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.profile.ProfileCredentialsProvider')
        .set("spark.hadoop.dfs.block.size", block_size)
        .set('spark.driver.port', '7077')
        .set('spark.driver.cores', '2')
        .set('spark.driver.memory', '4g')
        .set('spark.executor.memory', '2g')
       )

spark = (SparkSession
         .builder
         .appName('BusObservatory-manual-compaction')
         .config(conf=conf)
         .getOrCreate()
        )
spark.sparkContext.uiWebUrl

In [None]:
#TODO update for main data lake
# bucket = “busobservatory”
bucket = "busobservatory-migration"
in_path = f"s3a://{bucket}/{system_id}/*"
# out_path = f"s3a://{bucket}/{system_id}-compacted/"
out_path = f"s3a://{bucket}/{system_id}/compacted/"

In [None]:
import boto3
import math

def get_folder_size(bucket, path):
    s3 = boto3.resource('s3')
    my_bucket = s3.Bucket(bucket)
    total_size = 0
    count_obj = 0
    for obj in my_bucket.objects.filter(Prefix=path):
        total_size = total_size + obj.size
        count_obj = count_obj + 1

    return total_size, count_obj

def get_repartition_factor(dir_size):
    # block_size = sc._jsc.hadoopConfiguration().get("dfs.blocksize")
    block_size = int(spark._jsc.hadoopConfiguration().get("dfs.blocksize"))
    return math.ceil(dir_size/block_size)

In [None]:
dir_size, count_obj = get_folder_size(bucket, system_id)
num_files = get_repartition_factor(dir_size)
block_size = int(spark._jsc.hadoopConfiguration().get("dfs.blocksize"))
print(f'Compacting {dir_size/1024/1024:.1f} Mb from {count_obj} files into {num_files} files of {block_size/1024/1024:.1f} Mb each.')

In [None]:
input_df=spark.read.csv("<hdfs_directory>").\
withColumn("filename",input_file_name())

In [None]:
# if in_path and out_path are the same, this overwrites the data before it can read it all

df = (spark.read
      .format("parquet")
      .load(in_path)
      .repartition(num_files)
      .write
      .option("dataChange", "false")
      .format("parquet")
      .mode("overwrite")
      .save(out_path))

In [None]:
out_dir_size, out_count_obj = get_folder_size(bucket, f'{system_id}-compacted')
in_dir_size, in_count_obj = get_folder_size(bucket, system_id)
num_files = get_repartition_factor(in_dir_size)
block_size = int(spark._jsc.hadoopConfiguration().get("dfs.blocksize"))
print(f'Compaction complete: {out_dir_size/1024/1024:.1f} Mb in {out_count_obj-1} files is near optimal ({num_files} files of {block_size/1024/1024:.1f} Mb each.)')

In [None]:
# TODO delete files read in?
