# New PySpark Tutorial

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

https://sparkbyexamples.com/pyspark/pyspark-row-using-rdd-dataframe-2/?expand_article=1

## Create a SparkSession

In [2]:
spark = (SparkSession.builder
                     .appName("Example-3_6")
                     .getOrCreate())

23/07/04 17:48:39 WARN Utils: Your hostname, myThinkPad resolves to a loopback address: 127.0.1.1; using 192.168.1.107 instead (on interface wlp3s0)
23/07/04 17:48:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/04 17:48:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
##Create Raw Data

data = [(1,"Vin"), (2,"Shweta")]
schema = ["id", "name"]

df = spark.createDataFrame(data,schema)

df.printSchema()
df.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



                                                                                

+---+------+
| id|  name|
+---+------+
|  1|   Vin|
|  2|Shweta|
+---+------+



In [4]:
#Properties, functions and methods of any object(spark in this example)
#dir(spark)
#help(spark.createDataFrame)

In [83]:
#schema = StructType(StructField)

schema = StructType([StructField("id", IntegerType()), 
                     StructField("Name", StringType())])

df = spark.createDataFrame(data,schema)

df.printSchema()
df.show()


root
 |-- id: integer (nullable = true)
 |-- Name: string (nullable = true)

+---+------+
| id|  Name|
+---+------+
|  1|   Vin|
|  2|Shweta|
+---+------+



In [84]:
#help(StructType)

# Read csv files

In [13]:
# Don't change this file path
file_path = "rawdata/airports.csv"

# Read in the airports data
airports = spark.read.csv(file_path, header=True)
print('-----Method 1--------')
airports.limit(5).show(truncate=False)

#Method 2
airports = spark.read.format("csv").load("rawdata/planes.csv", header=True)
print('-----Method 2--------')
airports.limit(5).show(truncate=False)

#Method3 Read list of csv inside a folder
airports = spark.read.csv(path=["rawdata/books/part-00001-39e30f06-99da-46e9-8d8c-52b65a889529-c000.csv",\
                                "rawdata/books/part-00002-39e30f06-99da-46e9-8d8c-52b65a889529-c000.csv",\
                                "rawdata/books/part-00003-39e30f06-99da-46e9-8d8c-52b65a889529-c000.csv",\
                                "rawdata/books/part-00000-39e30f06-99da-46e9-8d8c-52b65a889529-c000.csv",\
                                "rawdata/books/part-00002-39e30f06-99da-46e9-8d8c-52b65a889529-c000.csv",\
                                "rawdata/books/part-00003-39e30f06-99da-46e9-8d8c-52b65a889529-c000.csv",\
                                "rawdata/books/part-00000-39e30f06-99da-46e9-8d8c-52b65a889529-c000.csv",\
                                "rawdata/books/part-00002-39e30f06-99da-46e9-8d8c-52b65a889529-c000.csv"],\
                          header=True)


#df = spark.read.csv("path1,path2,path3")

# Show the data
print('-----Method 3--------')
airports.limit(10).show()

-----Method 1--------
+---+-----------------------------+----------+-----------+----+---+---+
|faa|name                         |lat       |lon        |alt |tz |dst|
+---+-----------------------------+----------+-----------+----+---+---+
|04G|Lansdowne Airport            |41.1304722|-80.6195833|1044|-5 |A  |
|06A|Moton Field Municipal Airport|32.4605722|-85.6800278|264 |-5 |A  |
|06C|Schaumburg Regional          |41.9893408|-88.1012428|801 |-6 |A  |
|06N|Randall Airport              |41.431912 |-74.3915611|523 |-5 |A  |
|09J|Jekyll Island Airport        |31.0744722|-81.4277778|11  |-4 |A  |
+---+-----------------------------+----------+-----------+----+---+---+

-----Method 2--------
+-------+----+-----------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|type                   |manufacturer    |model   |engines|seats|speed|engine   |
+-------+----+-----------------------+----------------+--------+-------+-----+-----+---------+
|N102UW |1998|Fixed wi

## Write into csv file

In [19]:
data = [['Charles Duhigg', 'The Power of Habit', 300],
        ['Stephen Covey', '7 habits of highly effective people', 500],
        ['Eckhart Tolle', 'The Power of Now', 600],
        ['George Orwell', '1984', 245],
        ['Napolean Hill', 'Think N Grow Rich', 345]]
schema = "author STRING, title STRING, pages INT"
books_df = spark.createDataFrame(data, schema)
# Show the DataFrame; it should reflect our table above
#books_df.show()
# Print the schema used by Spark to process the DataFrame
#print(books_df.printSchema())

#Save the dataframe

#This will save csv in prtitions(Doesn't overwrite)
#books_df.write.csv(path="rawdata/books", header=True)

books_df.write.mode('overwrite').csv("rawdata/books_df")
#you can also use this
books_df.write.format("csv").mode('overwrite').save("rawdata/books_df")

#Method for single partition
#-------

books_df.coalesce(1).write.csv("rawdata/books_df/Single_Part")
books_df.repartition(1).write.mode('overwrite').csv("rawdata/books_df/Single_Part")


In [20]:
df = spark.read.csv("rawdata/books",header=True)
df.show()

+--------------+--------------------+-----+
|        author|               title|pages|
+--------------+--------------------+-----+
| George Orwell|                1984|  245|
| Napolean Hill|   Think N Grow Rich|  345|
| Stephen Covey|7 habits of highl...|  500|
|Charles Duhigg|  The Power of Habit|  300|
| Eckhart Tolle|    The Power of Now|  600|
+--------------+--------------------+-----+



## Read and Write json and parquet files

In [22]:
# Don't change this file path
file_path = "rawdata/userdata1.parquet"

# Read in the raw parquet data

df1 = spark.read.parquet(file_path)
#df1 = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").json(file_path)
df1.limit(3).show()

#Method 2
df2 = spark.read.format("parquet").load(file_path)
df2.limit(3).show()

#Method3 Read list of csv inside a folder

df = spark.read.load(["rawdata/userdata1.parquet", "rawdata/userdata2.parquet"],format='parquet')

# Show the data

df.limit(3).show()

+-------------------+---+----------+---------+--------------------+------+--------------+----------------+---------+---------+---------+-------------------+--------+
|  registration_dttm| id|first_name|last_name|               email|gender|    ip_address|              cc|  country|birthdate|   salary|              title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+---------+---------+---------+-------------------+--------+
|2016-02-03 13:25:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|   Internal Auditor|   1E+02|
|2016-02-03 22:34:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|      Accountant IV|        |
|2016-02-03 06:39:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|   Russia| 2/1/1960|144972.51|Structural Engineer|        |
+---

