In [None]:
# Best to use google colab for Pyspark during learning stage as installation of PySpark in local machine is tedious
# Import Pyspark package
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 32 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=5e89028c6b2cffdecfe1e99ca7e11699f6d0f0e10bcc6a1563231674c5fd573b
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
# Import Spark Session
import pyspark
from pyspark.sql import SparkSession

In [None]:
# Create Spark Session
spark = SparkSession.builder.master("local[1]").appName("Sparkbasics").getOrCreate()

In [None]:
# Create a RDD from parallelize
# 1. From list
list_ = [("Java", 2000), ("Python", 10000), ("Scala", 3000)]
rdd = spark.sparkContext.parallelize(list_)
# 2. From txtfile
# rdd2 = spark.SparkContext.textFile("/path_to_txt_file")

### Once you have an RDD, you can perform transformation and action operations. 
##### Any operation you perform on RDD runs in parallel.

In [None]:
# PySpark DataFrames can be created from existing RDD or datasets
# Create DataFrame
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]
columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)
df

DataFrame[firstname: string, middlename: string, lastname: string, dob: string, gender: string, salary: bigint]

In [None]:
# Similar to printing df.head(20)
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [None]:
df.printSchema() # same as df.types()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [None]:
# Like RDD, DataFrame also has operations like Transformations and Actions.

In [None]:
# df from csv
df = spark.read.csv("zipcodes.csv") 
# RDD to df
df = rdd.toDF() # by default col names would be _1, _2, _3, ..etc
columns = ["language","users_count"]
dfFromRDD1 = rdd.toDF(columns)

# Otherwise
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromData2 = spark.createDataFrame(data).toDF(*columns)

In [None]:
# Read and write Parquet file
# df1 = spark.read.parquet("file.parquet")
data =[("James ","","Smith","36636","M",3000),
              ("Michael ","Rose","","40288","M",4000),
              ("Robert ","","Williams","42114","M",4000),
              ("Maria ","Anne","Jones","39192","F",4000),
              ("Jen","Mary","Brown","","F",-1)]
columns=["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema=columns)
df.write.parquet("file.parquet") # Write

In [None]:
parDF = spark.read.parquet('file.parquet')
parDF.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [None]:
parDF.createOrReplaceTempView("ParquetTable")
sSql = spark.sql("select * from ParquetTable where salary >= 4000 ")
sSql.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
+---------+----------+--------+-----+------+------+



# Ways of creating dataframe

#### 1. Create DataFrame from RDD

In [None]:
rdd = spark.sparkContext.parallelize(data)
rdd

ParallelCollectionRDD[24] at readRDDFromFile at PythonRDD.scala:274

##### 1.1 Using toDF() method

In [None]:
df1 = rdd.toDF()
df1.show()

+--------+----+--------+-----+---+----+
|      _1|  _2|      _3|   _4| _5|  _6|
+--------+----+--------+-----+---+----+
|  James |    |   Smith|36636|  M|3000|
|Michael |Rose|        |40288|  M|4000|
| Robert |    |Williams|42114|  M|4000|
|  Maria |Anne|   Jones|39192|  F|4000|
|     Jen|Mary|   Brown|     |  F|  -1|
+--------+----+--------+-----+---+----+



In [None]:
columns = ['a', 'b', 'c', 'd', 'e', 'f']
df2 = rdd.toDF(schema = columns)
df2.printSchema()

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)
 |-- e: string (nullable = true)
 |-- f: long (nullable = true)



##### 1.2 Using createDataFrame() from SparkSession

In [None]:
df = spark.createDataFrame(rdd).toDF(*columns)
df.show()

+--------+----+--------+-----+---+----+
|       a|   b|       c|    d|  e|   f|
+--------+----+--------+-----+---+----+
|  James |    |   Smith|36636|  M|3000|
|Michael |Rose|        |40288|  M|4000|
| Robert |    |Williams|42114|  M|4000|
|  Maria |Anne|   Jones|39192|  F|4000|
|     Jen|Mary|   Brown|     |  F|  -1|
+--------+----+--------+-----+---+----+



#### 2. **Create DataFrame from List Collection**

##### 2.1 Using createDataFrame() from spark session

In [None]:
df = spark.createDataFrame(data).toDF(*columns)

##### 2.2 Using createDataFrame() with the Row type

In [None]:
from pyspark.sql.types import Row
rowData = map(lambda x: Row(*x), data)
df = spark.createDataFrame(rowData)
df.show()

+--------+----+--------+-----+---+----+
|      _1|  _2|      _3|   _4| _5|  _6|
+--------+----+--------+-----+---+----+
|  James |    |   Smith|36636|  M|3000|
|Michael |Rose|        |40288|  M|4000|
| Robert |    |Williams|42114|  M|4000|
|  Maria |Anne|   Jones|39192|  F|4000|
|     Jen|Mary|   Brown|     |  F|  -1|
+--------+----+--------+-----+---+----+



##### **2.3 Create DataFrame with schema**

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) 
  ])

df = spark.createDataFrame(data=data2, schema=schema)
df.show()
df.printSchema()
df.show(truncate=False)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|

#### 3. **Create DataFrame from Data sources**

df2 = spark.read.csv("/src/resources/file.csv") \
df2 = spark.read.text("/src/resources/file.txt")\
df2 = spark.read.json("/src/resources/file.json")


# **Create Empty Dataframes**

In [None]:
# First create an empty rdd
emptyRDD = spark.sparkContext.emptyRDD()
print(emptyRDD)
emptyRDD1 = spark.sparkContext.parallelize([])
print(emptyRDD1)

# 1. Create Empty DataFrame with Schema (StructType)
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])
dfEmpty = spark.createDataFrame(data=emptyRDD, schema=schema)
dfEmpty.printSchema()
dfEmpty.show()

# 2. Convert Empty RDD to DataFrame
df1 = emptyRDD.toDF(schema)
df1.show()

# 3. Create Empty DataFrame with Schema
df2 = spark.createDataFrame([], schema)
df2.show()

