In [0]:
from pyspark.sql import SQLContext
from pyspark.sql.types import Row,StructField, StructType

In [0]:
row_list=[Row(name="John", age=19),Row(name="Smith", age=23), Row(name="Sarah", age=18)] #row type being used to set list elements as rows
rdd=sc.parallelize(row_list)
rdd.collect()

Out[5]: [Row(name='John', age=19),
 Row(name='Smith', age=23),
 Row(name='Sarah', age=18)]

In [0]:
#Creating a DF of the RDD
df=spark.createDataFrame(rdd)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-----+---+
| name|age|
+-----+---+
| John| 19|
|Smith| 23|
|Sarah| 18|
+-----+---+



In [0]:
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd2 = spark.sparkContext.parallelize(data)  
df2 = spark.createDataFrame(rdd2)
df2.printSchema()
df2.show()

DF_With_Col_Headers = rdd2.toDF(["Language","User Count"]) #Creating DF with Col Headers
DF_With_Col_Headers.show()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+------+------+
|    _1|    _2|
+------+------+
|  Java| 20000|
|Python|100000|
| Scala|  3000|
+------+------+

+--------+----------+
|Language|User Count|
+--------+----------+
|    Java|     20000|
|  Python|    100000|
|   Scala|      3000|
+--------+----------+



In [0]:
from pyspark.sql.types import LongType, StringType, IntegerType
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
#Define Schema with two fields - person_name and person_age
schema = StructType([StructField("person_name", StringType(), False),
                     StructField("person_age", IntegerType(), False)])
# Create a DataFrame by applying the schema to the RDD and print the schema
another_df = sqlContext.createDataFrame(another_rdd, schema)
another_df.printSchema()
another_df.show()

root
 |-- person_name: string (nullable = false)
 |-- person_age: integer (nullable = false)

+-----------+----------+
|person_name|person_age|
+-----------+----------+
|       John|        19|
|      Smith|        23|
|      Sarah|        18|
+-----------+----------+



In [0]:
path ="/FileStore/tables/people-1.json" #File uplaoded to DataBricks
people_df=spark.read.json(path)
people_df.printSchema()
people_df.show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [0]:
#selecting data
df3=people_df.select("name").where(people_df['name']=="Michael")
df3.show()

+-------+
|   name|
+-------+
|Michael|
+-------+



In [0]:
# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)
employee5 = Employee('michael', 'jackson', 'no-reply@neverla.nd', 80000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee5, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

In [0]:
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)

display(df1)

department,employees
"List(123456, Computer Science)","List(List(michael, armbrust, no-reply@berkeley.edu, 100000), List(xiangrui, meng, no-reply@stanford.edu, 120000))"
"List(789012, Mechanical Engineering)","List(List(matei, null, no-reply@waterloo.edu, 140000), List(null, wendell, no-reply@berkeley.edu, 160000))"


In [0]:
df1.show()

+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|{123456, Computer...|[{michael, armbru...|
|{789012, Mechanic...|[{matei, null, no...|
+--------------------+--------------------+



In [0]:
departmentsWithEmployeesSeq2 = [departmentWithEmployees3,departmentWithEmployees4]
df2 = spark.createDataFrame(departmentsWithEmployeesSeq2)
display(df2)
df2.show()

department,employees
"List(345678, Theater and Drama)","List(List(michael, jackson, no-reply@neverla.nd, 80000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(901234, Indoor Recreation)","List(List(xiangrui, meng, no-reply@stanford.edu, 120000), List(matei, null, no-reply@waterloo.edu, 140000))"


+--------------------+--------------------+
|          department|           employees|
+--------------------+--------------------+
|{345678, Theater ...|[{michael, jackso...|
|{901234, Indoor R...|[{xiangrui, meng,...|
+--------------------+--------------------+



In [0]:
unionDF = df1.union(df2)
display(unionDF)

department,employees
"List(123456, Computer Science)","List(List(michael, armbrust, no-reply@berkeley.edu, 100000), List(xiangrui, meng, no-reply@stanford.edu, 120000))"
"List(789012, Mechanical Engineering)","List(List(matei, null, no-reply@waterloo.edu, 140000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(345678, Theater and Drama)","List(List(michael, jackson, no-reply@neverla.nd, 80000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(901234, Indoor Recreation)","List(List(xiangrui, meng, no-reply@stanford.edu, 120000), List(matei, null, no-reply@waterloo.edu, 140000))"


department,employees
"List(123456, Computer Science)","List(List(michael, armbrust, no-reply@berkeley.edu, 100000), List(xiangrui, meng, no-reply@stanford.edu, 120000))"
"List(789012, Mechanical Engineering)","List(List(matei, null, no-reply@waterloo.edu, 140000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(345678, Theater and Drama)","List(List(michael, jackson, no-reply@neverla.nd, 80000), List(null, wendell, no-reply@berkeley.edu, 160000))"
"List(901234, Indoor Recreation)","List(List(xiangrui, meng, no-reply@stanford.edu, 120000), List(matei, null, no-reply@waterloo.edu, 140000))"


In [0]:
from pyspark.sql.functions import explode

explodeDF = unionDF.select(explode("employees.firstName"))
explodeDF.show(3,truncate= False)
unexplodeDF = unionDF.select("employees.firstName")
unexplodeDF.show(truncate= False)
unionDF.printSchema()
explodeDF.printSchema()
unexplodeDF.printSchema()

+--------+
|col     |
+--------+
|michael |
|xiangrui|
|matei   |
+--------+
only showing top 3 rows

+-------------------+
|firstName          |
+-------------------+
|[michael, xiangrui]|
|[matei, null]      |
|[michael, null]    |
|[xiangrui, matei]  |
+-------------------+

root
 |-- department: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- employees: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- lastName: string (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- salary: long (nullable = true)

root
 |-- col: string (nullable = true)

root
 |-- firstName: array (nullable = true)
 |    |-- element: string (containsNull = true)