In [23]:
# Don't change this file path
file_path = "rawdata/zipcodes.json"

# Read in the raw json data

df1 = spark.read.json(file_path)
#df1 = spark.read.option("multiLine", "true").option("mode", "PERMISSIVE").json(file_path)
df.limit(3).show()

#Method 2
df2 = spark.read.format("json").load(file_path)
df.limit(3).show()

#Method3 Read list of csv inside a folder

df = spark.read.json(path=["rawdata/zipcode1.json", "rawdata/zipcode2.json"])


# Show the data
#airports.show()
df.limit(3).show()
df.printSchema()

+-------------------+---+----------+---------+--------------------+------+--------------+----------------+---------+---------+---------+-------------------+--------+
|  registration_dttm| id|first_name|last_name|               email|gender|    ip_address|              cc|  country|birthdate|   salary|              title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+---------+---------+---------+-------------------+--------+
|2016-02-03 13:25:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|   Internal Auditor|   1E+02|
|2016-02-03 22:34:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|      Accountant IV|        |
|2016-02-03 06:39:31|  3|    Evelyn|   Morgan|emorgan2@altervis...|Female|  7.161.136.94|6767119071901597|   Russia| 2/1/1960|144972.51|Structural Engineer|        |
+---

In [25]:
df.write.mode('overwrite').json(path="rawdata/json_write")
df.write.mode('overwrite').parquet(path="rawdata/parquet_write")
df.limit(7).show()

+-------------------+-------+-------------+-----+--------------------+--------------------+--------------+------+------------+-----+-----------+-----+-----+-----+-----------+-------+
|               City|Country|Decommisioned|  Lat|            Location|        LocationText|  LocationType|  Long|RecordNumber|State|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-------------------+-------+-------------+-----+--------------------+--------------------+--------------+------+------------+-----+-----------+-----+-----+-----+-----------+-------+
|PASEO COSTA DEL SUR|     US|        false|17.96|NA-US-PR-PASEO CO...|Paseo Costa Del S...|NOT ACCEPTABLE|-66.22|           2|   PR|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|       BDA SAN LUIS|     US|        false|18.14|NA-US-PR-BDA SAN ...|    Bda San Luis, PR|NOT ACCEPTABLE|-66.26|          10|   PR|         NA| 0.38|-0.86| 0.31|   STANDARD|    709|
|        PARC PARQUE|     US|        false|17.96|NA-US-PR-PARC PARQUE|     Parc Parqu

## Usage of withColumn() and withColumnRenamed()

In [27]:
#help(df.withColumn)
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

schema = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data,schema)

df.show()
df.printSchema()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [29]:
from pyspark.sql.functions import col, lit

#1. Change DataType using PySpark withColumn()

df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.printSchema()
df2.show(truncate=False)

#2. Update The Value of an Existing Column

df3 = df.withColumn("salary",col("salary")*100)
df3.printSchema()
df3.show(truncate=False) 

#3. Create a Column from an Existing

df4 = df.withColumn("CopiedColumn",col("salary")* -1)
df4.printSchema()

#4. Add a New Column using withColumn()
print('------------Add New Column--------')
df5 = df.withColumn("Country", lit("USA"))
df5.show()
df5.printSchema()
print('------------Add New Column twice--------')

df6 = df.withColumn("Country", lit("USA")).withColumn("anotherColumn",lit("anotherValue"))
df6.printSchema()
df6.show()

#5. Rename Column Name
df.withColumnRenamed("gender","sex").show(truncate=False) 
  
#6. Drop Column From PySpark DataFrame
df4.drop("CopiedColumn").show(truncate=False) 

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|James    |          |Smith   |1991-04-01|M     |3000  |
|Michael  |Rose      |        |2000-05-19|M     |4000  |
|Robert   |          |Williams|1978-09-05|M     |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |
+---------+----------+--------+----------+------+------+

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------

## Usage of StructType() and StructField()

