Holds all utility functions

In [0]:

%run ../includes/configuration

Out[9]: DataFrame[]

In [0]:
import requests
import json
from delta.tables import DeltaTable
from pyspark.sql import DataFrame, DataFrameWriter
from pyspark.sql.functions import (
    col,
    current_timestamp,
    from_json,
    from_unixtime,
    lit,
    explode
)
from pyspark.sql.session import SparkSession
from pyspark.sql.utils import AnalysisException

In [0]:
def create_dataframe_writer(
    dataframe: DataFrame,
    partition_column: str = None,
    mode: str = "append",
    format: str = "delta"
) -> DataFrameWriter:
  
  """
  Creates a dataframe writer.
  
  Parameters:
    dataframe (DataFrame): spark dataframe.
    partition_column (string): name of partition column
    mode (string): write mode operation
    format (string): write file format
  
  Returns:
    DataFrameWriter: Spark dataframe writer object.  
  """
  
  dataframe_writer = ( 
    dataframe.write.format(format).mode(mode)
  )
  if partition_column is not None:
    return dataframe_writer.partitionBy(partition_column)
  return dataframe_writer

In [0]:
def read_api_raw(spark: SparkSession, url: str) -> DataFrame:
  """
  Read data from given REST API url.
  
  Parameters:
    spark (SparkSession): Running spark session object.
    url (string): REST API endpoint.
  
  Returns:
    DataFrame: Spark dataframe having raw api data.  
  """
  
  api_data = requests.request('GET', url)
  dbutils.fs.put(ingestionPipelinePath+'temp/api_raw_data.json', api_data.text)
  raw_df = spark.read.option('multiline', 'true').json(ingestionPipelinePath+'temp/api_raw_data.json')
  raw_df = raw_df.withColumnRenamed('bikes', 'raw_data')
  
  return raw_df

In [0]:
def add_metadata_raw(raw: DataFrame) -> DataFrame:
  """
  Adds metadata information to RAW data.
  
  Parameters:
    raw (DataFrame): spark dataframe.
  
  Returns:
    DataFrame: Spark dataframe with metadata columns added.  
  """
  return (
    raw.select
      (
        lit("https://bikeindex.org/api/v3/search").alias("datasource"), 
        current_timestamp().alias("ingesttime"), 
        "raw_data",
        current_timestamp().cast("date").alias("p_ingestdate")
      )
    )

In [0]:
def transform_landing(landing: DataFrame) -> DataFrame:
  """
  Apply transformations on landing data.
  
  Parameters:
    landing (DataFrame): spark dataframe.
  
  Returns:
    DataFrame: Spark dataframe.  
  """
  return (
      landing.select(
        explode('raw_data').alias('data')
      )
      .select(col('data.*'))
      .withColumn('p_incident_date', current_timestamp().cast("date"))
    )


In [0]:
def add_to_metastore(table_name: str, path: str) -> None:
  """
  Creates table in a metastore.
  
  Parameters:
    table_name (string): name of the table to be registered in the metastore.
    path (string): path to the underlying data files.
  
  Returns:
    None
  """
  
  spark.sql(
    """
      DROP TABLE IF EXISTS {}
    """.format(table_name)
  )

  spark.sql(
    """
      CREATE TABLE {}
      USING DELTA 
      LOCATION "{}"
    """.format(table_name, path)
  )

In [0]:
def insert_update_processed(stage: DataFrame) -> None:
  """
  Apply INSERT or UPDATE operation based on the state of table. 
  i.e. if it is the very first load or insertion of new record there would be straightforward insert
  otherwsie there will be an update operation performed
  
  Parameters:
    stage (DataFrame): spark stage dataframe.
  
  Returns:
    None
  """
  
  try:
    deltaTable = DeltaTable.forPath(spark, path=processedPath)
    deltaTable.alias('oldData').merge(stage.alias("newData"), "oldData.id = newData.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
  except AnalysisException as ex:
    (
      stage
      .write.format("delta")
      .mode("append")
      .partitionBy("p_incident_date")
      .save(processedPath)
     )
