In [6]:
import pyspark

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]
columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+



https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/

# 1. PySpark Read CSV File into DataFrame

In [8]:
val spark = SparkSession.builder().master("local[1]")
          .appName("SparkByExamples.com")
          .getOrCreate()
df = spark.read.csv("/tmp/resources/zipcodes.csv")
df.printSchema()

SyntaxError: invalid syntax (<ipython-input-8-3acdd7f77853>, line 1)

In [None]:
df = spark.read.format("csv")
                  .load("enter your path/tmp/resources/zipcodes.csv")
df.printSchema()

In [None]:
df = spark.read.format("org.apache.spark.sql.csv")
                  .load("/tmp/resources/zipcodes.csv")
df.printSchema()

#you can also specify the Data sources by their fully qualified name, 
#but for built-in sources, you can simply use their short names (csv,json, parquet, jdbc, text e.t.c). 

# 2. Options While Reading CSV File

PySpark CSV dataset provides multiple options to work with CSV files

1. option(self, key, value)
2. options(self, **options)

Note - inferschema (2.2) me example dekho smjhne ke liye

In [None]:
#Read Multiple CSV Files

Using the read.csv() method you can also read multiple csv files, 
just pass all file names by separating comma as a path, for example : 

In [None]:
df = spark.read.csv("path1,path2,path3")

2.1 delimiter

delimiter option is used to specify the column delimiter of the CSV file. By default, it is comma (,) character, but can be set to any character like pipe(|), tab (\t), space using this option.



In [None]:
df3 = spark.read.options(delimiter=',') \
  .csv("C:/apps/sparkbyexamples/src/pyspark-examples/resources/zipcodes.csv")


2.2 inferSchema

The default value set to this option is False when setting to true it automatically infers (parinam nikalna) column types based on the data. 

Note:-  it requires reading the data one more time to infer the schema.

In [None]:
df4 = spark.read.options(inferSchema='True',delimiter=',') \
  .csv("src/main/resources/zipcodes.csv")

or

In [None]:
df4 = spark.read.option("inferSchema",True) \
                .option("delimiter",",") \
  .csv("src/main/resources/zipcodes.csv")

2.3 header

This option is used to read the first line of the CSV file as column names. By default the value of this option is False , and all column types are assumed to be a string.

In [None]:
df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("/tmp/resources/zipcodes.csv")

2.4 quotes

When you have a column with a delimiter that used to split the columns, use quotes option to specify the quote character, by default it is ” and delimiters inside quotes are ignored. but using this option you can set any character.

2.5 nullValues

Using nullValues option you can specify the string in a CSV to consider as null. For example, if you want to consider a date column with a value "1900-01-01" set null on DataFrame.

# 3. Reading CSV files with a user-specified custom schema

In [None]:
agar tum inferschema use nhi karna chahte ho. 
then use user-defined custom coulmn names and type using schema options.

In [None]:
schema = StructType() \     #yha par maine apna schema bna liya hai. now ab isko mai use kar sakta hun df ko read karte samay.
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("/tmp/resources/zipcodes.csv")

# 4. Write PySpark DataFrame to CSV file

In [None]:
df.write.option("header",True) \
  .csv("/tmp/spark_output/zipcodes")

4.1 Options

While writing a CSV file you can use several options. 

for example:-

header :- to output the DataFrame column names as header record.

delimiter :- to specify the delimiter on the CSV output file.

In [None]:
df2.write.options(header='True', delimiter=',') \
   .csv("/tmp/spark_output/zipcodes")

4.2 Saving modes

PySpark DataFrameWriter also has a method 'mode()' to specify saving mode.

overwrite – mode is used to overwrite the existing file.

append – To add the data to the existing file.

ignore – Ignores write operation when the file already exists.

error – This is a default option when the file already exists, it returns an error.

In [None]:
df2.write.format("csv").mode('overwrite').save("/tmp/spark_output/zipcodes")

# PySpark – Create DataFrame

In order to create a DataFrame from a list 
we need the data hence, 
first, let’s create the "data" and the "columns" that are needed.

In [4]:
columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

1. Create DataFrame from RDD

Create PySpark DataFrame is from an existing RDD. 

First, let’s create a Spark RDD from a collection List by calling parallelize() function from SparkContext . 

We would need this rdd object for all our examples below.

In [5]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)

1.1 Using "toDF()" function

-PySpark RDD’s toDF() method is used to create a DataFrame from existing RDD. 

-Since RDD doesn’t have columns, 
the DataFrame is created with default column names :-

“_1” 

and “_2” 

as we have two columns.

In [7]:
dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



If you wanted to provide column names to the DataFrame use 

---toDF() method with column names as arguments as shown below.

In [8]:
columns = ["language","users_count"]
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



1.2 Using "createDataFrame()" from SparkSession

In [10]:
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromRDD2.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



2.3 Create DataFrame with schema


StructType schema==To specify the column names along with their data types, 
you should create the StructType schema first and then 
assign this while creating a DataFrame.

In [14]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \   #To specify the column names with their data types
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



3. Create DataFrame from Data sources

Create DataFrame from data source files like CSV, Text, JSON, XML e.t.c.

3.1 Creating DataFrame from CSV


In [None]:
df2 = spark.read.csv("/src/resources/file.csv")

3.2. Creating from text (TXT) file

In [None]:
df2 = spark.read.text("/src/resources/file.txt")

3.3. Creating from JSON file

In [None]:
df2 = spark.read.json("/src/resources/file.json")

# HANDS ON PARQUET FILE

create a Pyspark DataFrame from a list of data using 

spark.createDataFrame() method.

In [16]:
data =[("James ","","Smith","36636","M",3000),
              ("Michael ","Rose","","40288","M",4000),
              ("Robert ","","Williams","42114","M",4000),
              ("Maria ","Anne","Jones","39192","F",4000),
              ("Jen","Mary","Brown","","F",-1)]
columns=["firstname","middlename","lastname","dob","gender","salary"]
df=spark.createDataFrame(data,columns)

In [None]:
/Users/ankitnayan/Downloads

Pyspark Write DataFrame to Parquet file format(ye automatically file bna dega parquet file)

In [18]:
df.write.parquet("/Users/ankitnayan/Downloads/sample1.parquet")

Pyspark Read Parquet file into DataFrame

In [19]:
parDF=spark.read.parquet("/Users/ankitnayan/Downloads/sample1.parquet")

