# 1. Create Column Class Object


In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("SP").getOrCreate()

In [5]:
from pyspark.sql.functions import lit
colObj=lit("SP.com")
colObj

Column<'SP.com'>

In [6]:
data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name","gender")
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: long (nullable = true)



In [9]:
#df.select(df["gender"]).show()

#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()

In [11]:
# using SQL col() function
from pyspark.sql.functions import col 
df.select(col("gender")).show()

#Accessing column name with dot (with backticks)
df.select(col("`name.fname`")).show()

In [12]:
# create DataFrame with struct using Row Class

from pyspark.sql import Row
data=[Row(name="James",prop=Row(hair="black",eye="blue")),
     Row(name="Ann",prop=Row(hair="gray",eye="black"))]

df=spark.createDataFrame(data)
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- prop: struct (nullable = true)
 |    |-- hair: string (nullable = true)
 |    |-- eye: string (nullable = true)



In [14]:
# Access struct column
df.select(df.prop.hair).show()
df.select(df["prop.hair"]).show()
df.select(col("prop.hair")).show()

# Access all columns from struct
df.select(col("prop.*")).show()

# 2. Pyspark column Operators

In [2]:
data=[(100,2,1),(200,3,4),(300,4,4)]
df=spark.createDataFrame(data).toDF("col1","col2","col3")
df.printSchema()

root
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: long (nullable = true)



In [5]:
# read csv file
file_path = "C:\\Users\\pcc\\Desktop\\daily-website-visitors.csv"
df=spark.read.csv(file_path,header=True,inferSchema=True)
df

DataFrame[Row: int, Day: string, Day.Of.Week: int, Date: string, Page.Loads: int, Unique.Visits: int, First.Time.Visits: int, Returning.Visits: int]

In [7]:
df=df.withColumnsRenamed({"Day.Of.Week":"Day_Of_Week","Page.Loads":"Page_Loads",
                          "Unique.Visits":"Unique_Visits","First.Time.Visits":"First_Time_Visits",
                          "Returning.Visits":"Returning_Visits"})

In [8]:
df.show()

+---+---------+-----------+----------+----------+-------------+-----------------+----------------+
|Row|      Day|Day_Of_Week|      Date|Page_Loads|Unique_Visits|First_Time_Visits|Returning_Visits|
+---+---------+-----------+----------+----------+-------------+-----------------+----------------+
|  1|   Sunday|          1| 9/14/2014|      2146|         1582|             1430|             152|
|  2|   Monday|          2| 9/15/2014|      3621|         2528|             2297|             231|
|  3|  Tuesday|          3| 9/16/2014|      3698|         2630|             2352|             278|
|  4|Wednesday|          4| 9/17/2014|      3667|         2614|             2327|             287|
|  5| Thursday|          5| 9/18/2014|      3316|         2366|             2130|             236|
|  6|   Friday|          6| 9/19/2014|      2815|         1863|             1622|             241|
|  7| Saturday|          7| 9/20/2014|      1658|         1118|              985|             133|
|  8|   Su

# 3. PySpark column functions

In [9]:
df.select(df.Page_Loads+df.Returning_Visits).show()

+-------------------------------+
|(Page_Loads + Returning_Visits)|
+-------------------------------+
|                           2298|
|                           3852|
|                           3976|
|                           3954|
|                           3552|
|                           3056|
|                           1791|
|                           2463|
|                           3912|
|                           4730|
|                           4698|
|                           4601|
|                           3539|
|                           1796|
|                           2658|
|                           4392|
|                           4786|
|                           4432|
|                           3764|
|                           3246|
+-------------------------------+
only showing top 20 rows



In [10]:
df.select(df.Page_Loads-df.Returning_Visits).show()

+-------------------------------+
|(Page_Loads - Returning_Visits)|
+-------------------------------+
|                           1994|
|                           3390|
|                           3420|
|                           3380|
|                           3080|
|                           2574|
|                           1525|
|                           2113|
|                           3364|
|                           4194|
|                           4130|
|                           4029|
|                           3107|
|                           1516|
|                           2272|
|                           3800|
|                           4162|
|                           3816|
|                           3264|
|                           2764|
+-------------------------------+
only showing top 20 rows