EmptyRDD[197] at emptyRDD at NativeMethodAccessorImpl.java:0
ParallelCollectionRDD[198] at readRDDFromFile at PythonRDD.scala:274
root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
+---------+----------+--------+

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
+---------+----------+--------+

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
+---------+----------+--------+



**After processing data in PySpark, we would need to convert it back to Pandas DataFrame for a further procession with Machine Learning application or any Python applications.**

In [None]:
# 1. Convert PySpark Dataframe to Pandas DataFrame
pySparkDF = df
pandasDF = pySparkDF.toPandas()
pandasDF

Unnamed: 0,firstname,middlename,lastname,id,gender,salary
0,James,,Smith,36636.0,M,3000
1,Michael,Rose,,40288.0,M,4000
2,Robert,,Williams,42114.0,M,4000
3,Maria,Anne,Jones,39192.0,F,4000
4,Jen,Mary,Brown,,F,-1


In [None]:
pySparkDF.show(2, truncate=2) # Limits to 2 rows each with max 25 characters

+---------+----------+--------+---+------+------+
|firstname|middlename|lastname| id|gender|salary|
+---------+----------+--------+---+------+------+
|       Ja|          |      Sm| 36|     M|    30|
|       Mi|        Ro|        | 40|     M|    40|
+---------+----------+--------+---+------+------+
only showing top 2 rows



In [None]:
# Nested StructType
structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|{James, , Smith}    |36636|M     |3100  |
|{Michael, Rose, }   |40288|M     |4300  |
|{Robert, , Williams}|42114|M     |1400  |
|{Maria, Anne, Jones}|39192|F     |5500  |
|{Jen, Mary, Brown}  |     |F     |-1    |
+--------------------+-----+------+------+



In [None]:
print(df.schema)

StructType(List(StructField(firstname,StringType,true),StructField(middlename,StringType,true),StructField(lastname,StringType,true),StructField(id,StringType,true),StructField(gender,StringType,true),StructField(salary,IntegerType,true)))


# Operations in Pyspark DF

#### **Select**

In [None]:
# Selecting columns of Pyspark df
df.select('salary', 'id').show()
#By using col() function
from pyspark.sql.functions import col
df.select(col("firstname"),col("lastname")).show()

+------+-----+
|salary|   id|
+------+-----+
|  3000|36636|
|  4000|40288|
|  4000|42114|
|  4000|39192|
|    -1|     |
+------+-----+

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|        |
|   Robert|Williams|
|    Maria|   Jones|
|      Jen|   Brown|
+---------+--------+



In [None]:
df.select([col for col in df.columns]).show()
# or
df.select("*").show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [None]:
df.select(df.columns[:3]).show(3) # top 3 rows alone

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|    James|          |   Smith|
|  Michael|      Rose|        |
|   Robert|          |Williams|
+---------+----------+--------+
only showing top 3 rows



In [None]:
# For nested ds:
df2.select("name.firstname","name.lastname").show(truncate=False)
df2.select("name.*").show(truncate=False) # whuchever has 'name' in the col name

+---------+--------+
|firstname|lastname|
+---------+--------+
|James    |Smith   |
|Michael  |        |
|Robert   |Williams|
|Maria    |Jones   |
|Jen      |Brown   |
+---------+--------+

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|James    |          |Smith   |
|Michael  |Rose      |        |
|Robert   |          |Williams|
|Maria    |Anne      |Jones   |
|Jen      |Mary      |Brown   |
+---------+----------+--------+



#### **Collect**

In [None]:
# Collect is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node
# select() is a transformation that returns a new DataFrame and holds the columns that are selected
# whereas collect() is an action that returns the entire data set in an Array to the driver.

In [None]:
df.collect() # All in row form, action returning data in an array to the driver

[Row(firstname='James', middlename='', lastname='Smith', id='36636', gender='M', salary=3000),
 Row(firstname='Michael', middlename='Rose', lastname='', id='40288', gender='M', salary=4000),
 Row(firstname='Robert', middlename='', lastname='Williams', id='42114', gender='M', salary=4000),
 Row(firstname='Maria', middlename='Anne', lastname='Jones', id='39192', gender='F', salary=4000),
 Row(firstname='Jen', middlename='Mary', lastname='Brown', id='', gender='F', salary=-1)]

In [None]:
# Looping over the data
for row in df.collect(): # similar to iterrows()
  print(row['firstname'] + ' gets salary of', str(row['salary']))

James gets salary of 3000
Michael gets salary of 4000
Robert gets salary of 4000
Maria gets salary of 4000
Jen gets salary of -1


In [None]:
print(df.collect()[0])
print(df.collect()[0][0])

Row(firstname='James', middlename='', lastname='Smith', id='36636', gender='M', salary=3000)
James


## **withColumn**()

transformation function of DataFrame which is used to:
change the value \
convert the datatype of an existing column \
create a new column \

In [None]:
# Change DataType using PySpark withColumn()
df.withColumn('salary', col('salary').cast("Integer")).show()  # same as df['salary] = df['salary].astype(int)

# Update The Value of an Existing Column
df.withColumn('salary', col('salary')*10).show()  # same as df['salary'] = df['salary'] * 10

# Create a new column based on existing one
df.withColumn('newSalary', col('salary') * 12).show() # same as df['newSalary'] = df['salary] * 12 but all does not save to df!

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M| 30000|
|  Michael|      Rose|        |40288|     M| 40000|
|   Robert|          |Williams|42114|     M| 40000|
|    Maria|      Anne|   Jones|39192|     F| 40000|
|      Jen|      Mary|   Brown|     |     F|   -10|
+---------+----------+--------+-----+------+------+

+---------+----------+--------+-----+------+------+---------+


In [None]:
#  PySpark lit() function is used to add a constant value to a DataFrame column
from pyspark.sql.functions import lit
df.withColumn('Country', lit('USA')).show() # same as df['Country'] = 'USA'

df.withColumn('County', lit('Champaign')) \
  .withColumn('PIN', lit('61820')).show()