Append or Overwrite an existing Parquet file

In [22]:
df.write.mode('append').parquet("/Users/ankitnayan/Downloads/sample1.parquet")
df.write.mode('overwrite').parquet("/Users/ankitnayan/Downloads/sample1.parquet")

In [24]:
parDF.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")

In [25]:
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/tmp/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   Maria |      Anne|   Jones|39192|     F|  4000|
|  Robert |          |Williams|42114|     M|  4000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [26]:
df.write.partitionBy("gender","salary").mode("overwrite").parquet("/Users/ankitnayan/Downloads/sample1.parquet")

# PySpark Select Columns From DataFrame

In [27]:
import pyspark
from pyspark.sql import SparkSession

In [34]:
# create a Dataframe.
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]
columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+



1. Select Single & Multiple Columns From PySpark

select the single or multiple columns of the DataFrame by passing the column names you wanted to select to the select() function

show() function is used to show the Dataframe contents.

In [35]:
df.select("firstname","lastname").show()

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+



In [36]:
df.select(df.firstname,df.lastname).show()
df.select(df["firstname"],df["lastname"]).show()

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+



In [37]:
#By using col() function
from pyspark.sql.functions import col
df.select(col("firstname"),col("lastname")).show()


+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+



2. Select All Columns From List

In [38]:
# Select All columns from List
df.select(*columns).show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



In [39]:
# Select All columns
df.select([col for col in df.columns]).show()
df.select("*").show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



3. Select Columns by Index

In [40]:
#Selects first 3 columns and top 3 rows
df.select(df.columns[:3]).show(3)

#Selects columns 2 to 4  and top 3 rows
df.select(df.columns[2:4]).show(3)

+---------+--------+-------+
|firstname|lastname|country|
+---------+--------+-------+
|    James|   Smith|    USA|
|  Michael|    Rose|    USA|
|   Robert|Williams|    USA|
+---------+--------+-------+
only showing top 3 rows

+-------+-----+
|country|state|
+-------+-----+
|    USA|   CA|
|    USA|   NY|
|    USA|   CA|
+-------+-----+
only showing top 3 rows



4. Select Nested Struct Columns from PySpark

In [41]:
data = [
        (("James",None,"Smith"),"OH","M"),
        (("Anna","Rose",""),"NY","F"),
        (("Julia","","Williams"),"OH","F"),
        (("Maria","Anne","Jones"),"NY","M"),
        (("Jen","Mary","Brown"),"NY","M"),
        (("Mike","Mary","Williams"),"OH","M")
        ]

from pyspark.sql.types import StructType,StructField, StringType        
schema = StructType([
    StructField('name', StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
         ])),
    StructField('state', StringType(), True),
    StructField('gender', StringType(), True)
     ])
df2 = spark.createDataFrame(data = data, schema = schema)
df2.printSchema()
df2.show(truncate=False) # shows all columns

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+-----+------+
|name                  |state|gender|
+----------------------+-----+------+
|{James, null, Smith}  |OH   |M     |
|{Anna, Rose, }        |NY   |F     |
|{Julia, , Williams}   |OH   |F     |
|{Maria, Anne, Jones}  |NY   |M     |
|{Jen, Mary, Brown}    |NY   |M     |
|{Mike, Mary, Williams}|OH   |M     |
+----------------------+-----+------+



In [44]:
df2.select("name").show(truncate=False) #let’s select struct column.

+----------------------+
|name                  |
+----------------------+
|{James, null, Smith}  |
|{Anna, Rose, }        |
|{Julia, , Williams}   |
|{Maria, Anne, Jones}  |
|{Jen, Mary, Brown}    |
|{Mike, Mary, Williams}|
+----------------------+



In [45]:
#to select the specific column from a nested struct, 
#you need to explicitly qualify the nested struct column name.
df2.select("name.firstname","name.lastname").show(truncate=False)

+---------+--------+
|firstname|lastname|
+---------+--------+
|James    |Smith   |
|Anna     |        |
|Julia    |Williams|
|Maria    |Jones   |
|Jen      |Brown   |
|Mike     |Williams|
+---------+--------+



In [46]:
df2.select("name.*").show(truncate=False) #to get all columns from struct column.

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|James    |null      |Smith   |
|Anna     |Rose      |        |
|Julia    |          |Williams|
|Maria    |Anne      |Jones   |
|Jen      |Mary      |Brown   |
|Mike     |Mary      |Williams|
+---------+----------+--------+



# PySpark withColumn() 

In [48]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.createDataFrame(data=data, schema = columns)

1. Change DataType using PySpark withColumn()

In [49]:
df.withColumn("salary",col("salary").cast("Integer")).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



2. Update The Value of an Existing Column

In [50]:
df.withColumn("salary",col("salary")*100).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|300000|
|  Michael|      Rose|        |2000-05-19|     M|400000|
|   Robert|          |Williams|1978-09-05|     M|400000|
|    Maria|      Anne|   Jones|1967-12-01|     F|400000|
|      Jen|      Mary|   Brown|1980-02-17|     F|  -100|
+---------+----------+--------+----------+------+------+



3. Create a Column from an Existing

In [51]:
df.withColumn("CopiedColumn",col("salary")* -1).show()
#creates a new column “CopiedColumn” by multiplying “salary” column with value -1.

+---------+----------+--------+----------+------+------+------------+
|firstname|middlename|lastname|       dob|gender|salary|CopiedColumn|
+---------+----------+--------+----------+------+------+------------+
|    James|          |   Smith|1991-04-01|     M|  3000|       -3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|       -4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|       -4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|       -4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|           1|
+---------+----------+--------+----------+------+------+------------+



4. Add a New Column using withColumn()

In [53]:
from pyspark.sql.functions import col,lit

In [54]:
#PySpark "lit()"" function is used to add a constant value to a DataFrame column. 
df.withColumn("Country", lit("USA")).show()
df.withColumn("Country", lit("USA")) \
  .withColumn("anotherColumn",lit("anotherValue")) \
  .show()

+---------+----------+--------+----------+------+------+-------+
|firstname|middlename|lastname|       dob|gender|salary|Country|
+---------+----------+--------+----------+------+------+-------+
|    James|          |   Smith|1991-04-01|     M|  3000|    USA|
|  Michael|      Rose|        |2000-05-19|     M|  4000|    USA|
|   Robert|          |Williams|1978-09-05|     M|  4000|    USA|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|    USA|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|    USA|
+---------+----------+--------+----------+------+------+-------+

