In [1]:
from pyspark.sql import SparkSession,SQLContext

In [2]:
# Init a Session
spark=SparkSession.builder.appName('PySpark_DataFrame').getOrCreate()

In [63]:
# Create a DataFrame
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',5000),
  ('Jen','Mary','Brown','1980-02-17','F',-1),
  ('Robert','Nguyen','Minh','1999-11-05','M',10000)
]
columns = ["firstname","middlename","lastname","dob","gender","salary"]

df=spark.createDataFrame(data,columns)

In [None]:
# Set alias to column
df.select(df.firstname.alias("first_name")).show()


In [26]:
# Sort the column
df=df.sort(df.salary)

In [37]:
# Change DataType
df.salary.cast("Float")
df.salary.astype("Float")

In [44]:
# Check value beetween lower and upper bound
df.filter(df.salary.between(3000,4000)).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
+---------+----------+--------+----------+------+------+



In [45]:
# Check contain
df.filter(df.firstname.contains("Robert")).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|   Robert|          |Williams|1978-09-05|     M|  4000|
+---------+----------+--------+----------+------+------+



In [48]:
# Check start and end with value
df.filter(df.firstname.startswith("M")).show()
df.filter(df.firstname.endswith("M")).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  5000|
+---------+----------+--------+----------+------+------+



In [50]:
# Check Null or not Null values
df.filter(df.firstname.isNotNull()).show()
df.filter(df.firstname.isNull()).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  5000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+---+------+------+
|firstname|middlename|lastname|dob|gender|salary|
+---------+----------+--------+---+------+------+
+---------+----------+--------+---+------+------+



In [51]:
# Similar to SQL LIKE expression
df.filter(df.firstname.like("%ria")).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    Maria|      Anne|   Jones|1967-12-01|     F|  5000|
+---------+----------+--------+----------+------+------+



In [None]:
# Returns a Column after getting sub string from the Column
df.select(df.firstname.substr(1,2).alias("substr")).show()

In [57]:
# When and Otherwise
from pyspark.sql.functions import when 

df.select(df.firstname,df.lastname,df.gender, when(df.gender == "M","Male") \
                                   .when(df.gender == "F","Female") \
                                   .when(df.gender == None,"") \
                                   .otherwise(df.gender).alias("new_gender") \
         ).show()

+---------+--------+------+----------+
|firstname|lastname|gender|new_gender|
+---------+--------+------+----------+
|    James|   Smith|     M|      Male|
|  Michael|        |     M|      Male|
|   Robert|Williams|     M|      Male|
|    Maria|   Jones|     F|    Female|
|      Jen|   Brown|     F|    Female|
+---------+--------+------+----------+



In [59]:
# Update Values of an Existing Column
df.withColumn("salary",df.salary*1.5).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|4500.0|
|  Michael|      Rose|        |2000-05-19|     M|6000.0|
|   Robert|          |Williams|1978-09-05|     M|6000.0|
|    Maria|      Anne|   Jones|1967-12-01|     F|7500.0|
|      Jen|      Mary|   Brown|1980-02-17|     F|  -1.5|
+---------+----------+--------+----------+------+------+



In [60]:
# Create a Column from an Existing Column
df.withColumn("new_salry",df.salary*2).show()

+---------+----------+--------+----------+------+------+---------+
|firstname|middlename|lastname|       dob|gender|salary|new_salry|
+---------+----------+--------+----------+------+------+---------+
|    James|          |   Smith|1991-04-01|     M|  3000|     6000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|     8000|
|   Robert|          |Williams|1978-09-05|     M|  4000|     8000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  5000|    10000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|       -2|
+---------+----------+--------+----------+------+------+---------+



In [62]:
# Rename Column name
df.withColumnRenamed("gender","sex").show()

+---------+----------+--------+----------+---+------+
|firstname|middlename|lastname|       dob|sex|salary|
+---------+----------+--------+----------+---+------+
|    James|          |   Smith|1991-04-01|  M|  3000|
|  Michael|      Rose|        |2000-05-19|  M|  4000|
|   Robert|          |Williams|1978-09-05|  M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|  F|  5000|
|      Jen|      Mary|   Brown|1980-02-17|  F|    -1|
+---------+----------+--------+----------+---+------+