# Rename column name
df.withColumnRenamed('gender', 'sex').show() # similar to df.rename(columns={'gender' : 'sex'})

df.withColumnRenamed('id', 'ID') \
  .withColumnRenamed('gender', 'sex').show()
# Dropping a column
df.drop('salary').show() # df.drop('salary', axis=1)

+---------+----------+--------+-----+------+------+-------+
|firstname|middlename|lastname|   id|gender|salary|Country|
+---------+----------+--------+-----+------+------+-------+
|    James|          |   Smith|36636|     M|  3000|    USA|
|  Michael|      Rose|        |40288|     M|  4000|    USA|
|   Robert|          |Williams|42114|     M|  4000|    USA|
|    Maria|      Anne|   Jones|39192|     F|  4000|    USA|
|      Jen|      Mary|   Brown|     |     F|    -1|    USA|
+---------+----------+--------+-----+------+------+-------+

+---------+----------+--------+-----+------+------+---------+-----+
|firstname|middlename|lastname|   id|gender|salary|   County|  PIN|
+---------+----------+--------+-----+------+------+---------+-----+
|    James|          |   Smith|36636|     M|  3000|Champaign|61820|
|  Michael|      Rose|        |40288|     M|  4000|Champaign|61820|
|   Robert|          |Williams|42114|     M|  4000|Champaign|61820|
|    Maria|      Anne|   Jones|39192|     F|  4000|

In [None]:
newColumns = ["newCol1","newCol2","newCol3","newCol4", 'newCol5', 'newCol6']
df.toDF(*newColumns).printSchema() # same as df.columns = newColumns

root
 |-- newCol1: string (nullable = true)
 |-- newCol2: string (nullable = true)
 |-- newCol3: string (nullable = true)
 |-- newCol4: string (nullable = true)
 |-- newCol5: string (nullable = true)
 |-- newCol6: integer (nullable = true)



# **Where & Filter**

1. PySpark filter() function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression. \
2. You can also use where() clause instead of the filter() if you are coming from an SQL background, both these functions operate exactly the same

In [None]:
from pyspark.sql.types import StructType,StructField 
from pyspark.sql.types import StringType, IntegerType, ArrayType
data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"), (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"), (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"), (("Mike","Mary","Williams"),["Python","VB"],"OH","M") ]
        
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])

df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [None]:
df.filter(df.state=='OH').show(truncate=False)
df.filter(df.state!='OH').show(truncate=False)
df.filter(~(df.state=='OH')).show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

+--------------------+------------------+-----+------+
|name                |languages         |state|gender|
+--------------------+------------------+-----+------+
|{Anna, Rose, }      |[Spark, Java, C++]|NY   |F     |
|{Maria, Anne, Jones}|[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, VB]      |NY   |M     |
+--------------------+------------------+-----+------+

+--------------------+------------------+-----+------+
|name                |languages         |state|gender|
+--------------------+------------------+-----+------+
|{Anna, Rose, }      |[Spark, Java, C++]|NY   |F 

In [None]:
from pyspark.sql.functions import col
df.filter(col('state')=='OH').show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [None]:
# Filter with SQL
query_like = "gender <> 'M'"
df.filter(query_like).show()

+-------------------+------------------+-----+------+
|               name|         languages|state|gender|
+-------------------+------------------+-----+------+
|     {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Julia, , Williams}|      [CSharp, VB]|   OH|     F|
+-------------------+------------------+-----+------+



In [None]:
# Filter with mutiple conditions
df.filter((df.state=='OH' )& (df.gender=='F')).show()

states = ['OH', 'NY']
df.filter(df.state.isin(states)).show()

+-------------------+------------+-----+------+
|               name|   languages|state|gender|
+-------------------+------------+-----+------+
|{Julia, , Williams}|[CSharp, VB]|   OH|     F|
+-------------------+------------+-----+------+

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [None]:
# Filter based on startswith, endswith and contains
df.filter(df.state.startswith('N') & (df.gender.endswith('F'))).show() 
df.filter(df.state.contains('H')).show() # same as df[df.state.contains('H)]

