In [79]:
# Create DataFrame
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("pysparkdemo") \
.getOrCreate()
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

In [80]:
cols=["fname","mname","lname","dob","gender","salary"]
df=spark.createDataFrame(data=data,schema=cols)

In [81]:
df.printSchema()

root
 |-- fname: string (nullable = true)
 |-- mname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [82]:
df.withColumnRenamed("dob","dateofbirth")

DataFrame[fname: string, mname: string, lname: string, dateofbirth: string, gender: string, salary: bigint]

In [83]:
df.show()

+-------+-----+--------+----------+------+------+
|  fname|mname|   lname|       dob|gender|salary|
+-------+-----+--------+----------+------+------+
|  James|     |   Smith|1991-04-01|     M|  3000|
|Michael| Rose|        |2000-05-19|     M|  4000|
| Robert|     |Williams|1978-09-05|     M|  4000|
|  Maria| Anne|   Jones|1967-12-01|     F|  4000|
|    Jen| Mary|   Brown|1980-02-17|     F|    -1|
+-------+-----+--------+----------+------+------+



In [84]:
from pyspark.sql.functions import *
df1=df.select(col("fname").alias("firstname"),col("lname").alias("lastname"))

In [85]:
df1.show()

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|        |
|   Robert|Williams|
|    Maria|   Jones|
|      Jen|   Brown|
+---------+--------+



In [86]:
df1.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)



In [87]:
df.select("fname","lname").show()

+-------+--------+
|  fname|   lname|
+-------+--------+
|  James|   Smith|
|Michael|        |
| Robert|Williams|
|  Maria|   Jones|
|    Jen|   Brown|
+-------+--------+



In [None]:
df.show()

In [None]:
df.select(*cols).show()
df.select([col for col in df.columns]).show()
df.select("*").show()

In [None]:
df.select(df.columns[:3]).show(2)
df.select(df.columns[2:3]).show(2)

In [None]:
data = [(('James','','Smith'),"OH","M"),
  (('Michael','Rose',''),'IN','M'),
  (('Robert','','Williams'),'GA','M'),
  (('Maria','Anne','Jones'),'CU','F'),
  (('Jen','Mary','Brown'),'NY','F')
]



In [None]:
from pyspark.sql.types import StructType,StructField, StringType        
schema = StructType([
    StructField('name', StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
         ])),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
     ])

In [None]:
df2 = spark.createDataFrame(data = data, schema = schema)
df2.printSchema()


In [None]:
df2.show(truncate=False) # shows all columns



In [None]:
df2.select("name").show(truncate=False)


In [None]:
df2.select("name.firstname","name.lastname").show(truncate=False)


In [None]:
df2.select("name.*").show(truncate=False)

In [None]:
df.filter(df.gender=="M").show()

In [None]:
df2.filter(df2.state=="OH").show()
df2.filter(df2.state!="OH").show()

In [None]:
df2.filter(df2.state.startswith("O")).show()
df2.filter(df2.state.endswith("N")).show()
df2.filter(df2.state.contains("H")).show()

In [None]:
df2.filter(df2.name.firstname.like("%ame%")).show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField 
from pyspark.sql.types import StringType, IntegerType, ArrayType

# Create SparkSession object
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# Create data
data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]

In [None]:
# Create schema        
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])

In [None]:
# Create dataframe
df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

In [None]:
df.filter(df.state=="OH").show()

In [None]:
df.filter(df.state!="OH").show()

In [None]:
from pyspark.sql.functions import col
df.filter(col("state") == "OH") \
    .show(truncate=False) 


In [None]:
# Using SQL Expression
df.filter("gender == 'M'").show()
# For not equal
df.filter("gender != 'M'").show()
df.filter("gender <> 'M'").show()

In [None]:
# Filter multiple conditions
df.filter( (df.state  == "OH") & (df.gender  == "M") ) \
    .show(truncate=False)  

In [None]:
# Filter using OR operator
df.filter( (df.state  == "OH") | (df.gender  == "M") ) \
    .show(truncate=False)  

In [None]:
li=["OH","CA","DE"]
df.filter(df.state.isin(li)).show()

In [None]:
df.filter(df.state.startswith("N")).show()
df.filter(df.state.endswith("H")).show()
df.filter(df.state.contains("H")).show()

In [None]:

df.printSchema()



In [None]:
df.filter(df.name.lastname.like("%il%")).show()



In [None]:
from pyspark.sql.functions import array_contains
df.filter(array_contains(df.languages,"Java")) \
    .show(truncate=False) 

In [None]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
# Create SparkSession

df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

In [None]:
df.sort("department","state").show(truncate=False)
df.sort(col("department"),col("state")).show(truncate=False)

In [None]:
df.orderBy("department","state").show(truncate=False)
df.orderBy(col("department"),col("state")).show(truncate=False)

In [None]:
df.sort(df.department.asc(),df.state.desc()).show(truncate=False)
df.sort(col("department").asc(),col("state").desc()).show(truncate=False)
df.orderBy(col("department").asc(),col("state").desc()).show(truncate=False)

In [None]:
df.createOrReplaceTempView("EMP")
spark.sql("select employee_name,department,state,salary,age,bonus from EMP ORDER BY department asc").show(truncate=False)

In [None]:
df.groupBy("department").sum("salary").show()

In [None]:
df.groupBy("department").count().show()
df.groupBy("department").min("salary").show()
df.groupBy("department").max("salary").show()
df.groupBy("department").mean( "salary").show() 

In [None]:
df.groupBy("department","state") \
    .sum("salary","bonus") \
    .show()

In [None]:

from pyspark.sql.functions import sum,avg,max

# Running more aggregations
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ) \
    .show(truncate=False)

In [None]:
from pyspark.sql.functions import sum,avg,max

# Using filter on aggregate data
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
      avg("salary").alias("avg_salary"), \
      sum("bonus").alias("sum_bonus"), \
      max("bonus").alias("max_bonus")) \
    .where(col("sum_bonus") >= 50000) \
    .show(truncate=False)

In [None]:

# Register DataFrame as a temporary view
df.createOrReplaceTempView("employees")

# Using SQL Query
sql_string = """SELECT department,
       SUM(salary) AS sum_salary,
       AVG(salary) AS avg_salary,
       SUM(bonus) AS sum_bonus,
       MAX(bonus) AS max_bonus
FROM employees
GROUP BY department
HAVING SUM(bonus) >= 50000"""

# Execute SQL query against the temporary view
df2 = spark.sql(sql_string)
df2.show()