<a href="https://colab.research.google.com/github/adrian-ja-projects/train-prediction-project/blob/fea_data_analisys/pl_transf_staging_to_raw.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Pipeline to write into RAw file ready for db use
1. The pipeline first flatten the json file
2. Loop through the files in the staging
3. Create the parquet table or append the data into the table. 
4. It provides feedback in case the json file is empty

## Installing dependencies for the pipeline to work

In [None]:
from pyspark.sql.functions import col, explode_outer
from pyspark.sql.types import *
from copy import deepcopy
from collections import Counter
from pyspark.sql import DataFrame as SparkDataFrame

In [None]:
%run /content/train-prediction-project/AutoFlatten_py.ipynb

##  Create spark app session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("pl_transf_staging_to_raw")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
def auto_flatting_json(file_path: str)-> SparkDataFrame:
  """
  method to flatten the json files programmatically
  """

  json_df = spark.read.format("json").options(multiline="True").load(file_path)
  json_schema = json_df.schema

  af = AutoFlatten(json_schema)
  af.compute()

  df1 = json_df
  
  ### Source of the code: https://towardsdatascience.com/flattening-json-records-using-pyspark-b83137669def
  visited = set([f'.{column}' for column in df1.columns])
  duplicate_target_counter = Counter(af.all_fields.values())
  cols_to_select = df1.columns
  for rest_col in af.rest:
      if rest_col not in visited:
          cols_to_select += [rest_col[1:]] if (duplicate_target_counter[af.all_fields[rest_col]]==1 and af.all_fields[rest_col] not in df1.columns) else [col(rest_col[1:]).alias(f"{rest_col[1:].replace('.', '>')}")]
          visited.add(rest_col)

  df1 = df1.select(cols_to_select)

  
  if af.order:
      for key in af.order:
          column = key.split('.')[-1]
          if af.bottom_to_top[key]:
              #########
              #values for the column in bottom_to_top dict exists if it is an array type
              #########
              df1 = df1.select('*', explode_outer(col(column)).alias(f"{column}_exploded")).drop(column)
              data_type = df1.select(f"{column}_exploded").schema.fields[0].dataType
              if not (isinstance(data_type, StructType) or isinstance(data_type, ArrayType)):
                  df1 = df1.withColumnRenamed(f"{column}_exploded", column if duplicate_target_counter[af.all_fields[key]]<=1 else key[1:].replace('.', '>'))
                  visited.add(key)
              else:
                  #grabbing all paths to columns after explode
                  cols_in_array_col = set(map(lambda x: f'{key}.{x}', df1.select(f'{column}_exploded.*').columns))
                  #retrieving unvisited columns
                  cols_to_select_set = cols_in_array_col.difference(visited)
                  all_cols_to_select_set = set(af.bottom_to_top[key])
                  #check done for duplicate column name & path
                  cols_to_select_list = list(map(lambda x: f"{column}_exploded{'.'.join(x.split(key)[1:])}" if (duplicate_target_counter[af.all_fields[x]]<=1 and x.split('.')[-1] not in df1.columns) else col(f"{column}_exploded{'.'.join(x.split(key)[1:])}").alias(f"{x[1:].replace('.', '>')}"), list(all_cols_to_select_set)))
                  #updating visited set
                  visited.update(cols_to_select_set)
                  rem = list(map(lambda x: f"{column}_exploded{'.'.join(x.split(key)[1:])}", list(cols_to_select_set.difference(all_cols_to_select_set))))
                  df1 = df1.select(df1.columns + cols_to_select_list + rem).drop(f"{column}_exploded")        
          else:
              #########
              #values for the column in bottom_to_top dict do not exist if it is a struct type / array type containing a string type
              #########
              #grabbing all paths to columns after opening
              cols_in_array_col = set(map(lambda x: f'{key}.{x}', df1.selectExpr(f'{column}.*').columns))
              #retrieving unvisited columns
              cols_to_select_set = cols_in_array_col.difference(visited)
              #check done for duplicate column name & path
              cols_to_select_list = list(map(lambda x: f"{column}.{x.split('.')[-1]}" if (duplicate_target_counter[x.split('.')[-1]]<=1 and x.split('.')[-1] not in df1.columns) else col(f"{column}.{x.split('.')[-1]}").alias(f"{x[1:].replace('.', '>')}"), list(cols_to_select_set)))
              #updating visited set
              visited.update(cols_to_select_set)
              df1 = df1.select(df1.columns + cols_to_select_list).drop(f"{column}")

  
  return df1.select([field[1:].replace('.', '>') if duplicate_target_counter[af.all_fields[field]]>1 else af.all_fields[field] for field in af.all_fields])

def _check_object_exists(file_path: str)-> bool:
  """
  method to check if object exists
  """
  return exists(file_path)

def write_data_into_table(table_name: str, parquet_table_location: str, inputDF: SparkDataFrame, write_mode: str, origin_file_path: str)-> None:
  """
  check if object exist if it doesn't create a parquet table
  """
  if not  _check_object_exists:
    inputDF.repartition(1).write.mode(write_mode).save(f"/content/raw/digitraffic/27_schedule")
    print("Table created")
  else:
    if write_mode == "overwrite":
      inputDF.repartition(1).write.mode(write_mode).save(f"/content/raw/digitraffic/27_schedule")
      #print(f"INFO:Table {table_name} created or overwritten {origin_file_path}")
    elif write_mode == "append": 
      inputDF.repartition(1).write.mode(write_mode).save(f"/content/raw/digitraffic/27_schedule")
      #print(f"INFO: Table {table_name} appended new data {origin_file_path}")

In [None]:
#Loop to get all json files in staging
table_name = "27_schedule"
parquet_table_location = "/content/raw/digitraffic/27_schedule"
debug= False

print("INFO: Starting transformation of json files...")
for folderYearMonth in os.listdir('/content/staging/digitraffic/stg_27_schedule'):
  for folderYearMonthDay in os.listdir(f'/content/staging/digitraffic/stg_27_schedule/{folderYearMonth}/'):
    for fileName in os.listdir(f'/content/staging/digitraffic/stg_27_schedule/{folderYearMonth}/{folderYearMonthDay}'):
      file_path = f'/content/staging/digitraffic/stg_27_schedule/{folderYearMonth}/{folderYearMonthDay}/{fileName}'
      
      #Auto flatten json file for db
      df_flat_json = auto_flatting_json(file_path)

      #create and write table into parquet location
      if df_flat_json.count() > 0:
        ## only write when data frame is not empty
        write_data_into_table(table_name, parquet_table_location, df_flat_json, "append", file_path)
      else:
        if debug:
          print(f"WARNING: {file_path} file doesn't contain data")

In [None]:
spark.stop()

In [None]:
print("Data in raw ready to upload to a db")