<a href="https://colab.research.google.com/github/Jayakumar1305/Spark1/blob/main/SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

SparkSQL


In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType


schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("bonus", IntegerType(), True),
    StructField("skills", StringType(), True),
    StructField("roles", ArrayType(StringType()), True),
    StructField("properties", MapType(StringType(), StringType()), True),
    StructField("joining_date", StringType(), True)
])

# Creating a DataFrame with the schema
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSchemaExample").getOrCreate()

data = [
    (1, "Alice", "Smith", "alice@gmail.com", "IT", 6000, 1000, "Python,Java", ["Manager", "Developer"], {"key1": "value1", "key2": "value2"}, "2021-06-15"),
    (2, "Bob", "Brown", "bob@gmail.com", "HR", 5000, None, "Recruitment,HR", ["HR", "Manager"], {"key3": "value3"}, "2022-07-20"),
    (3, "Charlie", "Davis", "charlie@gmail.com", "IT", 7000, 1200, "Python,Scala", ["Developer"], {"key4": "value4"}, "2019-03-10"),
    (4, "David", "Wilson", "david@gmail.com", "Finance", 4500, 800, "Accounting,Excel", ["Analyst"], {}, "2020-11-25"),
    (5, "Emma", "Johnson", "emma@gmail.com", "IT", 5500, None, "Python,SQL", ["Developer"], {"key5": "value5", "key6": "value6"}, "2023-01-05")
]

df = spark.createDataFrame(data, schema)


In [6]:
df.show()

