In [4]:
import findspark
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import *
spark = SparkSession.builder.getOrCreate() 
findspark.init()
sc = pyspark.SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

#### Dataframe creation

In [5]:
# creating employee database with row function, column names
Employee = Row("fisrtname", "lastname", "email", "salary")

In [6]:
Employee

<Row(fisrtname, lastname, email, salary)>

In [7]:
# create some employee data, same format
employee1 = Employee('Basher', 'armbrust', 'bash@edureka.co', 100000)
employee2 = Employee('Daniel', 'meng', 'daniel@stanford.edu', 120000)
employee3 = Employee('Muriel', None, 'muriel@waterloo.edu', 140000)
employee4 = Employee('Rachel', 'wendell', 'rach_3@edureka.co', 160000)
employee5 = Employee('Zach', 'galifianakis', 'zach_g@edureka.co', 160000)

In [8]:
print(employee4)

Row(fisrtname='Rachel', lastname='wendell', email='rach_3@edureka.co', salary=160000)


In [9]:
# department = Row("id", "name")

# department1 = department('123456','HR')
# department2 = department('789012','OPS')
# department3 = department('345678','FN')
# department4 = department('901234','DEV')


department1 = Row(id='123456', name='HR')
department2 = Row(id='789012', name='OPS')
department3 = Row(id='345678', name='FN')
department4 = Row(id='901234', name='DEV')

In [10]:
print(department4)

Row(id='901234', name='DEV')


In [11]:
departmentWithEmployee = Row("department", "Employees")

departmentWithEmployee_1 = departmentWithEmployee(department1, [employee1,employee2])
departmentWithEmployee_2 = departmentWithEmployee(department2, [employee2,employee3,employee4])
departmentWithEmployee_3 = departmentWithEmployee(department3, [employee4,employee1])
departmentWithEmployee_4 = departmentWithEmployee(department4, [employee3])

In [12]:
print(departmentWithEmployee_2)

Row(department=Row(id='789012', name='OPS'), Employees=[Row(fisrtname='Daniel', lastname='meng', email='daniel@stanford.edu', salary=120000), Row(fisrtname='Muriel', lastname=None, email='muriel@waterloo.edu', salary=140000), Row(fisrtname='Rachel', lastname='wendell', email='rach_3@edureka.co', salary=160000)])


In [13]:
# creating dataframe
departmentWithEmployee_df = [departmentWithEmployee_1, departmentWithEmployee_2]
df1 = spark.createDataFrame(departmentWithEmployee_df)

departmentWithEmployee_df = [departmentWithEmployee_2, departmentWithEmployee_4]
df2 = spark.createDataFrame(departmentWithEmployee_df)

In [14]:
df1.show()