+--------------+------------------+-----+------+
|          name|         languages|state|gender|
+--------------+------------------+-----+------+
|{Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
+--------------+------------------+-----+------+

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [None]:
data2 = [(2,"Michael Rose"),(3,"Robert Williams"),
     (4,"Rames Rose"),(5,"Rames rose")
  ]
df2 = spark.createDataFrame(data = data2, schema = ["id","name"])

In [None]:
# Filter using like and rlike
df2.filter(df2.name.like('%ert%')).show()

# rlike - SQL RLIKE pattern (LIKE with Regex)
df2.filter(df2.name.rlike("(?i)^*rose$")).show()

+---+---------------+
| id|           name|
+---+---------------+
|  3|Robert Williams|
+---+---------------+

+---+------------+
| id|        name|
+---+------------+
|  2|Michael Rose|
|  4|  Rames Rose|
|  5|  Rames rose|
+---+------------+



In [None]:
# Filter on an Array column
df.show(1)

from pyspark.sql.functions import array_contains
df.filter(array_contains(df.languages, 'Java')).show()

# Filtering on Nested Struct columns
df.filter(df.name.firstname == 'Anna').show()

+----------------+------------------+-----+------+
|            name|         languages|state|gender|
+----------------+------------------+-----+------+
|{James, , Smith}|[Java, Scala, C++]|   OH|     M|
+----------------+------------------+-----+------+
only showing top 1 row

+----------------+------------------+-----+------+
|            name|         languages|state|gender|
+----------------+------------------+-----+------+
|{James, , Smith}|[Java, Scala, C++]|   OH|     M|
|  {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
+----------------+------------------+-----+------+

+--------------+------------------+-----+------+
|          name|         languages|state|gender|
+--------------+------------------+-----+------+
|{Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
+--------------+------------------+-----+------+



## **Read and Write files in Pyspark**

In [None]:
dff = spark.read.csv('./sample_data/california_housing_train.csv')
# OR
dff = spark.read.format("csv").load('./sample_data/california_housing_train.csv')
dff.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)



##### 1.1 Using Header Record For Column Names

In [None]:
# Explicitly have to mention header = True 
dff = spark.read.options(header = True).csv('./sample_data/california_housing_train.csv')
dff.show(2)
# PySpark reads all columns as a string (StringType) by default

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|      80100.000000|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
only showing top 2 rows



##### 1.2 Read Multiple csv

In [None]:
# df = spark.read.csv("path1,path2,path3")
dff = spark.read.csv('./sample_data') # ALL CSV FILES IN THE SAMPLE DATA
dff.show(2)

+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----

In [None]:
# Options While Reading CSV File
path = './sample_data/california_housing_train.csv'

# delimiter
spark.read.options(delimiter = ',').csv(path)

# header
spark.read.options(header=True).csv(path)

# inferSchema (by def, all cols are infered as str types. Set this to True and you get )
df = spark.read.options(inferSchema=True, delimiter=',', header=True).csv(path)
df.show(2)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 2 rows



# Read csv files using user defined custom schema 
spark.read.format("csv").options(header=True).schema(schema).load(path)

where schema is a userdefined schema with types of columns mentioned.

In [None]:
# Write
df.write.options(header=True, delimiter=',').csv('dummy_write_file.csv')

#### 2.1 **Saving modes**

PySpark DataFrameWriter also has a method mode() to specify saving mode.

**overwrite** – mode is used to overwrite the existing file.

**append** – To add the data to the existing file.

**ignore** – Ignores write operation when the file already exists.

**error** – This is a default option when the file already exists, it returns an error

In [None]:
df.write.mode('overwrite').csv('dummy_write_file.csv')
# OR
df.write.format("csv").mode('overwrite').save(path)

In [None]:
# Apache Parquet file is a columnar storage format available to any project in the Hadoop ecosystem, 
# regardless of the choice of data processing framework, data model, or programming language.

# While querying columnar storage, it skips the nonrelevant data very quickly, making faster query execution. 
# As a result aggregation queries consume less time compared to row-oriented databases.

# Efficient compression of data by 75% on average

In [None]:
# FOR JSON, read.json("path") or read.format("json").load("path")

In [None]:
path = 'multiline-zipcode.json'
spark.read.options(multiline=True).json(path).show()

+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+



## Reading multiple json files at a time
spark.read.json( [path_file1, path_file2] )

df3 = spark.read.json("/*.json") # All json files in directory

### **Distinct to Drop duplicate functions**

In [None]:
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [None]:
# 1. Get distinct rows
df.distinct().show()
# OR
df.drop_duplicates().show() # Returns a new dataframe

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|          Jen|   Finance|  3900|
|      Michael|     Sales|  4600|
|        Scott|   Finance|  3300|
|        Kumar| Marketing|  2000|
|        James|     Sales|  3000|
|       Robert|     Sales|  4100|
|         Jeff| Marketing|  3000|
|         Saif|     Sales|  4100|
|        Maria|   Finance|  3000|
+-------------+----------+------+

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|          Jen|   Finance|  3900|
|      Michael|     Sales|  4600|
|        Scott|   Finance|  3300|
|        Kumar| Marketing|  2000|
|        James|     Sales|  3000|
|       Robert|     Sales|  4100|
|         Jeff| Marketing|  3000|
|         Saif|     Sales|  4100|
|        Maria|   Finance|  3000|
+-------------+----------+------+



In [None]:
df.drop_duplicates(['department','salary']).show() # same as df.drop_duplicates(subset = ['col1', 'col2'])

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        Maria|   Finance|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|        Kumar| Marketing|  2000|
|         Jeff| Marketing|  3000|
|        James|     Sales|  3000|
|       Robert|     Sales|  4100|
|      Michael|     Sales|  4600|
+-------------+----------+------+



### **OrderBy and Sort**

In [None]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [None]:
df.sort('department', 'salary').show() # By default, ascending order sort

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        James|     Sales|   NY| 90000| 34|10000|
+-------------+----------+-----+------+---+-----+



In [None]:
df.orderBy('department', 'salary').show() # By def, asc

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        James|     Sales|   NY| 90000| 34|10000|
+-------------+----------+-----+------+---+-----+



In [None]:
# Sort in desc order
df.sort(df.department.desc(), df.salary.desc())
# OR
df.sort(col('department').desc(), col('salary').desc()).show()
df.orderBy(col('department').desc(), col('salary').desc()).show()

NameError: ignored

###### **BY SQL VIEWS**

In [None]:
df.createOrReplaceTempView('view')

query = """
        SELECT 
            employee_name, state, age, salary 
        FROM view
        ORDER BY department ASC
        """
spark.sql(query).show()

+-------------+-----+---+------+
|employee_name|state|age|salary|
+-------------+-----+---+------+
|        Raman|   CA| 40| 99000|
|        Scott|   NY| 36| 83000|
|          Jen|   NY| 53| 79000|
|        Maria|   CA| 24| 90000|
|         Jeff|   CA| 25| 80000|
|        Kumar|   NY| 50| 91000|
|        James|   NY| 34| 90000|
|      Michael|   NY| 56| 86000|
|       Robert|   CA| 30| 81000|
+-------------+-----+---+------+



# **Group By**

In [None]:
df.groupBy('department').sum('salary').show()

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|     Sales|     257000|
|   Finance|     351000|
| Marketing|     171000|
+----------+-----------+



In [None]:
from pyspark.sql.functions import col
df.groupBy('department').count().show() # Total num of employees in each department

+----------+-----+
|department|count|
+----------+-----+
|     Sales|    3|
|   Finance|    4|
| Marketing|    2|
+----------+-----+



In [None]:
df.groupBy('department').min('salary').show()

+----------+-----------+
|department|min(salary)|
+----------+-----------+
|     Sales|      81000|
|   Finance|      79000|
| Marketing|      80000|
+----------+-----------+



In [None]:
df.groupBy('department').avg('salary').show()  # or mean you can use any one

+----------+-----------------+
|department|      avg(salary)|
+----------+-----------------+
|     Sales|85666.66666666667|
|   Finance|          87750.0|
| Marketing|          85500.0|
+----------+-----------------+



#### **3.1 PySpark groupBy and aggregate on multiple columns**

In [None]:
df.groupBy('department', 'state').max('salary', 'bonus').show()

+----------+-----+-----------+----------+
|department|state|max(salary)|max(bonus)|
+----------+-----+-----------+----------+
|   Finance|   NY|      83000|     19000|
| Marketing|   NY|      91000|     21000|
|     Sales|   CA|      81000|     23000|
| Marketing|   CA|      80000|     18000|
|   Finance|   CA|      99000|     24000|
|     Sales|   NY|      90000|     20000|
+----------+-----+-----------+----------+



#### **3.2 Running more aggregates at a time**

In [None]:
from pyspark.sql.functions import sum, avg, max, min
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ) \
    .show(truncate=False)


+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |171000    |85500.0          |39000    |21000    |
+----------+----------+-----------------+---------+---------+



#### **3.3 Using filter on aggregate data**

In [None]:
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ).where(col('avg_salary')>85500.0).show(truncate=False)

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
+----------+----------+-----------------+---------+---------+