+---------+----------+--------+----------+------+------+-------+-------------+
|firstname|middlename|lastname|       dob|gender|salary|Country|anotherColumn|
+---------+----------+--------+----------+------+------+-------+-------------+
|    James|          |   Smith|1991-04-01|     M|  3000|    USA| anotherValue|
|  Michael|      Rose|        |2000-05-19|     M|  4000|    USA| anotherValue|
|   Robert|        

5. Rename Column Name

To rename an existing column use "withColumnRenamed()"" function on DataFrame.

In [57]:
df.withColumnRenamed("gender","sex") \
  .show(truncate=False) 

+---------+----------+--------+----------+---+------+
|firstname|middlename|lastname|dob       |sex|salary|
+---------+----------+--------+----------+---+------+
|James    |          |Smith   |1991-04-01|M  |3000  |
|Michael  |Rose      |        |2000-05-19|M  |4000  |
|Robert   |          |Williams|1978-09-05|M  |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F  |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F  |-1    |
+---------+----------+--------+----------+---+------+



6. Drop Column From PySpark DataFrame

Use “drop” function to drop a specific column from the DataFrame.

In [58]:
df.drop("salary").show() 

+---------+----------+--------+----------+------+
|firstname|middlename|lastname|       dob|gender|
+---------+----------+--------+----------+------+
|    James|          |   Smith|1991-04-01|     M|
|  Michael|      Rose|        |2000-05-19|     M|
|   Robert|          |Williams|1978-09-05|     M|
|    Maria|      Anne|   Jones|1967-12-01|     F|
|      Jen|      Mary|   Brown|1980-02-17|     F|
+---------+----------+--------+----------+------+



# PySpark "withColumnRenamed" to Rename Column on DataFrame

In [None]:
# data

In [59]:
dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M',4000),
  (('Maria','Anne','Jones'),'1967-12-01','F',4000),
  (('Jen','Mary','Brown'),'1980-02-17','F',-1)
]

In [None]:
# schema

In [68]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('gender', IntegerType(), True)
         ])

In [69]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.createDataFrame(data = dataDF, schema = schema)
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)



1. PySpark withColumnRenamed – To rename DataFrame column name

 PySpark withColumnRenamed() Syntax:


""withColumnRenamed(existingName, newNam)""

 existingName – The existing column name you want to change
 
 newName – New name of the column

In [71]:
df.withColumnRenamed("dob","DateOfBirth").printSchema()# Returns a new DataFrame with a column renamed.
# above statement changes column “dob” to “DateOfBirth”

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)



2. PySpark withColumnRenamed – To rename multiple columns

In [72]:
df2 = df.withColumnRenamed("dob","DateOfBirth") \
        .withColumnRenamed("salary","salary_amount")
df2.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)



3. Using PySpark StructType – To rename a nested column in Dataframe

In [66]:
# Changing a column name on nested data is not straight forward
# 1.by creating a new schema
# 2.use it using cast function 

In [64]:
schema2 = StructType([
    StructField("fname",StringType()),
    StructField("middlename",StringType()),
    StructField("lname",StringType())])

In [None]:
df.select(col("name").cast(schema2), \ #name me hi change mara hai iske liye, cast me upar wala schema2 daal diya hai
     col("dob"), col("gender"),col("salary")) \ # dob,gender,salary me koi change nhi hai
   .printSchema()  
# This statement renames firstname to fname and lastname 
# to lname within name structure.

4. Using Select – To rename nested elements.

In [None]:
from pyspark.sql.functions import *
df.select(col("name.firstname").alias("fname"), \
  col("name.middlename").alias("mname"), \
  col("name.lastname").alias("lname"), \
  col("dob"),col("gender"),col("salary")) \
  .printSchema()

5. Using PySpark DataFrame withColumn – To rename nested columns

In [75]:
from pyspark.sql.functions import *
df4 = df.withColumn("fname",col("name.firstname")) \
      .withColumn("mname",col("name.middlename")) \
      .withColumn("lname",col("name.lastname")) \
      .drop("name")
df4.printSchema()

root
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- mname: string (nullable = true)
 |-- lname: string (nullable = true)



# PySpark Where Filter Function

In [76]:
from pyspark.sql.types import StructType,StructField 
from pyspark.sql.types import StringType, IntegerType, ArrayType
data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]
        
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])
df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



2. DataFrame filter() with Column Condition

In [77]:
# Using equals condition
df.filter(df.state == "OH").show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [78]:
# not equals condition
df.filter(df.state != "OH").show(truncate=False) 
df.filter(~(df.state == "OH")).show(truncate=False)

+--------------------+------------------+-----+------+
|name                |languages         |state|gender|
+--------------------+------------------+-----+------+
|{Anna, Rose, }      |[Spark, Java, C++]|NY   |F     |
|{Maria, Anne, Jones}|[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, VB]      |NY   |M     |
+--------------------+------------------+-----+------+

+--------------------+------------------+-----+------+
|name                |languages         |state|gender|
+--------------------+------------------+-----+------+
|{Anna, Rose, }      |[Spark, Java, C++]|NY   |F     |
|{Maria, Anne, Jones}|[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, VB]      |NY   |M     |
+--------------------+------------------+-----+------+



5. Filter Based on List Values

In [79]:
#Filter IS IN List values
li=["OH","CA","DE"]
df.filter(df.state.isin(li)).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [80]:
# Filter NOT IS IN List values
#These show all records with NY (NY is not part of the list)
df.filter(~df.state.isin(li)).show()
df.filter(df.state.isin(li)==False).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [None]:
# is null or null

In [81]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

data = [
    ("James",None,"M"),
    ("Anna","NY","F"),
    ("Julia",None,None)
  ]

columns = ["name","state","gender"]
df = spark.createDataFrame(data,columns)
df.show()

+-----+-----+------+
| name|state|gender|
+-----+-----+------+
|James| null|     M|
| Anna|   NY|     F|
|Julia| null|  null|
+-----+-----+------+



In [82]:
df.filter("state is NULL").show()
df.filter(df.state.isNull()).show()
df.filter(col("state").isNull()).show()

+-----+-----+------+
| name|state|gender|
+-----+-----+------+
|James| null|     M|
|Julia| null|  null|
+-----+-----+------+

