## Testing Notebook

In [1]:
path = '/opt/spark-data'

In [2]:
from __future__ import print_function
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, ArrayType
from pyspark.sql.functions import col, split, udf, size, element_at, explode

In [3]:
spark = SparkSession \
    .builder \
    .appName("S3_Analysis") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.cores", "2") \
    .config("spark.num.executors", "6") \
    .config("spark.executor.memory", "2g") \
    .enableHiveSupport() \
    .getOrCreate()

In [18]:
## Open the parquet and have a look
s3_stats = spark.read.parquet(os.path.join(path, "s3logs"))

In [19]:
s3_stats.createOrReplaceTempView("s3_stats")

In [21]:
spark.sql("SELECT distinct requesthour from s3_stats").collect()

[Row(requesthour=15), Row(requesthour=14)]

In [22]:
key_data = spark.sql("SELECT `key` FROM s3_stats")

In [23]:
key_data.head(10)

[Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-05-28/part-00018-210bd56d-93cd-4d71-8c05-d2dc38f19972.c000'),
 Row(key='dmp/edhfs/edh_fs_config_v1_13/hivemanaged/dmp/edhfs/edh_fs_config_v1_13/hivemanaged/interface/part-03081-d77a40ae-33cb-4b74-aff5-11e09c567c25-c000'),
 Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-06-20/part-00144-2c60988b-8140-47bd-b2d3-75e8d9559a92.c000'),
 Row(key='dmp/edhfs/edh_fs_config_v1_13/hivemanaged/dmp/edhfs/edh_fs_config_v1_13/hivemanaged/interface/part-03073-d77a40ae-33cb-4b74-aff5-11e09c567c25-c000'),
 Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-06-20/part-00144-2c60988b-8140-47bd-b2d3-75e8d9559a92.c000'),
 Row(key='dmp/edhfs/edh_fs_config_v1_13/hivemanaged/job_dmp_dv1/part-00000-6f743b8f-4b02-4fc3-a1b1-8032d2b88cfe-c000.snappy.parquet'),
 Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-06-18/part-0014

In [24]:
## Create some the parent child pairs we need to create out structure
def zip_pairs(value):
  lead_list = value.copy()
  lead_list.pop()
  lead_list.insert(0,None)
  result = [item for item in zip(lead_list,value)]
  
  return result

pairZip = udf(zip_pairs, ArrayType(ArrayType(StringType())) )

df2 = key_data.select("key").withColumn("key_split", split(col("key"), "/")) \
        .withColumn("depth", size(col("key_split"))) \
        .withColumn("file", element_at(col("key_split"), -1) ) \
        .withColumn("pairs", pairZip(col("key_split"))) \
        .withColumn("_tmp", explode(col("pairs"))) \
        .withColumn("parent", col("_tmp")[0]) \
        .withColumn("child", col("_tmp")[1]) \
        .drop("_tmp")

In [25]:
df2.select('key').take(10)

[Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-05-28/part-00018-210bd56d-93cd-4d71-8c05-d2dc38f19972.c000'),
 Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-05-28/part-00018-210bd56d-93cd-4d71-8c05-d2dc38f19972.c000'),
 Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-05-28/part-00018-210bd56d-93cd-4d71-8c05-d2dc38f19972.c000'),
 Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-05-28/part-00018-210bd56d-93cd-4d71-8c05-d2dc38f19972.c000'),
 Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-05-28/part-00018-210bd56d-93cd-4d71-8c05-d2dc38f19972.c000'),
 Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-05-28/part-00018-210bd56d-93cd-4d71-8c05-d2dc38f19972.c000'),
 Row(key='dmp/conformed/cim/cim_int_models/int_flat_delay/loaddate_partition%253D2020-05-28/part-00018-210bd56d-

## Exploring Entities

In [13]:
relationship_table = df2.select("parent", "child").distinct()
relationship_table.createOrReplaceTempView("relationship_table")
#relationship_table.write.format("hive").saveAsTable("logging_demo.relationship_table")

## Exploring just the files
storage_files = df2.select("key", "parent", "child", "file").filter( df2.child == df2.file )
storage_files.filter(storage_files.parent.contains("=")).show(40,truncate=False)

## Exploring the folders in the tree
folders = df2.select("key", "parent", "child", "file").filter( df2.child != df2.file )
folders.select("parent", "child").show(10, truncate=False)

distinct_folders = folders.select("child").distinct()

entities = spark.sql("SELECT parent as stage from relationship_table \
                     UNION SELECT child as stage from relationship_table")

In [14]:
entities.head(10)

[Row(stage='part_key%253D2019276'),
 Row(stage='onboard_file_version_av'),
 Row(stage='part_key%253D202076'),
 Row(stage='.hive-staging_hive_2020-05-20_16-03-31_084_6366862164164857064-504'),
 Row(stage='bhp_mcoe_cv_iw69_notification_item_av'),
 Row(stage='int_flat_timeusage'),
 Row(stage='part_key%253D2019177'),
 Row(stage='attempt_20200711074124_0027_m_000070_0'),
 Row(stage='part_key%253D19681'),
 Row(stage='attempt_20200705090358_0338_m_000075_0')]

## End

In [18]:
spark.stop()