In [0]:
from pyspark.sql import functions as F
import re
import pprint as pp 
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from delta.tables import DeltaTable
from pyspark.sql.types import ArrayType, StructType, StringType

In [0]:
def set_base_volume_paths():
    # VOLUME_PATHS = json.loads(dbutils.notebook.run("/Workspace/Users/rishak1997@gmail.com/google-fit-ETL-and-analytics/land_s3_data_to_volume_and_extract", timeout_seconds= 120))
    VOLUME_PATHS = json.loads(dbutils.jobs.taskValues.get(taskKey= "land_s3_data_to_volume_and_extract", key = "output_paths_of_data"))
    return VOLUME_PATHS

In [0]:
VOLUME_PATHS = set_base_volume_paths()

In [0]:
base_config_dict = {
    "activities": {
        "file_type": "xml",
        "table_name": "google_fit.bronze.activities",
        "vol_paths" : []
    },
    "all sessions": {
        "file_type": "json",
        "table_name": "google_fit.bronze.all_sessions",
        "vol_paths" : []
    },
    "daily activity metrics": {
        "file_type": "csv",
        "table_name": "google_fit.bronze.daily_activity_metrics",
        "vol_paths" : []
    },
}

In [0]:
def build_bronze_ingestion_config_dict():
    """
    builds and returns the bronze ingestion dict based on the basic config dict with the corresponding volume paths
    """
    bronze_ingestion_config = {k:
         v for k, v in base_config_dict.items()}
    for path in VOLUME_PATHS:     
        for key, value in base_config_dict.items():
            if(key in path.lower()):
                bronze_ingestion_config[key]['vol_paths'].append(path)
    return bronze_ingestion_config

In [0]:
bronze_ingestion_config_dict = build_bronze_ingestion_config_dict()

In [0]:
pp.pprint(bronze_ingestion_config_dict)

Functions

In [0]:
def flatten_df(df):
    counter = 0
    processed_cols = []
    for field_info in df.schema.fields:
        if(isinstance(field_info.dataType, ArrayType)):
            df = df.withColumn(field_info.name + "_exploded", F.explode_outer(F.col(field_info.name)))
            processed_cols.append(field_info.name)
        elif(isinstance(field_info.dataType, StructType)):
            for finfo in field_info.dataType.fields:
                df = df.withColumn(f"{field_info.name}_{finfo.name}", F.col(f"{field_info.name}.{finfo.name}"))
                processed_cols.append(field_info.name)
        else:
            counter += 1
            continue
    if(counter == len(df.columns)):
        return df
    else:
        df = df.drop(*processed_cols)
        return flatten_df(df)
            

In [0]:
def check_for_composite_keys(input_df):
    final_list = []

    for i in range(0, len(input_df.columns)):
        for j in range(0, len(input_df.columns)):
            if(len(input_df.columns[i:j])):
                col_grp_list =  input_df.columns[i:j]
                final_list.append(col_grp_list)
            else:
                pass

    final_list.sort(key=len)

    for cols in final_list:
        df = input_df.groupBy(*cols).agg(F.count(F.lit(1)).alias('grouped_count')).select('grouped_count').agg(F.min('grouped_count'), F.max('grouped_count'))    
        if(df.collect()[0][0] == 1 and df.collect()[0][1] == 1):
            print("Found unique combination of columns : ", cols)
            break