In [11]:
df.select(df.Page_Loads*df.Returning_Visits).show()

+-------------------------------+
|(Page_Loads * Returning_Visits)|
+-------------------------------+
|                         326192|
|                         836451|
|                        1028044|
|                        1052429|
|                         782576|
|                         678415|
|                         220514|
|                         400400|
|                         996812|
|                        1195816|
|                        1253576|
|                        1234090|
|                         717768|
|                         231840|
|                         475745|
|                        1212416|
|                        1395888|
|                        1270192|
|                         878500|
|                         724205|
+-------------------------------+
only showing top 20 rows



In [12]:
df.select(df.Page_Loads/df.Returning_Visits).show()

+-------------------------------+
|(Page_Loads / Returning_Visits)|
+-------------------------------+
|             14.118421052631579|
|             15.675324675324676|
|             13.302158273381295|
|             12.777003484320558|
|              14.05084745762712|
|             11.680497925311203|
|             12.466165413533835|
|             13.074285714285715|
|             13.277372262773723|
|             16.649253731343283|
|              15.54225352112676|
|             15.087412587412587|
|              15.38425925925926|
|              11.82857142857143|
|               12.7720207253886|
|             13.837837837837839|
|              14.33974358974359|
|              13.38961038961039|
|                         14.056|
|             12.468879668049793|
+-------------------------------+
only showing top 20 rows



In [13]:
df.select(df.Page_Loads % df.Returning_Visits).show()

+-------------------------------+
|(Page_Loads % Returning_Visits)|
+-------------------------------+
|                             18|
|                            156|
|                             84|
|                            223|
|                             12|
|                            164|
|                             62|
|                             13|
|                             76|
|                            174|
|                            154|
|                             25|
|                             83|
|                            116|
|                            149|
|                            248|
|                            106|
|                            120|
|                             14|
|                            113|
+-------------------------------+
only showing top 20 rows



In [14]:
df.select(df.Page_Loads>df.Unique_Visits).show()

+----------------------------+
|(Page_Loads > Unique_Visits)|
+----------------------------+
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
|                        true|
+----------------------------+
only showing top 20 rows



In [15]:
df.select(df.Page_Loads<df.Unique_Visits).show()

+----------------------------+
|(Page_Loads < Unique_Visits)|
+----------------------------+
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
+----------------------------+
only showing top 20 rows



In [16]:
df.select(df.Page_Loads==df.Unique_Visits).show()

+----------------------------+
|(Page_Loads = Unique_Visits)|
+----------------------------+
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
+----------------------------+
only showing top 20 rows



# 4. PySpark column functions

In [17]:
# 4.1 alias()- set's name to column
from pyspark.sql.functions import expr
df.select(df.Day_Of_Week.alias("day_of_week"),
         df.Date.alias("date")).show()

+-----------+----------+
|day_of_week|      date|
+-----------+----------+
|          1| 9/14/2014|
|          2| 9/15/2014|
|          3| 9/16/2014|
|          4| 9/17/2014|
|          5| 9/18/2014|
|          6| 9/19/2014|
|          7| 9/20/2014|
|          1| 9/21/2014|
|          2| 9/22/2014|
|          3| 9/23/2014|
|          4| 9/24/2014|
|          5| 9/25/2014|
|          6| 9/26/2014|
|          7| 9/27/2014|
|          1| 9/28/2014|
|          2| 9/29/2014|
|          3| 9/30/2014|
|          4|10-01-2014|
|          5|10-02-2014|
|          6|10-03-2014|
+-----------+----------+
only showing top 20 rows



In [18]:
df.select(expr("Day || '-' || Day_Of_Week").alias("Properday")).show()