In [49]:
data = [("James","","Smith","36","M",3000),
    ("Michael","Rose","","40","M",4000),
    ("Robert","","Williams","42","M",4000),
    ("Maria","Anne","Jones","39","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ 
    StructField("firstname",StringType(),True), 
    StructField("middlename",StringType(),True), 
    StructField("lastname",StringType(),True), 
    StructField("age", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("salary", IntegerType(), True) 
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+---+------+------+
|firstname|middlename|lastname|age|gender|salary|
+---------+----------+--------+---+------+------+
|James    |          |Smith   |36 |M     |3000  |
|Michael  |Rose      |        |40 |M     |4000  |
|Robert   |          |Williams|42 |M     |4000  |
|Maria    |Anne      |Jones   |39 |F     |4000  |
|Jen      |Mary      |Brown   |   |F     |-1    |
+---------+----------+--------+---+------+------+



## ArrayType column

In [35]:
data = [
 ("James,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

from pyspark.sql.types import StringType, ArrayType,StructType,StructField
schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True)
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languagesAtWork: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)
 |-- previousState: string (nullable = true)

+---------------+------------------+---------------+------------+-------------+
|           name| languagesAtSchool|languagesAtWork|currentState|previousState|
+---------------+------------------+---------------+------------+-------------+
|    James,Smith|[Java, Scala, C++]|  [Spark, Java]|          OH|           CA|
|  Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|          NY|           NJ|
|Robert,Williams|      [CSharp, VB]|[Spark, Python]|          UT|           NV|
+---------------+------------------+---------------+------------+-------------+



## explode(), split() and arrayContains() function

In [36]:
'''df.withColumn()
df.withColumn()
df.withColumn()
df.withColumn()'''

'df.withColumn()\ndf.withColumn()\ndf.withColumn()\ndf.withColumn()'

In [37]:

from pyspark.sql.functions import explode, split, array, array_contains
print('------------------------------------------->  EXPLODE')
df.select(df.name,explode(df.languagesAtSchool)).show()

print('------------------------------------------->  SPLIT')
df.select(split(df.name,",").alias("nameAsArray")).show()

print('------------------------------------------->  ARRAY')
df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()

print('------------------------------------------->  ARRAY CONTAINS')
df.select(df.name,array_contains(df.languagesAtSchool,"Java")
    .alias("array_contains")).show()

------------------------------------------->  EXPLODE
+---------------+------+
|           name|   col|
+---------------+------+
|    James,Smith|  Java|
|    James,Smith| Scala|
|    James,Smith|   C++|
|  Michael,Rose,| Spark|
|  Michael,Rose,|  Java|
|  Michael,Rose,|   C++|
|Robert,Williams|CSharp|
|Robert,Williams|    VB|
+---------------+------+

------------------------------------------->  SPLIT
+------------------+
|       nameAsArray|
+------------------+
|    [James, Smith]|
| [Michael, Rose, ]|
|[Robert, Williams]|
+------------------+

------------------------------------------->  ALIAS
+---------------+--------+
|           name|  States|
+---------------+--------+
|    James,Smith|[OH, CA]|
|  Michael,Rose,|[NY, NJ]|
|Robert,Williams|[UT, NV]|
+---------------+--------+

------------------------------------------->  ARRAY CONTAINS
+---------------+--------------+
|           name|array_contains|
+---------------+--------------+
|    James,Smith|          true|
|  Michael

## MapType column

In [38]:
#df.withColumn()
#df.withColumn()
#df.withColumn()
#df.withColumn()

In [39]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

#spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [ ("36636","Finance",3000,"USA"), 
    ("40288","Finance",5000,"IND"), 
    ("42114","Sales",3900,"USA"), 
    ("39192","Marketing",2500,"CAN"), 
    ("34534","Sales",6500,"USA") ]
schema = StructType([
     StructField('id', StringType(), True),
     StructField('dept', StringType(), True),
     StructField('salary', IntegerType(), True),
     StructField('location', StringType(), True)
     ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)


root
 |-- id: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- location: string (nullable = true)

+-----+---------+------+--------+
|id   |dept     |salary|location|
+-----+---------+------+--------+
|36636|Finance  |3000  |USA     |
|40288|Finance  |5000  |IND     |
|42114|Sales    |3900  |USA     |
|39192|Marketing|2500  |CAN     |
|34534|Sales    |6500  |USA     |
+-----+---------+------+--------+



In [40]:

#Convert columns to Map
from pyspark.sql.functions import col,lit,create_map
df = df.withColumn("propertiesMap",create_map(
        lit("salary"),col("salary"),
        lit("location"),col("location")
        )).drop("salary","location")
df.printSchema()
df.show(truncate=False)


root
 |-- id: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- propertiesMap: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+-----+---------+---------------------------------+
|id   |dept     |propertiesMap                    |
+-----+---------+---------------------------------+
|36636|Finance  |{salary -> 3000, location -> USA}|
|40288|Finance  |{salary -> 5000, location -> IND}|
|42114|Sales    |{salary -> 3900, location -> USA}|
|39192|Marketing|{salary -> 2500, location -> CAN}|
|34534|Sales    |{salary -> 6500, location -> USA}|
+-----+---------+---------------------------------+



## Row and Column

### ------------------Row-----------------------

In [58]:

from pyspark.sql import SparkSession, Row

row=Row("James",40)
print(row[0] +","+str(row[1]))
row2=Row(name="Alice", age=11)
print(row2.name)

Person = Row("name", "age")
p1=Person("James", 40)
p2=Person("Alice", 35)
print(p1.name +","+p2.name)

#PySpark Example
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [Row(name="James,,Smith",lang=["Java","Scala","C++"],state="CA"), 
    Row(name="Michael,Rose,",lang=["Spark","Java","C++"],state="NJ"),
    Row(name="Robert,,Williams",lang=["CSharp","VB"],state="NV")]

#RDD Example 1
rdd=spark.sparkContext.parallelize(data)
collData=rdd.collect()
print(collData)
for row in collData:
    print(row.name + "," +str(row.lang))

# RDD Example 2
Person=Row("name","lang","state")
data = [Person("James,,Smith",["Java","Scala","C++"],"CA"), 
    Person("Michael,Rose,",["Spark","Java","C++"],"NJ"),
    Person("Robert,,Williams",["CSharp","VB"],"NV")]
rdd=spark.sparkContext.parallelize(data)
collData=rdd.collect()
print(collData)
for person in collData:
    print(person.name + "," +str(person.lang))

#DataFrame Example 1
columns = ["name","languagesAtSchool","currentState"]
df=spark.createDataFrame(data)
df.printSchema()
df.show()

collData=df.collect()
print(collData)
for row in collData:
    print(row.name + "," +str(row.lang))
    
#DataFrame Example 2
columns = ["name","languagesAtSchool","currentState"]
df=spark.createDataFrame(data).toDF(*columns)
df.printSchema()


James,40
Alice
James,Alice
[Row(name='James,,Smith', lang=['Java', 'Scala', 'C++'], state='CA'), Row(name='Michael,Rose,', lang=['Spark', 'Java', 'C++'], state='NJ'), Row(name='Robert,,Williams', lang=['CSharp', 'VB'], state='NV')]
James,,Smith,['Java', 'Scala', 'C++']
Michael,Rose,,['Spark', 'Java', 'C++']
Robert,,Williams,['CSharp', 'VB']
[Row(name='James,,Smith', lang=['Java', 'Scala', 'C++'], state='CA'), Row(name='Michael,Rose,', lang=['Spark', 'Java', 'C++'], state='NJ'), Row(name='Robert,,Williams', lang=['CSharp', 'VB'], state='NV')]
James,,Smith,['Java', 'Scala', 'C++']
Michael,Rose,,['Spark', 'Java', 'C++']
Robert,,Williams,['CSharp', 'VB']
root
 |-- name: string (nullable = true)
 |-- lang: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)



23/07/04 01:05:50 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------------+------------------+-----+
|            name|              lang|state|
+----------------+------------------+-----+
|    James,,Smith|[Java, Scala, C++]|   CA|
|   Michael,Rose,|[Spark, Java, C++]|   NJ|
|Robert,,Williams|      [CSharp, VB]|   NV|
+----------------+------------------+-----+

[Row(name='James,,Smith', lang=['Java', 'Scala', 'C++'], state='CA'), Row(name='Michael,Rose,', lang=['Spark', 'Java', 'C++'], state='NJ'), Row(name='Robert,,Williams', lang=['CSharp', 'VB'], state='NV')]
James,,Smith,['Java', 'Scala', 'C++']
Michael,Rose,,['Spark', 'Java', 'C++']
Robert,,Williams,['CSharp', 'VB']
root
 |-- name: string (nullable = true)
 |-- languagesAtSchool: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- currentState: string (nullable = true)



### ------------------------Column-------------------------

In [41]:

from pyspark.sql.functions import lit
colObj = lit("sparkbyexamples.com")



data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
df.printSchema()


# Using DataFrame object (df)
df.select(df.gender).show()
df.select(df["gender"]).show()
#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()

#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("gender")).show()
#Accessing column name with dot (with backticks)
df.select(col("`name.fname`")).show()


root
 |-- name.fname: string (nullable = true)
 |-- gender: long (nullable = true)

+------+
|gender|
+------+
|    23|
|    40|
+------+

+------+
|gender|
+------+
|    23|
|    40|
+------+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
+----------+

+------+
|gender|
+------+
|    23|
|    40|
+------+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
+----------+



### ------------------Column Operations------------------

In [61]:
#PySpark Column Operators
data=[(100,2,1),(200,3,4),(300,4,4)]
df=spark.createDataFrame(data).toDF("col1","col2","col3")

#Arthmetic operations
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()

df.select(df.col2 > df.col3).show()
df.select(df.col2 < df.col3).show()
df.select(df.col2 == df.col3).show()


+-------------+
|(col1 + col2)|
+-------------+
|          102|
|          203|
|          304|
+-------------+

+-------------+
|(col1 - col2)|
+-------------+
|           98|
|          197|
|          296|
+-------------+

+-------------+
|(col1 * col2)|
+-------------+
|          200|
|          600|
|         1200|
+-------------+

+-----------------+
|    (col1 / col2)|
+-----------------+
|             50.0|
|66.66666666666667|
|             75.0|
+-----------------+

+-------------+
|(col1 % col2)|
+-------------+
|            0|
|            2|
|            0|
+-------------+

+-------------+
|(col2 > col3)|
+-------------+
|         true|
|        false|
|        false|
+-------------+

+-------------+
|(col2 < col3)|
+-------------+
|        false|
|         true|
|        false|
+-------------+

+-------------+
|(col2 = col3)|
+-------------+
|        false|
|        false|
|         true|
+-------------+



# alias(), asc(), desc(), cast() and like()

In [None]:
#Column Functions Examples for alias, ascx, desc, cast, like, 

data=[("James","Bond","100",None),
      ("Ann","Varsa","200",'F'),
      ("Tom Cruise","XXX","400",''),
      ("Tom Brand",None,"400",'M')] 
columns=["fname","lname","id","gender"]
df=spark.createDataFrame(data,columns)


In [69]:

#alias
from pyspark.sql.functions import expr
df.select(df.fname.alias("first_name"), \
          df.lname.alias("last_name")
   ).show()

#Another example
df.select(expr(" fname ||','|| lname").alias("fullName") \
   ).show()


#asc, desc to sort ascending and descending order repsectively.
df.sort(df.fname.asc()).show()
df.sort(df.fname.desc()).show()


#cast
df.select(df.fname,df.id.cast("int")).printSchema()


#between
df.filter(df.id.between(100,300)).show()


#contains
df.filter(df.fname.contains("Cruise")).show()


#startswith, endswith()
df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()


#isNull & isNotNull
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()


#like , rlike
df.select(df.fname,df.lname,df.id) \
  .filter(df.fname.like("%om")) 

#substr()
df.select(df.fname.substr(1,2).alias("substr")).show()


#when & otherwise
from pyspark.sql.functions import when
df.select(df.fname,df.lname,when(df.gender=="M","Male") \
              .when(df.gender=="F","Female") \
              .when(df.gender==None ,"") \
              .otherwise(df.gender).alias("new_gender") \
    ).show()


#isin
li=["100","200"]
df.select(df.fname,df.lname,df.id) \
  .filter(df.id.isin(li)) \
  .show()



#Create DataFrame with struct, array & map
from pyspark.sql.types import StructType,StructField,StringType,ArrayType,MapType
data=[(("James","Bond"),["Java","C#"],{'hair':'black','eye':'brown'}),
      (("Ann","Varsa"),[".NET","Python"],{'hair':'brown','eye':'black'}),
      (("Tom Cruise",""),["Python","Scala"],{'hair':'red','eye':'grey'}),
      (("Tom Brand",None),["Perl","Ruby"],{'hair':'black','eye':'blue'})]

schema = StructType([
        StructField('name', StructType([
            StructField('fname', StringType(), True),
            StructField('lname', StringType(), True)])),
        StructField('languages', ArrayType(StringType()),True),
        StructField('properties', MapType(StringType(),StringType()),True)
     ])
df=spark.createDataFrame(data,schema)
df.printSchema()


#getField from MapType
df.select(df.properties.getField("hair")).show()

#getField from Struct
df.select(df.name.getField("fname")).show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|     James|     Bond|
|       Ann|    Varsa|
|Tom Cruise|      XXX|
| Tom Brand|     null|
+----------+---------+

+--------------+
|      fullName|
+--------------+
|    James,Bond|
|     Ann,Varsa|
|Tom Cruise,XXX|
|          null|
+--------------+

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|       Ann|Varsa|200|     F|
|     James| Bond|100|  null|
| Tom Brand| null|400|     M|
|Tom Cruise|  XXX|400|      |
+----------+-----+---+------+

+----------+-----+---+------+
|     fname|lname| id|gender|
+----------+-----+---+------+
|Tom Cruise|  XXX|400|      |
| Tom Brand| null|400|     M|
|     James| Bond|100|  null|
|       Ann|Varsa|200|     F|
+----------+-----+---+------+

root
 |-- fname: string (nullable = true)
 |-- id: integer (nullable = true)

+-----+-----+---+------+
|fname|lname| id|gender|
+-----+-----+---+------+
|James| Bond|100|  null|
|  Ann|Varsa|200| 

## filter() and where()

In [42]:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import col,array_contains

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

arrayStructureData = [
        (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
        (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
        (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
        (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
        (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
        (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
        ]
        
arrayStructureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('languages', ArrayType(StringType()), True),
         StructField('state', StringType(), True),
         StructField('gender', StringType(), True)
         ])


df = spark.createDataFrame(data = arrayStructureData, schema = arrayStructureSchema)
df.printSchema()
df.show(truncate=False)

df.filter(df.state == "OH") \
    .show(truncate=False)

df.filter(col("state") == "OH") \
    .show(truncate=False)    
    
df.filter("gender  == 'M'") \
    .show(truncate=False)    

df.filter( (df.state  == "OH") & (df.gender  == "M") ) \
    .show(truncate=False)        

df.filter(array_contains(df.languages,"Java")) \
    .show(truncate=False)        

df.filter(df.name.lastname == "Williams") \
    .show(truncate=False) 


23/07/04 18:51:29 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

+----------------------+------------------+-----+------+
|name                  |langu

In [70]:
# Using where()


## distinct() and dropDuplicates()

In [43]:

# Import pySpark
from pyspark.sql.functions import expr
#spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

#Distinct
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)

#Drop duplicates
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
df2.show(truncate=False)

#Drop duplicates on selected columns
dropDisDF = df.dropDuplicates(["department","salary"])
print("Distinct count of department salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

Distinct count: 9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Michael      |Sales     |4600  |
|James        |Sales     |3000  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Kumar        |Marketing |2000  |
|Jeff         |Marketing |3000  |
|S

In [44]:
df.dropDuplicates()

DataFrame[employee_name: string, department: string, salary: bigint]

## orderBy() and  sort()

In [45]:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, asc,desc

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]

df = spark.createDataFrame(data = simpleData, schema = columns)

df.printSchema()
df.show(truncate=False)

df.sort("department","state").show(truncate=False)
df.sort(col("department"),col("state")).show(truncate=False)

df.orderBy("department","state").show(truncate=False)
df.orderBy(col("department"),col("state")).show(truncate=False)

df.sort(df.department.asc(),df.state.asc()).show(truncate=False)
df.sort(col("department").asc(),col("state").asc()).show(truncate=False)
df.orderBy(col("department").asc(),col("state").asc()).show(truncate=False)

df.sort(df.department.asc(),df.state.desc()).show(truncate=False)
df.sort(col("department").asc(),col("state").desc()).show(truncate=False)
df.orderBy(col("department").asc(),col("state").desc()).show(truncate=False)

df.createOrReplaceTempView("EMP")
spark.sql("select employee_name,department,state,salary,age,bonus from EMP ORDER BY department asc").show(truncate=False)


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+

## union() and unionAll()

In [46]:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000) \
  ]

columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

simpleData2 = [("James","Sales","NY",90000,34,10000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns2= ["employee_name","department","state","salary","age","bonus"]

df2 = spark.createDataFrame(data = simpleData2, schema = columns2)

df2.printSchema()
df2.show(truncate=False)

unionDF = df.union(df2)
unionDF.show(truncate=False)
disDF = df.union(df2).distinct()
disDF.show(truncate=False)

unionAllDF = df.unionAll(df2)
unionAllDF.show(truncate=False)


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----



+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|

In [47]:
df.union()

TypeError: DataFrame.union() missing 1 required positional argument: 'other'

## groupBy()

In [48]:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum,avg,max

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

df.groupBy("department").sum("salary").show(truncate=False)

df.groupBy("department").count().show(truncate=False)


df.groupBy("department","state") \
    .sum("salary","bonus") \
   .show(truncate=False)

df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ) \
    .show(truncate=False)
    
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
      avg("salary").alias("avg_salary"), \
      sum("bonus").alias("sum_bonus"), \
      max("bonus").alias("max_bonus")) \
    .where(col("sum_bonus") >= 50000) \
    .show(truncate=False)


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales     |257000     |
|Finance   |351



+----------+-----+
|department|count|
+----------+-----+
|Sales     |3    |
|Finance   |4    |
|Marketing |2    |
+----------+-----+

+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|Sales     |NY   |176000     |30000     |
|Sales     |CA   |81000      |23000     |
|Finance   |CA   |189000     |47000     |
|Finance   |NY   |162000     |34000     |
|Marketing |NY   |91000      |21000     |
|Marketing |CA   |80000      |18000     |
+----------+-----+-----------+----------+

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |171000    |85500.0          |39000    |21000    |
+----------+----------+-----------------+---------+---------+

+---------

## groupBy aggregate

In [51]:

# Import
from pyspark.sql import SparkSession

# Create SparkSession
#spark = SparkSession.builder.appName('SparkByExamples.com').master("local[5]").getOrCreate()

# Create DataFrame
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

from pyspark.sql.functions import sum,avg,max,count

# Example 1 - PySpark groupby agg
print("Example 1 - PySpark groupby agg")
df.groupBy("department") \
    .agg(count("*").alias("count")
     ) \
    .show(truncate=False)

# Example 2 - groupby multiple columns & agg
print("Example 2 - groupby multiple columns & agg")
df.groupBy("department","state") \
    .agg(count("*").alias("count")
     ) \
    .show(truncate=False)
    
# Example 3 - Multiple Aggregates
print("Example 3 - Multiple Aggregates")
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ) \
    .show(truncate=False)

# Example 4 - Using where on Aggregates
print("Example 4 - Using where on Aggregates")
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
      avg("salary").alias("avg_salary"), \
      sum("bonus").alias("sum_bonus"), \
      max("bonus").alias("max_bonus")) \
    .where(col("sum_bonus") >= 50000) \
    .show(truncate=False)

# Example 5 - SQL group by agg
# Create Temporary table in PySpark
df.createOrReplaceTempView("EMP")
print("Example 5 - SQL group by agg")
# PySpark SQL
sql_str="select department, sum(salary) as sum_salary," \
"avg(salary) as avg_salary," \
"sum(bonus) as sum_bonus," \
"max(bonus) as max_bonus" \
" from EMP "  \
" group by department having sum_bonus >= 50000"
spark.sql(sql_str).show()


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+

Example 1 - PySpark groupby agg
+----------+-----+
|department|count|
+----------+-----+
|Sales     |3    |
|Financ

In [53]:

from pyspark.sql import SparkSession
#spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# Create DataFrame df1 with columns name, and id
data = [("James",34), ("Michael",56), \
        ("Robert",30), ("Maria",24) ]

df1 = spark.createDataFrame(data = data, schema=["name","id"])
df1.printSchema()

# Create DataFrame df2 with columns name and id
data2=[(34,"James"),(45,"Maria"), \
       (45,"Jen"),(34,"Jeff")]

df2 = spark.createDataFrame(data = data2, schema = ["id","name"])
df2.printSchema()

# Using unionByName()
print("Using unionByName()")
df3 = df1.unionByName(df2)
#df3.printSchema()
df3.show()

# Using allowMissingColumns
print("Using allowMissingColumns")
df1 = spark.createDataFrame([[5, 2, 6]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[6, 7, 3]], ["col1", "col2", "col3"])
df3 = df1.unionByName(df2, allowMissingColumns=True)
#df3.printSchema()
df3.show()


root
 |-- name: string (nullable = true)
 |-- id: long (nullable = true)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

Using unionByName()
+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
|  James| 34|
|  Maria| 45|
|    Jen| 45|
|   Jeff| 34|
+-------+---+

Using allowMissingColumns
+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|   5|   2|   6|null|
|null|   6|   7|   3|
+----+----+----+----+



## select function

In [54]:

import pyspark
from pyspark.sql import SparkSession

#spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

print("Select one Column")
df.select("firstname").show()

print("Select multiple Column")
df.select("firstname","lastname").show()

#Using Dataframe object name
print("Using Dataframe object name")
df.select(df.firstname,df.lastname).show()

# Using col function
from pyspark.sql.functions import col
print("Using col function")
df.select(col("firstname"),col("lastname")).show()

data = [(("James",None,"Smith"),"OH","M"),
        (("Anna","Rose",""),"NY","F"),
        (("Julia","","Williams"),"OH","F"),
        (("Maria","Anne","Jones"),"NY","M"),
        (("Jen","Mary","Brown"),"NY","M"),
        (("Mike","Mary","Williams"),"OH","M")
        ]

from pyspark.sql.types import StructType,StructField, StringType        
schema = StructType([
    StructField('name', StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
         ])),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
     ])

