In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,ArrayType

In [4]:
spark = SparkSession.builder.appName("Filter").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/04 18:45:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/04 18:45:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/06/04 18:45:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
data = [("john doe",["Java","C++","Python"],"OH","M"),
        ("mary doe",["C++","Python"],"NY","F"),
        ("mark antony",["Python"],"OH","M"),
        ("Jinesh",["Java","C++","Python"],"NY","M")]

In [6]:
myschema = StructType([StructField("name",StringType(),True),
                       StructField("languages",ArrayType(StringType()),True),
                       StructField("State",StringType(),True),
                       StructField("gender",StringType(),True)])

In [7]:
df = spark.createDataFrame(data=data,schema=myschema)

In [8]:
df.show()

                                                                                

+-----------+-------------------+-----+------+
|       name|          languages|State|gender|
+-----------+-------------------+-----+------+
|   john doe|[Java, C++, Python]|   OH|     M|
|   mary doe|      [C++, Python]|   NY|     F|
|mark antony|           [Python]|   OH|     M|
|     Jinesh|[Java, C++, Python]|   NY|     M|
+-----------+-------------------+-----+------+



In [9]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- State: string (nullable = true)
 |-- gender: string (nullable = true)



In [10]:
df.filter(df.State == "OH").show()

+-----------+-------------------+-----+------+
|       name|          languages|State|gender|
+-----------+-------------------+-----+------+
|   john doe|[Java, C++, Python]|   OH|     M|
|mark antony|           [Python]|   OH|     M|
+-----------+-------------------+-----+------+



In [11]:
df.filter(df.State != "OH").show()

+--------+-------------------+-----+------+
|    name|          languages|State|gender|
+--------+-------------------+-----+------+
|mary doe|      [C++, Python]|   NY|     F|
|  Jinesh|[Java, C++, Python]|   NY|     M|
+--------+-------------------+-----+------+



24/06/04 18:45:35 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [12]:
df.filter("gender == 'M'").show()

+-----------+-------------------+-----+------+
|       name|          languages|State|gender|
+-----------+-------------------+-----+------+
|   john doe|[Java, C++, Python]|   OH|     M|
|mark antony|           [Python]|   OH|     M|
|     Jinesh|[Java, C++, Python]|   NY|     M|
+-----------+-------------------+-----+------+



In [13]:
df.filter("name == 'john doe'").show()

+--------+-------------------+-----+------+
|    name|          languages|State|gender|
+--------+-------------------+-----+------+
|john doe|[Java, C++, Python]|   OH|     M|
+--------+-------------------+-----+------+



In [14]:
df.filter((df.State == "OH") & (df.name == "john doe")).show()

+--------+-------------------+-----+------+
|    name|          languages|State|gender|
+--------+-------------------+-----+------+
|john doe|[Java, C++, Python]|   OH|     M|
+--------+-------------------+-----+------+



In [15]:
df.filter((df.State == "OH") | (df.name == "john doe")).show()

+-----------+-------------------+-----+------+
|       name|          languages|State|gender|
+-----------+-------------------+-----+------+
|   john doe|[Java, C++, Python]|   OH|     M|
|mark antony|           [Python]|   OH|     M|
+-----------+-------------------+-----+------+



In [16]:
df.filter((df.name.startswith("j"))).show()

+--------+-------------------+-----+------+
|    name|          languages|State|gender|
+--------+-------------------+-----+------+
|john doe|[Java, C++, Python]|   OH|     M|
+--------+-------------------+-----+------+



In [17]:
df.filter((df.name.endswith("y"))).show()

+-----------+---------+-----+------+
|       name|languages|State|gender|
+-----------+---------+-----+------+
|mark antony| [Python]|   OH|     M|
+-----------+---------+-----+------+



In [18]:
from pyspark.sql.functions import array_contains

In [19]:
test_df = df.filter(array_contains(df.languages,"Java")).show()

+--------+-------------------+-----+------+
|    name|          languages|State|gender|
+--------+-------------------+-----+------+
|john doe|[Java, C++, Python]|   OH|     M|
|  Jinesh|[Java, C++, Python]|   NY|     M|
+--------+-------------------+-----+------+



In [20]:
test_df = df.filter(array_contains(df.languages,"Python")).show()

+-----------+-------------------+-----+------+
|       name|          languages|State|gender|
+-----------+-------------------+-----+------+
|   john doe|[Java, C++, Python]|   OH|     M|
|   mary doe|      [C++, Python]|   NY|     F|
|mark antony|           [Python]|   OH|     M|
|     Jinesh|[Java, C++, Python]|   NY|     M|
+-----------+-------------------+-----+------+