+---+----------+---------+-----------------+----------+------+-----+----------------+--------------------+--------------------+------------+
| id|first_name|last_name|            email|department|salary|bonus|          skills|               roles|          properties|joining_date|
+---+----------+---------+-----------------+----------+------+-----+----------------+--------------------+--------------------+------------+
|  1|     Alice|    Smith|  alice@gmail.com|        IT|  6000| 1000|     Python,Java|[Manager, Developer]|{key1 -> value1, ...|  2021-06-15|
|  2|       Bob|    Brown|    bob@gmail.com|        HR|  5000| NULL|  Recruitment,HR|       [HR, Manager]|    {key3 -> value3}|  2022-07-20|
|  3|   Charlie|    Davis|charlie@gmail.com|        IT|  7000| 1200|    Python,Scala|         [Developer]|    {key4 -> value4}|  2019-03-10|
|  4|     David|   Wilson|  david@gmail.com|   Finance|  4500|  800|Accounting,Excel|           [Analyst]|                  {}|  2020-11-25|
|  5|      Em

Creta a spark dataframe to SQL table for temporary view

In [8]:
df.createOrReplaceTempView("employees")

In [10]:
spark.sql("select * from employees").show()

+---+----------+---------+-----------------+----------+------+-----+----------------+--------------------+--------------------+------------+
| id|first_name|last_name|            email|department|salary|bonus|          skills|               roles|          properties|joining_date|
+---+----------+---------+-----------------+----------+------+-----+----------------+--------------------+--------------------+------------+
|  1|     Alice|    Smith|  alice@gmail.com|        IT|  6000| 1000|     Python,Java|[Manager, Developer]|{key1 -> value1, ...|  2021-06-15|
|  2|       Bob|    Brown|    bob@gmail.com|        HR|  5000| NULL|  Recruitment,HR|       [HR, Manager]|    {key3 -> value3}|  2022-07-20|
|  3|   Charlie|    Davis|charlie@gmail.com|        IT|  7000| 1200|    Python,Scala|         [Developer]|    {key4 -> value4}|  2019-03-10|
|  4|     David|   Wilson|  david@gmail.com|   Finance|  4500|  800|Accounting,Excel|           [Analyst]|                  {}|  2020-11-25|
|  5|      Em

In [15]:
from pyspark.sql.functions import sum, avg, count, max, min, col # Remove 'mi'

# If you intended to use mutual information, it is not a built-in function.
# You may need to implement it yourself or use a library that provides it.
#
# If you intended to use a different function, please replace 'mi' with the correct function name.
# For example, if you meant 'min', change the line to:
# from pyspark.sql.functions import sum, avg, count, min, max

In [20]:
from pyspark.sql.functions import sum, lit

total_salary = df.agg(sum("salary")).collect()[0][0]  # Calculate total salary
df = df.withColumn("total_salary", lit(total_salary))  # Add as a new column
df.show()

+---+----------+---------+-----------------+----------+------+-----+----------------+--------------------+--------------------+------------+------------+
| id|first_name|last_name|            email|department|salary|bonus|          skills|               roles|          properties|joining_date|total_salary|
+---+----------+---------+-----------------+----------+------+-----+----------------+--------------------+--------------------+------------+------------+
|  1|     Alice|    Smith|  alice@gmail.com|        IT|  6000| 1000|     Python,Java|[Manager, Developer]|{key1 -> value1, ...|  2021-06-15|       28000|
|  2|       Bob|    Brown|    bob@gmail.com|        HR|  5000| NULL|  Recruitment,HR|       [HR, Manager]|    {key3 -> value3}|  2022-07-20|       28000|
|  3|   Charlie|    Davis|charlie@gmail.com|        IT|  7000| 1200|    Python,Scala|         [Developer]|    {key4 -> value4}|  2019-03-10|       28000|
|  4|     David|   Wilson|  david@gmail.com|   Finance|  4500|  800|Accounti

In [21]:
df = df.withColumn("total_salary", lit(total_salary))  # Add as a new column
df.show()

+---+----------+---------+-----------------+----------+------+-----+----------------+--------------------+--------------------+------------+------------+
| id|first_name|last_name|            email|department|salary|bonus|          skills|               roles|          properties|joining_date|total_salary|
+---+----------+---------+-----------------+----------+------+-----+----------------+--------------------+--------------------+------------+------------+
|  1|     Alice|    Smith|  alice@gmail.com|        IT|  6000| 1000|     Python,Java|[Manager, Developer]|{key1 -> value1, ...|  2021-06-15|       28000|
|  2|       Bob|    Brown|    bob@gmail.com|        HR|  5000| NULL|  Recruitment,HR|       [HR, Manager]|    {key3 -> value3}|  2022-07-20|       28000|
|  3|   Charlie|    Davis|charlie@gmail.com|        IT|  7000| 1200|    Python,Scala|         [Developer]|    {key4 -> value4}|  2019-03-10|       28000|
|  4|     David|   Wilson|  david@gmail.com|   Finance|  4500|  800|Accounti

Write in temporary sql method


In [22]:
spark.sql(""" select count(*), sum(salary), avg(salary), min(salary), max(salary) from employees""").show()

+--------+-----------+-----------+-----------+-----------+
|count(1)|sum(salary)|avg(salary)|min(salary)|max(salary)|
+--------+-----------+-----------+-----------+-----------+
|       5|      28000|     5600.0|       4500|       7000|
+--------+-----------+-----------+-----------+-----------+



COLLECT_LIST is use to collect all the rown and shows in single row output, truncate =False is used to show all the values in the list

COLLECT_SET = remove the duplicates,
COLLECT_LIST = keep the duplicates

In [25]:
spark.sql("SELECT COLLECT_LIST(first_name) from employees").show(truncate = False)

+----------------------------------+
|collect_list(first_name)          |
+----------------------------------+
|[Alice, Bob, Charlie, David, Emma]|
+----------------------------------+



In [26]:
spark.sql("SELECT first_name from employees").show(truncate = False)

+----------+
|first_name|
+----------+
|Alice     |
|Bob       |
|Charlie   |
|David     |
|Emma      |
+----------+



array_contains -- it will check the array contains the value or not


In [27]:
spark.sql("select roles, array_contains(roles, 'Manager') from employees").show(truncate = False)

+--------------------+------------------------------+
|roles               |array_contains(roles, Manager)|
+--------------------+------------------------------+
|[Manager, Developer]|true                          |
|[HR, Manager]       |true                          |
|[Developer]         |false                         |
|[Analyst]           |false                         |
|[Developer]         |false                         |
+--------------------+------------------------------+