df2 = spark.createDataFrame(data = data, schema = schema)
df2.printSchema()
df2.show(truncate=False) # shows all columns

df2.select("name").show(truncate=False)
df2.select("name.firstname","name.lastname").show(truncate=False)
df2.select("name.*").show(truncate=False)


+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+

Select one Column
+---------+
|firstname|
+---------+
|    James|
|  Michael|
|   Robert|
|    Maria|
+---------+

Select multiple Column
+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

Using Dataframe object name
+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

Using col function
+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

root
 |-- name: struct (nul

# JOINS

In [56]:

from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
          .appName('SparkByExamples.com') \
          .getOrCreate()
#EMP DataFrame
print("EMP DataFrame")
empData = [(1,"Smith",10), (2,"Rose",20),
    (3,"Williams",10), (4,"Jones",30)
  ]
empColumns = ["emp_id","name","emp_dept_id"]
empDF = spark.createDataFrame(empData,empColumns)
empDF.show()

#DEPT DataFrame
print("DEPT DataFrame")
deptData = [("Finance",10), ("Marketing",20),
    ("Sales",30),("IT",40)
  ]
deptColumns = ["dept_name","dept_id"]
deptDF=spark.createDataFrame(deptData,deptColumns)  
deptDF.show()

#Address DataFrame
print("ADDRESS DataFrame")
addData=[(1,"1523 Main St","SFO","CA"),
    (2,"3453 Orange St","SFO","NY"),
    (3,"34 Warner St","Jersey","NJ"),
    (4,"221 Cavalier St","Newark","DE"),
    (5,"789 Walnut St","Sandiago","CA")
  ]
addColumns = ["emp_id","addline1","city","state"]
addDF = spark.createDataFrame(addData,addColumns)
addDF.show()

#Join two DataFrames
print("Join two DataFrames- ADD & EMP")
empDF.join(addDF,empDF["emp_id"] == addDF["emp_id"]).show()

#Drop duplicate column
print("Drop duplicate column")
empDF.join(addDF,["emp_id"]).show()

#Join Multiple DataFrames
print("Join Multiple DataFrames-  DEPT, ADD & EMP ")
empDF.join(addDF,["emp_id"]) \
     .join(deptDF,empDF["emp_dept_id"] == deptDF["dept_id"]) \
     .show()

#Using Where for Join Condition
print("Using Where for Join Condition- DEPT, ADD & EMP")
empDF.join(deptDF).where(empDF["emp_dept_id"] == deptDF["dept_id"]) \
    .join(addDF).where(empDF["emp_id"] == addDF["emp_id"]) \
    .show()
    
#SQL
print("SQL JOIN")
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
addDF.createOrReplaceTempView("ADD")

spark.sql("select * from EMP e, DEPT d, ADD a " + \
    "where e.emp_dept_id == d.dept_id and e.emp_id == a.emp_id") \
    .show()


EMP DataFrame
+------+--------+-----------+
|emp_id|    name|emp_dept_id|
+------+--------+-----------+
|     1|   Smith|         10|
|     2|    Rose|         20|
|     3|Williams|         10|
|     4|   Jones|         30|
+------+--------+-----------+

DEPT DataFrame
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+

ADDRESS DataFrame
+------+---------------+--------+-----+
|emp_id|       addline1|    city|state|
+------+---------------+--------+-----+
|     1|   1523 Main St|     SFO|   CA|
|     2| 3453 Orange St|     SFO|   NY|
|     3|   34 Warner St|  Jersey|   NJ|
|     4|221 Cavalier St|  Newark|   DE|
|     5|  789 Walnut St|Sandiago|   CA|
+------+---------------+--------+-----+

Join two DataFrames- ADD & EMP




+------+--------+-----------+------+---------------+------+-----+
|emp_id|    name|emp_dept_id|emp_id|       addline1|  city|state|
+------+--------+-----------+------+---------------+------+-----+
|     1|   Smith|         10|     1|   1523 Main St|   SFO|   CA|
|     2|    Rose|         20|     2| 3453 Orange St|   SFO|   NY|
|     3|Williams|         10|     3|   34 Warner St|Jersey|   NJ|
|     4|   Jones|         30|     4|221 Cavalier St|Newark|   DE|
+------+--------+-----------+------+---------------+------+-----+

Drop duplicate column
+------+--------+-----------+---------------+------+-----+
|emp_id|    name|emp_dept_id|       addline1|  city|state|
+------+--------+-----------+---------------+------+-----+
|     1|   Smith|         10|   1523 Main St|   SFO|   CA|
|     2|    Rose|         20| 3453 Orange St|   SFO|   NY|
|     3|Williams|         10|   34 Warner St|Jersey|   NJ|
|     4|   Jones|         30|221 Cavalier St|Newark|   DE|
+------+--------+-----------+-------

                                                                                

+------+--------+-----------+---------------+------+-----+---------+-------+
|emp_id|    name|emp_dept_id|       addline1|  city|state|dept_name|dept_id|
+------+--------+-----------+---------------+------+-----+---------+-------+
|     3|Williams|         10|   34 Warner St|Jersey|   NJ|  Finance|     10|
|     1|   Smith|         10|   1523 Main St|   SFO|   CA|  Finance|     10|
|     2|    Rose|         20| 3453 Orange St|   SFO|   NY|Marketing|     20|
|     4|   Jones|         30|221 Cavalier St|Newark|   DE|    Sales|     30|
+------+--------+-----------+---------------+------+-----+---------+-------+

Using Where for Join Condition- DEPT, ADD & EMP


[Stage 388:>                                                        (0 + 4) / 4]                                                                                

+------+--------+-----------+---------+-------+------+---------------+------+-----+
|emp_id|    name|emp_dept_id|dept_name|dept_id|emp_id|       addline1|  city|state|
+------+--------+-----------+---------+-------+------+---------------+------+-----+
|     1|   Smith|         10|  Finance|     10|     1|   1523 Main St|   SFO|   CA|
|     2|    Rose|         20|Marketing|     20|     2| 3453 Orange St|   SFO|   NY|
|     3|Williams|         10|  Finance|     10|     3|   34 Warner St|Jersey|   NJ|
|     4|   Jones|         30|    Sales|     30|     4|221 Cavalier St|Newark|   DE|
+------+--------+-----------+---------+-------+------+---------------+------+-----+

SQL JOIN




+------+--------+-----------+---------+-------+------+---------------+------+-----+
|emp_id|    name|emp_dept_id|dept_name|dept_id|emp_id|       addline1|  city|state|
+------+--------+-----------+---------+-------+------+---------------+------+-----+
|     1|   Smith|         10|  Finance|     10|     1|   1523 Main St|   SFO|   CA|
|     2|    Rose|         20|Marketing|     20|     2| 3453 Orange St|   SFO|   NY|
|     3|Williams|         10|  Finance|     10|     3|   34 Warner St|Jersey|   NJ|
|     4|   Jones|         30|    Sales|     30|     4|221 Cavalier St|Newark|   DE|
+------+--------+-----------+---------+-------+------+---------------+------+-----+



                                                                                

# TYPES of JOIN

In [57]:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

#spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
print("Emp DF")
empDF.show(truncate=False)


dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
print("Dept DF")
deptDF.show(truncate=False)

print("---------INNER JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

print("---------OUTER JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer") \
    .show(truncate=False)

print("---------FULL JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full") \
    .show(truncate=False)

print("---------FULL_OUTER JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter") \
    .show(truncate=False)
    
print("---------LEFT JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left") \
    .show(truncate=False)

print("---------LEFT_OUTER JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter") \
   .show(truncate=False)

print("---------RIGHT JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right") \
   .show(truncate=False)

print("---------RIGHT_OUTER JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"rightouter") \
   .show(truncate=False)

print("---------LEFT_SEMI JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi") \
   .show(truncate=False)

print("---------LEFT_ANTI JOIN------------")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti") \
   .show(truncate=False)
   
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")
print('XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX')
print("---------SQL JOIN------------")
joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

Emp DF
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+-----



+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

---------OUTER JOIN------------


                                                                                

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

---------

[Stage 417:>                                                        (0 + 4) / 4]                                                                                

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

---------



+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

---------LEFT_OUTER JOIN------------




+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

---------RIGHT JOIN------------
+------+--------+---------------+-----------+-----------+------+---



+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+



                                                                                

+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
---------SQL JOIN------------
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  

## Pivot and Unpivot

In [58]:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
#spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
print("Fruits DF")
df.show(truncate=False)

pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
print("Pivoted Fruits DF")
pivotDF.show(truncate=False)

pivotDF = df.groupBy("Product","Country") \
      .sum("Amount") \
      .groupBy("Product") \
      .pivot("Country") \
      .sum("sum(Amount)")
pivotDF.printSchema()
print("Pivoted GroupBy Fruits DF")
pivotDF.show(truncate=False)

""" unpivot """

unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"
unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \
    .where("Total is not null")
print("UnPivot")
unPivotDF.show(truncate=False)


root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

Fruits DF
+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+

root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

Pivoted Fruits DF
+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----

## fill & fillNA

In [59]:

from pyspark.sql import SparkSession


filePath="rawdata/small_zipcode.csv"
df = spark.read.options(header='true', inferSchema='true') \
          .csv(filePath)

df.printSchema()
print("RAW data")
df.show(truncate=False)


print("fillna data")
df.fillna(value=0).show()
print("fillna subset population data")
df.fillna(value=0,subset=["population"]).show()
df.na.fill(value=0).show()
df.na.fill(value=0,subset=["population"]).show()

print("fillna '__' data")
df.fillna(value="").show()
print("na.fill '__' data")
df.na.fill(value="").show()

df.fillna("unknown",["city"]) \
    .fillna("",["type"]).show()

df.fillna({"city": "unknown", "type": ""}) \
    .show()

df.na.fill("unknown",["city"]) \
    .na.fill("",["type"]).show()

df.na.fill({"city": "unknown", "type": ""}) \
    .show()


root
 |-- id: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- population: integer (nullable = true)

RAW data
+---+-------+--------+-------------------+-----+----------+
|id |zipcode|type    |city               |state|population|
+---+-------+--------+-------------------+-----+----------+
|1  |704    |STANDARD|null               |PR   |30100     |
|2  |704    |null    |PASEO COSTA DEL SUR|PR   |null      |
|3  |709    |null    |BDA SAN LUIS       |PR   |3700      |
|4  |76166  |UNIQUE  |CINGULAR WIRELESS  |TX   |84000     |
|5  |76177  |STANDARD|null               |TX   |null      |
+---+-------+--------+-------------------+-----+----------+

fillna data
+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|   

## sample example 

In [63]:
df = spark.range(start=1, end=101)
df1 = df.sample(fraction=0.1, seed=123).show()
df2 = df.sample(fraction=0.1, seed=123).show()

+---+
| id|
+---+
| 45|
| 61|
| 65|
| 66|
| 67|
| 77|
| 80|
| 84|
| 94|
|100|
+---+

+---+
| id|
+---+
| 45|
| 61|
| 65|
| 66|
| 67|
| 77|
| 80|
| 84|
| 94|
|100|
+---+



## collect()

In [65]:

import pyspark
from pyspark.sql import SparkSession

#spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
print("----------------------------DEPT data")
deptDF.show(truncate=False)

dataCollect = deptDF.collect()

print("----------------------------DEPT data collect")
print(dataCollect)
dataCollect2 = deptDF.select("dept_name").collect()
print("----------------------------DEPT data collect2")
print(dataCollect2)

print("----------------------------DEPT datacollect iterate over each row")
for row in dataCollect:
    print(row['dept_name'] + "," +str(row['dept_id']))


root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

----------------------------DEPT data
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

----------------------------DEPT data collect
[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]
----------------------------DEPT data collect2
[Row(dept_name='Finance'), Row(dept_name='Marketing'), Row(dept_name='Sales'), Row(dept_name='IT')]
----------------------------DEPT datacollect iterate over each row
Finance,10
Marketing,20
Sales,30
IT,40


## transform function

In [67]:
# Prepare Data
simpleData = (("Java",4000,5), \
    ("Python", 4600,10),  \
    ("Scala", 4100,15),   \
    ("Scala", 4500,15),   \
    ("PHP", 3000,20),  \
  )
columns= ["CourseName", "fee", "discount"]

# Create DataFrame
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
print('Raw DATA')
df.show(truncate=False)

# Custom transformation 1
from pyspark.sql.functions import upper
def to_upper_str_columns(df):
    return df.withColumn("CourseName",upper(df.CourseName))

# Custom transformation 2
def reduce_price(df,reduceBy):
    return df.withColumn("new_fee",df.fee - reduceBy)

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee",  \
             df.new_fee - (df.new_fee * df.discount) / 100)

# transform() usage
df2 = df.transform(to_upper_str_columns) \
        .transform(reduce_price,1000) \
        .transform(apply_discount) 

print("Customized Transformations---> UpperCase, ReducePrice and Discount")
df2.show()

# Create DataFrame with Array
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"]),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"]),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"])
]
df = spark.createDataFrame(data=data,schema=["Name","Languages1","Languages2"])
df.printSchema()
print('Raw data for SQL transform')
df.show()