# **JOIN Operations**

In [None]:

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)


root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

#### **4.1 Inner Join**

In [None]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how='inner').show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



#### **4.2 Outer Join**

In [None]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how='outer').show() # or 'full', 'fullouter'

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|  null|    null|           null|       null|       null|  null|  null|    Sales|     30|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
|     6|   Brown|              2|       2010|         50|      |    -1|     null|   null|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



#### **4.3 Left Join**

In [None]:
empDF.join(deptDF, empDF.emp_dept_id==deptDF.dept_id, how='left').show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
|     6|   Brown|              2|       2010|         50|      |    -1|     null|   null|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



#### **4.4 Right Join**

In [None]:
empDF.join(deptDF, empDF.emp_dept_id==deptDF.dept_id, how='right').show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|  null|    null|           null|       null|       null|  null|  null|    Sales|     30|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



#### **4.5 Left-Semi Join**

In [None]:
# leftsemi join is similar to inner join 
# difference being leftsemi join returns all columns from the left dataset and
# ignores all columns from the right dataset.

In [None]:
empDF.join(deptDF, empDF.emp_dept_id==deptDF.dept_id, how='leftsemi').show()

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     5|   Brown|              2|       2010|         40|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+



#### **4.6 Left-Anti Join**

In [None]:
# leftanti join does the exact opposite of the leftsemi (whichever cant be joined with inner)
empDF.join(deptDF, empDF.emp_dept_id==deptDF.dept_id, how='leftanti').show()

+------+-----+---------------+-----------+-----------+------+------+
|emp_id| name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|     6|Brown|              2|       2010|         50|      |    -1|
+------+-----+---------------+-----------+-----------+------+------+



#### **4.7 Self Join**

In [None]:
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)


+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+



#### **4.8 Using SQL Expressions Join**

In [None]:
empDF.createOrReplaceTempView('EMP')
deptDF.createOrReplaceTempView('DEPT')

query1 = "SELECT * FROM EMP e, DEPT d WHERE e.emp_dept_id == d.dept_id"
spark.sql(query1).show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [None]:
query2 = "SELECT * FROM EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id"
spark.sql(query2).show()

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|  Finance|     10|
|     3|Williams|              1|       2010|         10|     M|  1000|  Finance|     10|
|     4|   Jones|              2|       2005|         10|     F|  2000|  Finance|     10|
|     2|    Rose|              1|       2010|         20|     M|  4000|Marketing|     20|
|     5|   Brown|              2|       2010|         40|      |    -1|       IT|     40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



#### **4.9 PySpark SQL Join on multiple DataFrames**

In [None]:
# df1.join(df2, df1.id==df2.id, how='inner').join(df3, df1.id==df3.id, how='inner')

# **UNION VS UNION ALL**

In [None]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000)]
columns= ["employee_name","department","state","salary","age","bonus"]
df1 = spark.createDataFrame(data = simpleData, schema = columns)
simpleData2 = [("James","Sales","NY",90000,34,10000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000)]
columns2= ["employee_name","department","state","salary","age","bonus"]
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)

In [None]:
unionDF = df1.union(df2)  # df1.unionAll(df2) returns the same O/P
unionDF.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [None]:
# Merge without Duplicates
df1.union(df2).distinct().show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
+-------------+----------+-----+------+---+-----+



In [None]:
# Merge Two DataFrames with Different Columns or Schema

df1.unionByName(df2, allowMissingColumns=True).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        James|     Sales|   NY| 90000| 34|10000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [None]:
#Create DataFrame df1 with columns --- name, dept & age
data = [("James","Sales",34), ("Michael","Sales",56),("Robert","Sales",30), ("Maria","Finance",24) ]
columns= ["name","dept","age"]
df1 = spark.createDataFrame(data = data, schema = columns)

#Create DataFrame df1 with columns --- name, dept, state & salary
data2=[("James","Sales","NY",9000),("Maria","Finance","CA",9000),("Jen","Finance","NY",7900),("Jeff","Marketing","CA",8000)]
columns2= ["name","dept","state","salary"]
df2 = spark.createDataFrame(data = data2, schema = columns2)

In [None]:
# Create new columns state and salary in df1 (as they are missing in df2) and fill with NULL
# DO THE SAME FOR df2 too!
from pyspark.sql.functions import lit
for column in [column for column in df2.columns if column not in df1.columns]:
  df1 = df1.withColumn(column, lit(None))
for column in [column for column in df1.columns if column not in df2.columns]:
  df2 = df2.withColumn(column, lit(None))

In [None]:
df1.unionByName(df2).show()