+-----+-----+------+
| name|state|gender|
+-----+-----+------+
|James| null|     M|
|Julia| null|  null|
+-----+-----+------+

+-----+-----+------+
| name|state|gender|
+-----+-----+------+
|James| null|     M|
|Julia| null|  null|
+-----+-----+------+



In [None]:
# Filter Rows with IS NOT NULL or isNotNull

#isNotNull() is used to filter rows that are NOT NULL in DataFrame 
#columns.

In [83]:
from pyspark.sql.functions import col
df.filter("state IS NOT NULL").show()
df.filter(df.state.isNotNull()).show()
df.filter(col("state").isNotNull()).show()

+----+-----+------+
|name|state|gender|
+----+-----+------+
|Anna|   NY|     F|
+----+-----+------+

+----+-----+------+
|name|state|gender|
+----+-----+------+
|Anna|   NY|     F|
+----+-----+------+

+----+-----+------+
|name|state|gender|
+----+-----+------+
|Anna|   NY|     F|
+----+-----+------+



In [84]:
# PySpark SQL Filter Rows with NULL Values

In [85]:
df.createOrReplaceTempView("DATA")
spark.sql("SELECT * FROM DATA where STATE IS NULL").show()
spark.sql("SELECT * FROM DATA where STATE IS NULL AND GENDER IS NULL").show()
spark.sql("SELECT * FROM DATA where STATE IS NOT NULL").show()

+-----+-----+------+
| name|state|gender|
+-----+-----+------+
|James| null|     M|
|Julia| null|  null|
+-----+-----+------+

+-----+-----+------+
| name|state|gender|
+-----+-----+------+
|Julia| null|  null|
+-----+-----+------+

+----+-----+------+
|name|state|gender|
+----+-----+------+
|Anna|   NY|     F|
+----+-----+------+



In [None]:
# Similar to SQL GROUP BY clause, 
# PySpark groupBy() function is used to collect the identical data into groups on 
# DataFrame and perform aggregate functions on the grouped data. 

we perform groupBy() on PySpark Dataframe, it returns GroupedData object which contains below aggregate functions.

1.count() - Returns the count of rows for each group.

2.mean() - Returns the mean of values for each group.

3.max() - Returns the maximum of values for each group.

4.min() - Returns the minimum of values for each group.

5.sum() - Returns the total for values for each group.

6.avg() - Returns the average for values for each group.

In [86]:
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



PySpark groupBy and aggregate on DataFrame columns

In [87]:
df.groupBy("department").sum("salary").show(truncate=False)

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales     |257000     |
|Finance   |351000     |
|Marketing |171000     |
+----------+-----------+



In [88]:
df.groupBy("department").count()

DataFrame[department: string, count: bigint]

In [89]:
df.groupBy("department").min("salary")

DataFrame[department: string, min(salary): bigint]

In [90]:
df.groupBy("department").avg( "salary")

DataFrame[department: string, avg(salary): double]

PySpark groupBy and aggregate on multiple columns

In [94]:
#//GroupBy on multiple columns
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show(truncate=False)

+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|Finance   |NY   |162000     |34000     |
|Marketing |NY   |91000      |21000     |
|Sales     |CA   |81000      |23000     |
|Marketing |CA   |80000      |18000     |
|Finance   |CA   |189000     |47000     |
|Sales     |NY   |176000     |30000     |
+----------+-----+-----------+----------+



# PySpark Join Types | Join Two DataFrames

In [None]:
1. PySpark Join Syntax

join(self, other, on=None, how=None)

In [None]:
Join String	Equivalent SQL Join
inner	INNER JOIN
outer, full, fullouter, full_outer	FULL OUTER JOIN
left, leftouter, left_outer	LEFT JOIN
right, rightouter, right_outer	RIGHT JOIN
cross	
anti, leftanti, left_anti	
semi, leftsemi, left_semi	

In [95]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

3. PySpark Inner Join DataFrame

In [96]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



4. PySpark Full Outer Join

In [97]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter") \
    .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+-

5. PySpark Left Outer Join

In [106]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter").show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|n

In [None]:
6.PySpark right Outer Join

In [108]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right") \
    .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [None]:
7. Left Semi Join

In [109]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi") \
   .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+



In [None]:
8. Left Anti Join

In [110]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti") \
   .show(truncate=False)

+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+



In [None]:
9. PySpark Self Join

In [111]:
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+



In [None]:
# Using SQL Expression

In [112]:
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+-

In [None]:
# PySpark SQL Join on multiple DataFrames

In [None]:
df1.join(df2,df1.id1 == df2.id2,"inner") \
   .join(df3,df1.id1 == df3.id3,"inner")

# PySpark UDF Introduction

UDF’s a.k.a User Defined Functions

PySpark UDF’s are similar to UDF on traditional databases.

In PySpark, you create a function in a Python syntax and wrap it with PySpark SQL "udf()" or register it as udf and use it on DataFrame and SQL respectively.

Create PySpark UDF

2.1 Create a DataFrame

In [115]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



2.2 Create a Python Function

In [117]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

2.3 Convert a Python function to PySpark UDF

In [118]:
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z),StringType())