+-------------+--------------------+
|   department|           Employees|
+-------------+--------------------+
| [123456, HR]|[[Basher, armbrus...|
|[789012, OPS]|[[Daniel, meng, d...|
+-------------+--------------------+



In [15]:
display(df2)

DataFrame[department: struct<id:string,name:string>, Employees: array<struct<fisrtname:string,lastname:string,email:string,salary:bigint>>]

In [16]:
df2.show()

+-------------+--------------------+
|   department|           Employees|
+-------------+--------------------+
|[789012, OPS]|[[Daniel, meng, d...|
|[901234, DEV]|[[Muriel,, muriel...|
+-------------+--------------------+



In [17]:
df1.collect()

[Row(department=Row(id='123456', name='HR'), Employees=[Row(fisrtname='Basher', lastname='armbrust', email='bash@edureka.co', salary=100000), Row(fisrtname='Daniel', lastname='meng', email='daniel@stanford.edu', salary=120000)]),
 Row(department=Row(id='789012', name='OPS'), Employees=[Row(fisrtname='Daniel', lastname='meng', email='daniel@stanford.edu', salary=120000), Row(fisrtname='Muriel', lastname=None, email='muriel@waterloo.edu', salary=140000), Row(fisrtname='Rachel', lastname='wendell', email='rach_3@edureka.co', salary=160000)])]

In [18]:
# union
unionDf = df1.union(df2)

# display(unionDf)

unionDf.show()

+-------------+--------------------+
|   department|           Employees|
+-------------+--------------------+
| [123456, HR]|[[Basher, armbrus...|
|[789012, OPS]|[[Daniel, meng, d...|
|[789012, OPS]|[[Daniel, meng, d...|
|[901234, DEV]|[[Muriel,, muriel...|
+-------------+--------------------+



In [19]:
intersectionDf = df1.intersect(df2)

# display(intersectionDf)

intersectionDf.show()

+-------------+--------------------+
|   department|           Employees|
+-------------+--------------------+
|[789012, OPS]|[[Daniel, meng, d...|
+-------------+--------------------+



In [20]:
unionDf.select('Employees').show()

+--------------------+
|           Employees|
+--------------------+
|[[Basher, armbrus...|
|[[Daniel, meng, d...|
|[[Daniel, meng, d...|
|[[Muriel,, muriel...|
+--------------------+



In [21]:
# explode - Returns a new row for each element in the given array
explodeDf = unionDf.select(explode('Employees').alias('Emp'))

In [22]:
explodeDf.show()

+--------------------+
|                 Emp|
+--------------------+
|[Basher, armbrust...|
|[Daniel, meng, da...|
|[Daniel, meng, da...|
|[Muriel,, muriel@...|
|[Rachel, wendell,...|
|[Daniel, meng, da...|
|[Muriel,, muriel@...|
|[Rachel, wendell,...|
|[Muriel,, muriel@...|
+--------------------+



In [23]:
# variant of select, accept SQL expressions
flatDf = explodeDf.selectExpr("Emp.fisrtname", "Emp.lastname", "Emp.email","Emp.salary")

In [24]:
flatDf.show()

+---------+--------+-------------------+------+
|fisrtname|lastname|              email|salary|
+---------+--------+-------------------+------+
|   Basher|armbrust|    bash@edureka.co|100000|
|   Daniel|    meng|daniel@stanford.edu|120000|
|   Daniel|    meng|daniel@stanford.edu|120000|
|   Muriel|    null|muriel@waterloo.edu|140000|
|   Rachel| wendell|  rach_3@edureka.co|160000|
|   Daniel|    meng|daniel@stanford.edu|120000|
|   Muriel|    null|muriel@waterloo.edu|140000|
|   Rachel| wendell|  rach_3@edureka.co|160000|
|   Muriel|    null|muriel@waterloo.edu|140000|
+---------+--------+-------------------+------+



#### Reading Data from CSV file and performing actions

In [25]:
# Reading data from csv
house_df = spark.read.csv("house_data.csv", inferSchema= True, header = True)

In [26]:
# print results
house_df.show(3)

+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+------+---------------------+-----------------------------+----------------------------+------------------------------+-------------------------------+-------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+---------------------+
|build_year|        lat|living_area|       long|municipality_name|zipcode|num_rooms|object_type_name| price|water_percentage_1000|travel_time_private_transport|travel_time_public_transport|number_of_buildings_in_hectare|number_of_apartments_in_hectare|number_of_workplaces_in_hectare|number_of_workplaces_sector_1_in_hectare|number_of_workplaces_sector_2_in_hectare|number_of_workplaces_sector_3_in_hectare|population_in_hectare|
+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+------+---------------------+----------

In [27]:
# return list of rows
house_df.take(3)

[Row(build_year=1990, lat=47.00980759, living_area=110, long=8.483789444, municipality_name='Vitznau', zipcode=6354, num_rooms=3, object_type_name='Wohnung', price=815000, water_percentage_1000=0.400565383, travel_time_private_transport=35, travel_time_public_transport=102, number_of_buildings_in_hectare=10, number_of_apartments_in_hectare=26, number_of_workplaces_in_hectare=7, number_of_workplaces_sector_1_in_hectare=0, number_of_workplaces_sector_2_in_hectare=0, number_of_workplaces_sector_3_in_hectare=7, population_in_hectare=34),
 Row(build_year=2017, lat=46.87217712, living_area=120, long=9.88018322, municipality_name='Klosters-Serneus', zipcode=7250, num_rooms=3, object_type_name='Wohnung', price=890000, water_percentage_1000=0.008272204, travel_time_private_transport=85, travel_time_public_transport=112, number_of_buildings_in_hectare=7, number_of_apartments_in_hectare=54, number_of_workplaces_in_hectare=3, number_of_workplaces_sector_1_in_hectare=0, number_of_workplaces_sector_

In [28]:
# to get structure of dataframe
house_df.printSchema() 

root
 |-- build_year: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- living_area: integer (nullable = true)
 |-- long: double (nullable = true)
 |-- municipality_name: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- num_rooms: integer (nullable = true)
 |-- object_type_name: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- water_percentage_1000: double (nullable = true)
 |-- travel_time_private_transport: integer (nullable = true)
 |-- travel_time_public_transport: integer (nullable = true)
 |-- number_of_buildings_in_hectare: integer (nullable = true)
 |-- number_of_apartments_in_hectare: integer (nullable = true)
 |-- number_of_workplaces_in_hectare: integer (nullable = true)
 |-- number_of_workplaces_sector_1_in_hectare: integer (nullable = true)
 |-- number_of_workplaces_sector_2_in_hectare: integer (nullable = true)
 |-- number_of_workplaces_sector_3_in_hectare: integer (nullable = true)
 |-- population_in_hectare: intege

In [29]:
# column names
house_df.columns  

['build_year',
 'lat',
 'living_area',
 'long',
 'municipality_name',
 'zipcode',
 'num_rooms',
 'object_type_name',
 'price',
 'water_percentage_1000',
 'travel_time_private_transport',
 'travel_time_public_transport',
 'number_of_buildings_in_hectare',
 'number_of_apartments_in_hectare',
 'number_of_workplaces_in_hectare',
 'number_of_workplaces_sector_1_in_hectare',
 'number_of_workplaces_sector_2_in_hectare',
 'number_of_workplaces_sector_3_in_hectare',
 'population_in_hectare']

In [30]:
# row count 
house_df.count()

998

In [31]:
# column count
len(house_df.columns)

19

In [32]:
# to get statistical summary of given column
house_df.describe('num_rooms').show()

+-------+------------------+
|summary|         num_rooms|
+-------+------------------+
|  count|               998|
|   mean| 4.079158316633267|
| stddev|0.9073125929965935|
|    min|                 2|
|    max|                 9|
+-------+------------------+



In [33]:
type(house_df)

pyspark.sql.dataframe.DataFrame

In [34]:
# to select columns
house_df.select('lat','long').show(5)

+-----------+-----------+
|        lat|       long|
+-----------+-----------+
|47.00980759|8.483789444|
|46.87217712| 9.88018322|
|46.51917267|6.525881767|
|47.52134323|8.536971092|
|46.51021576|9.852426529|
+-----------+-----------+
only showing top 5 rows



In [35]:
house_df.filter(house_df.price == 999000).show()

+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+------+---------------------+-----------------------------+----------------------------+------------------------------+-------------------------------+-------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+---------------------+
|build_year|        lat|living_area|       long|municipality_name|zipcode|num_rooms|object_type_name| price|water_percentage_1000|travel_time_private_transport|travel_time_public_transport|number_of_buildings_in_hectare|number_of_apartments_in_hectare|number_of_workplaces_in_hectare|number_of_workplaces_sector_1_in_hectare|number_of_workplaces_sector_2_in_hectare|number_of_workplaces_sector_3_in_hectare|population_in_hectare|
+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+------+---------------------+----------

In [36]:
house_df.filter( (col('build_year')==2010) | (col('build_year')==2018)).show(3)

+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+------+---------------------+-----------------------------+----------------------------+------------------------------+-------------------------------+-------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+---------------------+
|build_year|        lat|living_area|       long|municipality_name|zipcode|num_rooms|object_type_name| price|water_percentage_1000|travel_time_private_transport|travel_time_public_transport|number_of_buildings_in_hectare|number_of_apartments_in_hectare|number_of_workplaces_in_hectare|number_of_workplaces_sector_1_in_hectare|number_of_workplaces_sector_2_in_hectare|number_of_workplaces_sector_3_in_hectare|population_in_hectare|
+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+------+---------------------+----------

In [37]:
house_df.where(col('num_rooms').isNull()).show()

+----------+---+-----------+----+-----------------+-------+---------+----------------+-----+---------------------+-----------------------------+----------------------------+------------------------------+-------------------------------+-------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+---------------------+
|build_year|lat|living_area|long|municipality_name|zipcode|num_rooms|object_type_name|price|water_percentage_1000|travel_time_private_transport|travel_time_public_transport|number_of_buildings_in_hectare|number_of_apartments_in_hectare|number_of_workplaces_in_hectare|number_of_workplaces_sector_1_in_hectare|number_of_workplaces_sector_2_in_hectare|number_of_workplaces_sector_3_in_hectare|population_in_hectare|
+----------+---+-----------+----+-----------------+-------+---------+----------------+-----+---------------------+-----------------------------+----------------------------

In [38]:
house_df.select(col('build_year')).where(col('num_rooms')==4).show()

+----------+
|build_year|
+----------+
|      2010|
|      2015|
|      2016|
|      2008|
|      2018|
|      2015|
|      2017|
|      2005|
|      2017|
|      2001|
|      2019|
|      2005|
|      2019|
|      2007|
|      1594|
|      2002|
|      2015|
|      2015|
|      1999|
|      2018|
+----------+
only showing top 20 rows



In [39]:
house_df.select('num_rooms').show(10)

+---------+
|num_rooms|
+---------+
|        3|
|        3|
|        4|
|        3|
|        3|
|        4|
|        4|
|        3|
|        3|
|        4|
+---------+
only showing top 10 rows



In [40]:
# distinct values of a column
house_df.select('num_rooms').distinct().show()

+---------+
|num_rooms|
+---------+
|        6|
|        3|
|        5|
|        9|
|        4|
|        8|
|        7|
|        2|
+---------+



In [41]:
house_df.filter(house_df.num_rooms==7).count()

8

In [42]:
house_df.filter((house_df.num_rooms>=3) & (house_df.num_rooms<=6)).show(5)

+----------+-----------+-----------+-----------+-------------------+-------+---------+----------------+------+---------------------+-----------------------------+----------------------------+------------------------------+-------------------------------+-------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+---------------------+
|build_year|        lat|living_area|       long|  municipality_name|zipcode|num_rooms|object_type_name| price|water_percentage_1000|travel_time_private_transport|travel_time_public_transport|number_of_buildings_in_hectare|number_of_apartments_in_hectare|number_of_workplaces_in_hectare|number_of_workplaces_sector_1_in_hectare|number_of_workplaces_sector_2_in_hectare|number_of_workplaces_sector_3_in_hectare|population_in_hectare|
+----------+-----------+-----------+-----------+-------------------+-------+---------+----------------+------+---------------------+----

In [43]:
# to sort data, by default sorts in ascending order
house_df.orderBy(house_df.num_rooms,ascending = False).show(5)

+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+-------+---------------------+-----------------------------+----------------------------+------------------------------+-------------------------------+-------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+---------------------+
|build_year|        lat|living_area|       long|municipality_name|zipcode|num_rooms|object_type_name|  price|water_percentage_1000|travel_time_private_transport|travel_time_public_transport|number_of_buildings_in_hectare|number_of_apartments_in_hectare|number_of_workplaces_in_hectare|number_of_workplaces_sector_1_in_hectare|number_of_workplaces_sector_2_in_hectare|number_of_workplaces_sector_3_in_hectare|population_in_hectare|
+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+-------+---------------------+-------

In [44]:
house_df.filter(house_df.build_year == 2015).orderBy(house_df.num_rooms,ascending=False).show(7)

+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+-------+---------------------+-----------------------------+----------------------------+------------------------------+-------------------------------+-------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+---------------------+
|build_year|        lat|living_area|       long|municipality_name|zipcode|num_rooms|object_type_name|  price|water_percentage_1000|travel_time_private_transport|travel_time_public_transport|number_of_buildings_in_hectare|number_of_apartments_in_hectare|number_of_workplaces_in_hectare|number_of_workplaces_sector_1_in_hectare|number_of_workplaces_sector_2_in_hectare|number_of_workplaces_sector_3_in_hectare|population_in_hectare|
+----------+-----------+-----------+-----------+-----------------+-------+---------+----------------+-------+---------------------+-------

In [45]:
house_df.orderBy(house_df.num_rooms).show(5)

+----------+-----------+-----------+-----------+--------------------+-------+---------+----------------+------+---------------------+-----------------------------+----------------------------+------------------------------+-------------------------------+-------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+---------------------+
|build_year|        lat|living_area|       long|   municipality_name|zipcode|num_rooms|object_type_name| price|water_percentage_1000|travel_time_private_transport|travel_time_public_transport|number_of_buildings_in_hectare|number_of_apartments_in_hectare|number_of_workplaces_in_hectare|number_of_workplaces_sector_1_in_hectare|number_of_workplaces_sector_2_in_hectare|number_of_workplaces_sector_3_in_hectare|population_in_hectare|
+----------+-----------+-----------+-----------+--------------------+-------+---------+----------------+------+---------------------+-

In [46]:
house_df.filter(house_df.build_year == 2015).count()

41

In [47]:
# to group dataframe based on column
house_df.groupBy(house_df.build_year).count().show()

+----------+-----+
|build_year|count|
+----------+-----+
|      1829|    1|
|      1990|   17|
|      1975|    6|
|      1977|    4|
|      2003|   13|
|      2007|   24|
|      2018|  142|
|      1974|    8|
|      2015|   41|
|      1871|    2|
|      1019|    1|
|      2006|   20|
|      1978|    4|
|      1925|    1|
|      2013|   38|
|      1785|    1|
|      1956|    1|
|      1997|    7|
|      1988|    6|
|      1994|    5|
+----------+-----+
only showing top 20 rows



In [48]:
house_df.groupBy(col('build_year'),col('num_rooms')).count().orderBy(col('build_year'),ascending=False).show(5)

+----------+---------+-----+
|build_year|num_rooms|count|
+----------+---------+-----+
|      2020|        2|    1|
|      2020|        3|    7|
|      2020|        4|    7|
|      2020|        5|    2|
|      2019|        3|   17|
+----------+---------+-----+
only showing top 5 rows



In [49]:
house_df.filter(col('build_year')==2020).agg({'population_in_hectare':'sum'}).show()

+--------------------------+
|sum(population_in_hectare)|
+--------------------------+
|                       393|
+--------------------------+



#### Performing SQL Queries

In [50]:
# passing SQL queries directly to any dataframe
# create table from dataframe
house_df.registerTempTable('house_table') 

In [51]:
# pass sql query
sqlContext.sql('select * from house_table').show(5)

+----------+-----------+-----------+-----------+-------------------+-------+---------+----------------+------+---------------------+-----------------------------+----------------------------+------------------------------+-------------------------------+-------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+---------------------+
|build_year|        lat|living_area|       long|  municipality_name|zipcode|num_rooms|object_type_name| price|water_percentage_1000|travel_time_private_transport|travel_time_public_transport|number_of_buildings_in_hectare|number_of_apartments_in_hectare|number_of_workplaces_in_hectare|number_of_workplaces_sector_1_in_hectare|number_of_workplaces_sector_2_in_hectare|number_of_workplaces_sector_3_in_hectare|population_in_hectare|
+----------+-----------+-----------+-----------+-------------------+-------+---------+----------------+------+---------------------+----

In [52]:
sqlContext.sql('select distinct(num_rooms) from house_table').show()

+---------+
|num_rooms|
+---------+
|        6|
|        3|
|        5|
|        9|
|        4|
|        8|
|        7|
|        2|
+---------+



In [53]:
sqlContext.sql('select max(build_year) from house_table').show()

+---------------+
|max(build_year)|
+---------------+
|           2020|
+---------------+