In [0]:
def all_sessions_ingestion(bronze_ingestion_config_dict):
  """
   Composite key for upsert operation -> ['segment_exploded_endTime', 'segment_exploded_startTime', 'aggregate_exploded_metricName', 'fitnessActivity', 'entity']
  """
  for k, v in bronze_ingestion_config_dict.items():
        if(re.search(r"all\s*sessions", k.lower())):
            load_path = bronze_ingestion_config_dict[k]['vol_paths']
            file_format = bronze_ingestion_config_dict[k]["file_type"]
            table_name = bronze_ingestion_config_dict[k]["table_name"]
            print(f"load_path:  {load_path}")
            print(f"file_format:  {file_format}")
            print(f"table_name:  {table_name}" )
            print()    
            
  df = (
    spark.read.format(file_format)
    .option("multiline", "True")
    .option("inferSchema", True)
    .load(load_path)
    .withColumn("aggregate_exploded", F.explode("aggregate"))
    .drop("aggregate")
    .withColumn("aggregate_exploded_floatValue", F.col("aggregate_exploded.floatValue"))
    .withColumn("aggregate_exploded_intValue", F.col("aggregate_exploded.intValue"))
    .withColumn("aggregate_exploded_metricName", F.col("aggregate_exploded.metricName"))
    .drop("aggregate_exploded")
    .withColumn("segment_exploded", F.explode("segment"))
    .drop("segment")
    .withColumn("segment_exploded_endTime", F.col("segment_exploded.endTime"))
    .withColumn("segment_exploded_fitnessActivity", F.col("segment_exploded.fitnessActivity"))
    .withColumn("segment_exploded_startTime", F.col("segment_exploded.startTime"))
    .drop('segment_exploded')
    .withColumn("file_path", F.col("_metadata.file_path"))
    .withColumn("etl_timestamp", F.current_timestamp())
    .withColumn("entity", F.regexp_extract(F.col('file_path'), r"[/]takeout-.*_(.*)[\/]extracted", 1))
    .withColumn("file_path", F.replace(F.replace(F.col('file_path'), F.lit("%20"), F.lit(" ")), F.lit("dbfs:"), F.lit("")))
)
  # return df
  try:
    if spark.catalog.tableExists(table_name):
      print(f"Starting upsert operation on the table :{table_name}")
      (
      DeltaTable.forName(spark, table_name).alias("t")
        .merge(
          df.alias("s"),
          """
          t.entity = s.entity 
          AND 
          t.segment_exploded_endTime = s.segment_exploded_endTime 
          AND
          t.segment_exploded_startTime = s.segment_exploded_startTime 
          AND
          t.aggregate_exploded_metricName = s.aggregate_exploded_metricName 
          AND
          t.fitnessActivity = s.fitnessActivity 
          """
      ).whenNotMatchedInsertAll()
      # .whenMatchedUpdateAll()
      .execute()
      )
      print(f"Upsert operation was successful on the table :{table_name}")
    else:
      print(f"Starting overwrite operation on the table :{table_name}")
      df.write.mode("overwrite").saveAsTable(table_name)
      print(f"Overwrite operation was successful on the table :{table_name}")
  except Exception as e:
     print(f"Some error occured while ingesting the data for the table {table_name}. Error details: {e}")


In [0]:
def daily_activity_metrics_ingestion(bronze_ingestion_config_dict):
  """
  Composite key for upsert operation -> ['Date', 'entity']
  """
  for k, v in bronze_ingestion_config_dict.items():
        if(re.search(r"daily\s*activity\s*metrics", k.lower())):
            load_path = bronze_ingestion_config_dict[k]['vol_paths']
            file_format = bronze_ingestion_config_dict[k]["file_type"]
            table_name = bronze_ingestion_config_dict[k]["table_name"]
            print(f"load_path:  {load_path}")
            print(f"file_format:  {file_format}")
            print(f"table_name:  {table_name}")
            print()
  df = (
      spark.read.format(file_format)
      .option("header", True)
      .option("inferSchema", True)
      .option("mergeSchema", "true")
      .load(load_path)
      .withColumn("file_path", F.col("_metadata.file_path"))
      .withColumn("etl_timestamp", F.current_timestamp())
      .withColumn("entity", F.regexp_extract(F.col('file_path'), r"[/]takeout-.*_(.*)[\/]extracted", 1))
      .withColumn("file_path", F.replace(F.replace(F.col('file_path'), F.lit("%20"), F.lit(" ")), F.lit("dbfs:"), F.lit("")))
  )
  df = df.withColumn('date_type', F.when(F.col('Date').rlike("^\\d{4}-\\d{2}-\\d{2}$"), F.lit('Date_only')).otherwise(F.lit('other')))
  df = df.filter(F.col('date_type') != 'other')
  df = df.drop('date_type')
  clean_cols = [re.sub(r"\W+", "_", c) for c in df.columns]
  df = df.toDF(*clean_cols)
  # return df
  try:
    if spark.catalog.tableExists(table_name):
      print(f"Starting upsert operation on the table :{table_name}")
      (
      DeltaTable.forName(spark, table_name).alias("t")
        .merge(
          df.alias("s"),
          "t.entity = s.entity AND t.Date = s.Date"
      ).whenNotMatchedInsertAll()
      # .whenMatchedUpdateAll()
      .execute()
      )
      print(f"Upsert operation was successful on the table :{table_name}")
    else:
      print(f"Starting overwrite operation on the table :{table_name}")
      df.write.mode("overwrite").saveAsTable(table_name)
      print(f"Overwrite operation was successful on the table :{table_name}")
  except Exception as e:
     print(f"Some error occured while ingesting the data for the table {table_name}. Error details: {e}")