# using transform() SQL function
from pyspark.sql.functions import upper
from pyspark.sql.functions import transform
print("using transform() SQL lambda function")
df.select(transform("Languages1", lambda x: upper(x)).alias("languages1")) \
  .show()

root
 |-- CourseName: string (nullable = true)
 |-- fee: long (nullable = true)
 |-- discount: long (nullable = true)

Raw DATA
+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+

Customized Transformations---> UpperCase, ReducePrice and Discount
+----------+----+--------+-------+--------------+
|CourseName| fee|discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|      JAVA|4000|       5|   3000|        2850.0|
|    PYTHON|4600|      10|   3600|        3240.0|
|     SCALA|4100|      15|   3100|        2635.0|
|     SCALA|4500|      15|   3500|        2975.0|
|       PHP|3000|      20|   2000|        1600.0|
+----------+----+--------+-------+--------------+

root
 |-- Name: string (nullable = true)
 |-- Languages1: array (nullable = true)
 |    |-- element: string (c

# date functions

In [68]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create SparkSession
spark = SparkSession.builder \
            .appName('SparkByExamples.com') \
            .getOrCreate()
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data,["id","input"])
df.show()

+---+----------+
| id|     input|
+---+----------+
|  1|2020-02-01|
|  2|2019-03-01|
|  3|2021-03-01|
+---+----------+