In [119]:
""" Converting function to UDF 
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z)) 

3. Using UDF with DataFrame

3 .1 Using UDF with PySpark DataFrame select()

In [120]:
df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



3.2 Using UDF with PySpark DataFrame withColumn()

In [121]:
def upperCase(str):
    return str.upper()

In [122]:
upperCaseUDF = udf(lambda z:upperCase(z),StringType())   

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
  .show(truncate=False)

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



# PySpark When Otherwise | SQL Case When Usage

PySpark When Otherwise – when() is a SQL function that returns a Column type and otherwise() is a function of Column, if otherwise() is not used, it returns a None/NULL value.

PySpark SQL Case When – This is similar to SQL expression, Usage: CASE WHEN cond1 THEN result WHEN cond2 THEN result... ELSE result END.

In [123]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("James","M",60000),("Michael","M",70000),
        ("Robert",None,400000),("Maria","F",500000),
        ("Jen","",None)]

columns = ["name","gender","salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.show()

+-------+------+------+
|   name|gender|salary|
+-------+------+------+
|  James|     M| 60000|
|Michael|     M| 70000|
| Robert|  null|400000|
|  Maria|     F|500000|
|    Jen|      |  null|
+-------+------+------+



1. Using when() otherwise() on PySpark DataFrame.

In [None]:
# when(condition).otherwise(default)

when() function take 2 parameters, first param takes a condition and second takes a literal value or Column, if condition evaluates to true then it returns a value from second param.

In [125]:
from pyspark.sql.functions import when 
df2 = df.withColumn("new_gender", when(df.gender == "M","Male")
                                 .when(df.gender == "F","Female")
                                 .when(df.gender.isNull() ,"")
                                 .otherwise(df.gender))
df2.show()

+-------+------+------+----------+
|   name|gender|salary|new_gender|
+-------+------+------+----------+
|  James|     M| 60000|      Male|
|Michael|     M| 70000|      Male|
| Robert|  null|400000|          |
|  Maria|     F|500000|    Female|
|    Jen|      |  null|          |
+-------+------+------+----------+



In [126]:
# Using with select()
df2=df.select(col("*"),when(df.gender == "M","Male")
                  .when(df.gender == "F","Female")
                  .when(df.gender.isNull() ,"")
                  .otherwise(df.gender).alias("new_gender"))

2. PySpark SQL Case When on DataFrame.

Syntax of SQL CASE WHEN ELSE END


CASE
    WHEN condition1 THEN result_value1
    WHEN condition2 THEN result_value2
    -----
    -----
    ELSE result
END;

In [None]:
-"CASE" is the start of the expression
-Clause "WHEN" takes a condition, if condition true it returns a value from THEN
-If the condition is false it goes to the next condition and so on.
-If none of the condition matches, it returns a value from the "ELSE" clause.
-END is to end the expression

2.1 Using Case When Else on DataFrame using withColumn() & select()

In [None]:
from pyspark.sql.functions import expr, col

#Using Case When on withColumn()
df3 = df.withColumn("new_gender", expr("CASE WHEN gender = 'M' THEN 'Male' " + 
               "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
               "ELSE gender END"))
df3.show(truncate=False)

In [127]:
#Using Case When on select()
df4 = df.select(col("*"), expr("CASE WHEN gender = 'M' THEN 'Male' " +
           "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
           "ELSE gender END").alias("new_gender"))

2.2 Using Case When on SQL Expression

In [128]:
df.createOrReplaceTempView("EMP")
spark.sql("select name, CASE WHEN gender = 'M' THEN 'Male' " + 
               "WHEN gender = 'F' THEN 'Female' WHEN gender IS NULL THEN ''" +
              "ELSE gender END as new_gender from EMP").show()

+-------+----------+
|   name|new_gender|
+-------+----------+
|  James|      Male|
|Michael|      Male|
| Robert|          |
|  Maria|    Female|
|    Jen|          |
+-------+----------+



2.3. Multiple Conditions using & and | operator

In [None]:
df5.withColumn("new_column", when(col("code") == "a" | col("code") == "d", "A")
      .when(col("code") == "b" & col("amt") == "4", "B")
      .otherwise("A1")).show()

In [None]:
+---+----+---+----------+
| id|code|amt|new_column|
+---+----+---+----------+
| 66|   a|  4|         A|
| 67|   a|  0|         A|
| 70|   b|  4|         B|
| 71|   d|  4|         A|
+---+----+---+----------+

# PySpark Create DataFrame from List

In [131]:
dept = [("Finance",10), 
        ("Marketing",20), 
        ("Sales",30), 
        ("IT",40) 
      ]
dept

[('Finance', 10), ('Marketing', 20), ('Sales', 30), ('IT', 40)]

In [132]:
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



In [None]:
Now, let’s add a columns using Schema.

In [None]:
from pyspark.sql.types import StructType,StructField, StringType
deptSchema = StructType([       
    StructField('firstname', StringType(), True),
    StructField('middlename', StringType(), True),
    StructField('lastname', StringType(), True)
])

deptDF = spark.createDataFrame(data=dept, schema = deptSchema)
deptDF.printSchema()
deptDF.show(truncate=False)

# PySpark Create DataFrame From Dictionary (Dict)

In [134]:
dataDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'red','eye':'grey'}),
        ('Jefferson',{'hair':'red','eye':''})
        ]

In [135]:
df = spark.createDataFrame(data=dataDictionary, schema = ["name","properties"])
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|name      |properties                   |
+----------+-----------------------------+
|James     |{eye -> brown, hair -> black}|
|Michael   |{eye -> null, hair -> brown} |
|Robert    |{eye -> black, hair -> red}  |
|Washington|{eye -> grey, hair -> red}   |
|Jefferson |{eye -> , hair -> red}       |
+----------+-----------------------------+



Create a DataFrame Dictionary Column Using StructType

PySpark doesn’t have a Dictionary type instead it uses MapType to store the dictionary object.

MapType(StringType(),StringType()) – Here both key and value is a StringType.

In [136]:
from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
  StructField('name', StringType(), True),
  StructField('properties', MapType(StringType(),StringType()),True)
])
df2 = spark.createDataFrame(data=dataDictionary, schema = schema)

Extract Values from DataFrame Dictionary Column

In [137]:
df.rdd.map(lambda x: 
    (x.name,x.properties["hair"],x.properties["eye"])
  ).toDF(["hair","eye"]).show()

+----------+-----+-----+
|      hair|  eye|   _3|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| null|
|    Robert|  red|black|
|Washington|  red| grey|
| Jefferson|  red|     |
+----------+-----+-----+



PySpark – Cast Column Type With Examples

In [None]:
# 1. Cast Column Type With Example

In [138]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com') \
                    .getOrCreate()

simpleData = [("James",34,"2006-01-01","true","M",3000.60),
    ("Michael",33,"1980-01-10","true","F",3300.80),
    ("Robert",37,"06-01-1992","false","M",5000.50)
  ]

columns = ["firstname","age","jobStartDate","isGraduated","gender","salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.show(truncate=False)

+---------+---+------------+-----------+------+------+
|firstname|age|jobStartDate|isGraduated|gender|salary|
+---------+---+------------+-----------+------+------+
|James    |34 |2006-01-01  |true       |M     |3000.6|
|Michael  |33 |1980-01-10  |true       |F     |3300.8|
|Robert   |37 |06-01-1992  |false      |M     |5000.5|
+---------+---+------------+-----------+------+------+



In [140]:
# Convert String to Integer Type
df.withColumn("age",df.age.cast(IntegerType()))
df.withColumn("age",df.age.cast('int'))
df.withColumn("age",df.age.cast('integer'))

# Using select
#df.select(col("age").cast('int').alias("age"))

#Using selectExpr()
#df.selectExpr("cast(age as int) age")

#Using with spark.sql()
#spark.sql("SELECT INT(age),BOOLEAN(isGraduated),DATE(jobStartDate) from CastExample")

DataFrame[firstname: string, age: int, jobStartDate: string, isGraduated: string, gender: string, salary: double]

In [141]:
pip list 

Package                            Version
---------------------------------- -------------------
alabaster                          0.7.12
anaconda-client                    1.7.2
anaconda-navigator                 2.0.3
anaconda-project                   0.9.1
anyio                              2.2.0
appdirs                            1.4.4
applaunchservices                  0.2.1
appnope                            0.1.2
appscript                          1.1.2
argh                               0.26.2
argon2-cffi                        20.1.0
asn1crypto                         1.4.0
astroid                            2.5
astropy                            4.2.1
async-generator                    1.10
atomicwrites                       1.4.0
attrs                              20.3.0
autopep8                           1.5.6
Babel                              2.9.0
backcall                           0.2.0
backports.functools-lru-cache      1.6.4
backports.shutil-

Note: you may need to restart the kernel to use updated packages.


In [142]:
pip freeze >requirement.txt

Note: you may need to restart the kernel to use updated packages.


# PySpark SQL Date and Timestamp Functions

https://sparkbyexamples.com/pyspark/pyspark-sql-date-and-timestamp-functions/

Date and Time are very important if you are using PySpark for ETL. Most of all these functions accept input as, Date type, Timestamp type, or String. 

DateType default format is yyyy-MM-dd 
TimestampType default format is yyyy-MM-dd HH:mm:ss.SSSS
Returns null if the input is a string that can not be cast to Date or Timestamp.

In [144]:
# PySpark SQL Date and Timestamp Functions Examples

In [143]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create SparkSession
spark = SparkSession.builder \
            .appName('SparkByExamples.com') \
            .getOrCreate()
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data,["id","input"])
df.show()

+---+----------+
| id|     input|
+---+----------+
|  1|2020-02-01|
|  2|2019-03-01|
|  3|2021-03-01|
+---+----------+



In [None]:
# current_date()

In [145]:
#current_date()
df.select(current_date().alias("current_date")).show(1)

+------------+
|current_date|
+------------+
|  2021-09-07|
+------------+
only showing top 1 row



In [146]:
# date_format()

In [147]:
#date_format()
df.select(col("input"), 
    date_format(col("input"), "MM-dd-yyyy").alias("date_format") 
  ).show()

+----------+-----------+
|     input|date_format|
+----------+-----------+
|2020-02-01| 02-01-2020|
|2019-03-01| 03-01-2019|
|2021-03-01| 03-01-2021|
+----------+-----------+



In [148]:
# to_date()

In [149]:
#to_date()
# Below example converts string in date format yyyy-MM-dd to 
#a DateType yyyy-MM-dd using to_date()
df.select(col("input"), 
    to_date(col("input"), "yyy-MM-dd").alias("to_date") 
  ).show()

+----------+----------+
|     input|   to_date|
+----------+----------+
|2020-02-01|2020-02-01|
|2019-03-01|2019-03-01|
|2021-03-01|2021-03-01|
+----------+----------+



In [None]:
# datediff()

In [150]:
#datediff()
# The below example returns the difference between 
#two dates using datediff()
df.select(col("input"), 
    datediff(current_date(),col("input")).alias("datediff")  
  ).show()

+----------+--------+
|     input|datediff|
+----------+--------+
|2020-02-01|     584|
|2019-03-01|     921|
|2021-03-01|     190|
+----------+--------+



In [None]:
# months_between()

In [151]:
#months_between()
# The below example returns the months between two dates 
# using months_between().
df.select(col("input"), 
    months_between(current_date(),col("input")).alias("months_between")  
  ).show()

+----------+--------------+
|     input|months_between|
+----------+--------------+
|2020-02-01|   19.19354839|
|2019-03-01|   30.19354839|
|2021-03-01|    6.19354839|
+----------+--------------+



In [152]:
# trunc()

In [None]:
#trunc()
# The below example truncates the date at a specified unit 
#using trunc().
df.select(col("input"), 
    trunc(col("input"),"Month").alias("Month_Trunc"), 
    trunc(col("input"),"Year").alias("Month_Year"), 
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()

In [None]:
# add_months() , date_add(), date_sub()
#  we are adding and subtracting date and month from a given input.

In [153]:
#add_months() , date_add(), date_sub()
df.select(col("input"), 
    add_months(col("input"),3).alias("add_months"), 
    add_months(col("input"),-3).alias("sub_months"), 
    date_add(col("input"),4).alias("date_add"), 
    date_sub(col("input"),4).alias("date_sub") 
  ).show()


+----------+----------+----------+----------+----------+
|     input|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+



In [None]:
# year(), month(), month(),next_day(), weekofyear()

In [154]:
df.select(col("input"), 
     year(col("input")).alias("year"), 
     month(col("input")).alias("month"), 
     next_day(col("input"),"Sunday").alias("next_day"), 
     weekofyear(col("input")).alias("weekofyear") 
  ).show() 

+----------+----+-----+----------+----------+
|     input|year|month|  next_day|weekofyear|
+----------+----+-----+----------+----------+
|2020-02-01|2020|    2|2020-02-02|         5|
|2019-03-01|2019|    3|2019-03-03|         9|
|2021-03-01|2021|    3|2021-03-07|         9|
+----------+----+-----+----------+----------+



In [None]:
# dayofweek(), dayofmonth(), dayofyear()

In [155]:
df.select(col("input"),  
     dayofweek(col("input")).alias("dayofweek"), 
     dayofmonth(col("input")).alias("dayofmonth"), 
     dayofyear(col("input")).alias("dayofyear"), 
  ).show()


+----------+---------+----------+---------+
|     input|dayofweek|dayofmonth|dayofyear|
+----------+---------+----------+---------+
|2020-02-01|        7|         1|       32|
|2019-03-01|        6|         1|       60|
|2021-03-01|        2|         1|       60|
+----------+---------+----------+---------+



In [None]:
# current_timestamp()
# Following are the Timestamp Functions that you can use on SQL 
# and on DataFrame

In [156]:
data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

+---+-----------------------+
|id |input                  |
+---+-----------------------+
|1  |02-01-2020 11 01 19 06 |
|2  |03-01-2019 12 01 19 406|
|3  |03-01-2021 12 01 19 406|
+---+-----------------------+



In [None]:
# Below example returns the current timestamp in spark default 
# format yyyy-MM-dd HH:mm:ss

In [157]:
#current_timestamp()
df2.select(current_timestamp().alias("current_timestamp")
  ).show(1,truncate=False)


+--------------------------+
|current_timestamp         |
+--------------------------+
|2021-09-07 04:38:00.292697|
+--------------------------+
only showing top 1 row



In [None]:
# to_timestamp()

In [None]:
# Converts string timestamp to Timestamp type format.

In [158]:
#to_timestamp()
df2.select(col("input"), 
    to_timestamp(col("input"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp") 
  ).show(truncate=False)


+-----------------------+-----------------------+
|input                  |to_timestamp           |
+-----------------------+-----------------------+
|02-01-2020 11 01 19 06 |2020-02-01 11:01:19.06 |
|03-01-2019 12 01 19 406|2019-03-01 12:01:19.406|
|03-01-2021 12 01 19 406|2021-03-01 12:01:19.406|
+-----------------------+-----------------------+



# PySpark Concatenate Columns
https://sparkbyexamples.com/pyspark/pyspark-concatenate-columns/

In [None]:
# PySpark Concatenate Using concat()

In [None]:
# concat() function of Pyspark SQL is used to concatenate multiple
# DataFrame columns into a single column.

pyspark.sql.functions.concat(*cols)

In [160]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat,col
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df2=df.select(concat(df.firstname,df.middlename,df.lastname)
              .alias("FullName"),"dob","gender","salary") # concatenated three input string 
# columns(firstname, middlename, lastname) into a single string column(FullName)
df2.show(truncate=False)


+--------------+----------+------+------+
|FullName      |dob       |gender|salary|
+--------------+----------+------+------+
|JamesSmith    |1991-04-01|M     |3000  |
|MichaelRose   |2000-05-19|M     |4000  |
|RobertWilliams|1978-09-05|M     |4000  |
|MariaAnneJones|1967-12-01|F     |4000  |
|JenMaryBrown  |1980-02-17|F     |-1    |
+--------------+----------+------+------+



# PySpark split() Column into Multiple Columns

Syntax:

pyspark.sql.functions.split(str, pattern, limit=-1)

Parameters:

str – a string expression to split

pattern – a string representing a regular expression.

limit –an integer that controls the number of times pattern is applied.

In [163]:
spark=SparkSession.builder.appName("sparkbyexamples").getOrCreate()
data=data = [('James','','Smith','1991-04-01'),
  ('Michael','Rose','','2000-05-19'),
  ('Robert','','Williams','1978-09-05'),
  ('Maria','Anne','Jones','1967-12-01'),
  ('Jen','Mary','Brown','1980-02-17')
]

In [None]:
# Split Column using withColumn()

In [164]:
df1 = df.withColumn('year', split(df['dob'], '-').getItem(0)) \
       .withColumn('month', split(df['dob'], '-').getItem(1)) \
       .withColumn('day', split(df['dob'], '-').getItem(2))
df1.show(truncate=False)

+---------+----------+--------+----------+------+------+----+-----+---+
|firstname|middlename|lastname|dob       |gender|salary|year|month|day|
+---------+----------+--------+----------+------+------+----+-----+---+
|James    |          |Smith   |1991-04-01|M     |3000  |1991|04   |01 |
|Michael  |Rose      |        |2000-05-19|M     |4000  |2000|05   |19 |
|Robert   |          |Williams|1978-09-05|M     |4000  |1978|09   |05 |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |1967|12   |01 |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |1980|02   |17 |
+---------+----------+--------+----------+------+------+----+-----+---+



In [165]:
rdd = spark.sparkContext.parallelize((0,20))
print("From local[5]"+str(rdd.getNumPartitions()))

rdd1 = spark.sparkContext.parallelize((0,25), 6)
print("parallelize : "+str(rdd1.getNumPartitions()))

From local[5]8
parallelize : 6


spark.sparkContext.parallelize(Range(0,20),6)

Partition 1 : 0 1 2
Partition 2 : 3 4 5
Partition 3 : 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15
Partition 6 : 16 17 18 19

In [None]:
# 1.1 RDD repartition()

Spark RDD repartition() method is used to increase or decrease the partitions. The below example decreases the partitions from 10 to 4 by moving data from all partitions.

In [167]:
rdd2 = rdd1.repartition(4)
print("Repartition size : "+str(rdd2.getNumPartitions()))
rdd2.saveAsTextFile("/tmp/re-partition")

Repartition size : 4


Partition 1 : 1 6 10 15 19
Partition 2 : 2 3 7 11 16
Partition 3 : 4 8 12 13 17
Partition 4 : 0 5 9 14 18

In [None]:
# 1.2 RDD coalesce()

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

In [168]:
rdd3 = rdd1.coalesce(4)
print("Repartition size : "+str(rdd3.getNumPartitions()))
rdd3.saveAsTextFile("/tmp/coalesce")

Repartition size : 4


Partition 1 : 0 1 2
Partition 2 : 3 4 5 6 7 8 9
Partition 4 : 10 11 12 
Partition 5 : 13 14 15 16 17 18 19

# PySpark RDD Benefits
PySpark is widely adapted in Machine learning and Data science community due to it’s advantages compared with traditional python programming.

# In-Memory Processing
PySpark loads the data from disk and process in memory and keeps the data in memory, this is the main difference between PySpark and Mapreduce (I/O intensive). In between the transformations, we can also cache/persists the RDD in memory to reuse the previous computations.

# Immutability
PySpark RDD’s are immutable in nature meaning, once RDDs are created you cannot modify. When we apply transformations on RDD, PySpark creates a new RDD and maintains the RDD Lineage.

# Fault Tolerance
PySpark operates on fault-tolerant data stores on HDFS, S3 e.t.c hence any RDD operation fails, it automatically reloads the data from other partitions. Also, When PySpark applications running on a cluster, PySpark task failures are automatically recovered for a certain number of times (as per the configuration) and finish the application seamlessly.

# Lazy Evolution
PySpark does not evaluate the RDD transformations as they appear/encountered by Driver instead it keeps the all transformations as it encounters(DAG) and evaluates the all transformation when it sees the first RDD action.

# Partitioning
When you create RDD from a data, It by default partitions the elements in a RDD. By default it partitions to the number of cores available.

#PySpark RDD Limitations
PySpark RDDs are not much suitable for applications that make updates to the state store such as storage systems for a web application. For these applications, it is more efficient to use systems that perform traditional update logging and data checkpointing, such as databases. The goal of RDD is to provide an efficient programming model for batch analytics and leave these asynchronous applications.



# RDD Transformations

flatMap – flatMap() transformation flattens the RDD after applying the function and returns a new RDD.

In [None]:
rdd2 = rdd.flatMap(lambda x: x.split(" "))

map – map() transformation is used the apply any complex operations like adding a column, updating a column 

In [None]:
rdd3 = rdd2.map(lambda x: (x,1))

reduceByKey – reduceByKey() merges the values for each key with the function specified. 

In [None]:
rdd5 = rdd3.reduceByKey(lambda a,b: a+b)

sortByKey – sortByKey() transformation is used to sort RDD elements on key. 

In [None]:
rdd6 = rdd5.map(lambda x: (x[1],x[0])).sortByKey()
#Print rdd6 result to console
print(rdd6.collect())

filter – filter() transformation is used to filter the records in an RDD. In our example we are filtering all words starts with “a”.

In [None]:
rdd4 = rdd3.filter(lambda x : 'an' in x[1])
print(rdd4.collect())

In [169]:
# Convert PySpark RDD to DataFrame

In [None]:
1. Create PySpark RDD

In [170]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = spark.sparkContext.parallelize(dept)

In [None]:
2. Convert PySpark RDD to DataFrame

Converting PySpark RDD to DataFrame can be done using 

# 1.toDF(), 

# 2.createDataFrame(). 

https://sparkbyexamples.com/pyspark/convert-pyspark-rdd-to-dataframe/

2.1 Using rdd.toDF() function
PySpark provides toDF() function in RDD which can be used to convert RDD into Dataframe

In [171]:
df = rdd.toDF()
df.printSchema()
df.show(truncate=False)

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+---------+---+
|_1       |_2 |
+---------+---+
|Finance  |10 |
|Marketing|20 |
|Sales    |30 |
|IT       |40 |
+---------+---+



toDF() has another signature that takes arguments to define column names as shown below.

In [172]:
deptColumns = ["dept_name","dept_id"]
df2 = rdd.toDF(deptColumns)
df2.printSchema()
df2.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



# 2.2 Using PySpark createDataFrame() function

SparkSession class provides createDataFrame() method to create DataFrame and it takes rdd object as an argument.

In [174]:
deptDF = spark.createDataFrame(rdd, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



In [175]:
# 2.3 Using createDataFrame() with StructType schema

In [176]:
from pyspark.sql.types import StructType,StructField, StringType
deptSchema = StructType([       
    StructField('dept_name', StringType(), True),
    StructField('dept_id', StringType(), True)
])

deptDF1 = spark.createDataFrame(rdd, schema = deptSchema)
deptDF1.printSchema()
deptDF1.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: string (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



In [None]:
# 1. Window Functions

PySpark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row. 

PySpark SQL supports three kinds of window functions:-

ranking functions

analytic functions

aggregate functions

In [2]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [None]:
# 2. PySpark Window Ranking functions

# row_number() 
window function is used to give the sequential row number starting from 1 to the result of each window partition.

In [7]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
+-------------+----------+------+----------+



# rank Window Function

rank() window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

In [5]:
"""rank"""
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
    .show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
+-------------+----------+------+----+



# dense_rank Window Function


dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps. 

This is similar to rank() function difference being rank function leaves gaps in rank when there are ties.

2.4 percent_rank Window Function

In [8]:
""" percent_rank """
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
+-------------+----------+------+------------+



2.5 ntile Window Function

ntile() window function returns the relative rank of result rows within a window partition. In below example we have used 2 as an argument to ntile hence it returns ranking between 2 values (1 and 2)

In [10]:
"""ntile"""
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
+-------------+----------+------+-----+



4. PySpark Window Aggregate Functions

In this section, I will explain how to calculate sum, min, max for each department using PySpark SQL Aggregate window functions and WindowSpec. When working with Aggregate functions, we don’t need to use order by clause.

In [11]:
windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|     Sales|3760.0|18800|3000|4600|
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
+----------+------+-----+----+----+



In [None]:
ask row number 
pehle where, phir select kaise sequence pta chalega


In [14]:
order_payment_orders_join=df_order.select('order_id','order_purchase_timestamp')\
.join(df_order_payments.select('order_id'),on='order_id',how='inner')

NameError: name 'df_order' is not defined

In [None]:
seller_orderreview_join=df_items.select('order_id','product_id','seller_id')\
.join(df_order_reviews.select('review_score','order_id'),on='order_id',how='inner')
.join(df_sellers.select('seller_id'),on='seller_id',how='inner')

In [None]:
# Trim

In [14]:
from pyspark.sql import SparkSession

In [15]:
val mySpark = SparkSession\
  .builder()\
  .appName("Spark SQL basic example")\
  .config("spark.some.config.option", "some-value")\
  .getOrCreate()\

// For implicit conversions like converting RDDs to DataFrames
import mySpark.implicits._

SyntaxError: invalid syntax (<ipython-input-15-aa5c07896ec8>, line 1)

In [10]:
import mySpark.implicits._
data = Seq((1,"ABC    "), (2,"     DEF"),(3,"        GHI    ") )
df = data.toDF("col1","col2")
df.show()

ModuleNotFoundError: No module named 'spark'

In [9]:


import org.apache.spark.sql.functions.{trim,ltrim,rtrim,col}

//using withColumn to remove white spaces
df.withColumn("col2",trim(col("col2"))).show()
df.withColumn("col2",ltrim(col("col2"))).show()
df.withColumn("col2",rtrim(col("col2"))).show()

//Using select to remove white spaces
df.select(col("col1"),trim(col("col2")).as("col2")).show()

//Using SQL Expression to remove white spaces
df.createOrReplaceTempView("TAB")
spark.sql("select col1,trim(col2) as col2 from TAB").show()

SyntaxError: invalid syntax (<ipython-input-9-c9db8c39472e>, line 6)