+-------+---------+----+-----+------+
|   name|     dept| age|state|salary|
+-------+---------+----+-----+------+
|  James|    Sales|  34| null|  null|
|Michael|    Sales|  56| null|  null|
| Robert|    Sales|  30| null|  null|
|  Maria|  Finance|  24| null|  null|
|  James|    Sales|null|   NY|  9000|
|  Maria|  Finance|null|   CA|  9000|
|    Jen|  Finance|null|   NY|  7900|
|   Jeff|Marketing|null|   CA|  8000|
+-------+---------+----+-----+------+



# **Fillna and Fill**

In [None]:
df1.fillna(value=0, subset=None).show()

+-------+-------+---+-----+------+
|   name|   dept|age|state|salary|
+-------+-------+---+-----+------+
|  James|  Sales| 34| null|  null|
|Michael|  Sales| 56| null|  null|
| Robert|  Sales| 30| null|  null|
|  Maria|Finance| 24| null|  null|
+-------+-------+---+-----+------+



In [None]:
df1.na.fill(value=0, subset=['state']).show()

+-------+-------+---+-----+------+
|   name|   dept|age|state|salary|
+-------+-------+---+-----+------+
|  James|  Sales| 34| null|  null|
|Michael|  Sales| 56| null|  null|
| Robert|  Sales| 30| null|  null|
|  Maria|Finance| 24| null|  null|
+-------+-------+---+-----+------+



In [None]:
# df1.na.fill({'state' : 'IL', 'salary':0}).show()

# **Map** **Transformations**

In [None]:
# DataFrame doesn’t have map() transformation to use with DataFrame 
# hence you need to transform DataFrame to RDD first.

data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd = spark.sparkContext.parallelize(data)
rdd2 = rdd.map(lambda x: (x,1))
for element in rdd2.collect():
  print(element)

('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)


In [None]:
data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+



In [None]:
rdd2 = df.rdd.map(lambda x: (x[0] + ' ' + x[1], x[2], x[3]*2))
rdd2.toDF(schema = ['name','sex','new_sal']).show()

+---------------+---+-------+
|           name|sex|new_sal|
+---------------+---+-------+
|    James Smith|  M|     60|
|      Anna Rose|  F|     82|
|Robert Williams|  M|    124|
+---------------+---+-------+



In [None]:
# By Calling function
def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=firstName+", "+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd2=df.rdd.map(lambda x: func1(x))
rdd2.toDF().show()

+----------------+---+---+
|              _1| _2| _3|
+----------------+---+---+
|    James, Smith|  m| 60|
|      Anna, Rose|  f| 82|
|Robert, Williams|  m|124|
+----------------+---+---+



# **5. Date & Time Functions**

https://sparkbyexamples.com/pyspark/pyspark-sql-date-and-timestamp-functions/

In [None]:
# 1. DateType default format is yyyy-MM-dd 
# 2. TimestampType default format is yyyy-MM-dd HH:mm:ss.SSSS
# 3. Returns null if the input is a string that can not be cast to Date or Timestamp.

In [None]:
from pyspark.sql.functions import *

In [None]:
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data, schema = ["id","input"])
df.show()

+---+----------+
| id|     input|
+---+----------+
|  1|2020-02-01|
|  2|2019-03-01|
|  3|2021-03-01|
+---+----------+



#### **5.1 current_date()**

In [None]:
df.select(current_date()).show() # no of rows equal to that of df

+--------------+
|current_date()|
+--------------+
|    2022-05-26|
|    2022-05-26|
|    2022-05-26|
+--------------+



#### **5.2 date_format()**

In [None]:
# date_format() to parses the date and converts from yyyy-dd-mm to MM-dd-yyyy
df.withColumn('input', date_format(col('input'),'MM-dd-yyyy')).show()

+---+----------+
| id|     input|
+---+----------+
|  1|02-01-2020|
|  2|03-01-2019|
|  3|03-01-2021|
+---+----------+



#### **5.3 to_date()**

In [None]:
df.withColumn('input', to_date(col('input'))).show() # by default to 'yyyy-MM-dd'

+---+----------+
| id|     input|
+---+----------+
|  1|2020-02-01|
|  2|2019-03-01|
|  3|2021-03-01|
+---+----------+



#### **5.4 datediff()**

In [None]:
df.select('input', datediff(current_date(), col('input'))).show()

+----------+-------------------------------+
|     input|datediff(current_date(), input)|
+----------+-------------------------------+
|2020-02-01|                            846|
|2019-03-01|                           1183|
|2021-03-01|                            452|
+----------+-------------------------------+



#### **5.5 months_between()**

In [None]:
df.select('input', months_between(current_date(), col('input'))).show()

+----------+-------------------------------------------+
|     input|months_between(current_date(), input, true)|
+----------+-------------------------------------------+
|2020-02-01|                                27.83870968|
|2019-03-01|                                38.83870968|
|2021-03-01|                                14.83870968|
+----------+-------------------------------------------+



#### **5.6 trunc()**

In [None]:
# Truncates the date at a specified unit using Trunc()
df.select('input',
          trunc(col('input'), 'Month').alias('Month_trunc'),
          trunc(col('input'), 'Year').alias('Year_trunc'),
          trunc(col('input'), 'Day').alias('Day_trunc')).show()

+----------+-----------+----------+---------+
|     input|Month_trunc|Year_trunc|Day_trunc|
+----------+-----------+----------+---------+
|2020-02-01| 2020-02-01|2020-01-01|     null|
|2019-03-01| 2019-03-01|2019-01-01|     null|
|2021-03-01| 2021-03-01|2021-01-01|     null|
+----------+-----------+----------+---------+



#### **5.6 add_months() , date_add(), date_sub()**

In [None]:
df.select('input', 
          add_months(col('input'), 2).alias('Add_months'),
          date_add(col('input'), 3).alias('Date_add'),
          date_sub(col('input'), 3).alias('Date_sub')
          ).show()

+----------+----------+----------+----------+
|     input|Add_months|  Date_add|  Date_sub|
+----------+----------+----------+----------+
|2020-02-01|2020-04-01|2020-02-04|2020-01-29|
|2019-03-01|2019-05-01|2019-03-04|2019-02-26|
|2021-03-01|2021-05-01|2021-03-04|2021-02-26|
+----------+----------+----------+----------+



