In [1]:
%%time
import os
#os.environ['JAVA_HOME'] = 'C:\Progr~2\Java\jre-1.8'
#os.environ['SPARK_HOME'] = 'C:\Spark\spark-3.4.0-bin-hadoop3'
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sync_task").getOrCreate()

CPU times: total: 156 ms
Wall time: 5.94 s


In [2]:
spark

In [3]:
## to read a file in any format ("".csv"  here), use below method-

first_table = spark.read.format("csv").load(r"C:\Users\Rahul\Desktop\Project\train.csv", header = True)

In [4]:
#df.show(5)

In [5]:
## Convert range into DataFrame with 'x' column name using "toDF(x)" method. 

## Eg.
myRange = spark.range(1000).toDF("number")

myRange.show(10)  # use .show(x) to print top x records of "myRange" dataframe

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+
only showing top 10 rows



In [6]:
myRange.explain()       ## "explain" method is used to see the DataFrame’s lineage (or how Spark will execute this query)

== Physical Plan ==
*(1) Project [id#127L AS number#129L]
+- *(1) Range (0, 1000, step=1, splits=8)




In [7]:
# To make any DataFrame into a table or view, call this: 


first_table.createOrReplaceTempView("Table_DF") 

Remember, Spark can run the same transformations, regardless of the language, in the exact same way. You can express your
business logic in SQL or DataFrames. Spark will compile that logic down to an underlying plan (that you can see in the explain plan) before
actually executing your code. With Spark SQL, you can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. There is no performance difference between writing SQL queries or writing DataFrame code, they both “compile” to the same underlying plan that we specify in DataFrame code.

In [8]:
# Method 1 - Using Spark SQL
sqlWay = spark.sql("""
SELECT Cover_Type, max(`Elevation(meters)`), min(`Aspect(Degrees)`)
FROM Table_DF
GROUP BY Cover_Type
""")

sqlWay.show(10)

+----------+----------------------+--------------------+
|Cover_Type|max(Elevation(meters))|min(Aspect(Degrees))|
+----------+----------------------+--------------------+
|         1|                  3844|                   0|
|         2|                  3001|                   0|
|         3|                  2875|                   0|
|         4|                  2425|                 101|
|         5|                  3426|                   0|
|         6|                  2885|                   0|
|         7|                  3677|                   0|
+----------+----------------------+--------------------+



In [9]:
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[Cover_Type#71], functions=[max(Elevation(meters)#17), min(Aspect(Degrees)#18)])
   +- Sort [Cover_Type#71 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(Cover_Type#71, 200), ENSURE_REQUIREMENTS, [plan_id=109]
         +- SortAggregate(key=[Cover_Type#71], functions=[partial_max(Elevation(meters)#17), partial_min(Aspect(Degrees)#18)])
            +- Sort [Cover_Type#71 ASC NULLS FIRST], false, 0
               +- FileScan csv [Elevation(meters)#17,Aspect(degrees)#18,Cover_Type#71] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/Rahul/Desktop/Project/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Elevation(meters):string,Aspect(degrees):string,Cover_Type:string>




In [10]:
## Method 2 - Using spark DataFrame

### NOT ABLE TO RUN FOR NOW, WILL TAKE CARE LATER

dataFrameWay = first_table\
.groupBy("Cover_Type")\
.count()
dataFrameWay.explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Cover_Type#71], functions=[count(1)])
   +- Exchange hashpartitioning(Cover_Type#71, 200), ENSURE_REQUIREMENTS, [plan_id=123]
      +- HashAggregate(keys=[Cover_Type#71], functions=[partial_count(1)])
         +- FileScan csv [Cover_Type#71] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/Rahul/Desktop/Project/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Cover_Type:string>




In [11]:
# "take(x)" can be used to extract "xth" value from the dataframe

## Using spark SQL
spark.sql("SELECT max(`Elevation(meters)`) from Table_DF").take(1)

[Row(max(Elevation(meters))='3844')]

In [12]:
# Using spark dataframe

from pyspark.sql.functions import max       ## importing "max" function


first_table.select(max("Elevation(meters)")).take(1)

[Row(max(Elevation(meters))='3844')]

In [13]:
## Reading one more csv file for understanding

data_table = spark.read.format("csv").load(r"C:\Users\Rahul\Desktop\Project\Final_Train.csv", header = True)

In [14]:
data_table.show(10)

data_table.createOrReplaceTempView("Table_new") 

+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|       Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...|        Ayurveda|98% 76 Feedback W...| 350|
|MBBS, MS - Otorhi...| 9 years experience|  null|Mathikere - BEL, ...|  ENT Specialist|                null| 300|
| BSc - Zoology, BAMS|12 years experience|  null|Bannerghatta Road...|        Ayurveda|Bannerghatta Road...| 250|
|                BAMS|20 years experience|  100%|Keelkattalai, Che...|        Ayurveda|100% 4 Feedback K...| 250|
|                BAMS| 8 years experience|  null|      Porur, Chennai|        Ayurveda| 

In [15]:
# Using Spark SQL
maxSql = spark.sql("""
SELECT Profile, sum(Fees) as Total_Fees
FROM Table_new
GROUP BY Profile
ORDER BY sum(Fees) DESC
LIMIT 5 """)

maxSql.show()

+----------------+----------+
|         Profile|Total_Fees|
+----------------+----------+
|  Dermatologists|   10350.0|
|         Dentist|    9650.0|
|General Medicine|    8200.0|
|  ENT Specialist|    7050.0|
|       Homeopath|    5100.0|
+----------------+----------+



In [16]:
# Using spark DataFrame

from pyspark.sql.functions import desc, col, sum
from pyspark.sql.types import IntegerType

data_table.groupby('Profile').agg(sum(col("Fees").cast(IntegerType()))).show()



+----------------+----------------------+
|         Profile|sum(CAST(Fees AS INT))|
+----------------+----------------------+
|  Dermatologists|                 10350|
|        Ayurveda|                  4100|
|       Homeopath|                  5100|
|  ENT Specialist|                  7050|
|General Medicine|                  8200|
|         Dentist|                  9650|
+----------------+----------------------+



DataFrames and Datasets are (distributed) table-like collections with well-defined rows and columns. Each column must have the same number of rows as all the other columns (although you can use null to specify the absence of a value) and each column has type information that must be consistent for every row in the collection. To Spark, DataFrames and Datasets represent immutable, lazily evaluated plans that specify what operations to apply to data residing at a
location to generate some output. When we perform an action on a DataFrame, we instruct Spark to perform the actual transformations and return the result. These represent plans of how to manipulate rows and columns to compute the user’s desired result.

In [17]:
avg_exp_fee = spark.sql("""

select Experience, avg(Fees), min(Fees), max(Fees)
from Table_new
group by Experience
order by Experience
""")

avg_exp_fee.show(5)

+-------------------+------------------+---------+---------+
|         Experience|         avg(Fees)|min(Fees)|max(Fees)|
+-------------------+------------------+---------+---------+
| 0 years experience|             100.0|      100|      100|
|10 years experience|254.54545454545453|      100|      800|
|11 years experience|342.85714285714283|      100|      500|
|12 years experience| 376.6666666666667|      100|      800|
|13 years experience|428.57142857142856|      100|      750|
+-------------------+------------------+---------+---------+
only showing top 5 rows



In [18]:
## to convert pyspark dataframes into pandas dataframes, use ".toPandas()" method as used below-

x = avg_exp_fee.toPandas()

x.head()

Unnamed: 0,Experience,avg(Fees),min(Fees),max(Fees)
0,0 years experience,100.0,100,100
1,10 years experience,254.545455,100,800
2,11 years experience,342.857143,100,500
3,12 years experience,376.666667,100,800
4,13 years experience,428.571429,100,750


A schema defines the column names and types of a DataFrame. You can define schemas manually or read a schema from a data source (often called schema on read). Schemas consist of types, meaning that you need a way of specifying what lies where.

In [19]:
#Below code actually performs addition purely in Spark:

df_new = spark.range(500).toDF("number")
#df_new.select(df_new["number"] + 10)

In [20]:
df_new.select(df_new["number"]+25).show()

+-------------+
|(number + 25)|
+-------------+
|           25|
|           26|
|           27|
|           28|
|           29|
|           30|
|           31|
|           32|
|           33|
|           34|
|           35|
|           36|
|           37|
|           38|
|           39|
|           40|
|           41|
|           42|
|           43|
|           44|
+-------------+
only showing top 20 rows



Spark Types - Spark has a large number of internal type representations

In [21]:
# Use below method to import spark types- 

from pyspark.sql.types import *
#b = ByteType()

In [22]:
data_table.show(10)

+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|       Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...|        Ayurveda|98% 76 Feedback W...| 350|
|MBBS, MS - Otorhi...| 9 years experience|  null|Mathikere - BEL, ...|  ENT Specialist|                null| 300|
| BSc - Zoology, BAMS|12 years experience|  null|Bannerghatta Road...|        Ayurveda|Bannerghatta Road...| 250|
|                BAMS|20 years experience|  100%|Keelkattalai, Che...|        Ayurveda|100% 4 Feedback K...| 250|
|                BAMS| 8 years experience|  null|      Porur, Chennai|        Ayurveda| 

In [23]:
data_table.printSchema()    # print all schemas using "printSchema()" method

root
 |-- Qualification: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Place: string (nullable = true)
 |-- Profile: string (nullable = true)
 |-- Miscellaneous_Info: string (nullable = true)
 |-- Fees: string (nullable = true)



In [24]:
data_table.schema       # print schema info using "schema" method

StructType([StructField('Qualification', StringType(), True), StructField('Experience', StringType(), True), StructField('Rating', StringType(), True), StructField('Place', StringType(), True), StructField('Profile', StringType(), True), StructField('Miscellaneous_Info', StringType(), True), StructField('Fees', StringType(), True)])

A schema is a StructType made up of a number of fields, StructFields, that have a name,
type, a Boolean flag which specifies whether that column can contain missing or null values,
and, finally, users can optionally specify associated metadata with that column. The metadata is a
way of storing information about this column.
Schemas can contain other StructTypes (Spark’s complex types).

In [25]:
# The example that follows shows how to create and enforce a specific schema on a DataFrame-

from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("Qualification", StringType(), True),
StructField("Experience", StringType(), True),
StructField("Rating", StringType(), True),
StructField("Place", StringType(), True),
StructField("Profile", StringType(), True),
StructField("Miscellaneous_Info", StringType(), True),

StructField("Fees", LongType(), True, metadata={"hello":"world"}) ])

new_df = spark.read.format("json").schema(myManualSchema).load(r"C:\Users\Rahul\Desktop\Project\Final_Train.csv", header = True)
new_df.show(5)

+-------------+----------+------+-----+-------+------------------+----+
|Qualification|Experience|Rating|Place|Profile|Miscellaneous_Info|Fees|
+-------------+----------+------+-----+-------+------------------+----+
|         null|      null|  null| null|   null|              null|null|
|         null|      null|  null| null|   null|              null|null|
|         null|      null|  null| null|   null|              null|null|
|         null|      null|  null| null|   null|              null|null|
|         null|      null|  null| null|   null|              null|null|
+-------------+----------+------+-----+-------+------------------+----+
only showing top 5 rows



# Columns-

There are a lot of different ways to construct and refer to columns but the two simplest ways are
by using the "col" or "column" functions.

In [26]:
from pyspark.sql.functions import col, column, concat
col("Experience")
column("Experience")

Column<'Experience'>

If you need to refer to a specific DataFrame’s column, you can use the "col" method on the
specific DataFrame. This can be useful when you are performing a join and need to refer to a
specific column in one DataFrame that might share a name with another column in the joined
DataFrame.

In [27]:
# Below is the way by which you can look inside any column of a spark dataframe

#data_table.select(col("Fees")).show(15)
#data_table.select(col("Experience"),col("Fees")).show(15)
data_table.select(col("Experience"), col("Profile"),col("Rating"),col("Fees")).show(15)

+-------------------+----------------+------+----+
|         Experience|         Profile|Rating|Fees|
+-------------------+----------------+------+----+
|24 years experience|       Homeopath|  100%| 100|
|12 years experience|        Ayurveda|   98%| 350|
| 9 years experience|  ENT Specialist|  null| 300|
|12 years experience|        Ayurveda|  null| 250|
|20 years experience|        Ayurveda|  100%| 250|
| 8 years experience|        Ayurveda|  null| 100|
|42 years experience|       Homeopath|  null| 200|
|10 years experience|         Dentist|   99%| 200|
|14 years experience|General Medicine|  null| 100|
|23 years experience|         Dentist|  null| 100|
| 5 years experience|  ENT Specialist|  null| 700|
| 7 years experience|        Ayurveda|  null| 100|
| 9 years experience|         Dentist|   98%| 200|
|21 years experience|         Dentist|  null| 350|
|12 years experience|  ENT Specialist|  null| 500|
+-------------------+----------------+------+----+
only showing top 15 rows



# Expressions-

An expression is a set of transformations on one or more values in a record in a DataFrame. Think of it like a
function that takes as input one or more column names, resolves them, and then potentially
applies more expressions to create a single value for each record in the dataset. Importantly, this
“single value” can actually be a complex type like a Map or Array.

In [28]:
from pyspark.sql.functions import expr
#data_table.select(expr("(((Fees + 5) * 200) - 6)"), "Fees").show()
data_table.select(expr("(((Fees + 5) * 200) - 6)"),expr("(((Fees + 5) * 200) - 6) < Fees") , "Fees").show()

#data_table.select(expr("(((Fees + 5) * 200) - 6) > Fees")).show()

+------------------------+---------------------------------+----+
|(((Fees + 5) * 200) - 6)|((((Fees + 5) * 200) - 6) < Fees)|Fees|
+------------------------+---------------------------------+----+
|                 20994.0|                            false| 100|
|                 70994.0|                            false| 350|
|                 60994.0|                            false| 300|
|                 50994.0|                            false| 250|
|                 50994.0|                            false| 250|
|                 20994.0|                            false| 100|
|                 40994.0|                            false| 200|
|                 40994.0|                            false| 200|
|                 20994.0|                            false| 100|
|                 20994.0|                            false| 100|
|                140994.0|                            false| 700|
|                 20994.0|                            false| 100|
|         

In [29]:
data_table.first()   # print first row of a spark dataframe

Row(Qualification='BHMS, MD - Homeopathy', Experience='24 years experience', Rating='100%', Place='Kakkanad, Ernakulam', Profile='Homeopath', Miscellaneous_Info='100% 16 Feedback Kakkanad, Ernakulam', Fees='100')

# Creating Rows-
You can create rows by manually instantiating a "Row" object with the values that belong in each
column. It’s important to note that only DataFrames have schemas. Rows themselves do not have
schemas. This means that if you create a Row manually, you must specify the values in the same
order as the schema of the DataFrame to which they might be appended.

In [30]:
## Creating rows for spark dataframes

from pyspark.sql import Row

myRow = Row("MBBS","105 years experience","90%", "Charbagh, Lucknow","Ayurvedic", "Rest in Piece GUCCI MANE","AB3")

In [31]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
StructField("Qualification", StringType(), True),
StructField("Experience", StringType(), True),
StructField("Rating", StringType(), True),
StructField("Place", StringType(), True),
StructField("Profile", StringType(), True),
StructField("Miscellaneous_Info", StringType(), True),
StructField("Fees", StringType(), True)])


#my_df = spark.createDataFrame([myRow],myManualSchema)

In [32]:
x = spark.createDataFrame(myRow)

TypeError: Can not infer schema for type: <class 'str'>

In [34]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("some", LongType(), True),
StructField("col", LongType(), True),
StructField("names", LongType(), False)
])
myRow = (Row(1, 1, 1))
myDf = spark.sqlContext.createDataFrame(myRow, myManualSchema)
myDf.show()

AttributeError: 'SparkSession' object has no attribute 'sqlContext'

In [35]:
data_table.show(10)

+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|       Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...|        Ayurveda|98% 76 Feedback W...| 350|
|MBBS, MS - Otorhi...| 9 years experience|  null|Mathikere - BEL, ...|  ENT Specialist|                null| 300|
| BSc - Zoology, BAMS|12 years experience|  null|Bannerghatta Road...|        Ayurveda|Bannerghatta Road...| 250|
|                BAMS|20 years experience|  100%|Keelkattalai, Che...|        Ayurveda|100% 4 Feedback K...| 250|
|                BAMS| 8 years experience|  null|      Porur, Chennai|        Ayurveda| 

In [36]:
data_table.select('Rating', 'Fees').show(10)

+------+----+
|Rating|Fees|
+------+----+
|  100%| 100|
|   98%| 350|
|  null| 300|
|  null| 250|
|  100%| 250|
|  null| 100|
|  null| 200|
|   99%| 200|
|  null| 100|
|  null| 100|
+------+----+
only showing top 10 rows



You can refer to columns in a number of different ways; all you need to keep in mind is that you can use them interchangeably:

In [37]:
## Four ways to refer columns of a spark dataframe

from pyspark.sql.functions import expr, col, column
data_table.select(
expr("Rating"),    # Method 1: using "expr()" method
    
col("Rating"),    # Method 2: Using "col()" method
    
column("Rating"),  # Method 3: Using "Column()" method
    
   "Rating").show(2)    # Method 4: Generic method


+------+------+------+------+
|Rating|Rating|Rating|Rating|
+------+------+------+------+
|  100%|  100%|  100%|  100%|
|   98%|   98%|   98%|   98%|
+------+------+------+------+
only showing top 2 rows



"expr" is the most flexible reference that we can use. It can refer to a plain
column or a string manipulation of a column. To illustrate, let’s change the column name, and
then change it back by using the AS keyword and then the alias method on the column:

In [38]:
# Renaming a column name-

# Method 1 -
data_table.select(expr("Profile AS Expertize")).show(10)

# Method 2 -
data_table.select(expr("Profile").alias("Expertize")).show(10)

# Combining both methods together-
data_table.select(expr("Profile AS Expertize").alias("Medical Domain")).show(10)

+----------------+
|       Expertize|
+----------------+
|       Homeopath|
|        Ayurveda|
|  ENT Specialist|
|        Ayurveda|
|        Ayurveda|
|        Ayurveda|
|       Homeopath|
|         Dentist|
|General Medicine|
|         Dentist|
+----------------+
only showing top 10 rows

+----------------+
|       Expertize|
+----------------+
|       Homeopath|
|        Ayurveda|
|  ENT Specialist|
|        Ayurveda|
|        Ayurveda|
|        Ayurveda|
|       Homeopath|
|         Dentist|
|General Medicine|
|         Dentist|
+----------------+
only showing top 10 rows

+----------------+
|  Medical Domain|
+----------------+
|       Homeopath|
|        Ayurveda|
|  ENT Specialist|
|        Ayurveda|
|        Ayurveda|
|        Ayurveda|
|       Homeopath|
|         Dentist|
|General Medicine|
|         Dentist|
+----------------+
only showing top 10 rows



Because "select" followed by a series of "expr" is such a common pattern, Spark has a shorthand
for doing this efficiently: "selectExpr". This is probably the most convenient interface for
everyday use-

Here’s a simple example that adds a new column "withinCountry" to our DataFrame that specifies whether the destination and origin are the same:

In [39]:
data_table.selectExpr(
"*", # all original columns
"(Qualification <> Profile) as Compared_Columns").show(2)

+--------------------+-------------------+------+--------------------+---------+--------------------+----+----------------+
|       Qualification|         Experience|Rating|               Place|  Profile|  Miscellaneous_Info|Fees|Compared_Columns|
+--------------------+-------------------+------+--------------------+---------+--------------------+----+----------------+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|Homeopath|100% 16 Feedback ...| 100|            true|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...| Ayurveda|98% 76 Feedback W...| 350|            true|
+--------------------+-------------------+------+--------------------+---------+--------------------+----+----------------+
only showing top 2 rows



In [40]:
first_table.select('Hillshade_3pm','Slope(degrees)','Aspect(degrees)').show(10)

+-------------+--------------+---------------+
|Hillshade_3pm|Slope(degrees)|Aspect(degrees)|
+-------------+--------------+---------------+
|          158|            14|            186|
|          198|            15|            243|
|          142|            12|            162|
|          164|            17|            345|
|          151|             9|              4|
|          241|            33|            270|
|           92|            25|             34|
|          151|             3|            153|
|           28|            32|             92|
|          205|            26|            226|
+-------------+--------------+---------------+
only showing top 10 rows



In [41]:
first_table.selectExpr("avg(Hillshade_3pm) AS AVG_Hillshade", "max(Hillshade_3pm)","min(Hillshade_3pm)",\
              "count(distinct(`Slope(degrees)`)) AS distinct_count_slope", "count(`Aspect(degrees)`)", "sum(Hillshade_3pm)",\
             "max(`Aspect(degrees)`)").show()


+-----------------+------------------+------------------+--------------------+----------------------+------------------+--------------------+
|    AVG_Hillshade|max(Hillshade_3pm)|min(Hillshade_3pm)|distinct_count_slope|count(Aspect(degrees))|sum(Hillshade_3pm)|max(Aspect(degrees))|
+-----------------+------------------+------------------+--------------------+----------------------+------------------+--------------------+
|142.8546299483649|                99|                 0|                  52|                 29050|         4149927.0|                  99|
+-----------------+------------------+------------------+--------------------+----------------------+------------------+--------------------+



# Converting to Spark Types (Literals)-
Sometimes, we need to pass explicit values into Spark that are just a value (rather than a new
column). This might be a constant value or something we’ll need to compare to later on. The
way we do this is through literals. This is basically a translation from a given programming
language’s literal value to one that Spark understands. Literals are expressions and you can use
them in the same way:

In [42]:
from pyspark.sql.functions import lit
first_table.select("*",'Hillshade_3pm', lit(1).alias("One")).show(2)

+-----------------+---------------+--------------+----------------------------------------+--------------------------------------+---------------------------------------+-------------+--------------+-------------+------------------------------------------+-----------------+-----------------+-----------------+-----------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+----------+-------------+---+
|Elevation(meters)|Aspect(degrees)|Slope(degrees)|Horizontal_Distance_To_Hydrology(meters)|Vertical_Distance_To_Hydrology(meters)|H

# Adding Columns-
There’s also a more formal way of adding a new column to a DataFrame, and that’s by using the
"withColumn" method on our DataFrame. For example,


In [43]:
## Use "lit()" method if we want to enter either integer or string as used below-

#first_table.withColumn("numberOne", lit(1)).show(2)
data_table.withColumn("NumberOne",lit('AMKXA')).show(7)

+--------------------+-------------------+------+--------------------+--------------+--------------------+----+---------+
|       Qualification|         Experience|Rating|               Place|       Profile|  Miscellaneous_Info|Fees|NumberOne|
+--------------------+-------------------+------+--------------------+--------------+--------------------+----+---------+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|     Homeopath|100% 16 Feedback ...| 100|    AMKXA|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...|      Ayurveda|98% 76 Feedback W...| 350|    AMKXA|
|MBBS, MS - Otorhi...| 9 years experience|  null|Mathikere - BEL, ...|ENT Specialist|                null| 300|    AMKXA|
| BSc - Zoology, BAMS|12 years experience|  null|Bannerghatta Road...|      Ayurveda|Bannerghatta Road...| 250|    AMKXA|
|                BAMS|20 years experience|  100%|Keelkattalai, Che...|      Ayurveda|100% 4 Feedback K...| 250|    AMKXA|
|                BAMS| 8

we’ll set a Boolean flag for when profile is the same as the Qualification:

In [44]:
data_table.withColumn("Profile_Check", expr("Profile == Qualification")).show(2)

+--------------------+-------------------+------+--------------------+---------+--------------------+----+-------------+
|       Qualification|         Experience|Rating|               Place|  Profile|  Miscellaneous_Info|Fees|Profile_Check|
+--------------------+-------------------+------+--------------------+---------+--------------------+----+-------------+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|Homeopath|100% 16 Feedback ...| 100|        false|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...| Ayurveda|98% 76 Feedback W...| 350|        false|
+--------------------+-------------------+------+--------------------+---------+--------------------+----+-------------+
only showing top 2 rows



# Renaming Columns-
Although we can rename a column in the manner that we just described, another alternative is to
use the "withColumnRenamed" method.

In [45]:
data_table.withColumnRenamed("Profile", "Expertize").columns

['Qualification',
 'Experience',
 'Rating',
 'Place',
 'Expertize',
 'Miscellaneous_Info',
 'Fees']

# Case Sensitivity-
By default Spark is case insensitive; however, you can make Spark case sensitive by setting the
configuration:

set spark.sql.caseSensitive true

# Removing Columns-
Let’s take a look at how we can remove columns from
DataFrames. You likely already noticed that we can do this by using select. However, there is
also a dedicated method called "drop"

In [46]:
#first_table.drop("Soil_Type_1").columns

#We can drop multiple columns by passing in multiple columns as arguments:

first_table.drop("Soil_Type_1", "Soil_Type_2","Soil_Type_3", "Soil_Type_4").columns

['Elevation(meters)',
 'Aspect(degrees)',
 'Slope(degrees)',
 'Horizontal_Distance_To_Hydrology(meters)',
 'Vertical_Distance_To_Hydrology(meters)',
 'Horizontal_Distance_To_Roadways(meters)',
 'Hillshade_9am',
 'Hillshade_Noon',
 'Hillshade_3pm',
 'Horizontal_Distance_To_Fire_Points(meters)',
 'Wilderness_Area_1',
 'Wilderness_Area_2',
 'Wilderness_Area_3',
 'Wilderness_Area_4',
 'Soil_Type_5',
 'Soil_Type_6',
 'Soil_Type_7',
 'Soil_Type_8',
 'Soil_Type_9',
 'Soil_Type_10',
 'Soil_Type_11',
 'Soil_Type_12',
 'Soil_Type_13',
 'Soil_Type_14',
 'Soil_Type_15',
 'Soil_Type_16',
 'Soil_Type_17',
 'Soil_Type_18',
 'Soil_Type_19',
 'Soil_Type_20',
 'Soil_Type_21',
 'Soil_Type_22',
 'Soil_Type_23',
 'Soil_Type_24',
 'Soil_Type_25',
 'Soil_Type_26',
 'Soil_Type_27',
 'Soil_Type_28',
 'Soil_Type_29',
 'Soil_Type_30',
 'Soil_Type_31',
 'Soil_Type_32',
 'Soil_Type_33',
 'Soil_Type_34',
 'Soil_Type_35',
 'Soil_Type_36',
 'Soil_Type_37',
 'Soil_Type_38',
 'Soil_Type_39',
 'Soil_Type_40',
 'Cover_Ty

# Changing a Column’s Type (cast)-
Sometimes, we might need to convert from one type to another; for example, if we have a set of
StringType that should be integers. We can convert columns from one type to another by
casting the column from one type to another

In [47]:
data_table.withColumn("Extra Taxes", col("Fees").cast("float"))

DataFrame[Qualification: string, Experience: string, Rating: string, Place: string, Profile: string, Miscellaneous_Info: string, Fees: string, Extra Taxes: float]

In [48]:
data_table.withColumn("Fees in long", col("Fees").cast("long"))

DataFrame[Qualification: string, Experience: string, Rating: string, Place: string, Profile: string, Miscellaneous_Info: string, Fees: string, Fees in long: bigint]

# Filtering Rows-
To filter rows, we create an expression that evaluates to true or false. You then filter out the rows
with an expression that is equal to false.

There are two methods to perform this operation: you can use "where" or "filter"

In [49]:
data_table.filter(col("Fees")== 100).show(10)       # using "filter()" method

data_table.filter(col("Fees")>= 350).filter(col("Profile")=='Ayurveda').filter(col('Rating')=='98%').show()     ### using multiple "filter()" method togerther

#data_table.where(col("Fees")== 100).show(10)     

data_table.where("Fees = 100").show(10)	     ### using "where" method

+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|       Homeopath|100% 16 Feedback ...| 100|
|                BAMS| 8 years experience|  null|      Porur, Chennai|        Ayurveda|                null| 100|
|MBBS, MD - Genera...|14 years experience|  null| Old City, Hyderabad|General Medicine|                null| 100|
|            BSc, BDS|23 years experience|  null|   Athani, Ernakulam|         Dentist|                null| 100|
|                BAMS| 7 years experience|  null|Somajiguda, Hyder...|        Ayurveda|                null| 100|
|MBBS, Diploma in ...|24 years experience|  null|Tambaram West, Ch...|  ENT Specialist| 

In [50]:
data_table.show(10)

+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|       Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...|        Ayurveda|98% 76 Feedback W...| 350|
|MBBS, MS - Otorhi...| 9 years experience|  null|Mathikere - BEL, ...|  ENT Specialist|                null| 300|
| BSc - Zoology, BAMS|12 years experience|  null|Bannerghatta Road...|        Ayurveda|Bannerghatta Road...| 250|
|                BAMS|20 years experience|  100%|Keelkattalai, Che...|        Ayurveda|100% 4 Feedback K...| 250|
|                BAMS| 8 years experience|  null|      Porur, Chennai|        Ayurveda| 

# Getting Unique Rows- 
The way we do this is by using the distinct method on a
DataFrame, which allows us to deduplicate any rows that are in that DataFrame. 

In [51]:
data_table.select("Experience","Profile").distinct().count()    # Distinct count of "Experience", "Profile"
data_table.select("Experience","Profile").count()     # Total count of "Experience", "Profile"

149

# Random Samples-
Sometimes, you might just want to sample some random records from your DataFrame. You can
do this by using the "sample" method on a DataFrame and whether you’d like to sample with or without
replacement:

In [52]:
# in Python
seed = 8
withReplacement = False
fraction = 0.45
data_table.sample(withReplacement, fraction, seed).count()

76

# Random Splits-
Random splits can be helpful when you need to break up your DataFrame into a random “splits”
of the original DataFrame. 

In [53]:
dataFrames = data_table.randomSplit([0.25, 0.75], seed)
#dataFrames[0].count() > dataFrames[1].count()
dataFrames[0].count() > dataFrames[1].count()

False

# Concatenating and Appending Rows (Union)- 
To append to a DataFrame, you must
union the original DataFrame along with the new DataFrame. This just concatenates the two
DataFramess. To union two DataFrames, you must be sure that they have the same schema and
number of columns; otherwise, the union will fail.

In [54]:
from pyspark.sql import Row
schema = data_table.schema
newRows = [
Row("B.Tech", "2994 years experience", "8918%", "Janakpuri, Lucknow", "Software Engineer", "No such info.", 3450),
Row("B.Tech", "9282 years experience", "1111%", "Dubagga, Lucknow", "Senior Software Engineer", "No info.", 1928)
]
 
#print(newRows)
parallelizedRows = spark.sparkContext.parallelize(newRows)
#print(parallelizedRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
#print(newDF)


df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()

Traceback (most recent call last):
  File "C:\Users\Rahul\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Rahul\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "C:\Users\Rahul\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Rahul\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Rahul\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 565, in _fun

PicklingError: Could not serialize object: IndexError: tuple index out of range

# Sorting Rows-
When we sort the values in a DataFrame, we always want to sort with either the largest or
smallest values at the top of a DataFrame. There are two equivalent operations to do this "sort"
and "orderBy" that work the exact same way. 

They accept both column expressions and strings as
well as multiple columns. The default is to sort in ascending order:

In [None]:
data_table.sort("Fees").show(5)
data_table.orderBy("Fees", "Rating","Experience").show(5)
data_table.orderBy(col("Fees"), col("Rating"), col("Experience")).show(5)

In [55]:
#To more explicitly specify sort direction, you need to use the asc and desc functions if operating
#on a column. These allow you to specify the order in which a given column should be sorted:

from pyspark.sql.functions import desc, asc
#data_table.orderBy(expr("Fees desc")).show(5)
#data_table.orderBy(expr("Fees desc"), expr("Rating desc"),expr("Experience desc")).show(5)

data_table.orderBy(col("Fees").desc(), col("Rating").asc()).show(10)
#df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)


+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|                MBBS| 5 years experience|  null|Greater Kailash P...|General Medicine|Diabetes Manageme...| 800|
|MBBS, MD - Dermat...|12 years experience|  null|Panchsheel Park, ...|  Dermatologists|Panchsheel Park, ...| 800|
|      MBBS, MS - ENT|39 years experience|   86%|    Pitampura, Delhi|  ENT Specialist|86% 15 Feedback P...| 800|
|          MBBS, DDVL|10 years experience|   89%| Vadapalani, Chennai|  Dermatologists|Dermabrasion Chem...| 800|
|                MBBS|13 years experience|   96%|Jubilee Hills, Hy...|General Medicine|96% 20 Feedback J...| 750|
|MBBS, MD - Dermat...|33 years experience|  null|   Tuglakabad, Delhi|  Dermatologists| 

An advanced tip is to use "asc_nulls_first()", "desc_nulls_first()", "asc_nulls_last()", or
"desc_nulls_last()" method to specify where you would like your null values to appear in an ordered
DataFrame.


In [56]:
data_table.orderBy(col("Fees").asc(), col("Rating").asc_nulls_last()).show(10)

data_table.orderBy(col("Fees").asc(), col("Rating").desc_nulls_last(), col("Experience").asc_nulls_last()).show(10)

+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|                 BDS|10 years experience|  100%|Vileparle East, M...|         Dentist|Ceramic Veneers /...| 100|
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|       Homeopath|100% 16 Feedback ...| 100|
|BDS, MDS - Period...|20 years experience|  100%|Mogappair East, C...|         Dentist|100% 51 Feedback ...| 100|
|       MDS, DNB, BDS|21 years experience|  100%|Pollachi, Coimbatore|         Dentist|100% 7 Feedback P...| 100|
|MD - General Medi...|34 years experience|  100%|Borivali West, Mu...|General Medicine|100% 2 Feedback B...| 100|
|MBBS, MS - ENT, D...|31 years experience|   80%|        Saket, Delhi|  ENT Specialist|8

For optimization purposes, it’s sometimes advisable to sort within each partition before another
set of transformations. You can use the "sortWithinPartitions()" method to do this:


In [57]:
data_table.sortWithinPartitions("Profile").show()

+--------------------+-------------------+------+--------------------+--------+--------------------+----+
|       Qualification|         Experience|Rating|               Place| Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+--------+--------------------+----+
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...|Ayurveda|98% 76 Feedback W...| 350|
| BSc - Zoology, BAMS|12 years experience|  null|Bannerghatta Road...|Ayurveda|Bannerghatta Road...| 250|
|                BAMS|20 years experience|  100%|Keelkattalai, Che...|Ayurveda|100% 4 Feedback K...| 250|
|                BAMS| 8 years experience|  null|      Porur, Chennai|Ayurveda|                null| 100|
|                BAMS| 7 years experience|  null|Somajiguda, Hyder...|Ayurveda|                null| 100|
|                BAMS| 9 years experience|  null| IP Extension, Delhi|Ayurveda|                null| 300|
|BAMS, Diploma in ...|31 years experience|  10

# Limit-
Oftentimes, you might want to restrict what you extract from a DataFrame; 

In [58]:
data_table.limit(15).count()
data_table.orderBy(col("Fees").desc_nulls_last()).limit(20).show()

+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|      MBBS, MS - ENT|39 years experience|   86%|    Pitampura, Delhi|  ENT Specialist|86% 15 Feedback P...| 800|
|          MBBS, DDVL|10 years experience|   89%| Vadapalani, Chennai|  Dermatologists|Dermabrasion Chem...| 800|
|                MBBS| 5 years experience|  null|Greater Kailash P...|General Medicine|Diabetes Manageme...| 800|
|MBBS, MD - Dermat...|12 years experience|  null|Panchsheel Park, ...|  Dermatologists|Panchsheel Park, ...| 800|
|                MBBS|13 years experience|   96%|Jubilee Hills, Hy...|General Medicine|96% 20 Feedback J...| 750|
| MBBS, MS, DNB - ENT| 5 years experience|  null|Thousand Lights, ...|  ENT Specialist| 

# Converting to Spark Types-

One thing you’ll see us do throughout this chapter is convert native types to Spark types. We do
this by using the "lit" function. This function converts a type in another language to its correspnding Spark representation.

In [59]:
data_table.select(lit(237),lit('SLS'),lit(87.48))

DataFrame[237: int, SLS: string, 87.48: double]

# Working with Booleans-

Booleans are essential when it comes to data analysis because they are the foundation for all
filtering. 
Boolean statements consist of four elements: "and", "or", "true", and "false". We can specify equality as well as
less-than or greater-than

In [60]:
from pyspark.sql.functions import col
data_table.where(col("Fees") >= 500)\
.select("Rating", "Place", "Qualification","Fees")\
.show(10, False)     #### here, True/False is showing the trucation of records in the spark table. In case if length of record becomes very large, then 
                        # spark displays is with dots(....). In order to see complete word of the entry, keep it "False" else "True".

+------+------------------------+------------------------------------------------------------------+----+
|Rating|Place                   |Qualification                                                     |Fees|
+------+------------------------+------------------------------------------------------------------+----+
|null  |Thousand Lights, Chennai|MBBS, MS, DNB - ENT                                               |700 |
|null  |Kondli, Delhi           |MBBS, Diploma in Otorhinolaryngology (DLO), DNB - ENT             |500 |
|null  |Vasundhra Enclave, Delhi|MBBS, DNB - ENT                                                   |500 |
|100%  |HSR Layout, Bangalore   |MBBS, IBCLC (USA)                                                 |500 |
|97%   |Defence Colony, Delhi   |Diploma in Dermatology, MBBS                                      |500 |
|97%   |Banjara Hills, Hyderabad|MBBS, DDVL, Fellowship in Aesthetic Medicine                      |500 |
|null  |Sion West, Mumbai       |BDS          

In [61]:
# One more example

from pyspark.sql.functions import col
data_table.where(col("Rating") == '100%')\
.select("Rating","Place" , "Qualification","Fees")\
.show(10, True) 

+------+--------------------+--------------------+----+
|Rating|               Place|       Qualification|Fees|
+------+--------------------+--------------------+----+
|  100%| Kakkanad, Ernakulam|BHMS, MD - Homeop...| 100|
|  100%|Keelkattalai, Che...|                BAMS| 250|
|  100%|HSR Layout, Banga...|                MBBS| 150|
|  100%|Pollachi, Coimbatore|       MDS, DNB, BDS| 100|
|  100%|HSR Layout, Banga...|   MBBS, IBCLC (USA)| 500|
|  100%|Safdarjung Enclav...|BDS, MDS - Oral &...| 400|
|  100%|Hyder Nagar, Hyde...|BDS, MDS - Prosth...| 250|
|  100%|Dahisar West, Mumbai|BHMS, M. D. Hom. ...| 500|
|  100%| RT Nagar, Bangalore|BAMS, Diploma in ...| 500|
|  100%|Musheerabad, Hyde...|BDS, MDS - Oral a...| 200|
+------+--------------------+--------------------+----+
only showing top 10 rows



In Spark, you should always chain together and filters as a sequential filter.
The reason is that even if Boolean statements are expressed serially (one after the other),
Spark will flatten all of these filters into one statement and perform the filter at the same time,
creating the "and" statement for us.
Although you can specify your statements explicitly by using
"and" if you like, they’re often easier to understand and to read if you specify them serially. "or"
statements need to be specified in the same statement:

In [62]:
from pyspark.sql.functions import instr
priceFilter = data_table.Fees < 600
print(priceFilter)
descripFilter = instr(data_table.Qualification, "MBBS") >=1           # "instr(x,y)" returns 1st index of character 'y' from string x for each record
print(descripFilter)

data_table.where(data_table.Profile.isin("General Medicine")).where(priceFilter & descripFilter).show(truncate = True)

Column<'(Fees < 600)'>
Column<'(instr(Qualification, MBBS) >= 1)'>
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|MBBS, MD - Genera...|14 years experience|  null| Old City, Hyderabad|General Medicine|                null| 100|
|MBBS, MD - Genera...|10 years experience|  null|Saroor Nagar, Hyd...|General Medicine|                null| 200|
|                MBBS|19 years experience|  100%|HSR Layout, Banga...|General Medicine|100% 4 Feedback H...| 150|
|                MBBS|41 years experience|  null|     Chembur, Mumbai|General Medicine|                null| 200|
|MBBS, Fellowship ...|31 years experience|  null|Thammanam, Ernakulam|General Medicine|                null| 100|
|   MBBS, IBCLC (USA)

In [63]:
from pyspark.sql.functions import instr
priceFilter = data_table.Fees < 600
print(priceFilter)
descripFilter = instr(data_table.Qualification, "BHMS") == 1     # "instr(x,y)" returns 1st index of character 'y' from string x for each record
print(descripFilter)

data_table.where(data_table.Rating.isin("100%")).where(priceFilter & descripFilter).show(truncate = True)


## Here, we did not need to specify our filter as an expression and we used a column name without any extra work.

Column<'(Fees < 600)'>
Column<'(instr(Qualification, BHMS) = 1)'>
+--------------------+-------------------+------+--------------------+---------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|  Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+---------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|Homeopath|100% 16 Feedback ...| 100|
|BHMS, M. D. Hom. ...|12 years experience|  100%|Dahisar West, Mumbai|Homeopath|100% 21 Feedback ...| 500|
|BHMS, MS - Psycho...|14 years experience|  100%|HSR Layout, Banga...|Homeopath|100% 43 Feedback ...| 200|
|                BHMS|12 years experience|  100%|Kanakpura Road, B...|Homeopath|Pediaterics Skin ...| 500|
|                BHMS| 8 years experience|  100%|South Extension 2...|Homeopath|100% 9 Feedback S...| 350|
+--------------------+-------------------+------+--------------------+--------

Boolean expressions are not just reserved to filters. To filter a DataFrame, you can also just
specify a Boolean column:

In [64]:
# in Python
from pyspark.sql.functions import instr
DOTCodeFilter = col("Rating") == "100%"
priceFilter = col("Fees") > 200
descripFilter = instr(col("Experience"), "12") == 1
data_table.withColumn("isExpensive", DOTCodeFilter | (priceFilter & descripFilter))\
.where(col("isExpensive") == 'true')\
.select("Fees", "Qualification","Rating","isExpensive").show(5)

# Here, we did not need to specify our filter as an expression and we used a column name without any extra work.

+----+--------------------+------+-----------+
|Fees|       Qualification|Rating|isExpensive|
+----+--------------------+------+-----------+
| 100|BHMS, MD - Homeop...|  100%|       true|
| 350|BAMS, MD - Ayurve...|   98%|       true|
| 250| BSc - Zoology, BAMS|  null|       true|
| 250|                BAMS|  100%|       true|
| 500|MBBS, Diploma in ...|  null|       true|
+----+--------------------+------+-----------+
only showing top 5 rows



I’s often easier to just express
filters as SQL statements than using the programmatic DataFrame interface and Spark SQL
allows us to do this without paying any performance penalty. Like this-

In [65]:
# in Python
from pyspark.sql.functions import expr
data_table.withColumn("New_Column_added_as_Expensive", expr("NOT Fees <= 250"))\
.where("New_Column_added_as_Expensive = 'true'")\
.select("Qualification", "Rating","Fees","New_Column_added_as_Expensive").show(5,False)

+-----------------------------------------------------+------+----+-----------------------------+
|Qualification                                        |Rating|Fees|New_Column_added_as_Expensive|
+-----------------------------------------------------+------+----+-----------------------------+
|BAMS, MD - Ayurveda Medicine                         |98%   |350 |true                         |
|MBBS, MS - Otorhinolaryngology                       |null  |300 |true                         |
|MBBS, MS, DNB - ENT                                  |null  |700 |true                         |
|BDS, MDS - Oral & Maxillofacial Surgery              |null  |350 |true                         |
|MBBS, Diploma in Otorhinolaryngology (DLO), DNB - ENT|null  |500 |true                         |
+-----------------------------------------------------+------+----+-----------------------------+
only showing top 5 rows



# Working with Numbers-

In [66]:
from pyspark.sql.functions import expr, pow
fabricatedQuantity = pow(col("Fees"), 2)/10
data_table.select(expr("Fees"), fabricatedQuantity.alias("Calculated Fee")).show(10)

+----+--------------+
|Fees|Calculated Fee|
+----+--------------+
| 100|        1000.0|
| 350|       12250.0|
| 300|        9000.0|
| 250|        6250.0|
| 250|        6250.0|
| 100|        1000.0|
| 200|        4000.0|
| 200|        4000.0|
| 100|        1000.0|
| 100|        1000.0|
+----+--------------+
only showing top 10 rows



In [67]:
data_table.selectExpr(
"Place",
"(POWER((Fees * 1.15), 2.0) + 50) as Calculated_New_fee").show(10)


+--------------------+------------------+
|               Place|Calculated_New_fee|
+--------------------+------------------+
| Kakkanad, Ernakulam|13274.999999999996|
|Whitefield, Banga...|162056.24999999994|
|Mathikere - BEL, ...|          119075.0|
|Bannerghatta Road...|          82706.25|
|Keelkattalai, Che...|          82706.25|
|      Porur, Chennai|13274.999999999996|
|   Karol Bagh, Delhi|52949.999999999985|
|  Arekere, Bangalore|52949.999999999985|
| Old City, Hyderabad|13274.999999999996|
|   Athani, Ernakulam|13274.999999999996|
+--------------------+------------------+
only showing top 10 rows



Another common numerical task is rounding. By default, the "round" function rounds up if you’re exactly in between two numbers. You can
round down by using the "bround":

In [68]:
from pyspark.sql.functions import lit, round, bround
data_table.select(round(lit("2.5")), bround(lit("2.5"))).show(10)


+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
|          3.0|           2.0|
|          3.0|           2.0|
|          3.0|           2.0|
|          3.0|           2.0|
|          3.0|           2.0|
|          3.0|           2.0|
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 10 rows



Another numerical task is to compute the correlation of two columns. For example, we can see
the Pearson correlation coefficient for two columns to see if cheaper things are typically bought
in greater quantities.

In [69]:
first_table.withColumn('Wilderness_Area_1', col('Wilderness_Area_1').cast('float'))

DataFrame[Elevation(meters): string, Aspect(degrees): string, Slope(degrees): string, Horizontal_Distance_To_Hydrology(meters): string, Vertical_Distance_To_Hydrology(meters): string, Horizontal_Distance_To_Roadways(meters): string, Hillshade_9am: string, Hillshade_Noon: string, Hillshade_3pm: string, Horizontal_Distance_To_Fire_Points(meters): string, Wilderness_Area_1: float, Wilderness_Area_2: string, Wilderness_Area_3: string, Wilderness_Area_4: string, Soil_Type_1: string, Soil_Type_2: string, Soil_Type_3: string, Soil_Type_4: string, Soil_Type_5: string, Soil_Type_6: string, Soil_Type_7: string, Soil_Type_8: string, Soil_Type_9: string, Soil_Type_10: string, Soil_Type_11: string, Soil_Type_12: string, Soil_Type_13: string, Soil_Type_14: string, Soil_Type_15: string, Soil_Type_16: string, Soil_Type_17: string, Soil_Type_18: string, Soil_Type_19: string, Soil_Type_20: string, Soil_Type_21: string, Soil_Type_22: string, Soil_Type_23: string, Soil_Type_24: string, Soil_Type_25: strin

In [70]:
first_table.Wilderness_Area_1

Column<'Wilderness_Area_1'>

In [71]:
# in Python
from pyspark.sql.functions import corr
#first_table.stat.corr(col("Wilderness_Area_1"), col("Elevation(meters)").cast('long'))		## Method 1
first_table.select(corr(col("Wilderness_Area_1").cast('long'), col("Elevation(meters)").cast('long'))).show()		## Method 2

#df.select(corr("Quantity", "UnitPrice")).show()		## Method 2


+--------------------------------------------------------------------------+
|corr(CAST(Wilderness_Area_1 AS BIGINT), CAST(Elevation(meters) AS BIGINT))|
+--------------------------------------------------------------------------+
|                                                        0.1301108910010127|
+--------------------------------------------------------------------------+



Another common task is to compute summary statistics for a column or set of columns. We can
use the "describe" method to achieve exactly this.

In [72]:
data_table.describe().show()

+-------+--------------------+------------------+------+--------------------+---------+--------------------+------------------+
|summary|       Qualification|        Experience|Rating|               Place|  Profile|  Miscellaneous_Info|              Fees|
+-------+--------------------+------------------+------+--------------------+---------+--------------------+------------------+
|  count|                 149|               149|    75|                 148|      149|                  92|               149|
|   mean|                null|              null|  null|                null|     null|                null|298.32214765100673|
| stddev|                null|              null|  null|                null|     null|                null|187.12043630197886|
|    min|                BAMS|0 years experience|  100%|AS Rao Nagar, Hyd...| Ayurveda|1 Feedback Pallik...|               100|
|    max|PhD - Orthodontic...|9 years experience|   99%|Yelahanka, Bangalore|Homeopath|Viral Fever Treat

If you need these exact numbers, you can also perform this as an aggregation yourself by
importing the functions and applying them to the columns that you need:


In [73]:
# in Python
from pyspark.sql.functions import count, mean, stddev_pop, min, max

first_table.describe('Slope(degrees)','Elevation(meters)').show()
#first_table.select(mean(col("Slope(degrees)").cast('long'))).show()		## Method 2


+-------+-----------------+------------------+
|summary|   Slope(degrees)| Elevation(meters)|
+-------+-----------------+------------------+
|  count|            29050|             29050|
|   mean|14.02853700516351| 2959.328330464716|
| stddev|7.458200139042338|277.57822696412705|
|    min|                0|              1879|
|    max|                9|              3844|
+-------+-----------------+------------------+



There are a number of statistical functions available in the StatFunctions Package (accessible
using "stat" as we see in the code block below). These are DataFrame methods that you can use
to calculate a variety of different things. For instance, you can calculate either exact or
approximate quantiles of your data using the "approxQuantile" method:


colName = "Fees"
quantileProbs = [0.5]
relError = 0.05
first_table.stat.approxQuantile("Elevation(meters)", quantileProbs, relError)

You also can use this to see a cross-tabulation or frequent item pairs

In [74]:
# in Python
data_table.stat.crosstab("Qualification", "Fees").show()
data_table.stat.crosstab("Rating", "Profile").show()

+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  Qualification_Fees|100|150|200|250|300|350|400| 50|500|600|650|700|750|800|
+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|BDS, MDS - Oral &...|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|MDS-Oral Patholog...|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|MBBS, MD - Dermat...|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  1|
|BHMS, Diploma In ...|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|          MBBS, AFIH|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|MBBS, DDVL, MD - ...|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|
|BHMS, MD - Homeop...|  1|  0|  0|  1|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|
|                BHMS|  2|  1|  3|  0|  0|  1|  0|  0|  2|  0|  0|  0|  0|  0|
|                 BDS|  2|  0|  4|  4|  1|  0|  0|  0|  1|  0|  0|  0|  0|  0|
|MD - Ayurveda Med...|  0|  0|  0|  0|  0|  0|  0|  

In [75]:
# in Python
data_table.stat.freqItems(["Rating"]).show(20,False)
data_table.stat.freqItems(["Rating","Profile"]).show(20,False)
data_table.stat.freqItems(["Rating","Profile","Fees"]).show(20,False)

+------------------------------------------------------------------------------------------------------+
|Rating_freqItems                                                                                      |
+------------------------------------------------------------------------------------------------------+
|[79%, 80%, 97%, 82%, 94%, 99%, 78%, 96%, 36%, 87%, 93%, 90%, 89%, 95%, 100%, 98%, 74%, 86%, 88%, null]|
+------------------------------------------------------------------------------------------------------+

+------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|Rating_freqItems                                                                                      |Profile_freqItems                                                               |
+-----------------------------------------------------------------------------------------------------

we can also add a unique ID to each row by using the function
"monotonically_increasing_id". This function generates a unique value for each row, starting
with 0:


In [76]:
# in Python
from pyspark.sql.functions import monotonically_increasing_id
data_table.select('*',monotonically_increasing_id()).show(10)

+--------------------+-------------------+------+--------------------+----------------+--------------------+----+-----------------------------+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|monotonically_increasing_id()|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+-----------------------------+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|       Homeopath|100% 16 Feedback ...| 100|                            0|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...|        Ayurveda|98% 76 Feedback W...| 350|                            1|
|MBBS, MS - Otorhi...| 9 years experience|  null|Mathikere - BEL, ...|  ENT Specialist|                null| 300|                            2|
| BSc - Zoology, BAMS|12 years experience|  null|Bannerghatta Road...|        Ayurveda|Bannerghatta Road...| 250|                       

# Working with Strings-

String manipulation shows up in nearly every data flow, and it’s worth explaining what you can
do with strings. You might be manipulating log files performing regular expression extraction or
substitution, or checking for simple string existence, or making all strings uppercase or
lowercase.

The "initcap" function will capitalize every word in a given string when that word is separated from another by a space.


In [77]:
from pyspark.sql.functions import initcap
data_table.select(col("Qualification"),initcap(col("Qualification")),col('Experience'),initcap(col("Experience"))).show(15)


+--------------------+----------------------+-------------------+-------------------+
|       Qualification|initcap(Qualification)|         Experience|initcap(Experience)|
+--------------------+----------------------+-------------------+-------------------+
|BHMS, MD - Homeop...|  Bhms, Md - Homeop...|24 years experience|24 Years Experience|
|BAMS, MD - Ayurve...|  Bams, Md - Ayurve...|12 years experience|12 Years Experience|
|MBBS, MS - Otorhi...|  Mbbs, Ms - Otorhi...| 9 years experience| 9 Years Experience|
| BSc - Zoology, BAMS|   Bsc - Zoology, Bams|12 years experience|12 Years Experience|
|                BAMS|                  Bams|20 years experience|20 Years Experience|
|                BAMS|                  Bams| 8 years experience| 8 Years Experience|
|                BHMS|                  Bhms|42 years experience|42 Years Experience|
|                 BDS|                   Bds|10 years experience|10 Years Experience|
|MBBS, MD - Genera...|  Mbbs, Md - Genera...|14 years 

As just mentioned, you can cast strings in uppercase and lowercase, as well:

In [78]:
from pyspark.sql.functions import lower, upper
data_table.select(col("Qualification"),lower(col("Qualification")),upper(lower(col("Qualification")))).show(5,False)

+------------------------------+------------------------------+------------------------------+
|Qualification                 |lower(Qualification)          |upper(lower(Qualification))   |
+------------------------------+------------------------------+------------------------------+
|BHMS, MD - Homeopathy         |bhms, md - homeopathy         |BHMS, MD - HOMEOPATHY         |
|BAMS, MD - Ayurveda Medicine  |bams, md - ayurveda medicine  |BAMS, MD - AYURVEDA MEDICINE  |
|MBBS, MS - Otorhinolaryngology|mbbs, ms - otorhinolaryngology|MBBS, MS - OTORHINOLARYNGOLOGY|
|BSc - Zoology, BAMS           |bsc - zoology, bams           |BSC - ZOOLOGY, BAMS           |
|BAMS                          |bams                          |BAMS                          |
+------------------------------+------------------------------+------------------------------+
only showing top 5 rows



Another trivial task is adding or removing spaces around a string. 
You can do this by using "lpad", "ltrim", "rpad" and "rtrim", "trim"

In [79]:
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
data_table.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(10)


+------+------+-----+---+----------+
| ltrim| rtrim| trim| lp|        rp|
+------+------+-----+---+----------+
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
+------+------+-----+---+----------+
only showing top 10 rows



## Regular Expressions-

Regular Expressions are most frequently performed tasks for searching the existence of one string
in another or replacing all mentions of a string with another value. It gives
the user an ability to specify a set of rules to use to either extract values from a string or replace
them with some other values.

There are two key functions in Spark that you’ll need in
order to perform regular expression tasks: "regexp_extract()" and "regexp_replace()". These functions extract values and replace values, respectively.

In [80]:
# Search for "MBBS", "MD" and "BDS" word from column "Qualification" and replacing them with "GUCCI MANE" word

from pyspark.sql.functions import regexp_replace,regexp_extract
regex_string = "MBBS|MD|BDS"          
data_table.select(
regexp_replace(col("Qualification"), regex_string, "GUCCI MANE").alias("New_one_Qualification"),col("Qualification")).show(20,False)

+-----------------------------------------------------------+-----------------------------------------------------+
|New_one_Qualification                                      |Qualification                                        |
+-----------------------------------------------------------+-----------------------------------------------------+
|BHMS, GUCCI MANE - Homeopathy                              |BHMS, MD - Homeopathy                                |
|BAMS, GUCCI MANE - Ayurveda Medicine                       |BAMS, MD - Ayurveda Medicine                         |
|GUCCI MANE, MS - Otorhinolaryngology                       |MBBS, MS - Otorhinolaryngology                       |
|BSc - Zoology, BAMS                                        |BSc - Zoology, BAMS                                  |
|BAMS                                                       |BAMS                                                 |
|BAMS                                                       |BAMS       

In [81]:
#Replace string column value conditionally
from pyspark.sql.functions import when
data_table.withColumn('Qualificationnew', 
    when(data_table.Qualification.endswith('Homeopathy'),regexp_replace(data_table.Qualification,'Homeopathy','NONONHOEOPATHY')) \
   .when(data_table.Qualification.endswith('Medicine'),regexp_replace(data_table.Qualification,'Medicine','COMPOUNDER')) \
   .otherwise(data_table.Qualification)) \
   .select('Rating','Fees' ,'Qualification','Qualificationnew').show(20,truncate=False)

+------+----+-----------------------------------------------------+-----------------------------------------------------+
|Rating|Fees|Qualification                                        |Qualificationnew                                     |
+------+----+-----------------------------------------------------+-----------------------------------------------------+
|100%  |100 |BHMS, MD - Homeopathy                                |BHMS, MD - NONONHOEOPATHY                            |
|98%   |350 |BAMS, MD - Ayurveda Medicine                         |BAMS, MD - Ayurveda COMPOUNDER                       |
|null  |300 |MBBS, MS - Otorhinolaryngology                       |MBBS, MS - Otorhinolaryngology                       |
|null  |250 |BSc - Zoology, BAMS                                  |BSc - Zoology, BAMS                                  |
|100%  |250 |BAMS                                                 |BAMS                                                 |
|null  |100 |BAMS       

Another task might be to replace given characters with other characters. Building this as a
regular expression could be tedious, so Spark also provides the "translate()" function to replace these
values. This is done at the character level and will replace all instances of a character with the
indexed character in the replacement string:

In [82]:
## Replacing "-" with "&" from Qualification column

from pyspark.sql.functions import translate
data_table.select(translate(col("Qualification"), "-", "&"),col("Qualification")).show(20,False)

+-----------------------------------------------------+-----------------------------------------------------+
|translate(Qualification, -, &)                       |Qualification                                        |
+-----------------------------------------------------+-----------------------------------------------------+
|BHMS, MD & Homeopathy                                |BHMS, MD - Homeopathy                                |
|BAMS, MD & Ayurveda Medicine                         |BAMS, MD - Ayurveda Medicine                         |
|MBBS, MS & Otorhinolaryngology                       |MBBS, MS - Otorhinolaryngology                       |
|BSc & Zoology, BAMS                                  |BSc - Zoology, BAMS                                  |
|BAMS                                                 |BAMS                                                 |
|BAMS                                                 |BAMS                                                 |
|BHMS     

We can also perform something similar, like pulling out the first mentioned color:

In [83]:
from pyspark.sql.functions import regexp_extract
extract_str = "(Diploma|MS|BDS)"
data_table.select(regexp_extract(col("Qualification"), extract_str, 1).alias("New_extra_Qualification"),col("Qualification")).show(20,False)


+-----------------------+--------------------+
|New_extra_Qualification|       Qualification|
+-----------------------+--------------------+
|                     MS|BHMS, MD - Homeop...|
|                     MS|BAMS, MD - Ayurve...|
|                     MS|MBBS, MS - Otorhi...|
|                     MS| BSc - Zoology, BAMS|
|                     MS|                BAMS|
|                     MS|                BAMS|
|                     MS|                BHMS|
|                    BDS|                 BDS|
|                       |MBBS, MD - Genera...|
|                    BDS|            BSc, BDS|
|                     MS| MBBS, MS, DNB - ENT|
|                     MS|                BAMS|
|                    BDS|            BDS, MDS|
|                    BDS|BDS, MDS - Oral &...|
|                Diploma|MBBS, Diploma in ...|
|                       |MBBS, MD - Genera...|
|                Diploma|MBBS, Diploma in ...|
|                       |MBBS, MF- Homeopathy|
|            

# Working with Dates and Timestamps-

The key to understanding the transformations that you are going to need to apply is to ensure that you know
exactly what type and format you have at each given step of the way. Spark’s "TimestampType" class supports only second-level precision, which means that if
you’re going to be working with milliseconds or microseconds, you’ll need to work around this
problem by potentially operating on them as "longs". Any more precision when coercing to a
TimestampType will be removed.

Let’s begin with the basics and get the current date and the current timestamps:

In [84]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10).withColumn("today", current_date()).withColumn("now", current_timestamp())         

#"current_date()" method is used to get current date
#"current_timestamp()" method is used to get current time alongwith date

dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()
dateDF.show()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)

+---+----------+--------------------+
| id|     today|                 now|
+---+----------+--------------------+
|  0|2023-05-02|2023-05-02 23:42:...|
|  1|2023-05-02|2023-05-02 23:42:...|
|  2|2023-05-02|2023-05-02 23:42:...|
|  3|2023-05-02|2023-05-02 23:42:...|
|  4|2023-05-02|2023-05-02 23:42:...|
|  5|2023-05-02|2023-05-02 23:42:...|
|  6|2023-05-02|2023-05-02 23:42:...|
|  7|2023-05-02|2023-05-02 23:42:...|
|  8|2023-05-02|2023-05-02 23:42:...|
|  9|2023-05-02|2023-05-02 23:42:...|
+---+----------+--------------------+



In [85]:
dateDF.createOrReplaceTempView("Table_NEW")
spark.sql("""select * from Table_NEW""").show()

+---+----------+--------------------+
| id|     today|                 now|
+---+----------+--------------------+
|  0|2023-05-02|2023-05-02 23:42:...|
|  1|2023-05-02|2023-05-02 23:42:...|
|  2|2023-05-02|2023-05-02 23:42:...|
|  3|2023-05-02|2023-05-02 23:42:...|
|  4|2023-05-02|2023-05-02 23:42:...|
|  5|2023-05-02|2023-05-02 23:42:...|
|  6|2023-05-02|2023-05-02 23:42:...|
|  7|2023-05-02|2023-05-02 23:42:...|
|  8|2023-05-02|2023-05-02 23:42:...|
|  9|2023-05-02|2023-05-02 23:42:...|
+---+----------+--------------------+



Now that we have a simple DataFrame to work with, let’s add and subtract five days from today.
These functions take a column and then the number of days to either add or subtract as the
arguments:

In [86]:
from pyspark.sql.functions import date_add, date_sub
dateDF.select(col("today"),date_sub(col("today"), 5), date_add(col("today"), 5)).show(10)

+----------+------------------+------------------+
|     today|date_sub(today, 5)|date_add(today, 5)|
+----------+------------------+------------------+
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
+----------+------------------+------------------+



In [87]:
dateDF.createOrReplaceTempView("Table_NEW")
spark.sql("""select today, today - 5, today + 5  from Table_NEW""").show()

+----------+------------------+------------------+
|     today|date_sub(today, 5)|date_add(today, 5)|
+----------+------------------+------------------+
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
|2023-05-02|        2023-04-27|        2023-05-07|
+----------+------------------+------------------+



"datediff()" function that will return the number of days in between two dates. "months_between()" gives you the number of months between two dates.

In [88]:
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today"))).show(1)        # "datediff()" method is used to get no. of days difference b/w two dates

dateDF.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end"))\
.select(months_between(col("start"), col("end"))).show(1)       # "months_between()" method is used to get no. of months in b/w two dates provided

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row

+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -16.67741935|
+--------------------------------+
only showing top 1 row



The "to_date()" function allows you to convert a string to a date.

##### NOTE - Spark will not throw an error if it cannot parse the date; rather, it will just return null.

In [89]:
dateDF.createOrReplaceTempView("dateTable")
spark.sql("""SELECT to_date('2016-01-01'), months_between('2016-01-01', '2017-01-01'),
datediff('2016-01-01', '2017-01-01')
FROM dateTable""").show()

+-------------------+--------------------------------------------+--------------------------------+
|to_date(2016-01-01)|months_between(2016-01-01, 2017-01-01, true)|datediff(2016-01-01, 2017-01-01)|
+-------------------+--------------------------------------------+--------------------------------+
|         2016-01-01|                                       -12.0|                            -366|
|         2016-01-01|                                       -12.0|                            -366|
|         2016-01-01|                                       -12.0|                            -366|
|         2016-01-01|                                       -12.0|                            -366|
|         2016-01-01|                                       -12.0|                            -366|
|         2016-01-01|                                       -12.0|                            -366|
|         2016-01-01|                                       -12.0|                            -366|


In [90]:
from pyspark.sql.functions import to_date, lit
spark.range(5).withColumn("date", lit("2017-01-01")).printSchema()
spark.range(5).withColumn("date", lit("2017-01-01")).select(to_date(col("date"))).printSchema()
spark.range(5).withColumn("date", lit("2017-01-01")).select(to_date(col("date"))).show(1)


spark.range(5).withColumn("date_1", lit("2017-01-01")).withColumn("date_2", lit("2017-99-99")).\
select(to_date(col("date_1")), to_date(col("date_2"))).show(1)     # Since "2017-99-99" is an incorrect date, therefore system is displaying value as "null"

root
 |-- id: long (nullable = false)
 |-- date: string (nullable = false)

root
 |-- to_date(date): date (nullable = true)

+-------------+
|to_date(date)|
+-------------+
|   2017-01-01|
+-------------+
only showing top 1 row

+---------------+---------------+
|to_date(date_1)|to_date(date_2)|
+---------------+---------------+
|     2017-01-01|           null|
+---------------+---------------+
only showing top 1 row



In [97]:
cleanDateDF.createOrReplaceTempView("dateTable2")

spark.sql("""SELECT to_date(date, 'yyyy-dd-MM'), to_date(date2, 'yyyy-dd-MM'), to_date(date)
FROM dateTable2""").show()

+-------------------------+--------------------------+-------------+
|to_date(date, yyyy-dd-MM)|to_date(date2, yyyy-dd-MM)|to_date(date)|
+-------------------------+--------------------------+-------------+
|               2017-11-12|                2017-12-20|   2017-11-12|
+-------------------------+--------------------------+-------------+



We find this to be an especially tricky situation for bugs because some dates might match the
correct format, whereas others do not.

We will use two functions to fix this: "to_date()" and "to_timestamp()"  method.


In [98]:
from pyspark.sql.functions import to_date
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(
to_date(lit("2017-12-11"), dateFormat).alias("date"),
to_date(lit("2017-20-12"), dateFormat).alias("date2"))

cleanDateDF.show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



In [99]:
cleanDateDF.createOrReplaceTempView("dateTable2")

spark.sql("""SELECT to_date(date, 'yyyy-dd-MM'), to_date(date2, 'yyyy-dd-MM'), to_date(date)

FROM dateTable2""").show()

+-------------------------+--------------------------+-------------+
|to_date(date, yyyy-dd-MM)|to_date(date2, yyyy-dd-MM)|to_date(date)|
+-------------------------+--------------------------+-------------+
|               2017-11-12|                2017-12-20|   2017-11-12|
+-------------------------+--------------------------+-------------+



Now let’s use an example of "to_timestamp()", which always requires a format to be specified:

In [100]:
from pyspark.sql.functions import to_timestamp
dateFormat = "yyyy-dd-MM"
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

+------------------------------+
|to_timestamp(date, yyyy-dd-MM)|
+------------------------------+
|           2017-11-12 00:00:00|
+------------------------------+



In [101]:
## BELOW CODE NOT FUNCTIONING AS REQUIRED.....NEED SOME RCA

cleanDateDF.createOrReplaceTempView("dateTable2")

spark.sql("""SELECT date, to_timestamp(date, 'yyyy-MM-dd'), date2, to_timestamp(date2, 'yyyy-dd-MM') 

FROM dateTable2
""").show()

+----------+------------------------------+----------+-------------------------------+
|      date|to_timestamp(date, yyyy-MM-dd)|     date2|to_timestamp(date2, yyyy-dd-MM)|
+----------+------------------------------+----------+-------------------------------+
|2017-11-12|           2017-11-12 00:00:00|2017-12-20|            2017-12-20 00:00:00|
+----------+------------------------------+----------+-------------------------------+



After we have our date or timestamp in the correct format and type, comparing between them is
actually quite easy. We just need to be sure to either use a date/timestamp type or specify our
string according to the right format of yyyy-MM-dd if we’re comparing a date:

In [102]:
cleanDateDF.filter(col("date2") > to_date(lit("2017-12-12"))).show()

cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()

#One minor point is that we can also set this as a string, which Spark parses to a literal:

cleanDateDF.filter(col("date2") > "2017-12-12").show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



# Working with Nulls in Data-

You should always use nulls to represent missing or empty data in your
DataFrames. Spark can optimize working with null values more than it can if you use empty
strings or other values.

Use the ".na" subpackage on a DataFrame for interacting with null values.


There are two things you can do with null values: you can explicitly drop nulls or you can fill
them with a value (globally or on a per-column basis). Let’s experiment with each of these now.

Coalesce()-
Spark includes a function to allow you to select the first non-null value from a set of columns by
using the coalesce function. 


In [108]:
from pyspark.sql.functions import coalesce
data_table.select(col("Rating"), col("Fees"),coalesce(col("Rating"), col("Fees"))).show(10)    

+------+----+----------------------+
|Rating|Fees|coalesce(Rating, Fees)|
+------+----+----------------------+
|  100%| 100|                  100%|
|   98%| 350|                   98%|
|  null| 300|                   300|
|  null| 250|                   250|
|  100%| 250|                  100%|
|  null| 100|                   100|
|  null| 200|                   200|
|   99%| 200|                   99%|
|  null| 100|                   100|
|  null| 100|                   100|
+------+----+----------------------+
only showing top 10 rows



##### ifnull, nullif, nvl, and nvl2-

There are several other SQL functions that you can use to achieve similar things.

"ifnull" allows you to select the second value if the first is null, and defaults to the first.

"nullif" returns null if the two values are equal or else returns the second if they are not.

"nvl" returns the second value if the first is null, but defaults to the first.

"nvl2" returns the second value if the first is not null; otherwise, it will return the last specified value.


data_table.select(col("Rating"), col("Fees"),ifnull(col("Rating"), col("Fees"))).show(10)
data_table.select(col("Rating"), col("Fees"),nullif(col("Rating"), col("Fees"))).show(10)    
data_table.select(col("Rating"), col("Fees"),nvl(col("Rating"), col("Fees"))).show(10)    
data_table.select(col("Rating"), col("Fees"),nvl2(col("Rating"), col("Fees"))).show(10)    

In [149]:
data_table.createOrReplaceTempView("dfTable")

spark.sql(""" select ifnull(null, 'return_value') a,
nullif('value', 'value') b,
nvl(null, 'return_value') c,
nvl2('not_null', 'return_value', "else_value") d
FROM dfTable LIMIT 1""").show()

+------------+----+------------+------------+
|           a|   b|           c|           d|
+------------+----+------------+------------+
|return_value|null|return_value|return_value|
+------------+----+------------+------------+



drop-
"drop()" removes rows that contain nulls. The default is to drop row in which any value is null:

In [150]:
data_table.na.drop().show(5)      ## ".na()" drops all rows which has null in any column.

data_table.na.drop("any").show(5)  	# Specifying "any" as an argument drops a row if any of the values are null

+--------------------+-------------------+------+--------------------+---------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|  Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+---------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...| Ayurveda|98% 76 Feedback W...| 350|
|                BAMS|20 years experience|  100%|Keelkattalai, Che...| Ayurveda|100% 4 Feedback K...| 250|
|                 BDS|10 years experience|   99%|  Arekere, Bangalore|  Dentist|Dental Fillings C...| 200|
|            BDS, MDS| 9 years experience|   98%|Coimbatore Raceco...|  Dentist|98% 14 Feedback C...| 200|
+--------------------+-------------------+------+--------------------+---------+--------------------+----+
only showing top 5 rows

+-----------

In [151]:
data_table.createOrReplaceTempView("dfTable")
spark.sql("""SELECT * FROM dfTable WHERE Qualification IS NOT NULL 
and Experience IS NOT NULL 
and Rating IS NOT NULL 
and Profile IS NOT NULL 
and Place IS NOT NULL 
and Miscellaneous_Info IS NOT NULL""").show(5)

+--------------------+-------------------+------+--------------------+---------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|  Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+---------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...| Ayurveda|98% 76 Feedback W...| 350|
|                BAMS|20 years experience|  100%|Keelkattalai, Che...| Ayurveda|100% 4 Feedback K...| 250|
|                 BDS|10 years experience|   99%|  Arekere, Bangalore|  Dentist|Dental Fillings C...| 200|
|            BDS, MDS| 9 years experience|   98%|Coimbatore Raceco...|  Dentist|98% 14 Feedback C...| 200|
+--------------------+-------------------+------+--------------------+---------+--------------------+----+
only showing top 5 rows



In [152]:
data_table.na.drop("all").show(10) 		# Using “all” drops the row only if all values are null or NaN for that row

#We can also apply this to certain sets of columns by passing in an array of columns:
data_table.na.drop("all", subset=["Rating", "Miscellaneous_Info"]).show(10)

+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+------+--------------------+----------------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|  100%| Kakkanad, Ernakulam|       Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|   98%|Whitefield, Banga...|        Ayurveda|98% 76 Feedback W...| 350|
|MBBS, MS - Otorhi...| 9 years experience|  null|Mathikere - BEL, ...|  ENT Specialist|                null| 300|
| BSc - Zoology, BAMS|12 years experience|  null|Bannerghatta Road...|        Ayurveda|Bannerghatta Road...| 250|
|                BAMS|20 years experience|  100%|Keelkattalai, Che...|        Ayurveda|100% 4 Feedback K...| 250|
|                BAMS| 8 years experience|  null|      Porur, Chennai|        Ayurveda| 

fill-
Using the "fill()" function, you can fill one or more columns with a set of values.

For example, to fill all null values in columns of type String, you might specify the following:


In [153]:
data_table.na.fill("GUCCI MANE GUCCI MANE GUCCI MANE").show(5)

+--------------------+-------------------+--------------------+--------------------+--------------+--------------------+----+
|       Qualification|         Experience|              Rating|               Place|       Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+--------------------+--------------------+--------------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|                100%| Kakkanad, Ernakulam|     Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|                 98%|Whitefield, Banga...|      Ayurveda|98% 76 Feedback W...| 350|
|MBBS, MS - Otorhi...| 9 years experience|GUCCI MANE GUCCI ...|Mathikere - BEL, ...|ENT Specialist|GUCCI MANE GUCCI ...| 300|
| BSc - Zoology, BAMS|12 years experience|GUCCI MANE GUCCI ...|Bannerghatta Road...|      Ayurveda|Bannerghatta Road...| 250|
|                BAMS|20 years experience|                100%|Keelkattalai, Che...|      Ayurveda|100% 4 Feedback K..


We could do the same for columns of type Integer by using df.na.fill(5:Integer), or for
Doubles df.na.fill(5:Double).To specify columns, we just pass in an array of column names like we did in the previous example:

In [154]:
data_table.na.fill("SUPER GUCCI MANE", subset=["Rating", "Miscellaneous_Info"]).show(5)

fill_cols_vals = {"Rating": "UNKNOWN RATING" ,"Miscellaneous_Info" : 9999999}
data_table.na.fill(fill_cols_vals).show(10)

+--------------------+-------------------+----------------+--------------------+--------------+--------------------+----+
|       Qualification|         Experience|          Rating|               Place|       Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+----------------+--------------------+--------------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|            100%| Kakkanad, Ernakulam|     Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|             98%|Whitefield, Banga...|      Ayurveda|98% 76 Feedback W...| 350|
|MBBS, MS - Otorhi...| 9 years experience|SUPER GUCCI MANE|Mathikere - BEL, ...|ENT Specialist|    SUPER GUCCI MANE| 300|
| BSc - Zoology, BAMS|12 years experience|SUPER GUCCI MANE|Bannerghatta Road...|      Ayurveda|Bannerghatta Road...| 250|
|                BAMS|20 years experience|            100%|Keelkattalai, Che...|      Ayurveda|100% 4 Feedback K...| 250|
+--------------------+--

replace-
The most common use case is to replace all values in a certain column according to their current value.

In [155]:
data_table.na.replace(["100%","99%"], ["UNCLEAR RATING","YOYOYOYO"], "Rating").show(10)     
x = data_table.na.replace(["100%","99%"], ["UNCLEAR RATING","YOYOYOYO"], "Rating")
# Replacing 100% with "UNCLEAR RATING" and 99% with "YOYOYOYO" in Rating column

+--------------------+-------------------+--------------+--------------------+----------------+--------------------+----+
|       Qualification|         Experience|        Rating|               Place|         Profile|  Miscellaneous_Info|Fees|
+--------------------+-------------------+--------------+--------------------+----------------+--------------------+----+
|BHMS, MD - Homeop...|24 years experience|UNCLEAR RATING| Kakkanad, Ernakulam|       Homeopath|100% 16 Feedback ...| 100|
|BAMS, MD - Ayurve...|12 years experience|           98%|Whitefield, Banga...|        Ayurveda|98% 76 Feedback W...| 350|
|MBBS, MS - Otorhi...| 9 years experience|          null|Mathikere - BEL, ...|  ENT Specialist|                null| 300|
| BSc - Zoology, BAMS|12 years experience|          null|Bannerghatta Road...|        Ayurveda|Bannerghatta Road...| 250|
|                BAMS|20 years experience|UNCLEAR RATING|Keelkattalai, Che...|        Ayurveda|100% 4 Feedback K...| 250|
|                BAMS| 8

Structs-
You can think of structs as DataFrames within DataFrames.

We can create a struct by wrapping a set of columns in parenthesis in a query:Ordering-
As we discussed in Chapter 5, you can use "asc_nulls_first", "desc_nulls_first",
"asc_nulls_last", or "desc_nulls_last" to specify where you would like your null values to
appear in an ordered DataFrame.


# Working with Complex Types-
Complex types can help you organize and structure your data in ways that make more sense for
the problem that you are hoping to solve. 

There are three kinds of complex types: structs, arrays and maps


Structs-
You can think of structs as DataFrames within DataFrames.

We can create a struct by wrapping a set of columns in parenthesis in a query:

In [156]:
first_table.selectExpr("(`Aspect(degrees)`, `Elevation(meters)`, `Slope(degrees)`) as complex").show(10)
#df.selectExpr("struct(Description, InvoiceNo) as complex", "*")

+---------------+
|        complex|
+---------------+
|{186, 2982, 14}|
|{243, 2929, 15}|
|{162, 3051, 12}|
|{345, 3090, 17}|
|   {4, 3023, 9}|
|{270, 3174, 33}|
| {34, 3306, 25}|
| {153, 3257, 3}|
| {92, 2180, 32}|
|{226, 2857, 26}|
+---------------+
only showing top 10 rows



In [157]:
from pyspark.sql.functions import struct
complexDF = data_table.select(struct("Profile", "Place", "Rating").alias("complex"))
complexDF.show(10,False)

+--------------------------------------------------+
|complex                                           |
+--------------------------------------------------+
|{Homeopath, Kakkanad, Ernakulam, 100%}            |
|{Ayurveda, Whitefield, Bangalore, 98%}            |
|{ENT Specialist, Mathikere - BEL, Bangalore, null}|
|{Ayurveda, Bannerghatta Road, Bangalore, null}    |
|{Ayurveda, Keelkattalai, Chennai, 100%}           |
|{Ayurveda, Porur, Chennai, null}                  |
|{Homeopath, Karol Bagh, Delhi, null}              |
|{Dentist, Arekere, Bangalore, 99%}                |
|{General Medicine, Old City, Hyderabad, null}     |
|{Dentist, Athani, Ernakulam, null}                |
+--------------------------------------------------+
only showing top 10 rows



In [158]:
from pyspark.sql.functions import struct
complexDF = data_table.select(struct("Place", "Rating","Profile").alias("complex_columns"))
complexDF.show(10)

+--------------------+
|     complex_columns|
+--------------------+
|{Kakkanad, Ernaku...|
|{Whitefield, Bang...|
|{Mathikere - BEL,...|
|{Bannerghatta Roa...|
|{Keelkattalai, Ch...|
|{Porur, Chennai, ...|
|{Karol Bagh, Delh...|
|{Arekere, Bangalo...|
|{Old City, Hydera...|
|{Athani, Ernakula...|
+--------------------+
only showing top 10 rows



In [159]:
complexDF.createOrReplaceTempView("complexDF")
#We now have a DataFrame with a column complex. We can query it just as we might another DataFrame, the only difference is that we use a dot syntax to do so, or the column method "getField":
complexDF.select("complex_columns.Place").show(10)
complexDF.select("complex_columns.Place", "complex_columns.Profile").show(10)

complexDF.select(col("complex_columns").getField("Place")).show(10)
complexDF.select(col("complex_columns").getField("Rating"), col("complex_columns").getField("Profile")).show(10)

+--------------------+
|               Place|
+--------------------+
| Kakkanad, Ernakulam|
|Whitefield, Banga...|
|Mathikere - BEL, ...|
|Bannerghatta Road...|
|Keelkattalai, Che...|
|      Porur, Chennai|
|   Karol Bagh, Delhi|
|  Arekere, Bangalore|
| Old City, Hyderabad|
|   Athani, Ernakulam|
+--------------------+
only showing top 10 rows

+--------------------+----------------+
|               Place|         Profile|
+--------------------+----------------+
| Kakkanad, Ernakulam|       Homeopath|
|Whitefield, Banga...|        Ayurveda|
|Mathikere - BEL, ...|  ENT Specialist|
|Bannerghatta Road...|        Ayurveda|
|Keelkattalai, Che...|        Ayurveda|
|      Porur, Chennai|        Ayurveda|
|   Karol Bagh, Delhi|       Homeopath|
|  Arekere, Bangalore|         Dentist|
| Old City, Hyderabad|General Medicine|
|   Athani, Ernakulam|         Dentist|
+--------------------+----------------+
only showing top 10 rows

+---------------------+
|complex_columns.Place|
+-----------------

###### Arrays-

To define arrays, let’s work through a use case. With our current data, our objective is to take
every single word in our [Description] column and convert that into a row in our DataFrame.
The first task is to turn our [Description] column into a complex type, an array.


split-
We do this by using the "split()" function and specify the delimiter:

In [160]:
# in Python
from pyspark.sql.functions import split
data_table.select(split(col("Place"), ",")[0]).show(10)
data_table.select(split(col("Place"), ",")[1]).show(10)
data_table.select(split(col("Place"), ",")[2]).show(10)

+----------------------+
|split(Place, ,, -1)[0]|
+----------------------+
|              Kakkanad|
|            Whitefield|
|       Mathikere - BEL|
|     Bannerghatta Road|
|          Keelkattalai|
|                 Porur|
|            Karol Bagh|
|               Arekere|
|              Old City|
|                Athani|
+----------------------+
only showing top 10 rows

+----------------------+
|split(Place, ,, -1)[1]|
+----------------------+
|             Ernakulam|
|             Bangalore|
|             Bangalore|
|             Bangalore|
|               Chennai|
|               Chennai|
|                 Delhi|
|             Bangalore|
|             Hyderabad|
|             Ernakulam|
+----------------------+
only showing top 10 rows

+----------------------+
|split(Place, ,, -1)[2]|
+----------------------+
|                  null|
|                  null|
|                  null|
|                  null|
|                  null|
|                  null|
|                  null

In [161]:
data_table.createOrReplaceTempView("dfTable")
spark.sql("""SELECT split(Place, ',')[0] FROM dfTable""").show(5)
spark.sql("""SELECT split(Place, ',')[1] FROM dfTable""").show(5)
spark.sql("""SELECT split(Qualification, ',')[0],split(Qualification, ',')[1],split(Qualification, ',')[2]  FROM dfTable""").show(5)

+----------------------+
|split(Place, ,, -1)[0]|
+----------------------+
|              Kakkanad|
|            Whitefield|
|       Mathikere - BEL|
|     Bannerghatta Road|
|          Keelkattalai|
+----------------------+
only showing top 5 rows

+----------------------+
|split(Place, ,, -1)[1]|
+----------------------+
|             Ernakulam|
|             Bangalore|
|             Bangalore|
|             Bangalore|
|               Chennai|
+----------------------+
only showing top 5 rows

+------------------------------+------------------------------+------------------------------+
|split(Qualification, ,, -1)[0]|split(Qualification, ,, -1)[1]|split(Qualification, ,, -1)[2]|
+------------------------------+------------------------------+------------------------------+
|                          BHMS|               MD - Homeopathy|                          null|
|                          BAMS|           MD - Ayurveda Me...|                          null|
|                        

In [162]:
data_table.select(split(col("Place"), ",").alias("array_col"),split(col("Qualification"), ",").alias("array_qua_col"))\
.selectExpr("array_col[1]","array_qua_col[0]").show(10,False)

## Splitting wrt. "," only. In case if we want splits wrt. more symbols like "-", "_", "+", then we can use OR ("|") in between.
data_table.select(split(col("Place"), ",").alias("array_col"),split(col("Qualification"), ",|-").alias("array_qua_col"))\
.selectExpr("array_col[1]","array_qua_col","array_qua_col[0]").show(10,False)

+------------+----------------+
|array_col[1]|array_qua_col[0]|
+------------+----------------+
| Ernakulam  |BHMS            |
| Bangalore  |BAMS            |
| Bangalore  |MBBS            |
| Bangalore  |BSc - Zoology   |
| Chennai    |BAMS            |
| Chennai    |BAMS            |
| Delhi      |BHMS            |
| Bangalore  |BDS             |
| Hyderabad  |MBBS            |
| Ernakulam  |BSc             |
+------------+----------------+
only showing top 10 rows

+------------+----------------------------------+----------------+
|array_col[1]|array_qua_col                     |array_qua_col[0]|
+------------+----------------------------------+----------------+
| Ernakulam  |[BHMS,  MD ,  Homeopathy]         |BHMS            |
| Bangalore  |[BAMS,  MD ,  Ayurveda Medicine]  |BAMS            |
| Bangalore  |[MBBS,  MS ,  Otorhinolaryngology]|MBBS            |
| Bangalore  |[BSc ,  Zoology,  BAMS]           |BSc             |
| Chennai    |[BAMS]                            |BAMS    

Array Length-
We can determine the array’s length by querying for its size:

In [163]:
from pyspark.sql.functions import size
data_table.select(col("Qualification"),split(col("Qualification"),",|-"),\
                  size(split(col("Qualification"), ",|-")), \
                  split(col("Miscellaneous_Info" ),",|-| ")).show(5,False)

+------------------------------+----------------------------------+-----------------------------------+----------------------------------------------------------------------+
|Qualification                 |split(Qualification, ,|-, -1)     |size(split(Qualification, ,|-, -1))|split(Miscellaneous_Info, ,|-| , -1)                                  |
+------------------------------+----------------------------------+-----------------------------------+----------------------------------------------------------------------+
|BHMS, MD - Homeopathy         |[BHMS,  MD ,  Homeopathy]         |3                                  |[100%, 16, Feedback, Kakkanad, , Ernakulam]                           |
|BAMS, MD - Ayurveda Medicine  |[BAMS,  MD ,  Ayurveda Medicine]  |3                                  |[98%, 76, Feedback, Whitefield, , Bangalore]                          |
|MBBS, MS - Otorhinolaryngology|[MBBS,  MS ,  Otorhinolaryngology]|3                                  |null                  

In [164]:
data_table.show(10,False)

+------------------------------+-------------------+------+----------------------------+----------------+-------------------------------------------------------------------------------+----+
|Qualification                 |Experience         |Rating|Place                       |Profile         |Miscellaneous_Info                                                             |Fees|
+------------------------------+-------------------+------+----------------------------+----------------+-------------------------------------------------------------------------------+----+
|BHMS, MD - Homeopathy         |24 years experience|100%  |Kakkanad, Ernakulam         |Homeopath       |100% 16 Feedback Kakkanad, Ernakulam                                           |100 |
|BAMS, MD - Ayurveda Medicine  |12 years experience|98%   |Whitefield, Bangalore       |Ayurveda        |98% 76 Feedback Whitefield, Bangalore                                          |350 |
|MBBS, MS - Otorhinolaryngology|9 years exper

In [165]:
from pyspark.sql.functions import size
## Selecting 1st element of the row after the split. 
## Selecting 3rd element of the row after the split.

data_table.select(col("Qualification"),split(col("Qualification"),",|-")[0],
                  size(split(col("Qualification"), ",|-")),
                  split(col("Miscellaneous_Info" ),",|-| ")[0],   
                  split(col("Miscellaneous_Info" ),",|-| ")[2]).show(5,False)

+------------------------------+--------------------------------+-----------------------------------+---------------------------------------+---------------------------------------+
|Qualification                 |split(Qualification, ,|-, -1)[0]|size(split(Qualification, ,|-, -1))|split(Miscellaneous_Info, ,|-| , -1)[0]|split(Miscellaneous_Info, ,|-| , -1)[2]|
+------------------------------+--------------------------------+-----------------------------------+---------------------------------------+---------------------------------------+
|BHMS, MD - Homeopathy         |BHMS                            |3                                  |100%                                   |Feedback                               |
|BAMS, MD - Ayurveda Medicine  |BAMS                            |3                                  |98%                                    |Feedback                               |
|MBBS, MS - Otorhinolaryngology|MBBS                            |3                        

array_contains-
We can also check whether this array contains a value:

In [166]:
from pyspark.sql.functions import array_contains

data_table.select(col("Qualification"),split(col("Qualification"), ","),array_contains(split(col("Qualification"), ","), "BAMS")).show(20,False)

+-----------------------------------------------------+---------------------------------------------------------+-------------------------------------------------+
|Qualification                                        |split(Qualification, ,, -1)                              |array_contains(split(Qualification, ,, -1), BAMS)|
+-----------------------------------------------------+---------------------------------------------------------+-------------------------------------------------+
|BHMS, MD - Homeopathy                                |[BHMS,  MD - Homeopathy]                                 |false                                            |
|BAMS, MD - Ayurveda Medicine                         |[BAMS,  MD - Ayurveda Medicine]                          |true                                             |
|MBBS, MS - Otorhinolaryngology                       |[MBBS,  MS - Otorhinolaryngology]                        |false                                            |
|BSc - Zoology, 

In [167]:
data_table.createOrReplaceTempView("dfTable")
spark.sql("""SELECT array_contains(split(Qualification, ','), 'BAMS') FROM dfTable""").show(10)

+-------------------------------------------------+
|array_contains(split(Qualification, ,, -1), BAMS)|
+-------------------------------------------------+
|                                            false|
|                                             true|
|                                            false|
|                                            false|
|                                             true|
|                                             true|
|                                            false|
|                                            false|
|                                            false|
|                                            false|
+-------------------------------------------------+
only showing top 10 rows



explode-
The "explode()" function takes a column that consists of arrays and creates one row (with the rest of
the values duplicated) per value in the array.

"Hello World", "Other Col" ---->(split)---->["Hello", "World"],"Other Col" -----> (explode) ----->"Hello", "Other Col", "World", "Other Col"

In [168]:
## Working not understood

from pyspark.sql.functions import split, explode
data_table.withColumn("splitted", split(col("Qualification"), ","))\
.withColumn("exploded", explode(col("splitted")))\
.select("Qualification", "exploded").show(10,False)

+------------------------------+-------------------------+
|Qualification                 |exploded                 |
+------------------------------+-------------------------+
|BHMS, MD - Homeopathy         |BHMS                     |
|BHMS, MD - Homeopathy         | MD - Homeopathy         |
|BAMS, MD - Ayurveda Medicine  |BAMS                     |
|BAMS, MD - Ayurveda Medicine  | MD - Ayurveda Medicine  |
|MBBS, MS - Otorhinolaryngology|MBBS                     |
|MBBS, MS - Otorhinolaryngology| MS - Otorhinolaryngology|
|BSc - Zoology, BAMS           |BSc - Zoology            |
|BSc - Zoology, BAMS           | BAMS                    |
|BAMS                          |BAMS                     |
|BAMS                          |BAMS                     |
+------------------------------+-------------------------+
only showing top 10 rows



In [169]:
data_table.createOrReplaceTempView("dfTable")
spark.sql(
"""SELECT Qualification, Profile, exploded
FROM (SELECT *, split(Qualification, ",") as splitted FROM dfTable)
LATERAL VIEW explode(splitted) as exploded""").show(10,False)

+------------------------------+--------------+-------------------------+
|Qualification                 |Profile       |exploded                 |
+------------------------------+--------------+-------------------------+
|BHMS, MD - Homeopathy         |Homeopath     |BHMS                     |
|BHMS, MD - Homeopathy         |Homeopath     | MD - Homeopathy         |
|BAMS, MD - Ayurveda Medicine  |Ayurveda      |BAMS                     |
|BAMS, MD - Ayurveda Medicine  |Ayurveda      | MD - Ayurveda Medicine  |
|MBBS, MS - Otorhinolaryngology|ENT Specialist|MBBS                     |
|MBBS, MS - Otorhinolaryngology|ENT Specialist| MS - Otorhinolaryngology|
|BSc - Zoology, BAMS           |Ayurveda      |BSc - Zoology            |
|BSc - Zoology, BAMS           |Ayurveda      | BAMS                    |
|BAMS                          |Ayurveda      |BAMS                     |
|BAMS                          |Ayurveda      |BAMS                     |
+------------------------------+------

Maps-

Maps are created by using the map function and key-value pairs of columns. You then can select
them just like you might select from an array.

In [173]:
from pyspark.sql.functions import create_map
data_table.select(create_map(col("Profile"), col("Rating")).alias("complex_map")).show(10, False)

+--------------------------+
|complex_map               |
+--------------------------+
|{Homeopath -> 100%}       |
|{Ayurveda -> 98%}         |
|{ENT Specialist -> null}  |
|{Ayurveda -> null}        |
|{Ayurveda -> 100%}        |
|{Ayurveda -> null}        |
|{Homeopath -> null}       |
|{Dentist -> 99%}          |
|{General Medicine -> null}|
|{Dentist -> null}         |
+--------------------------+
only showing top 10 rows



In [174]:
data_table.createOrReplaceTempView("dfTable")
spark.sql(
"""SELECT map(Profile, Rating) as complex_map FROM dfTable
""").show(10,False)

+--------------------------+
|complex_map               |
+--------------------------+
|{Homeopath -> 100%}       |
|{Ayurveda -> 98%}         |
|{ENT Specialist -> null}  |
|{Ayurveda -> null}        |
|{Ayurveda -> 100%}        |
|{Ayurveda -> null}        |
|{Homeopath -> null}       |
|{Dentist -> 99%}          |
|{General Medicine -> null}|
|{Dentist -> null}         |
+--------------------------+
only showing top 10 rows



In [179]:
data_table.select(map(col("Profile"),col("Rating")).alias("complex_map")).selectExpr("complex_map['Ayurveda 100%']").show(2)

TypeError: Column is not iterable

You can also explode map types, which will turn them into columns:

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).selectExpr("explode(complex_map)").show(2)


# Working with JSON-

Spark has some unique support for working with JSON data. You can operate directly on strings
of JSON in Spark and parse from JSON or extract JSON objects. Let’s begin by creating a JSON
column:

In [180]:
jsonDF = spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")

In [181]:
jsonDF.show(5,False)

+-------------------------------------------+
|jsonString                                 |
+-------------------------------------------+
|{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}|
+-------------------------------------------+



You can use the "get_json_object" to inline query a JSON object, be it a dictionary or array.
You can use "json_tuple" if this object has only one level of nesting:

In [182]:
from pyspark.sql.functions import get_json_object, json_tuple
jsonDF.select(
get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]").alias("column"),json_tuple(col("jsonString"), "myJSONKey")).show(2,False)

+------+-----------------------+
|column|c0                     |
+------+-----------------------+
|2     |{"myJSONValue":[1,2,3]}|
+------+-----------------------+



In [183]:
jsonDF.selectExpr(
"json_tuple(jsonString, '$.myJSONKey.myJSONValue[1]') as column").show(2)

+------+
|column|
+------+
|  null|
+------+



In [184]:
from pyspark.sql.functions import to_json
data_table.selectExpr("(Profile, Place, Rating) as myStruct")\
.select(to_json(col("myStruct"))).show(10,False)

+----------------------------------------------------------------------+
|to_json(myStruct)                                                     |
+----------------------------------------------------------------------+
|{"Profile":"Homeopath","Place":"Kakkanad, Ernakulam","Rating":"100%"} |
|{"Profile":"Ayurveda","Place":"Whitefield, Bangalore","Rating":"98%"} |
|{"Profile":"ENT Specialist","Place":"Mathikere - BEL, Bangalore"}     |
|{"Profile":"Ayurveda","Place":"Bannerghatta Road, Bangalore"}         |
|{"Profile":"Ayurveda","Place":"Keelkattalai, Chennai","Rating":"100%"}|
|{"Profile":"Ayurveda","Place":"Porur, Chennai"}                       |
|{"Profile":"Homeopath","Place":"Karol Bagh, Delhi"}                   |
|{"Profile":"Dentist","Place":"Arekere, Bangalore","Rating":"99%"}     |
|{"Profile":"General Medicine","Place":"Old City, Hyderabad"}          |
|{"Profile":"Dentist","Place":"Athani, Ernakulam"}                     |
+--------------------------------------------------

In [185]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
parseSchema = StructType((
StructField("Profile",StringType(),True),
StructField("Place",StringType(),True)))
data_table.selectExpr("(Profile, Place) as myStruct")\
.select(to_json(col("myStruct")).alias("newJSON"))\
.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(5,False)

+--------------------------------------------+-----------------------------------------------------------------+
|from_json(newJSON)                          |newJSON                                                          |
+--------------------------------------------+-----------------------------------------------------------------+
|{Homeopath, Kakkanad, Ernakulam}            |{"Profile":"Homeopath","Place":"Kakkanad, Ernakulam"}            |
|{Ayurveda, Whitefield, Bangalore}           |{"Profile":"Ayurveda","Place":"Whitefield, Bangalore"}           |
|{ENT Specialist, Mathikere - BEL, Bangalore}|{"Profile":"ENT Specialist","Place":"Mathikere - BEL, Bangalore"}|
|{Ayurveda, Bannerghatta Road, Bangalore}    |{"Profile":"Ayurveda","Place":"Bannerghatta Road, Bangalore"}    |
|{Ayurveda, Keelkattalai, Chennai}           |{"Profile":"Ayurveda","Place":"Keelkattalai, Chennai"}           |
+--------------------------------------------+--------------------------------------------------

# User-Defined Functions-

These user-defined functions (UDFs) make it possible for you to write your own custom
transformations using Python or Scala and even use external libraries.
They’re just functions that operate on the data, record by record. By default, these functions 
are registered as temporary functions to be used in that specific SparkSession or Context.

The first step is the actual function. We’ll create a simple one for this example. Let’s write a
"power3" function that takes a number and raises it to a power of three:


In [186]:
udfExampleDF = spark.range(5).toDF("num")
def power3(double_value):
    return double_value ** 3
power3(2.0)

8.0

Now that we’ve created these functions and tested them, we need to register them with Spark so
that we can use them on all of our worker machines. Spark will serialize the function on the
driver and transfer it over the network to all executor processes. This happens regardless of
language.


If the function is written in Python, Spark starts a Python
process on the worker, serializes all of the data to a format that Python can understand
(remember, it was in the JVM earlier), executes the function row by row on that data in the
Python process, and then finally returns the results of the row operations to the JVM and Spark.

First, we need to register the function to make it available as a DataFrame function:


In [187]:
from pyspark.sql.functions import udf,col
power3udf = udf(power3)

## Below code is not working hence commented
#udfExampleDF.select(power3udf(col("num"))).show(2)

At this juncture, we can use this only as a DataFrame function. That is to say, we can’t use it
within a string expression, only on an expression. However, we can also register this UDF as a
Spark SQL function.


#### NOTE- NEED MORE INPUTS FOR UNDERSTANDING UDF

In [188]:
#udfExampleDF.selectExpr("power3(num)").show(2)

If you specify the type that doesn’t align with the actual type returned by the function, Spark will
not throw an error but will just return "null" to designate a failure. You can see this if you were to
switch the return type in the following function to be a DoubleType:

In [189]:
from pyspark.sql.types import IntegerType, DoubleType
spark.udf.register("power3py", power3, DoubleType())

<function __main__.power3(double_value)>

In [190]:
#udfExampleDF.selectExpr("power3py(num)").show(2)

#### NOTE- NEED MORE INPUTS FOR UNDERSTANDING UDF

# Aggregations-

The simplest grouping is to just summarize a complete DataFrame by performing an
aggregation in a select statement.

A “group by” allows you to specify one or more keys as well as one or more
aggregation functions to transform the value columns.

A “window” gives you the ability to specify one or more keys as well as one or more
aggregation functions to transform the value columns. However, the rows input to the
function are somehow related to the current row.

A “grouping set,” which you can use to aggregate at multiple different levels. Grouping
sets are available as a primitive in SQL and via rollups and cubes in DataFrames.

A “rollup” makes it possible for you to specify one or more keys as well as one or more
aggregation functions to transform the value columns, which will be summarized
hierarchically.

A “cube” allows you to specify one or more keys as well as one or more aggregation
functions to transform the value columns, which will be summarized across all
combinations of columns.

Each grouping returns a "RelationalGroupedDataset" on which we specify our aggregations.


In [211]:


from pyspark.sql.functions import expr, count, countDistinct, approx_count_distinct,first, last,min, max,sumDistinct,sum,\
avg, var_pop, stddev_pop,var_samp, stddev_samp, skewness, kurtosis, covar_pop, covar_samp

first_table.select(count("*"), 
                   countDistinct("Cover_Type"),\
                   approx_count_distinct("Slope(Degrees)"),\
                   first("Slope(Degrees)"),\
                   last("Slope(Degrees)"),\
                  min("Slope(Degrees)"),\
                  max("Slope(Degrees)"),\
                  sumDistinct("Slope(Degrees)"),\
                  sum("Slope(Degrees)"),\
                  avg("Slope(Degrees)"),\
                  var_pop("Aspect(Degrees)"),\
                  stddev_pop("Aspect(Degrees)"),\
                  var_samp("Aspect(Degrees)"),\
                  stddev_samp("Aspect(Degrees)"),\
                  skewness("Aspect(Degrees)"),\
                  kurtosis("Aspect(Degrees)"),\
                  covar_pop("Aspect(Degrees)","Cover_Type"),\
                  covar_samp("Aspect(Degrees)","Cover_Type")).show(5)

+--------+--------------------------+-------------------------------------+---------------------+--------------------+-------------------+-------------------+----------------------------+-------------------+-------------------+------------------------+---------------------------+-------------------------+----------------------------+-------------------------+-------------------------+--------------------------------------+---------------------------------------+
|count(1)|count(DISTINCT Cover_Type)|approx_count_distinct(Slope(Degrees))|first(Slope(Degrees))|last(Slope(Degrees))|min(Slope(Degrees))|max(Slope(Degrees))|sum(DISTINCT Slope(Degrees))|sum(Slope(Degrees))|avg(Slope(Degrees))|var_pop(Aspect(Degrees))|stddev_pop(Aspect(Degrees))|var_samp(Aspect(Degrees))|stddev_samp(Aspect(Degrees))|skewness(Aspect(Degrees))|kurtosis(Aspect(Degrees))|covar_pop(Aspect(Degrees), Cover_Type)|covar_samp(Aspect(Degrees), Cover_Type)|
+--------+--------------------------+-----------------------------

Aggregating to Complex Types-
In Spark, you can perform aggregations not just of numerical values using formulas, you can also
perform them on complex types. For example, we can collect a list of values present in a given
column or only the unique values by collecting to a set.


In [217]:
from pyspark.sql.functions import collect_set, collect_list
data_table.agg(collect_set("Rating"), collect_list("Rating")).show(1,False)

+------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_set(Rating)                                                                             |collect_list(Rating)                                                                                                                                                                                                                                                                                                                                                                                          

In [220]:
data_table.createOrReplaceTempView("dfTable")
spark.sql("""SELECT collect_set(Rating), collect_list(Rating) FROM dfTable""").show(1,False)

+------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_set(Rating)                                                                             |collect_list(Rating)                                                                                                                                                                                                                                                                                                                                                                                          

#### Grouping-

A more common task is to perform calculations based on groups in the data. 
This is typically done on categorical data for
which we group our data on one column and perform some calculations on the other columns
that end up in that group.

In [235]:
data_table.groupBy("Profile", "Experience").count().show(5)

+--------------+-------------------+-----+
|       Profile|         Experience|count|
+--------------+-------------------+-----+
|       Dentist|10 years experience|    7|
|      Ayurveda|15 years experience|    1|
|       Dentist| 9 years experience|    3|
|ENT Specialist|35 years experience|    1|
|Dermatologists|33 years experience|    1|
+--------------+-------------------+-----+
only showing top 5 rows



In [236]:
data_table.createOrReplaceTempView("dfTable")
spark.sql("""SELECT Profile, Experience,count(*) FROM dfTable GROUP BY Profile, Experience""").show(5)

+--------------+-------------------+--------+
|       Profile|         Experience|count(1)|
+--------------+-------------------+--------+
|       Dentist|10 years experience|       7|
|      Ayurveda|15 years experience|       1|
|       Dentist| 9 years experience|       3|
|ENT Specialist|35 years experience|       1|
|Dermatologists|33 years experience|       1|
+--------------+-------------------+--------+
only showing top 5 rows



#### Grouping with Expressions-

Rather than passing that function as an expression
into a "select" statement, we specify it as within "agg". This makes it possible for you to pass-in
arbitrary expressions that just need to have some aggregation specified.


In [240]:
from pyspark.sql.functions import count
data_table.groupBy("Profile","Rating").agg(count("Experience").alias("quan"),\
                                           expr("count(Experience)")).show()

+----------------+------+----+-----------------+
|         Profile|Rating|quan|count(Experience)|
+----------------+------+----+-----------------+
|  Dermatologists|  null|   9|                9|
|  ENT Specialist|   36%|   1|                1|
|        Ayurveda|  null|  12|               12|
|General Medicine|   93%|   1|                1|
|  Dermatologists|   97%|   5|                5|
|        Ayurveda|   98%|   1|                1|
|       Homeopath|   82%|   1|                1|
|  ENT Specialist|   94%|   1|                1|
|General Medicine|   88%|   1|                1|
|General Medicine|   99%|   1|                1|
|  ENT Specialist|  100%|   2|                2|
|  ENT Specialist|   80%|   1|                1|
|       Homeopath|  null|  10|               10|
|  Dermatologists|   94%|   1|                1|
|  ENT Specialist|  null|   9|                9|
|  ENT Specialist|   74%|   1|                1|
|General Medicine|   96%|   1|                1|
|  Dermatologists|  

In [245]:
data_table.groupBy("Place","Profile").agg(max("Fees").alias("quan"),\
                                           expr("max(Fees)")).show()

+--------------------+----------------+----+---------+
|               Place|         Profile|quan|max(Fees)|
+--------------------+----------------+----+---------+
|                null|  Dermatologists| 100|      100|
|AS Rao Nagar, Hyd...|         Dentist| 200|      200|
|AS Rao Nagar, Hyd...|General Medicine| 200|      200|
| Adambakkam, Chennai|       Homeopath| 150|      150|
|   Ambattur, Chennai|         Dentist| 100|      100|
|Andheri West, Mumbai|         Dentist| 500|      500|
|Andheri West, Mumbai|General Medicine| 200|      200|
|     Andheri, Mumbai|  Dermatologists| 100|      100|
|Anna Nagar East, ...|        Ayurveda| 250|      250|
|  Arekere, Bangalore|         Dentist| 250|      250|
|Ashok Nagar, Chennai|  ENT Specialist| 200|      200|
|   Athani, Ernakulam|         Dentist| 100|      100|
|  Attapur, Hyderabad|  Dermatologists| 350|      350|
|BTM Layout 2nd St...|       Homeopath| 200|      200|
|   Bakkarwala, Delhi|        Ayurveda| 150|      150|
|Banashank

Grouping with Maps-
Sometimes, it can be easier to specify your transformations as a series of Maps for which the key
is the column, and the value is the aggregation function (as a string) that you would like to
perform. You can reuse multiple column names if you specify them inline, as well:

In [253]:
data_table.groupBy("Profile").agg(expr("avg(Fees)"),expr("stddev_pop(Fees)"), expr("max(Fees)")).show()

## in the below line of code, we are extracting city name from "Place" column for better groupby operation
data_table.groupBy("Profile",split(col("Place"),",")[1]).agg(expr("avg(Fees)"),expr("stddev_pop(Fees)"), expr("max(Fees)")).show(10)

+----------------+------------------+------------------+---------+
|         Profile|         avg(Fees)|  stddev_pop(Fees)|max(Fees)|
+----------------+------------------+------------------+---------+
|        Ayurveda|227.77777777777777| 134.6004604398413|      500|
|         Dentist|            241.25|110.61617196413913|      500|
|  Dermatologists|             414.0|229.13751329714648|      800|
|  ENT Specialist|371.05263157894734|194.20268134205358|      800|
|General Medicine| 282.7586206896552|208.96950036035432|      800|
|       Homeopath| 283.3333333333333|             150.0|      500|
+----------------+------------------+------------------+---------+

+--------+----------------------+------------------+------------------+---------+
| Profile|split(Place, ,, -1)[1]|         avg(Fees)|  stddev_pop(Fees)|max(Fees)|
+--------+----------------------+------------------+------------------+---------+
|Ayurveda|             Bangalore|             350.0| 93.54143466934853|      500|
|

In [258]:
spark.sql("""SELECT Profile, split(Place, ",")[1] ,avg(Fees), stddev_pop(Fees), max(Fees) FROM dfTable
GROUP BY Profile, split(Place, ",")[1]""").show(10)

+--------+----------------------+------------------+------------------+---------+
| Profile|split(Place, ,, -1)[1]|         avg(Fees)|  stddev_pop(Fees)|max(Fees)|
+--------+----------------------+------------------+------------------+---------+
|Ayurveda|             Bangalore|             350.0| 93.54143466934853|      500|
|Ayurveda|               Chennai|             187.5|  64.9519052838329|      250|
|Ayurveda|                 Delhi|166.66666666666666|102.74023338281629|       50|
|Ayurveda|             Hyderabad|233.33333333333334| 94.28090415820634|      300|
|Ayurveda|                Mumbai|             187.5|181.57298807917437|      500|
| Dentist|             Bangalore|             210.0|              20.0|      250|
| Dentist|               Chennai|             180.0| 74.83314773547882|      300|
| Dentist|            Coimbatore|             180.0| 74.83314773547883|      300|
| Dentist|                 Delhi| 314.2857142857143| 95.29760045804524|      500|
| Dentist|      

## Window Functions-

Spark supports three kinds of window functions: ranking functions, analytic functions,
and aggregate functions.

The first step to a window function is to create a window specification. Note that the "partition
by" is unrelated to the partitioning scheme concept that we have covered thus far. It’s just a
similar concept that describes how we will be breaking up our group. The ordering determines
the ordering within a given partition, and, finally, the frame specification (the "rowsBetween"
statement) states which rows will be included in the frame based on its reference to the current
input row. In the following example, we look at all previous rows up to the current row:


In [318]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
.partitionBy(split(col("Place"), ",")[1], "Profile")\
.orderBy(desc("Fees"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [319]:

#Now we want to use an aggregation function to learn more about each specific customer. 
#For now, establishing the maximum Fees over all time.

from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Fees")).over(windowSpec)


In [320]:
#You will notice that this returns a column (or expressions). We can now use this in a DataFrame select statement

from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [334]:

# This also returns a column that we can use in select statements. Now we can perform a select to view the calculated window values:

from pyspark.sql.functions import col
data_table.where("Rating IS NOT NULL").orderBy("Fees")\
.select(
split(col("Place"), ",")[1],col("Profile"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity"),col("Fees")).show()

+----------------------+----------------+------------+-----------------+-------------------+----+
|split(Place, ,, -1)[1]|         Profile|quantityRank|quantityDenseRank|maxPurchaseQuantity|Fees|
+----------------------+----------------+------------+-----------------+-------------------+----+
|             Bangalore|        Ayurveda|           1|                1|                500| 500|
|             Bangalore|        Ayurveda|           2|                2|                500| 350|
|             Bangalore|        Ayurveda|           3|                3|                500| 300|
|             Bangalore|         Dentist|           1|                1|                250| 250|
|             Bangalore|         Dentist|           2|                2|                250| 200|
|             Bangalore|         Dentist|           2|                2|                250| 200|
|             Bangalore|         Dentist|           2|                2|                250| 200|
|             Bangal

In [348]:
spark.sql(""" select split(Place , ",")[1], Profile, rank(Fees) over(partition by split(Place,",")[1], `Profile` ORDER BY Fees DESC )as quantityRank , 
                                                    dense_rank(Fees) over(partition by split(Place,",")[1], Profile order by Fees desc) as quantityDenseRank,
                                                    max(Fees) over(partition by split(Place,",")[1], Profile order by Fees desc) as maxPurchaseQuantity,Fees
                            
                from dfTable--(select * from dfTable where Rating is not NULL order by Fees) 
                where Rating is not NULL
                order by split(Place , ",")[1], Profile, Fees desc  """).show()

+----------------------+----------------+------------+-----------------+-------------------+----+
|split(Place, ,, -1)[1]|         Profile|quantityRank|quantityDenseRank|maxPurchaseQuantity|Fees|
+----------------------+----------------+------------+-----------------+-------------------+----+
|             Bangalore|        Ayurveda|           1|                1|                500| 500|
|             Bangalore|        Ayurveda|           2|                2|                500| 350|
|             Bangalore|        Ayurveda|           3|                3|                500| 300|
|             Bangalore|         Dentist|           1|                1|                250| 250|
|             Bangalore|         Dentist|           2|                2|                250| 200|
|             Bangalore|         Dentist|           2|                2|                250| 200|
|             Bangalore|         Dentist|           2|                2|                250| 200|
|             Bangal

#### Grouping Sets-

Sometimes we want something a bit
more complete—an aggregation across multiple groups. We achieve this by using grouping sets.
Grouping sets are a low-level tool for combining sets of aggregations together. They give you the
ability to create arbitrary aggregation in their group-by statements.

In below eg., we would like to get the
total quantity of all stock codes and customers. To do so, we’ll use the following SQL
expression:

In [361]:
spark.sql("""SELECT Profile, split(Place,",")[1], sum(Fees) FROM dfTable
GROUP BY Profile, split(Place,",")[1] GROUPING SETS((Profile, split(Place,",")[1]),())
ORDER BY sum(Fees) DESC
""").show(10)

+----------------+----------------------+---------+
|         Profile|split(Place, ,, -1)[1]|sum(Fees)|
+----------------+----------------------+---------+
|            null|                  null|  44450.0|
|  Dermatologists|                 Delhi|   3400.0|
|General Medicine|             Hyderabad|   3050.0|
|  Dermatologists|             Hyderabad|   2900.0|
|  ENT Specialist|                 Delhi|   2800.0|
|         Dentist|                 Delhi|   2200.0|
|       Homeopath|             Bangalore|   1900.0|
|  ENT Specialist|               Chennai|   1800.0|
|         Dentist|                Mumbai|   1800.0|
|  ENT Specialist|             Bangalore|   1750.0|
+----------------+----------------------+---------+
only showing top 10 rows



The GROUPING SETS operator is only available in SQL. To perform the same in DataFrames, you
use the "rollup" and "cube" operators—which allow us to get the same results.


Rollups-
A rollup is a multidimensional aggregation that performs a variety of group-by style calculations
for us.


In [367]:
rolledUpDF = data_table.rollup("Profile", split(col("Place"),",")[1]).agg(sum("Fees"))\
.selectExpr("Profile", split(col("Place"),",")[1], "`sum(Fees)` as total_Fees")\
.orderBy(split(col("Place"),",")[1])
rolledUpDF.show()

TypeError: Column is not iterable

#### Cube-
A "cube" takes the "rollup" to a level deeper. Rather than treating elements hierarchically, a "cube"
does the same thing across all dimensions.

The method call is quite similar, but instead of calling rollup, we call cube:

In [376]:
from pyspark.sql.functions import sum
data_table.cube("Profile", col("Place")).agg("sum(Fees)")\
.select("Profile", col("Place"), "sum(Fees)").orderBy("Profile").show()

AssertionError: all exprs should be Column