In [0]:
def activities_ingestion(bronze_ingestion_config_dict): 
    """
    Composite key for upsert operation -> 
    ['Activity_exploded_Id',
    'Activity_exploded__Sport',
    'Activity_exploded_Lap_Calories',
    'Activity_exploded_Lap_DistanceMeters',
    'Activity_exploded_Lap_Intensity',
    'Activity_exploded_Lap_TotalTimeSeconds',
    'Activity_exploded_Lap_TriggerMethod',
    'Activity_exploded_Lap__StartTime',
    'Activity_exploded_Lap_Track_Trackpoint_exploded_DistanceMeters',
    'Activity_exploded_Lap_Track_Trackpoint_exploded_Time',
    'entity' ]
    """
    for k, v in bronze_ingestion_config_dict.items():
        if(re.search(r"activities", k.lower())):
            load_path = bronze_ingestion_config_dict[k]['vol_paths']
            file_format = bronze_ingestion_config_dict[k]["file_type"]
            table_name = bronze_ingestion_config_dict[k]["table_name"]
            print(f"load_path:  {load_path}")
            print(f"file_format:  {file_format}")
            print(f"table_name:  {table_name}")
            print()
    df = (
        spark.read.format(file_format)
        .option("rootTag", "Activity")
        .option("rowTag", "Activities")
        .option("inferSchema", True)
        .option("treatEmptyValuesAsNulls", "true") 
        .load(load_path)
        
    )
    df = flatten_df(df)
    df = (
        df
        .withColumn("file_path", F.col("_metadata.file_path"))
        .withColumn("etl_timestamp", F.current_timestamp())
        .withColumn("entity", F.regexp_extract(F.col('file_path'), r"[/]takeout-.*_(.*)[\/]extracted", 1))
        .withColumn("file_path", F.replace(F.replace(F.col('file_path'), F.lit("%20"), F.lit(" ")), F.lit("dbfs:"), F.lit("")))
    )
    try:
        if spark.catalog.tableExists(table_name):
            print(f"Starting upsert operation on the table :{table_name}")
            (
            DeltaTable.forName(spark, table_name).alias("t")
                .merge(
                df.alias("s"),
                """
                t.entity = s.entity 
                AND
                t.Activity_exploded_Id = s.Activity_exploded_Id 
                AND 
                t.Activity_exploded__Sport = s.Activity_exploded__Sport 
                AND 
                t.Activity_exploded_Lap_Calories = s.Activity_exploded_Lap_Calories 
                AND  
                t.Activity_exploded_Lap_DistanceMeters = s.Activity_exploded_Lap_DistanceMeters 
                AND
                t.Activity_exploded_Lap_Intensity = s.Activity_exploded_Lap_Intensity 
                AND
                t.Activity_exploded_Lap_TotalTimeSeconds = s.Activity_exploded_Lap_TotalTimeSeconds 
                AND
                t.Activity_exploded_Lap_TriggerMethod = s.Activity_exploded_Lap_TriggerMethod 
                AND
                t.Activity_exploded_Lap__StartTime = s.Activity_exploded_Lap__StartTime 
                AND
                t.Activity_exploded_Lap_Track_Trackpoint_exploded_DistanceMeters = s.Activity_exploded_Lap_Track_Trackpoint_exploded_DistanceMeters 
                AND
                t.Activity_exploded_Lap_Track_Trackpoint_exploded_Time = s.Activity_exploded_Lap_Track_Trackpoint_exploded_Time 
                """
            ).whenNotMatchedInsertAll()
            # .whenMatchedUpdateAll()
            .execute()
            )
            print(f"Upsert operation was successful on the table :{table_name}")
        else:
            print(f"Starting overwrite operation on the table :{table_name}")
            df.write.mode("overwrite").saveAsTable(table_name)
            print(f"Overwrite operation was successful on the table :{table_name}")
    except Exception as e:
        print(f"Some error occured while ingesting the data for the table {table_name}. Error details: {e}")