+-----------+
|  Properday|
+-----------+
|   Sunday-1|
|   Monday-2|
|  Tuesday-3|
|Wednesday-4|
| Thursday-5|
|   Friday-6|
| Saturday-7|
|   Sunday-1|
|   Monday-2|
|  Tuesday-3|
|Wednesday-4|
| Thursday-5|
|   Friday-6|
| Saturday-7|
|   Sunday-1|
|   Monday-2|
|  Tuesday-3|
|Wednesday-4|
| Thursday-5|
|   Friday-6|
+-----------+
only showing top 20 rows



In [19]:
# 4.2 asc() and desc() - sort the df columns by ascending or descendig order
df.sort(df.Day.asc()).show()
df.sort(df.Day.desc()).show()

+---+------+-----------+----------+----------+-------------+-----------------+----------------+
|Row|   Day|Day_Of_Week|      Date|Page_Loads|Unique_Visits|First_Time_Visits|Returning_Visits|
+---+------+-----------+----------+----------+-------------+-----------------+----------------+
| 97|Friday|          6|12/19/2014|      2748|         1879|             1559|             320|
|237|Friday|          6|05-08-2015|      4696|         3366|             2824|             542|
|104|Friday|          6|12/26/2014|      1486|         1005|              808|             197|
|  6|Friday|          6| 9/19/2014|      2815|         1863|             1622|             241|
|111|Friday|          6|01-02-2015|      1948|         1288|             1030|             258|
| 20|Friday|          6|10-03-2014|      3005|         2097|             1856|             241|
|118|Friday|          6|01-09-2015|      2783|         1941|             1663|             278|
| 34|Friday|          6|10/17/2014|     

In [22]:
# 4.3 cast() and astype() - used to convert the data type.
df.select(df.Day,df.Row.cast("int")).printSchema()

root
 |-- Day: string (nullable = true)
 |-- Row: integer (nullable = true)



In [28]:
# 4.4 between() - Return a boolean expression when a column values in between lower and upper bound.
df.filter(df.Row.between(100,300)).show()

+---+---------+-----------+----------+----------+-------------+-----------------+----------------+
|Row|      Day|Day_Of_Week|      Date|Page_Loads|Unique_Visits|First_Time_Visits|Returning_Visits|
+---+---------+-----------+----------+----------+-------------+-----------------+----------------+
|100|   Monday|          2|12/22/2014|      2452|         1715|             1394|             321|
|101|  Tuesday|          3|12/23/2014|      2298|         1440|             1159|             281|
|102|Wednesday|          4|12/24/2014|      1430|          947|              772|             175|
|103| Thursday|          5|12/25/2014|      1002|          667|              522|             145|
|104|   Friday|          6|12/26/2014|      1486|         1005|              808|             197|
|105| Saturday|          7|12/27/2014|      1345|          941|              781|             160|
|106|   Sunday|          1|12/28/2014|      1453|          948|              765|             183|
|107|   Mo

In [31]:
# 4.5 contain() -check if a df column value contain a value specified in this function.
df.filter(df.Date.contains("9/24/2014")).show()

+---+---------+-----------+---------+----------+-------------+-----------------+----------------+
|Row|      Day|Day_Of_Week|     Date|Page_Loads|Unique_Visits|First_Time_Visits|Returning_Visits|
+---+---------+-----------+---------+----------+-------------+-----------------+----------------+
| 11|Wednesday|          4|9/24/2014|      4414|         3175|             2891|             284|
+---+---------+-----------+---------+----------+-------------+-----------------+----------------+



In [32]:
# 4.6 startswith() & endswith() - check if the value of df column starts and ends with a string resp.
df.filter(df.Day.startswith("F")).show()
df.filter(df.Day.endswith("a")).show()

+---+------+-----------+----------+----------+-------------+-----------------+----------------+
|Row|   Day|Day_Of_Week|      Date|Page_Loads|Unique_Visits|First_Time_Visits|Returning_Visits|
+---+------+-----------+----------+----------+-------------+-----------------+----------------+
|  6|Friday|          6| 9/19/2014|      2815|         1863|             1622|             241|
| 13|Friday|          6| 9/26/2014|      3323|         2249|             2033|             216|
| 20|Friday|          6|10-03-2014|      3005|         2097|             1856|             241|
| 27|Friday|          6|10-10-2014|      3565|         2382|             2100|             282|
| 34|Friday|          6|10/17/2014|      3624|         2477|             2148|             329|
| 41|Friday|          6|10/24/2014|      3571|         2498|             2170|             328|
| 48|Friday|          6|10/31/2014|      2933|         2007|             1728|             279|
| 55|Friday|          6|11-07-2014|     

