In [121]:
import logging
from pyspark.sql import SparkSession

Here are some considerations for when to create an RDD first and when to create a DataFrame directly:

**Creating an RDD first (as in the provided code):**
1. **Custom Transformation:** If you have custom transformation logic that you want to apply to the data in the RDD before creating a DataFrame, you might choose to work with RDDs first. This allows you to use RDD transformations like `map`, `filter`, and `reduce` to process the data.

2. **Non-Structured Data:** If your data is unstructured or doesn't have a well-defined schema, working with RDDs can be more flexible because RDDs are schema-less compared to DataFrames.

3. **Fine-Grained Control:** RDDs provide fine-grained control over data partitioning and processing, which can be beneficial in specific use cases where you need to optimize performance at a low level.

**Creating a DataFrame directly:**
1. **Structured Data:** If your data is structured, such as CSV, JSON, or Parquet files, you can create a DataFrame directly from those sources using built-in Spark methods. For example, you can use `spark.read.csv("file.csv")` to create a DataFrame from a CSV file.

2. **Schema Inference:** DataFrames can automatically infer the schema from the data source, which simplifies the process, especially when dealing with large and complex datasets.

3. **Optimization:** DataFrames benefit from Spark's built-in optimizations, which can result in better performance for common operations like filtering, grouping, and aggregating.

##### Create Spark Session. This is our entry point for Dataframes or Datasets, not RDDs.

In [122]:
def rdd_to_dataframe(data, schema):
  """ 
  Example: This fn creates a Spark RDD, loads it into a Spark DataFrame, and returns the DataFrame.
  """
  # create a SparkSession
  spark = SparkSession.builder.appName('RDDToDataFrame').getOrCreate()

  try:
    # creat an RDD from the input data, using Spark Context not Session!
    # rdd = spark.sparkContext.parallelize(data)

    # convert RDD to DataFrame
    df = spark.createDataFrame(data, schema)

    # return the DataFrame, without stopping the SparkSession
    return df
  
  except Exception as e:
    # Log error and Stop the SparkSession
    logging.error('Error while transforming RDD to DF: {}'.format(e))
    spark.stop()

##### Create some random data

In [123]:
dept_data = [(1, 'Big Data'), (2, 'Finance'), (3, 'Marketing')]
dept_schema = ['department_id', 'department_name']

In [124]:
emp_data = [(1, 'Carlos', 17), (1, 'Bob', 30), (2 ,'Jasmin', 26)]
emp_schema = ['department_id', 'employee_name', 'age']

#### Let's now use the Spark RDD as SparkSchema

In [125]:
df_emp = rdd_to_dataframe(emp_data, emp_schema)
df_dept = rdd_to_dataframe(dept_data, dept_schema)

In [126]:
# Show Schema
df_dept.show()

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            1|       Big Data|
|            2|        Finance|
|            3|      Marketing|
+-------------+---------------+



In [127]:
print(df_dept)

DataFrame[department_id: bigint, department_name: string]


### Use Spark SQL, to join 2 datasets

In [128]:
# Do we have a session running
spark = SparkSession.builder.appName('RDDToDataFrame').getOrCreate()

In [129]:
# Register as view
df_emp.createOrReplaceTempView('employees')
df_dept.createOrReplaceTempView('departments')

In [130]:
spark.sql('''select * from employees where age >= 18 ''').show()

+-------------+-------------+---+
|department_id|employee_name|age|
+-------------+-------------+---+
|            1|          Bob| 30|
|            2|       Jasmin| 26|
+-------------+-------------+---+



In [131]:
# Query sample, using Spark SQL
spark.sql('''select emp.*, dept.*
          from employees as emp
          inner join departments as dept on (emp.department_id = dept.department_id)
          where age >= 18
          ''').show()

+-------------+-------------+---+-------------+---------------+
|department_id|employee_name|age|department_id|department_name|
+-------------+-------------+---+-------------+---------------+
|            1|          Bob| 30|            1|       Big Data|
|            2|       Jasmin| 26|            2|        Finance|
+-------------+-------------+---+-------------+---------------+



In [132]:
# Let's now save the JOINED Result into a new Temporart View -- NO WHERE CLAUSE
spark.sql('''
          select emp.employee_name, emp.age, emp.department_id, dept.department_name
          from employees as emp
            inner join departments as dept on (emp.department_id = dept.department_id)
             where age >= 18
          ''').createOrReplaceTempView('dept_employees')

In [133]:
# Show the JOINED RESULTSET into a new Temporary View -- NO WHER CLAUSE
spark.sql('''
        select * from dept_employees where department_id is not null
          ''').show()

+-------------+---+-------------+---------------+
|employee_name|age|department_id|department_name|
+-------------+---+-------------+---------------+
|          Bob| 30|            1|       Big Data|
|       Jasmin| 26|            2|        Finance|
+-------------+---+-------------+---------------+



#### Save the output for our Business Data Consumers

In [134]:
# Define output location
output_location = 'output/dept_employees/'

# let's now save the JOINED RESULTSET to local storage. This could be Amazon S3 or other.
spark.sql('''
        select * from dept_employees where department_id is not null
          ''').write.mode('append').parquet(output_location)
          

#### Join a 3rd dataset, but with different format

In [135]:
df_budgets = spark.read.option('multiline', 'true').json('./data/json/department_budgets.json')

In [136]:
df_budgets.printSchema()

root
 |-- budget: long (nullable = true)
 |-- budget_authorizer: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cto: struct (nullable = true)
 |    |    |    |-- last_name: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- office: string (nullable = true)
 |-- budget_period: string (nullable = true)
 |-- department_id: long (nullable = true)
 |-- offices: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cost_center: struct (nullable = true)
 |    |    |    |-- budget_status: string (nullable = true)
 |    |    |    |-- office: string (nullable = true)