In [0]:
def bronze_ingestion(bronze_ingestion_config_dict):
    futures = []
    with ThreadPoolExecutor(len(bronze_ingestion_config_dict)) as e:
       all_sessions_ingestion_future  = e.submit(all_sessions_ingestion, bronze_ingestion_config_dict)
       futures.append(all_sessions_ingestion_future)
       daily_activity_metrics_ingestion_future  = e.submit(daily_activity_metrics_ingestion, bronze_ingestion_config_dict)
       futures.append(daily_activity_metrics_ingestion_future)
       activities_ingestion_future  = e.submit(activities_ingestion, bronze_ingestion_config_dict)
       futures.append(activities_ingestion_future)
    for f in as_completed(futures):
        f.result()
    

In [0]:
bronze_ingestion(bronze_ingestion_config_dict)

In [0]:
dbutils.jobs.taskValues.set("bronze_max_ingestion_timestamps_dict", json.dumps(
    {v['table_name'] : spark.table(v['table_name']).agg(F.max(F.col('etl_timestamp')).cast(StringType())).collect()[0][0] for _, v in bronze_ingestion_config_dict.items()}
))

In [0]:
dbutils.notebook.exit(json.dumps(
    {v['table_name'] : spark.table(v['table_name']).agg(F.max(F.col('etl_timestamp')).cast(StringType())).collect()[0][0] for _, v in bronze_ingestion_config_dict.items()}
))

In [0]:
# Notebook exited: {"google_fit.bronze.activities": "2025-09-12 11:03:56.009982", "google_fit.bronze.all_sessions": "2025-09-12 11:03:59.785936", "google_fit.bronze.daily_activity_metrics": "2025-09-12 11:02:53.288683"} 

In [0]:
# df = spark.sql(" show tables in google_fit.bronze").selectExpr("concat_ws('.', database, tableName) as table")

# tables = [t['table'] for t in df.collect()]

# for t in tables:
#     spark.sql(f"drop table google_fit.{t}")

In [0]:
# def all_data_ingestion(bronze_ingestion_dict):
#     for k, v in bronze_ingestion_dict.items():
#         if(re.search(r"all\s*data", k.lower())):
#             load_path = k
#             file_format = bronze_ingestion_dict[k]["file_type"]
#             table_name = bronze_ingestion_dict[k]["table_name"]
#             print(f"load_path:  {load_path}")
#             print(f"file_format:  {file_format}")
#             print(f"table_name:  {table_name}")

#     df = (
#     spark.read.format(file_format)
#     .option("inferSchema", "true")
#     .load("/Volumes/google_fit/bronze/landing_zone/takeout-20250906T131146Z-1-001_20250907173921/extracted/Takeout/Fit/All data/derived_com.google.calories.bmr_com.google.and.json")
#     .withColumn("fitValue_exploded", F.explode("fitValue"))
#     .drop("fitValue")
#     .withColumn("fitValue_exploded_value", F.col("fitValue_exploded.value"))
#     .drop("fitValue_exploded")
#     .withColumn("fitValue_exploded_value_intVal", F.col("fitValue_exploded_value.intVal"))
#     .drop("fitValue_exploded_value")
# )
    
#     return df

In [0]:
# display(all_data_ingestion(bronze_ingestion_dict))

In [0]:
# nested_df = (
#      spark.read.format("xml")
#         .option("rootTag", "Activity")
#         .option("rowTag", "Activities")
#         .option("inferSchema", True)
#         .option("treatEmptyValuesAsNulls", "true") 
#         .load("/Volumes/google_fit/bronze/landing_zone/takeout-20250906T131146Z-1-001_20250907173921_rishabh/extracted/Takeout/Fit/Activities/")
# )

In [0]:
# for field in nested_df.schema.fields:
#         print(field.name)
#         print(field.dataType)

In [0]:
# check = flatten_df(nested_df)

In [0]:
# display(check
#         .groupBy(*check.columns)
#         .count()
#         .orderBy(F.desc("count"))
# )

In [0]:
# check.columns

In [0]:
# check_for_composite_keys(check)

In [0]:
# check.dropDuplicates().count()

In [0]:
# check.count() == check.dropDuplicates().count()