#### **5.7 year(), month(), month(),next_day(), weekofyear(), dayofweek(), dayofmonth(), dayofyear()**

In [None]:
df.select('input',
          year('input').alias('Year'),
          month('input').alias('Month'),
          next_day('input', 'Sunday'), # next sunday
          weekofyear('input'),
          dayofyear('input'),
          dayofweek('input'),
          dayofmonth('input')
          ).show()

+----------+----+-----+-----------------------+-----------------+----------------+----------------+-----------------+
|     input|Year|Month|next_day(input, Sunday)|weekofyear(input)|dayofyear(input)|dayofweek(input)|dayofmonth(input)|
+----------+----+-----+-----------------------+-----------------+----------------+----------------+-----------------+
|2020-02-01|2020|    2|             2020-02-02|                5|              32|               7|                1|
|2019-03-01|2019|    3|             2019-03-03|                9|              60|               6|                1|
|2021-03-01|2021|    3|             2021-03-07|                9|              60|               2|                1|
+----------+----+-----+-----------------------+-----------------+----------------+----------------+-----------------+



#### **5.8 current_timestamp(), to_timestamp**

In [None]:
data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

+---+-----------------------+
|id |input                  |
+---+-----------------------+
|1  |02-01-2020 11 01 19 06 |
|2  |03-01-2019 12 01 19 406|
|3  |03-01-2021 12 01 19 406|
+---+-----------------------+



In [None]:
df2.select(current_timestamp()).show(truncate=False)

+--------------------------+
|current_timestamp()       |
+--------------------------+
|2022-05-27 03:06:54.360562|
|2022-05-27 03:06:54.360562|
|2022-05-27 03:06:54.360562|
+--------------------------+



In [None]:
df2.select('input', to_timestamp(col('input'), "MM-dd-yyyy HH:m:ss SSS")).show()

+--------------------+-------------------------------------------+
|               input|to_timestamp(input, MM-dd-yyyy HH:m:ss SSS)|
+--------------------+-------------------------------------------+
|02-01-2020 11 01 ...|                                       null|
|03-01-2019 12 01 ...|                                       null|
|03-01-2021 12 01 ...|                                       null|
+--------------------+-------------------------------------------+



In [None]:
df2.select('input', 
          hour('input'),
          minute('input'),
          second('input')
          ).show()

+--------------------+-----------+-------------+-------------+
|               input|hour(input)|minute(input)|second(input)|
+--------------------+-----------+-------------+-------------+
|02-01-2020 11 01 ...|       null|         null|         null|
|03-01-2019 12 01 ...|       null|         null|         null|
|03-01-2021 12 01 ...|       null|         null|         null|
+--------------------+-----------+-------------+-------------+



# **6. Aggregate Functions**

In [None]:

simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.show(2, truncate=False)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
+-------------+----------+------+
only showing top 2 rows



In [None]:
print("approx_count_distinct: " + \
      str(df.select(approx_count_distinct("salary")).collect()[0][0]))

approx_count_distinct: 6


In [None]:
print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

avg: 3400.0


In [None]:
df.select(collect_list('salary')).show(truncate=False)

+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+



In [None]:
df.select(collect_set('salary')).show(truncate=False)

+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+



In [None]:
df.select(countDistinct('department', 'salary')).show()

+----------------------------------+
|count(DISTINCT department, salary)|
+----------------------------------+
|                                 8|
+----------------------------------+



In [None]:
df.select(count('department')).show()

+-----------------+
|count(department)|
+-----------------+
|               10|
+-----------------+



In [None]:
df.select(first('salary')).show()
df.select(last('salary')).show()
df.select(kurtosis('salary')).show()
df.select(skewness('salary')).show()

+-------------+
|first(salary)|
+-------------+
|         3000|
+-------------+

+------------+
|last(salary)|
+------------+
|        4100|
+------------+

+-------------------+
|   kurtosis(salary)|
+-------------------+
|-0.6467803030303032|
+-------------------+

+--------------------+
|    skewness(salary)|
+--------------------+
|-0.12041791181069571|
+--------------------+



In [None]:
df.select(stddev('salary'), 
          stddev_samp('salary'),
          stddev_pop('salary')).show()

+-------------------+-------------------+------------------+
|stddev_samp(salary)|stddev_samp(salary)|stddev_pop(salary)|
+-------------------+-------------------+------------------+
|  765.9416862050705|  765.9416862050705|  726.636084983398|
+-------------------+-------------------+------------------+



In [None]:
df.select(sumDistinct('salary')).show()



+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|               20900|
+--------------------+



In [None]:
df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False)

+-----------------+-----------------+---------------+
|var_samp(salary) |var_samp(salary) |var_pop(salary)|
+-----------------+-----------------+---------------+
|586666.6666666666|586666.6666666666|528000.0       |
+-----------------+-----------------+---------------+



# **7. Window Functions**

#### **7.1 row_number()**

In [None]:
df.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



In [None]:
from pyspark.sql.window import Window
windowSpec = Window.partitionBy('department').orderBy('salary')
display(windowSpec)

<pyspark.sql.window.WindowSpec at 0x7f1240721510>

