In [0]:
# cores in cluster
#spark.sparkContext.defaultParallelism
sc.defaultParallelism



In [0]:
# partitions of data -> small sets of data 
df = spark.table('dev_waypoint_customer_cluster.person_store')

df.rdd.getNumPartitions()

In [0]:
# marking job for spark ui
spark.conf.set("JobDescription", "Step A-S: Basic initialization")

## Delete partition from Data Lake table

In [0]:
def delete_partitions(table_name, container="bronze"):
  """
    description:
      Runs a recursive deletion of bronze data except for the last day
      
    parameters:    
      table_name (the name of the table whose partition will be deleted)
      container (bronze, silver or gold)
  """
    
  folder_to_keep = "raw_date=" + str(date.today().strftime("%Y-%m-%d")) + "/"                                    # most recent data load

  files = dbutils.fs.ls(f"abfss://{container}@{storage_name}.dfs.core.windows.net/{cluster_path}/{table_name}/") # returns path, partition name and size
  path_with_partitions_list = [partition[0] for partition in files]                                              # returns list of paths 
  for path in path_with_partitions_list:                                                                         
    if folder_to_keep not in path:
      dbutils.fs.rm(path, recurse=True)                                                                          # if the path does not contain the most recent partition, delete it
      print(path + " SUCCESSFULLY DELETED")

In [0]:
spark.conf.get("spark.sql.shuffle.partitions")

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", 8)
spark.conf.get("spark.sql.shuffle.partitions")

In [0]:
%run /helpers/GdprUtils

In [0]:
dbutils.widgets.text('environment', 'dev')
environment = dbutils.widgets.get('environment')

print(f'The current envirement is: {environment}')

In [0]:
person_events_table = f'{environment}_waypoint_customer_cluster.person_events'
person_store_table = f'{environment}_waypoint_customer_cluster.person_store'

In [0]:
person_events = spark.table(person_events_table)
person_store = spark.table(person_store_table)

## obtain the current data by new data

In [0]:
new_silver_data = silver_data.union(bronze_data).subtract(silver_data.intersect(bronze_data))      # temp version. depends on if we need to keep duplicate records with different TimeStamp

### Schema changing  based on other DF

In [0]:
person_store = spark.createDataFrame(person_store.rdd, person_events.schema)
person_store.repartition("CountryCode").write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable(person_store_table)

## sql create table based on location

In [0]:
%sql
CREATE TABLE IF NOT EXISTS raw_dev_waypoint_customer_cluster.customer_epol_events USING DELTA LOCATION "abfss://silver@aisoisidatalakedev.dfs.core.windows.net/waypoint/customer_cluster/customer_epol_events"

### Getting difference between to dataframes' schema

In [0]:
diff = lambda l1,l2: [x for x in l1 if x not in l2]
diff(person_events_std_old.schema.fields, person_events_std.schema.fields)

In [0]:
clb = ColumnListBuilder("customer_person_events", True)
cols = clb.BuildColumnListCL1()

raw_person_events = spark.table(raw_person_events_table) \
                                                    .withColumn( 'enqueuedTime', unix_timestamp(col("EnqueuedTimeUtc"),"M/d/y h:m:s a") ) \
                                                    .withColumnRenamed('paritition', 'partition')

new_person_events_std = parseJSONEvents(raw_person_events, personSchema).selectExpr(cols).where("Id IS NOT NULL")
old_person_events_std = parseJSONEvents(raw_person_events, personSchemaOld).where("Id IS NOT NULL")\
                                                                           .withColumn("CurrentAddress", col('currentAddress').cast(currentAddressSchema))\
                                                                           .withColumn("Name", col('Name').cast(nameSchema))\
                                                                           .selectExpr(cols)

person_events_std = new_person_events_std.union(old_person_events_std)

person_events_std.repartition("CountryCode").write.format('delta').option('overwriteSchema', True).mode('overwrite').saveAsTable(person_store_table)

In [0]:
clb = ColumnListBuilder("customer_company_events", True)
cols = clb.BuildColumnListCL1()

raw_company_events = spark.table(raw_company_events_table) \
                                                  .withColumn( 'enqueuedTime', unix_timestamp(col("EnqueuedTimeUtc"),"M/d/y h:m:s a") ) \
                                                  .withColumnRenamed('paritition', 'partition')
company_events_std = parseJSONEvents(raw_company_events, companySchema).selectExpr(cols)

company_events_std.repartition("CountryCode").write.format('delta').mode('overwrite').saveAsTable(company_store_table)