In [35]:
# 4.7 eqNullSafe() - equality test that is safe for null values.
df.select(df.First_Time_Visits.eqNullSafe(None)).show()
df.select(df.First_Time_Visits.eqNullSafe(985)).show()

+----------------------------+
|(First_Time_Visits <=> NULL)|
+----------------------------+
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
+----------------------------+
only showing top 20 rows

+---------------------------+
|(First_Time_Visits <=> 985)|
+---------------------------+
|                      false|
|                      false|
|                      false|
|                      false|
|                   

In [36]:
# 4.8 isNull() & isNotNull() - check if the df column has NULL or non NULL values.
df.filter(df.Page_Loads.isNull()).show()
df.filter(df.Page_Loads.isNotNull()).show()

+---+---+-----------+----+----------+-------------+-----------------+----------------+
|Row|Day|Day_Of_Week|Date|Page_Loads|Unique_Visits|First_Time_Visits|Returning_Visits|
+---+---+-----------+----+----------+-------------+-----------------+----------------+
+---+---+-----------+----+----------+-------------+-----------------+----------------+

+---+---------+-----------+----------+----------+-------------+-----------------+----------------+
|Row|      Day|Day_Of_Week|      Date|Page_Loads|Unique_Visits|First_Time_Visits|Returning_Visits|
+---+---------+-----------+----------+----------+-------------+-----------------+----------------+
|  1|   Sunday|          1| 9/14/2014|      2146|         1582|             1430|             152|
|  2|   Monday|          2| 9/15/2014|      3621|         2528|             2297|             231|
|  3|  Tuesday|          3| 9/16/2014|      3698|         2630|             2352|             278|
|  4|Wednesday|          4| 9/17/2014|      3667|        

In [44]:
# 4.9 like() & rlike() - similar to SQL LIKE Exepression
df.select(df.Day,df.Date,df.Row).filter(df.Day.like("Sun%")).show()
df.select(df.Day,df.Date,df.Row).filter(df.Day.like("%ues%")).show()

+------+----------+---+
|   Day|      Date|Row|
+------+----------+---+
|Sunday| 9/14/2014|  1|
|Sunday| 9/21/2014|  8|
|Sunday| 9/28/2014| 15|
|Sunday|10-05-2014| 22|
|Sunday|10-12-2014| 29|
|Sunday|10/19/2014| 36|
|Sunday|10/26/2014| 43|
|Sunday|11-02-2014| 50|
|Sunday|11-09-2014| 57|
|Sunday|11/16/2014| 64|
|Sunday|11/23/2014| 71|
|Sunday|11/30/2014| 78|
|Sunday|12-07-2014| 85|
|Sunday|12/14/2014| 92|
|Sunday|12/21/2014| 99|
|Sunday|12/28/2014|106|
|Sunday|01-04-2015|113|
|Sunday|01-11-2015|120|
|Sunday| 1/18/2015|127|
|Sunday| 1/25/2015|134|
+------+----------+---+
only showing top 20 rows

+-------+----------+---+
|    Day|      Date|Row|
+-------+----------+---+
|Tuesday| 9/16/2014|  3|
|Tuesday| 9/23/2014| 10|
|Tuesday| 9/30/2014| 17|
|Tuesday|10-07-2014| 24|
|Tuesday|10/14/2014| 31|
|Tuesday|10/21/2014| 38|
|Tuesday|10/28/2014| 45|
|Tuesday|11-04-2014| 52|
|Tuesday|11-11-2014| 59|
|Tuesday|11/18/2014| 66|
|Tuesday|11/25/2014| 73|
|Tuesday|12-02-2014| 80|
|Tuesday|12-09-2014| 87