In [69]:

#to_date()
print("to_date function")
df.select(col("input"), 
    to_date(col("input"), "yyy-MM-dd").alias("to_date") 
  ).show()

#datediff()
print("date_diff function")
df.select(col("input"), 
    datediff(current_date(),col("input")).alias("datediff")  
  ).show()

print("date month between function")
#months_between()
df.select(col("input"), 
    months_between(current_date(),col("input")).alias("months_between")  
  ).show()

print("date trunc function")

#trunc()
df.select(col("input"), 
    trunc(col("input"),"Month").alias("Month_Trunc"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()

print("add_months , date_add, date_sub function")

#add_months() , date_add(), date_sub()
df.select(col("input"), 
    add_months(col("input"),3).alias("add_months"), 
    add_months(col("input"),-3).alias("sub_months"), 
    date_add(col("input"),4).alias("date_add"), 
    date_sub(col("input"),4).alias("date_sub") 
  ).show()


print("year, month, month, next_day, weekofyear function")

df.select(col("input"), 
     year(col("input")).alias("year"), 
     month(col("input")).alias("month"), 
     next_day(col("input"),"Sunday").alias("next_day"), 
     weekofyear(col("input")).alias("weekofyear") 
  ).show()


print("new date data")

data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

print("current_timestamp function")

#current_timestamp()
df2.select(current_timestamp().alias("current_timestamp")
  ).show(1,truncate=False)

print("to_timestamp function")

#to_timestamp()
df2.select(col("input"), 
    to_timestamp(col("input"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp") 
  ).show(truncate=False)

print("hour, Minute and second")

#hour, minute,second
data=[["1","2020-02-01 11:01:19.06"],["2","2019-03-01 12:01:19.406"],["3","2021-03-01 12:01:19.406"]]
df3=spark.createDataFrame(data,["id","input"])

df3.select(col("input"), 
    hour(col("input")).alias("hour"), 
    minute(col("input")).alias("minute"),
    second(col("input")).alias("second") 
  ).show(truncate=False)


print("THE END")


to_date function
+----------+----------+
|     input|   to_date|
+----------+----------+
|2020-02-01|2020-02-01|
|2019-03-01|2019-03-01|
|2021-03-01|2021-03-01|
+----------+----------+

date_diff function
+----------+--------+
|     input|datediff|
+----------+--------+
|2020-02-01|    1249|
|2019-03-01|    1586|
|2021-03-01|     855|
+----------+--------+

date month between function
+----------+--------------+
|     input|months_between|
+----------+--------------+
|2020-02-01|   41.09677419|
|2019-03-01|   52.09677419|
|2021-03-01|   28.09677419|
+----------+--------------+

date trunc function
+----------+-----------+----------+-----------+
|     input|Month_Trunc|Month_Year|Month_Trunc|
+----------+-----------+----------+-----------+
|2020-02-01| 2020-02-01|2020-01-01| 2020-02-01|
|2019-03-01| 2019-03-01|2019-01-01| 2019-03-01|
|2021-03-01| 2021-03-01|2021-01-01| 2021-03-01|
+----------+-----------+----------+-----------+

add_months , date_add, date_sub function
+----------+-----

## User Defined Functions

In [72]:

# Import
from pyspark.sql import SparkSession

# Create SparkSession
#spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# Prepare Data
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

# Create DataFrame
df = spark.createDataFrame(data=data,schema=columns)
print("Raw Data")
df.show()

# imports
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import pandas as pd

# create pandas_udf
@pandas_udf(StringType())
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()

# Using UDF with select()
print("Using UDF with select")
df.select("Seqno","Name",to_upper("Name")).show()

# Using UDF with withColumn()
print("Using UDF with withColumn")
df.withColumn("upper_col",to_upper("Name")).show()


Raw Data
+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+

Using UDF with select


                                                                                

+-----+------------+--------------+
|Seqno|        Name|to_upper(Name)|
+-----+------------+--------------+
|    1|  john jones|    JOHN JONES|
|    2|tracey smith|  TRACEY SMITH|
|    3| amy sanders|   AMY SANDERS|
+-----+------------+--------------+

Using UDF with withColumn
+-----+------------+------------+
|Seqno|        Name|   upper_col|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+