In [0]:
def convert_raw_to_standardized(dataset_name, raw_table_name, customer):
  """
  dataset_name = 
  raw_table_name = 
  customer = "person"
  """
  dataset_name = f'customer_{customer}_events'
  clb = ColumnListBuilder(dataset_name, True)
  cols = clb.BuildColumnListCL1()
  
  if customer == "person":
    raw_person_events = spark.table(raw_table_name) \
                                        .withColumn( 'enqueuedTime', unix_timestamp(col("EnqueuedTimeUtc"),"M/d/y h:m:s a") ) \
                                        .withColumnRenamed('paritition', 'partition')
    
    new_person_events_std = parseJSONEvents(raw_person_events, personSchema).selectExpr(cols).where("Id IS NOT NULL")
    old_person_events_std = parseJSONEvents(raw_person_events, personSchemaOld).where("Id IS NOT NULL")\
                                                                           .withColumn("CurrentAddress", col('currentAddress').cast(currentAddressSchema))\
                                                                           .withColumn("Name", col('Name').cast(nameSchema))\
                                                                           .selectExpr(cols)
    person_events_std = new_person_events_std.union(old_person_events_std)
    person_events_std.repartition("CountryCode").write.format('delta').option('overwriteSchema', True).mode('overwrite').saveAsTable(person_store_table)
    
  elif customer == "company":
    raw_company_events = spark.table(raw_table_name) \
                                                  .withColumn( 'enqueuedTime', unix_timestamp(col("EnqueuedTimeUtc"),"M/d/y h:m:s a") ) \
                                                  .withColumnRenamed('paritition', 'partition')
    company_events_std = parseJSONEvents(raw_company_events, companySchema).selectExpr(cols)

    company_events_std.repartition("CountryCode").write.format('delta').mode('overwrite').saveAsTable(company_store_table)
    
  else:
    raise ValueError("Please, specify the customer type: person or company.")

In [0]:
convert_raw_to_standardized("a", "b", "petya")

## Get the size of parquet file

In [0]:
def get_table_size(path):
  total_size = 0
  
  for folder in dbutils.fs.ls(path):
    for file in dbutils.fs.ls(path + str(folder[-2])):
      total_size += file[-1]
  return f"The parquet table is {total_size / 1000000} MB"



In [0]:
get_table_size("abfss://bronze@aisoisidatalakedev.dfs.core.windows.net/waypoint/policy/co_Cover/")

In [0]:
path = "dbfs:/dev_waypoint_policy/ex_Exposure"
dbutils.fs.ls("dbfs:/")

In [0]:
dbutils.fs.ls("dbfs:/user/hive/warehouse/")

In [0]:
dbutils.fs.ls("dbfs:/databricks-datasets/learning-spark-v2/us_population.json")

In [0]:
files = dbutils.fs.ls("abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/")
display(files)

path,name,size
abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/accounting/,accounting/,0
abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/agreements/,agreements/,0
abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/claims/,claims/,0
abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/customer/,customer/,0
abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/customer_cluster/,customer_cluster/,0
abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/polaris/,polaris/,0
abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/policy/,policy/,0
abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/rating/,rating/,0


In [0]:
files = dbutils.fs.ls("abfss://bronze@aisoisidatalanding.dfs.core.windows.net/")
display(files)

In [0]:
print(int(5e8))


In [0]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold","500m")

In [0]:
files = dbutils.fs.ls("abfss://bronze@aisoisidatalanding.blob.core.windows.net/")
display(files)

In [0]:
%fs ls 

In [0]:
files = dbutils.fs.ls("waypoint-customer-cluster/customer_epol/")
display(files)

In [0]:
test = spark.table("dev_puzzel.agents").limit(5)
test.display()

agent_id,customer_key,user_name,user_num,full_name,usergroup_id,usergroup_name,email,mobile,dte_updated,chat_role,chat_master_user_id,unblockable_role,unblockable_group,deleted,ExtractDate
16598,10076,A8Y,20968,Anita Elvbakken,25279,Trondheim Outbound 4,,,2018-08-30T17:00:01.170+0000,,,,,False,2018-08-31T01:00:12.053+0000
18333,100761,MN7,95557,Margareth Nordgård,10898,MO Sarpsborg,margareth.nordgard@if.no,,2018-08-30T16:00:05.677+0000,,,,,False,2018-08-31T01:00:12.053+0000
18338,100761,RIJ,95562,Rigmor Jensen,10885,ED Skadesenter Nord,rigmor.jensen@if.no,,2018-08-30T10:00:00.973+0000,0.0,18338.0,,,False,2018-08-31T01:00:12.053+0000
18366,100761,B5L,95594,Berit Linga,10889,ED Skadesenter Sør,berit.linga@if.no,,2018-08-30T15:00:01.790+0000,,,,,False,2018-08-31T01:00:12.053+0000
18370,100761,SNK,95598,Sigrun Kjellevand,10889,ED Skadesenter Sør,sigrun.kjellevand@if.no,,2018-08-30T15:00:01.790+0000,,,,,False,2018-08-31T01:00:12.053+0000


In [0]:
test.select(col("full_name").split(" ")).limit(5).display()

In [0]:
partition_list = [partition[0] for partition in spark.sql("SHOW PARTITIONS RAW_DEV_WAYPOINT_POLICY.CO_COVER").collect()]
for i in partition_list:
  print(i)

In [0]:
dbutils.fs.ls("abfss://bronze@aisoisidatalake.dfs.core.windows.net/waypoint/customer_cluster/customer_epol")

In [0]:
from datetime import date, timedelta

lag_date = date.today() + timedelta(days=-2)
lag_date