In [46]:
# 4.10 substr() - Return a column after getting sub string from column
df.select(df.Day.substr(2,5).alias("Substr")).show()

+------+
|Substr|
+------+
| unday|
| onday|
| uesda|
| ednes|
| hursd|
| riday|
| aturd|
| unday|
| onday|
| uesda|
| ednes|
| hursd|
| riday|
| aturd|
| unday|
| onday|
| uesda|
| ednes|
| hursd|
| riday|
+------+
only showing top 20 rows



In [50]:
 #4.11 when() & otherwise() - It is similar to SQL Case When, executes sequence of expressios until
# it matches the condition and returns a value when match

# when & otherwise
from pyspark.sql.functions import when
df.select(df.Row, df.Unique_Visits,when(df.Day=="Sunday","Weekend")\
          .when(df.Day=="Saturday","Weekend").otherwise(df.Day).alias("Week_Status")).show()

+---+-------------+-----------+
|Row|Unique_Visits|Week_Status|
+---+-------------+-----------+
|  1|         1582|    Weekend|
|  2|         2528|     Monday|
|  3|         2630|    Tuesday|
|  4|         2614|  Wednesday|
|  5|         2366|   Thursday|
|  6|         1863|     Friday|
|  7|         1118|    Weekend|
|  8|         1656|    Weekend|
|  9|         2586|     Monday|
| 10|         3257|    Tuesday|
| 11|         3175|  Wednesday|
| 12|         3029|   Thursday|
| 13|         2249|     Friday|
| 14|         1180|    Weekend|
| 15|         1806|    Weekend|
| 16|         2873|     Monday|
| 17|         3032|    Tuesday|
| 18|         2849|  Wednesday|
| 19|         2489|   Thursday|
| 20|         2097|     Friday|
+---+-------------+-----------+
only showing top 20 rows



In [52]:
# 4.12 isin() -check if value presents in a list 
list_=["9/23/2014","9/24/2014"]
df.select(df.Row,df.Day,df.First_Time_Visits).filter(df.Date.isin(list_)).show()

+---+---------+-----------------+
|Row|      Day|First_Time_Visits|
+---+---------+-----------------+
| 10|  Tuesday|             2989|
| 11|Wednesday|             2891|
+---+---------+-----------------+



In [57]:
# 4.13 getField() - To get the value by key from MapType column and by struct child name 
# from StructType Column
from pyspark.sql.types import StringType,StructField,StructType,ArrayType, MapType

structureData = [
    (("James","Smith"),["Java","C#"],{"hair": "black","eye":"brown"}),
    (("Michael","Rose"),["python","C#"],{"hair": "white","eye":"blue"}),
    (("Robert","Williams"),["Java","python"],{"hair": "red","eye":"brown"}),
    (("Maria","Anne"),["SQL","python"],{"hair": "black","eye":"blue"}),
    (("Jen","Mary"),["Java","SQL"],{"hair": "black","eye":"brown"})
  ]

schema = StructType([
    StructField("name",StructType([
        StructField("fname",StringType(),True),
        StructField("lname",StringType(),True)])),
    StructField("langauge",ArrayType(StringType(),True)),
    StructField("properties",MapType(StringType(),StringType(),True))])

data=spark.createDataFrame(structureData,schema)
data.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- fname: string (nullable = true)
 |    |-- lname: string (nullable = true)
 |-- langauge: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [60]:
# getField from MapType
data.select(data.properties.getField("hair"))
# getField from Struct
data.select(data.name.getField("fname"))

DataFrame[name.fname: string]

In [64]:
# 4.14 getItem() - To get the value by index from MapType or ArrayType and ny key for MapType column.

# getItem() used with ArrayType
data.select(data.langauge.getItem(1))

# getItem() used with MapType 
data.select(data.properties.getItem("hair"))

DataFrame[properties[hair]: string]

In [None]:
# 4.15 dropField - used to drops fields in StructType by name
# 4.16 withField() - An expression that adds/replaces a field in StructType by name
# 4.17 over() - used with window functions