## Data Lab: Load and Join Employees and Departments data

In [1]:
import logging
from pyspark.sql import SparkSession

#### Create Spark Session. Remember this is our entry point for Dataframes or Datasets, not RDDs! 

In [2]:

def rdd_to_dataframe(data, schema):
    """
    Example: This fn creates a Spark RDD, loads it into a Spark DataFrame, and returns the DataFrame
    """

    # Create a SparkSession
    spark = SparkSession.builder.appName("RDDToDataFrame").getOrCreate()

    try:
        # Create an RDD from the input data, using Spark Context not Session!
        rdd = spark.sparkContext.parallelize(data)

        # Convert RDD to DataFrame
        df = spark.createDataFrame(rdd, schema)

        # Return the DataFrame, without stopping the SparkSession
        return df

    except Exception as e:
        # Log error and Stop the SparkSession
        logging.error('Error while transforming RDD to DF: {}'.format(e))
        spark.stop()


----
Create some random data

In [3]:
# Data sample
dept_data = [(1,"Big Data"), (2, "Finance"), (3,"Marketing")]
dept_schema = ["department_id", "department_name"]

In [4]:
# Data sample
emp_data = [(1,"Carlos", 17), (1,"Bob", 30), (2,"Jasmin", 26)]
emp_schema = ["department_id","employee_name", "age"]

---

### Let's now use the Spark RDD as a Spark Dataframe

In [5]:
# Call function, to transform RDD into DF
df_emp = rdd_to_dataframe(emp_data, emp_schema)
df_dept = rdd_to_dataframe(dept_data, dept_schema)

24/04/25 10:13:32 WARN Utils: Your hostname, ajaisw26-C02DQ79YMD6M resolves to a loopback address: 127.0.0.1; using 192.168.1.25 instead (on interface en0)
24/04/25 10:13:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/25 10:13:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [9]:
# Show schema
df_dept.show()

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            1|       Big Data|
|            2|        Finance|
|            3|      Marketing|
+-------------+---------------+



In [7]:
df_emp.printSchema()

root
 |-- department_id: long (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- age: long (nullable = true)



### Use Spark SQL, to join 2 datasets

In [10]:
# Do we have a session running?
spark = SparkSession.builder.appName("RDDToDataFrame").getOrCreate()

In [11]:
# Register as view
df_emp.createOrReplaceTempView('employees')
df_dept.createOrReplaceTempView('departments')

In [12]:
# Query sample, using Spark SQL
spark.sql('''
            select emp.*, dept.*
            from employees as emp
                inner join departments as dept on (emp.department_id = dept.department_id)
            where age >= 18
            ''').show()

+-------------+-------------+---+-------------+---------------+
|department_id|employee_name|age|department_id|department_name|
+-------------+-------------+---+-------------+---------------+
|            1|          Bob| 30|            1|       Big Data|
|            2|       Jasmin| 26|            2|        Finance|
+-------------+-------------+---+-------------+---------------+



In [13]:
# Let's now save the JOINED RESULTSET into a new Temporary View -- NO WHERE CLAUSE
spark.sql('''
        select emp.employee_name, emp.age, emp.department_id, dept.department_name
        from employees as emp
            inner join departments as dept on (emp.department_id = dept.department_id)
             where age >= 18
        ''').createOrReplaceTempView('dept_employees')

In [14]:
# Let's now save the JOINED RESULTSET into a new Temporary View -- NO WHERE CLAUSE
spark.sql('''
        select * from dept_employees where department_id is not null
        ''').show()

+-------------+---+-------------+---------------+
|employee_name|age|department_id|department_name|
+-------------+---+-------------+---------------+
|          Bob| 30|            1|       Big Data|
|       Jasmin| 26|            2|        Finance|
+-------------+---+-------------+---------------+



## 💾 Let's save this output for our Business Data Consumers

In [22]:
# Define output location
output_location = 'output/dept_employees/'

# Let's now save the JOINED RESULTSET to local storage. This could be Amazon S3 or other.
spark.sql('''
        select * from dept_employees where department_id is not null
        ''').write.mode('overwrite').csv(output_location)