In [67]:
# Drop Duplicates
df.dropDuplicates().show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    James|          |   Smith|1991-04-01|     M|  3000|
|   Robert|    Nguyen|    Minh|1999-11-05|     M| 10000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  5000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [73]:
#Running more aggregates at a time
from pyspark.sql.functions import sum,avg,max
df.groupBy("gender").agg(sum("salary").alias("sum_salary"), \
                         avg("salary").alias("avg_salary"), \
                         max("salary").alias("max_salary")       
                         ) \
                    .show()

+------+----------+----------+----------+
|gender|sum_salary|avg_salary|max_salary|
+------+----------+----------+----------+
|     F|      4999|    2499.5|      5000|
|     M|     21000|    5250.0|     10000|
+------+----------+----------+----------+



In [76]:
# Create a DataFrame
data = [('James','','Smith','1991-04-01','M',3000,'NY'),
  ('Michael','Rose','','2000-05-19','M',4000,'WA'),
  ('Robert','','Williams','1978-09-05','M',4000,'NY'),
  ('Maria','Anne','Jones','1967-12-01','F',5000,'LA'),
  ('Jen','Mary','Brown','1980-02-17','F',-1,'FL'),
  ('Robert','Nguyen','Minh','1999-11-05','M',10000,'AL')
]
columns = ["firstname","middlename","lastname","dob","gender","salary","address"]

df=spark.createDataFrame(data,columns)

data2 = [("'NY'","New York"), \
    ("WA"," West Arizona"), \
    ("FL","Florida"), \
    ("AL","Alaska") \
  ]
colums2 = ["name","fullname"]
df2=spark.createDataFrame(data2,colums2)

In [81]:
# Join DataFrames
df.join(df2,df.address==df2.name,"left").select(["firstname","middlename","lastname","dob","gender","salary","address","fullname"]).show()

+---------+----------+--------+----------+------+------+-------+-------------+
|firstname|middlename|lastname|       dob|gender|salary|address|     fullname|
+---------+----------+--------+----------+------+------+-------+-------------+
|    Maria|      Anne|   Jones|1967-12-01|     F|  5000|     LA|         null|
|  Michael|      Rose|        |2000-05-19|     M|  4000|     WA| West Arizona|
|   Robert|    Nguyen|    Minh|1999-11-05|     M| 10000|     AL|       Alaska|
|    James|          |   Smith|1991-04-01|     M|  3000|     NY|         null|
|   Robert|          |Williams|1978-09-05|     M|  4000|     NY|         null|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|     FL|      Florida|
+---------+----------+--------+----------+------+------+-------+-------------+



In [96]:
# UDF
from pyspark.sql.functions import udf

def nhan(str):
    return str.upper()

convertUDF=udf(lambda a:nhan(a))

df.withColumn("test",convertUDF("lastname")).show()

+---------+----------+--------+----------+------+------+-------+--------+
|firstname|middlename|lastname|       dob|gender|salary|address|    test|
+---------+----------+--------+----------+------+------+-------+--------+
|    James|          |   Smith|1991-04-01|     M|  3000|     NY|   SMITH|
|  Michael|      Rose|        |2000-05-19|     M|  4000|     WA|        |
|   Robert|          |Williams|1978-09-05|     M|  4000|     NY|WILLIAMS|
|    Maria|      Anne|   Jones|1967-12-01|     F|  5000|     LA|   JONES|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|     FL|   BROWN|
|   Robert|    Nguyen|    Minh|1999-11-05|     M| 10000|     AL|    MINH|
+---------+----------+--------+----------+------+------+-------+--------+



+---------+----------+--------+----------+------+------+-------+
|firstname|middlename|lastname|       dob|gender|salary|address|
+---------+----------+--------+----------+------+------+-------+
|    James|          |   Smith|1991-04-01|     M|  3000|     NY|
|  Michael|      Rose|        |2000-05-19|     M|  4000|     WA|
|   Robert|          |Williams|1978-09-05|     M|  4000|     NY|
|    Maria|      Anne|   Jones|1967-12-01|     F|  5000|     LA|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|     FL|
|   Robert|    Nguyen|    Minh|1999-11-05|     M| 10000|     AL|
+---------+----------+--------+----------+------+------+-------+