In [137]:
df_budgets.select('offices').where('department_id == 1').show(truncate=False)

+-------------------------------------------------------------------------+
|offices                                                                  |
+-------------------------------------------------------------------------+
|[{{denied, new york}}, {{approved, mumbai}}, {{approved, san francisco}}]|
+-------------------------------------------------------------------------+



#### Quering without flattening JSON

In [138]:
df_budgets.createOrReplaceTempView('budgets_json')

In [139]:
# let's join the third dataset
spark.sql('''
      select emp.employee_name,
          emp.department_id,
          bud.budget,
          bud.budget_period,
          bud.offices[0].cost_center.office as office_1,
          bud.offices[0].cost_center.budget_status as budget_status_1,
          bud.offices[1].cost_center.office as office_2,
          bud.offices[1].cost_center.budget_status as budget_status_2,
          nvl(bud.budget_authorizer[0].cto.name, 'no CTO registered') as cto_name,
          nvl(bud.budget_authorizer[0].cto.last_name, 'no CTO registered') as cto_last_name
      from dept_employees as emp
        inner join budgets_json as bud on (emp.department_id = bud.department_id)
          ''').show()

+-------------+-------------+------+-------------+--------+---------------+--------+---------------+-----------------+-----------------+
|employee_name|department_id|budget|budget_period|office_1|budget_status_1|office_2|budget_status_2|         cto_name|    cto_last_name|
+-------------+-------------+------+-------------+--------+---------------+--------+---------------+-----------------+-----------------+
|          Bob|            1| 16000|         year|new york|         denied|  mumbai|       approved|no CTO registered|no CTO registered|
|       Jasmin|            2| 23000|         year|    null|           null|    null|           null|              joe|              doe|
+-------------+-------------+------+-------------+--------+---------------+--------+---------------+-----------------+-----------------+



#### Flattening JSON into Columnar format is normally easier, clearer and more scalable

- Suggestion: always test and benchmaark perfomance, to compare Json Paths access vs flattening

####

In [140]:
import logging
from pyspark.sql.types import ArrayType, StructType
from pyspark.sql.functions import explode_outer, col

In [144]:
def flatten_dataframe(df):
  """
  Spark function to flatten nested structs.
  :return: Spark dataframe
  """
  try:
    # compute Complex fields (Lists and Structs) in Schema
    complex_fields = dict([(field.name, field.dataType)
                            for field in df.schema.fields
                            if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
    while len(complex_fields) != 0:
      col_name = list(complex_fields.keys())[0]

      # if StructType then convert all sub element to columns
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
        expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [n.name for n in complex_fields[col_name]]]
        df = df.select('*', *expanded).drop(col_name)

      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):
        df = df.withColumn(col_name, explode_outer(col_name))

      # recompute remaining Complex Fields in Schema
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
      
    return df
    
  except Exception as e:
    logging.error('Error while flattening JSON data: {}'.format(e))

In [145]:
df_budgets_flat = flatten_dataframe(df_budgets)

In [146]:
df_budgets_flat.show()

+------+-------------+-------------+-------------------------------+--------------------------+----------------------------+---------------------------------+--------------------------+
|budget|budget_period|department_id|budget_authorizer_cto_last_name|budget_authorizer_cto_name|budget_authorizer_cto_office|offices_cost_center_budget_status|offices_cost_center_office|
+------+-------------+-------------+-------------------------------+--------------------------+----------------------------+---------------------------------+--------------------------+
| 16000|         year|            1|                           null|                      null|                        null|                           denied|                  new york|
| 16000|         year|            1|                           null|                      null|                        null|                         approved|                    mumbai|
| 16000|         year|            1|                           null|  

In [153]:
# New flatten schema
df_budgets_flat.printSchema()

root
 |-- budget: long (nullable = true)
 |-- budget_period: string (nullable = true)
 |-- department_id: long (nullable = true)
 |-- budget_authorizer_cto_last_name: string (nullable = true)
 |-- budget_authorizer_cto_name: string (nullable = true)
 |-- budget_authorizer_cto_office: string (nullable = true)
 |-- offices_cost_center_budget_status: string (nullable = true)
 |-- offices_cost_center_office: string (nullable = true)



In [154]:
df_budgets_flat.createOrReplaceTempView('budgets_flat')

In [155]:
spark.sql('''
          select *
          from dept_employees''').show()

+-------------+---+-------------+---------------+
|employee_name|age|department_id|department_name|
+-------------+---+-------------+---------------+
|          Bob| 30|            1|       Big Data|
|       Jasmin| 26|            2|        Finance|
+-------------+---+-------------+---------------+



In [158]:
# Let's join the third dataset
spark.sql('''
          select emp.employee_name, emp.department_name, bud.budget, bud.budget_period, bud.offices_cost_center_office
          from dept_employees as emp
            inner join budgets_flat as bud on (emp.department_id = bud.department_id)
          ''').show(n=50)

+-------------+---------------+------+-------------+--------------------------+
|employee_name|department_name|budget|budget_period|offices_cost_center_office|
+-------------+---------------+------+-------------+--------------------------+
|          Bob|       Big Data| 16000|         year|             san francisco|
|          Bob|       Big Data| 16000|         year|                    mumbai|
|          Bob|       Big Data| 16000|         year|                  new york|
|       Jasmin|        Finance| 23000|         year|                      null|
+-------------+---------------+------+-------------+--------------------------+