In [None]:
df.withColumn('row_number',
              row_number().over(windowSpec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         2|
|       Robert|     Sales|  4100|         3|
|         Saif|     Sales|  4100|         4|
|      Michael|     Sales|  4600|         5|
+-------------+----------+------+----------+



In [None]:
df.createOrReplaceTempView('temp')

query = """
          SELECT *, ROW_NUMBER() OVER(PARTITION BY(department) ORDER BY salary ASC) AS row_num
          FROM temp;
        """
spark.sql(query).show()

# select challenge_id, h_id, h_name, score, 
#    dense_rank() over ( partition by challenge_id order by score desc ) 
#        as "rank", from hacker;

+-------------+----------+------+-------+
|employee_name|department|salary|row_num|
+-------------+----------+------+-------+
|        Maria|   Finance|  3000|      1|
|        Scott|   Finance|  3300|      2|
|          Jen|   Finance|  3900|      3|
|        Kumar| Marketing|  2000|      1|
|         Jeff| Marketing|  3000|      2|
|        James|     Sales|  3000|      1|
|        James|     Sales|  3000|      2|
|       Robert|     Sales|  4100|      3|
|         Saif|     Sales|  4100|      4|
|      Michael|     Sales|  4600|      5|
+-------------+----------+------+-------+



#### **7.2 rank()**

In [None]:
df.withColumn('rank', rank().over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
+-------------+----------+------+----+



In [None]:
query = """ 
SELECT *, RANK() OVER (PARTITION BY department ORDER BY salary ASC) AS rank
FROM temp;
"""
spark.sql(query).show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
+-------------+----------+------+----+



#### **7.3 dense_rank()**

In [None]:
df.withColumn('dense_rank', dense_rank().over(windowSpec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
+-------------+----------+------+----------+



In [None]:
query = """ 
SELECT *, DENSE_RANK() OVER (PARTITION BY department ORDER BY salary ASC) AS rank
FROM temp;
"""
spark.sql(query).show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   2|
|         Saif|     Sales|  4100|   2|
|      Michael|     Sales|  4600|   3|
+-------------+----------+------+----+



In [None]:
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
+-------------+----------+------+------------+



In [None]:
query = """ 
SELECT *, PERCENT_RANK() OVER (PARTITION BY department ORDER BY salary ASC) AS percent_rank
FROM temp;
"""
spark.sql(query).show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
+-------------+----------+------+------------+



#### **7.4 ntile Window Function**

In [None]:
df.withColumn('ntile', ntile(2).over(windowSpec)).show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+



In [None]:
query = """ 
SELECT *, NTILE() OVER (PARTITION BY department ORDER BY salary ASC) AS ntile
FROM temp;
"""
spark.sql(query).show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    1|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    1|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    1|
|      Michael|     Sales|  4600|    1|
+-------------+----------+------+-----+



#### **7.5 PySpark Window Analytic functions**

In [None]:
# cume_dist() same as DENSE_RANK()
df.withColumn('cume_dist', cume_dist().over(windowSpec)).show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
+-------------+----------+------+------------------+



In [None]:
# lag
df.withColumn('lag', lag('salary', 2).over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|null|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
+-------------+----------+------+----+



In [None]:
query = """ 
SELECT *, LAG(salary, 2) OVER (PARTITION BY department ORDER BY salary ASC) AS lag
FROM temp;
"""
spark.sql(query).show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|null|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
+-------------+----------+------+----+



In [None]:
query = """ 
SELECT *, LEAD(salary, 2) OVER (PARTITION BY department ORDER BY salary ASC) AS lead
FROM temp;
"""
spark.sql(query).show()

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sales|  4100|null|
|      Michael|     Sales|  4600|null|
+-------------+----------+------+----+



In [None]:
windowSpecAgg  = Window.partitionBy("department")

from pyspark.sql.functions import col,avg,sum,min,max,row_number 

df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg("salary").over(windowSpecAgg)) \
  .withColumn("sum", sum("salary").over(windowSpecAgg)) \
  .withColumn("min", min("salary").over(windowSpecAgg)) \
  .withColumn("max", max("salary").over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+



https://towardsdatascience.com/easy-fixes-for-sparksql-performance-ad4166792e6e

# **8. ArrayType()**

In [None]:
from pyspark.sql.types import ArrayType, StringType

In [None]:
arrayCol = ArrayType(StringType(),False)
# Above example creates string array and doesn’t not accept null values.

In [None]:

data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")]
from pyspark.sql.types import StringType, ArrayType,StructType,StructField
schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True)])

df = spark.createDataFrame(data=data,schema=schema)
df.show()

+----------------+------------------+---------------+------------+-------------+
|            name| languagesAtSchool|languagesAtWork|currentState|previousState|
+----------------+------------------+---------------+------------+-------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+----------------+------------------+---------------+------------+-------------+



#### **8.1 Explode**

In [None]:
df.select('languagesAtSchool', explode('languagesAtSchool')).show()

+------------------+------+
| languagesAtSchool|   col|
+------------------+------+
|[Java, Scala, C++]|  Java|
|[Java, Scala, C++]| Scala|
|[Java, Scala, C++]|   C++|
|[Spark, Java, C++]| Spark|
|[Spark, Java, C++]|  Java|
|[Spark, Java, C++]|   C++|
|      [CSharp, VB]|CSharp|
|      [CSharp, VB]|    VB|
+------------------+------+



#### **8.2 Split**

In [None]:
df.select('name', split('name', ',')).show()

+----------------+--------------------+
|            name|  split(name, ,, -1)|
+----------------+--------------------+
|    James,,Smith|    [James, , Smith]|
|   Michael,Rose,|   [Michael, Rose, ]|
|Robert,,Williams|[Robert, , Williams]|
+----------------+--------------------+



#### **8.3 array() - Creates a new array**

In [None]:
(df.withColumn('states', array('currentState', 'previousState'))
  .select('currentState', 'previousState', 'states').show()
)

+------------+-------------+--------+
|currentState|previousState|  states|
+------------+-------------+--------+
|          OH|           CA|[OH, CA]|
|          NY|           NJ|[NY, NJ]|
|          UT|           NV|[UT, NV]|
+------------+-------------+--------+



#### **8.3 array_contains() - Creates a new array**

In [None]:
df.select('languagesAtSchool', array_contains('languagesAtSchool', 'Java')).show()

+------------------+---------------------------------------+
| languagesAtSchool|array_contains(languagesAtSchool, Java)|
+------------------+---------------------------------------+
|[Java, Scala, C++]|                                   true|
|[Spark, Java, C++]|                                   true|
|      [CSharp, VB]|                                  false|
+------------------+---------------------------------------+