In [32]:
spark = SparkSession.builder.appName("test").getOrCreate()

In [33]:
simpledata = [("james","sales","OH",34000,23,10000),
              ("John","finance","NY",44000,24,20000),
              ("mary","health","HO",54000,23,30000),
              ("arun","stocks","IN",64000,23,40000),
              ("arjun","shares","RM",74000,23,50000),
              ("mac","vegetables","OH",84000,23,60000),
              ("cameron","fruits","OH",94000,23,70000)]

In [34]:
myschema = ["employee_name","department","state","salary","age","bonus"]

In [35]:
employee_df = spark.createDataFrame(data = simpledata,schema=myschema)

In [36]:
employee_df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        james|     sales|   OH| 34000| 23|10000|
|         John|   finance|   NY| 44000| 24|20000|
|         mary|    health|   HO| 54000| 23|30000|
|         arun|    stocks|   IN| 64000| 23|40000|
|        arjun|    shares|   RM| 74000| 23|50000|
|          mac|vegetables|   OH| 84000| 23|60000|
|      cameron|    fruits|   OH| 94000| 23|70000|
+-------------+----------+-----+------+---+-----+



In [28]:
employee_df.printSchema()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)



In [37]:
employee_df.sort("employee_name","age").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|         John|   finance|   NY| 44000| 24|20000|
|        arjun|    shares|   RM| 74000| 23|50000|
|         arun|    stocks|   IN| 64000| 23|40000|
|      cameron|    fruits|   OH| 94000| 23|70000|
|        james|     sales|   OH| 34000| 23|10000|
|          mac|vegetables|   OH| 84000| 23|60000|
|         mary|    health|   HO| 54000| 23|30000|
+-------------+----------+-----+------+---+-----+



In [38]:
employee_df.sort("department","state").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|         John|   finance|   NY| 44000| 24|20000|
|      cameron|    fruits|   OH| 94000| 23|70000|
|         mary|    health|   HO| 54000| 23|30000|
|        james|     sales|   OH| 34000| 23|10000|
|        arjun|    shares|   RM| 74000| 23|50000|
|         arun|    stocks|   IN| 64000| 23|40000|
|          mac|vegetables|   OH| 84000| 23|60000|
+-------------+----------+-----+------+---+-----+



In [41]:
employee_df.sort("employee_name",ascending=[True]).show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|John         |finance   |NY   |44000 |24 |20000|
|arjun        |shares    |RM   |74000 |23 |50000|
|arun         |stocks    |IN   |64000 |23 |40000|
|cameron      |fruits    |OH   |94000 |23 |70000|
|james        |sales     |OH   |34000 |23 |10000|
|mac          |vegetables|OH   |84000 |23 |60000|
|mary         |health    |HO   |54000 |23 |30000|
+-------------+----------+-----+------+---+-----+



In [43]:
employee_df.sort(employee_df.employee_name.asc()).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|         John|   finance|   NY| 44000| 24|20000|
|        arjun|    shares|   RM| 74000| 23|50000|
|         arun|    stocks|   IN| 64000| 23|40000|
|      cameron|    fruits|   OH| 94000| 23|70000|
|        james|     sales|   OH| 34000| 23|10000|
|          mac|vegetables|   OH| 84000| 23|60000|
|         mary|    health|   HO| 54000| 23|30000|
+-------------+----------+-----+------+---+-----+



In [45]:
employee_df.sort(employee_df.department.asc()).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|         John|   finance|   NY| 44000| 24|20000|
|      cameron|    fruits|   OH| 94000| 23|70000|
|         mary|    health|   HO| 54000| 23|30000|
|        james|     sales|   OH| 34000| 23|10000|
|        arjun|    shares|   RM| 74000| 23|50000|
|         arun|    stocks|   IN| 64000| 23|40000|
|          mac|vegetables|   OH| 84000| 23|60000|
+-------------+----------+-----+------+---+-----+



In [46]:
employee_df.createOrReplaceTempView("EMP")

In [49]:
spark.sql("""select employee_name,department,state from EMP order by department asc""").show()

+-------------+----------+-----+
|employee_name|department|state|
+-------------+----------+-----+
|         John|   finance|   NY|
|      cameron|    fruits|   OH|
|         mary|    health|   HO|
|        james|     sales|   OH|
|        arjun|    shares|   RM|
|         arun|    stocks|   IN|
|          mac|vegetables|   OH|
+-------------+----------+-----+