In [0]:
# def activities_ingestion(bronze_ingestion_config_dict): 
#     for k, v in bronze_ingestion_config_dict.items():
#         if(re.search(r"activities", k.lower())):
#             load_path = bronze_ingestion_config_dict[k]['vol_paths']
#             file_format = bronze_ingestion_config_dict[k]["file_type"]
#             table_name = bronze_ingestion_config_dict[k]["table_name"]
#             print(f"load_path:  {load_path}")
#             print(f"file_format:  {file_format}")
#             print(f"table_name:  {table_name}")
#             print()
#     df = (
#         spark.read.format(file_format)
#         .option("rootTag", "Activity")
#         .option("rowTag", "Activities")
#         .option("inferSchema", True)
#         .option("treatEmptyValuesAsNulls", "true") 
#         .load(*load_path)
#         .withColumn("Activity_Id", F.col("Activity.Id"))
#         .withColumn("Activity_Lap", F.col("Activity.Lap"))
#         .withColumn("Activity_Sport", F.col("Activity._Sport"))
#         .drop('Activity')
#         .withColumn("Activity_Lap_Calories", F.col('Activity_Lap.Calories'))
#         .withColumn("Activity_Lap_DistanceMeters", F.col('Activity_Lap.DistanceMeters'))
#         .withColumn("Activity_Lap_Intensity", F.col('Activity_Lap.Intensity'))
#         .withColumn("Activity_Lap_TotalTimeSeconds", F.col('Activity_Lap.TotalTimeSeconds'))
#         .withColumn("Activity_Lap_Track", F.col('Activity_Lap.Track'))
#         .withColumn("Activity_Lap_TriggerMethod", F.col('Activity_Lap.TriggerMethod'))
#         .withColumn("Activity_Lap_StartTime", F.col('Activity_Lap._StartTime'))
#         .drop("Activity_Lap")
#         .withColumn("Activity_Lap_Track_Trackpoint", F.col('Activity_Lap_Track.Trackpoint'))
#         .drop("Activity_Lap_Track")
#         .withColumn("Activity_Lap_Track_Trackpoint_exploded", F.explode('Activity_Lap_Track_Trackpoint'))
#         .drop("Activity_Lap_Track_Trackpoint")
#         .withColumn("Activity_Lap_Track_Trackpoint_exploded_DistanceMeters", F.col('Activity_Lap_Track_Trackpoint_exploded.DistanceMeters'))
#         .withColumn("Activity_Lap_Track_Trackpoint_exploded_Time", F.col('Activity_Lap_Track_Trackpoint_exploded.Time'))
#         .drop("Activity_Lap_Track_Trackpoint_exploded")
#     )
#     for field in df.schema.fields:
#         if isinstance(field.dataType, ArrayType):
#             df = df.withColumn(f"{field.name}_exploded", F.explode(F.col(field.name)))
#             df = df.drop(field.name)
#     return (
#         df.dropDuplicates()
#         .withColumn("file_path", F.col("_metadata.file_path"))
#         .withColumn("etl_timestamp", F.current_timestamp())
#         .withColumn("entity", F.regexp_extract(F.col('file_path'), r"[/]takeout-.*_(.*)[\/]extracted", 1))
#         .withColumn("file_path", F.replace(F.replace(F.col('file_path'), F.lit("%20"), F.lit(" ")), F.lit("dbfs:"), F.lit("")))
#     )
#     try:
#         if spark.catalog.tableExists(table_name):
#             print(f"Starting upsert operation on the table :{table_name}")
#             (
#             DeltaTable.forName(spark, table_name).alias("t")
#                 .merge(
#                 df.alias("s"),
#                 "t.entity = s.entity AND t.Activity_exploded_Id = s.Activity_exploded_Id"
#             ).whenNotMatchedInsertAll()
#             .whenMatchedUpdateAll()
#             .execute()
#             )
#             print(f"Upsert operation was successful on the table :{table_name}")
#         else:
#             print(f"Starting overwrite operation on the table :{table_name}")
#             df.write.mode("overwrite").saveAsTable(table_name)
#             print(f"Overwrite operation was successful on the table :{table_name}")
#     except Exception as e:
#         print(f"Some error occured while ingesting the data for the table {table_name}. Error details: {e}")

In [0]:
# display(
#     daily_activity_metrics_ingestion(bronze_ingestion_config_dict)
#         .withColumn('date_type', F.when(F.col('Date').rlike("^\\d{4}-\\d{2}-\\d{2}$"), F.lit('Date_only')).otherwise(F.lit('other')))
#         .filter(F.col('date_type') == 'other')
#         .groupBy('Date').count()
# )