<a href="https://colab.research.google.com/github/candido05/PySpark_repo/blob/main/Spark_foundation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
from pyspark import SparkContext

sc = SparkContext(appName="My First Spark Application")

In [3]:
sc

In [4]:
sc.stop()

# Criando SparkContext no Apache Spark nas versões 2.x

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("My First Spark Application") \
    .getOrCreate()

sc = spark.sparkContext

In [6]:
sc

In [7]:
sc.stop()

# Criando uma SparkSession

In [8]:
spark_ss = SparkSession.builder \
    .appName("Second Spark Application") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

In [9]:
spark_ss

In [10]:
spark_ss.stop()

# RDDs

In [11]:
spark = SparkSession.builder.appName("RDD_Demo").getOrCreate()

In [12]:
spark

In [13]:
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = spark.sparkContext.parallelize(numbers)

In [14]:
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

In [15]:
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [16]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("David", 28)]
rdd = spark.sparkContext.parallelize(data)

In [17]:
print("Todos os elementos de rdd: ",rdd.collect())

Todos os elementos de rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('David', 28)]


### RDDs Operações: Actions

In [18]:
count = rdd.count()
print("Número de elementos no RDD:", count)

Número de elementos no RDD: 4


In [19]:
firsr_element = rdd.first()
print("Primeiro elemento do RDD:", firsr_element)

Primeiro elemento do RDD: ('Alice', 25)


In [20]:
taken_elements = rdd.take(3)
print("Os três primeiros elementos do RDD:", taken_elements)

Os três primeiros elementos do RDD: [('Alice', 25), ('Bob', 30), ('Charlie', 35)]


### RDDS Operações: Transformations

In [21]:
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))
result = mapped_rdd.collect()
print("RDD após mapeamento:", result)

RDD após mapeamento: [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('DAVID', 28)]


In [22]:
filtered_rdd = rdd.filter(lambda x: x[1] > 25)
result = filtered_rdd.collect()
print("RDD após filtro:", result)

RDD após filtro: [('Bob', 30), ('Charlie', 35), ('David', 28)]


In [23]:
reduced_rdd = rdd.reduceByKey(lambda x, y: x - y)
reduced_rdd.collect()

[('Alice', 25), ('Bob', 30), ('Charlie', 35), ('David', 28)]

In [24]:
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

[('Charlie', 35), ('Bob', 30), ('David', 28), ('Alice', 25)]

# Salver RDDs para arquivo de texto

In [25]:
rdd.saveAsTextFile("rdd_output.txt")

In [26]:
rdd_text = spark.sparkContext.textFile("rdd_output.txt")
rdd_text.collect()

["('Charlie', 35)", "('David', 28)", "('Alice', 25)", "('Bob', 30)"]

In [27]:
spark.stop()

# Spark DataFrames

In [28]:
from pyspark.sql.functions import desc

In [29]:
spark = SparkSession.builder.appName("DataFrame_Demo").getOrCreate()

In [32]:
rdd = spark.sparkContext.textFile("/content/texto.txt")
result_rdd = rdd.flatMap(lambda line: line.split(" ")) \
                .map(lambda word: (word, 1)) \
                .reduceByKey(lambda a, b: a + b) \
                .sortBy(lambda x: x[1], ascending=False)

In [33]:
result_rdd.take(10)

[('the', 12),
 ('of', 7),
 ('a', 7),
 ('distributed', 5),
 ('in', 5),
 ('Spark', 4),
 ('as', 3),
 ('is', 3),
 ('API', 3),
 ('on', 3)]

In [34]:
df = spark.read.text("/content/texto.txt")
result_df = df.selectExpr("explode(split(value, ' ')) as word") \
              .groupBy("word") \
              .count() \
              .orderBy(desc("count"))

In [35]:
result_df.take(10)

[Row(word='the', count=12),
 Row(word='of', count=7),
 Row(word='a', count=7),
 Row(word='in', count=5),
 Row(word='distributed', count=5),
 Row(word='Spark', count=4),
 Row(word='API', count=3),
 Row(word='RDD', count=3),
 Row(word='is', count=3),
 Row(word='on', count=3)]

In [36]:
spark.stop()

## Lendo arquivos CSVs

In [40]:
spark = SparkSession.builder.appName("DataFrame_read").getOrCreate()

In [44]:
csv_file_path = "/content/sample_data/california_housing_train.csv"
df = spark.read.csv(csv_file_path, header=True)

In [45]:
df.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



In [46]:
df.show(10)

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|      73400.000000|
|-114.570000|33.570000|         20.000000|1454.000000|    326.000000| 624.000000| 262.000000|     1.925000|    

## Lendo CSV com a definição explicita de schema

In [48]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [54]:
 schema = StructType([
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("housing_median_age", DoubleType(), True),
    StructField("total_rooms", DoubleType(), True),
    StructField("total_bedrooms", DoubleType(), True),
    StructField("population", DoubleType(), True),
    StructField("households", DoubleType(), True),
    StructField("median_income", DoubleType(), True),
    StructField("median_house_value", DoubleType(), True),
])

In [55]:
csv_file_path_schema = "/content/sample_data/california_housing_train.csv"
df = spark.read.csv(csv_file_path_schema, header=True, schema=schema)

In [56]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [57]:
df.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

## InferSchema

In [58]:
csv_file_path_inferschema = "/content/sample_data/california_housing_train.csv"
df = spark.read.csv(csv_file_path_schema, header=True, inferSchema=True)

In [59]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



# Lendo arquivos JSON

In [67]:
json_file_path = "/content/sample_data/anscombe.json"
df_json = spark.read.json(json_file_path, multiLine=True)

In [68]:
df_json.printSchema()

root
 |-- Series: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)



In [69]:
df_json.show(10)

+------+----+-----+
|Series|   X|    Y|
+------+----+-----+
|     I|10.0| 8.04|
|     I| 8.0| 6.95|
|     I|13.0| 7.58|
|     I| 9.0| 8.81|
|     I|11.0| 8.33|
|     I|14.0| 9.96|
|     I| 6.0| 7.24|
|     I| 4.0| 4.26|
|     I|12.0|10.84|
|     I| 7.0| 4.81|
+------+----+-----+
only showing top 10 rows



# Operações com DataFrames

In [72]:
data_file_path = "/content/stocks.txt"
df = spark.read.csv(data_file_path, header=True, inferSchema=True)
df.show(10)

+---+----------------+-----------+--------+-------+
| id|            name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  1|          iPhone|Electronics|      10| 899.99|
|  2|         Macbook|Electronics|       5|1299.99|
|  3|            iPad|Electronics|      15| 499.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
| 10|    Dining Table|  Furniture|      10| 249.99|
+---+----------------+-----------+--------+-------+
only showing top 10 rows



In [73]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)



### Seleção de colunas

In [74]:
select_columns =  df.select("id", "name", "price")
select_columns.show(10)

+---+----------------+-------+
| id|            name|  price|
+---+----------------+-------+
|  1|          iPhone| 899.99|
|  2|         Macbook|1299.99|
|  3|            iPad| 499.99|
|  4|      Samsung TV| 799.99|
|  5|           LG TV| 699.99|
|  6|      Nike Shoes|  99.99|
|  7|    Adidas Shoes|  89.99|
|  8| Sony Headphones| 149.99|
|  9|Beats Headphones| 199.99|
| 10|    Dining Table| 249.99|
+---+----------------+-------+
only showing top 10 rows



### Filtragem de linhas

In [75]:
filtered_data = df.filter(df.quantity > 20)
print("Dados filtrados: ", filtered_data.count())
filtered_data.show(10)

Dados filtrados:  44
+---+--------------+-----------+--------+-----+
| id|          name|   category|quantity|price|
+---+--------------+-----------+--------+-----+
|  6|    Nike Shoes|   Clothing|      30|99.99|
|  7|  Adidas Shoes|   Clothing|      25|89.99|
| 12|        Apples|       Food|     100|  0.5|
| 13|       Bananas|       Food|     150| 0.25|
| 14|       Oranges|       Food|     120| 0.75|
| 15|Chicken Breast|       Food|      50| 3.99|
| 16| Salmon Fillet|       Food|      30| 5.99|
| 24|    Laptop Bag|Accessories|      25|29.99|
| 25|      Backpack|Accessories|      30|24.99|
| 28|         Jeans|   Clothing|      30|59.99|
+---+--------------+-----------+--------+-----+
only showing top 10 rows



### Agrupamento e Agregações

In [78]:
grouped_data = df.groupBy("category").agg({"price": "avg", "quantity": "sum"})
grouped_data.show()

+-----------+-------------+------------------+
|   category|sum(quantity)|        avg(price)|
+-----------+-------------+------------------+
|       Food|         1035|  3.06923076923077|
|     Sports|          127|264.43444444444435|
|Electronics|          446| 369.6869696969695|
|   Clothing|          540| 63.99000000000001|
|  Furniture|          116|  193.066923076923|
|Accessories|          935| 21.63705882352942|
+-----------+-------------+------------------+



### Join

In [79]:
df2 = df.select("id", "category").limit(10)
join_data = df.join(df2, "id", "inner")
join_data.show()

+---+----------------+-----------+--------+-------+-----------+
| id|            name|   category|quantity|  price|   category|
+---+----------------+-----------+--------+-------+-----------+
|  1|          iPhone|Electronics|      10| 899.99|Electronics|
|  2|         Macbook|Electronics|       5|1299.99|Electronics|
|  3|            iPad|Electronics|      15| 499.99|Electronics|
|  4|      Samsung TV|Electronics|       8| 799.99|Electronics|
|  5|           LG TV|Electronics|      10| 699.99|Electronics|
|  6|      Nike Shoes|   Clothing|      30|  99.99|   Clothing|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|   Clothing|
|  8| Sony Headphones|Electronics|      12| 149.99|Electronics|
|  9|Beats Headphones|Electronics|      20| 199.99|Electronics|
| 10|    Dining Table|  Furniture|      10| 249.99|  Furniture|
+---+----------------+-----------+--------+-------+-----------+



### Sorting

In [80]:
sorted_data = df.orderBy("price")
sorted_data.show(10)

+---+------------+-----------+--------+-----+
| id|        name|   category|quantity|price|
+---+------------+-----------+--------+-----+
| 13|     Bananas|       Food|     150| 0.25|
| 12|      Apples|       Food|     100|  0.5|
| 14|     Oranges|       Food|     120| 0.75|
| 87|      Eraser|Accessories|     150| 0.99|
| 46|      Yogurt|       Food|      70| 1.49|
| 88|       Ruler|Accessories|     120| 1.49|
| 41|        Milk|       Food|     100| 1.99|
| 85|    Notebook|Accessories|     100| 1.99|
| 42|       Bread|       Food|      95| 2.49|
| 90|Sticky Notes|Accessories|      70| 2.49|
+---+------------+-----------+--------+-----+
only showing top 10 rows



In [82]:
sorted_data = df.orderBy("price", ascending=False)
sorted_data.show(10)

+---+--------------+-----------+--------+-------+
| id|          name|   category|quantity|  price|
+---+--------------+-----------+--------+-------+
|100|  Laser Cutter|Electronics|       1|1499.99|
|  2|       Macbook|Electronics|       5|1299.99|
| 99|    3D Printer|Electronics|       2|1199.99|
| 96| Smart Glasses|Electronics|       2| 999.99|
|  1|        iPhone|Electronics|      10| 899.99|
|  4|    Samsung TV|Electronics|       8| 799.99|
| 57|Rowing Machine|     Sports|       3| 799.99|
| 77|          Sofa|  Furniture|       3| 799.99|
| 95|         Drone|Electronics|       3| 799.99|
|  5|         LG TV|Electronics|      10| 699.99|
+---+--------------+-----------+--------+-------+
only showing top 10 rows



### Linhas unicas

In [83]:
distinct_data = df.select("category").distinct()
distinct_data.show()

+-----------+
|   category|
+-----------+
|       Food|
|     Sports|
|Electronics|
|   Clothing|
|  Furniture|
|Accessories|
+-----------+



### Drop

In [84]:
drop_columns = df.drop("category")
drop_columns.show(10)

+---+----------------+--------+-------+
| id|            name|quantity|  price|
+---+----------------+--------+-------+
|  1|          iPhone|      10| 899.99|
|  2|         Macbook|       5|1299.99|
|  3|            iPad|      15| 499.99|
|  4|      Samsung TV|       8| 799.99|
|  5|           LG TV|      10| 699.99|
|  6|      Nike Shoes|      30|  99.99|
|  7|    Adidas Shoes|      25|  89.99|
|  8| Sony Headphones|      12| 149.99|
|  9|Beats Headphones|      20| 199.99|
| 10|    Dining Table|      10| 249.99|
+---+----------------+--------+-------+
only showing top 10 rows



### Adicionando novas colunas

In [85]:
df_with_new_columns = df.withColumn("revenue", df.quantity * df.price)
df_with_new_columns.show(10)

+---+----------------+-----------+--------+-------+-------+
| id|            name|   category|quantity|  price|revenue|
+---+----------------+-----------+--------+-------+-------+
|  1|          iPhone|Electronics|      10| 899.99| 8999.9|
|  2|         Macbook|Electronics|       5|1299.99|6499.95|
|  3|            iPad|Electronics|      15| 499.99|7499.85|
|  4|      Samsung TV|Electronics|       8| 799.99|6399.92|
|  5|           LG TV|Electronics|      10| 699.99| 6999.9|
|  6|      Nike Shoes|   Clothing|      30|  99.99| 2999.7|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|2249.75|
|  8| Sony Headphones|Electronics|      12| 149.99|1799.88|
|  9|Beats Headphones|Electronics|      20| 199.99| 3999.8|
| 10|    Dining Table|  Furniture|      10| 249.99| 2499.9|
+---+----------------+-----------+--------+-------+-------+
only showing top 10 rows



### Renomeação

In [86]:
df_with_alias = df.withColumnRenamed("name", "product_name")
df_with_alias.show(10)

+---+----------------+-----------+--------+-------+
| id|    product_name|   category|quantity|  price|
+---+----------------+-----------+--------+-------+
|  1|          iPhone|Electronics|      10| 899.99|
|  2|         Macbook|Electronics|       5|1299.99|
|  3|            iPad|Electronics|      15| 499.99|
|  4|      Samsung TV|Electronics|       8| 799.99|
|  5|           LG TV|Electronics|      10| 699.99|
|  6|      Nike Shoes|   Clothing|      30|  99.99|
|  7|    Adidas Shoes|   Clothing|      25|  89.99|
|  8| Sony Headphones|Electronics|      12| 149.99|
|  9|Beats Headphones|Electronics|      20| 199.99|
| 10|    Dining Table|  Furniture|      10| 249.99|
+---+----------------+-----------+--------+-------+
only showing top 10 rows



In [87]:
spark.stop()

# Spark SQL

In [88]:
spark = SparkSession.builder.appName("SparkSQL_Demo").getOrCreate()

In [89]:
file_path = "/content/employees.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show()

+-------+---+------+---------+
|   name|age|gender|   salary|
+-------+---+------+---------+
|  Yusuf| 43|Female| 86578.46|
|  Laura| 57|Female| 47389.78|
|  Julia| 44|  Male| 31511.88|
|  Alice| 46| Other| 48866.98|
|   Tina| 27| Other| 66748.92|
|Quentin| 57|Female|111466.92|
|Charlie| 32| Other| 46416.16|
|Quentin| 30| Other| 80475.83|
|    Ian| 55|  Male| 93695.64|
|  Oscar| 46|Female| 44608.38|
|   Umar| 56| Other| 73034.14|
|  Alice| 27|Female|105737.08|
|   Xena| 52|Female| 95854.65|
|   Nina| 23| Other| 53907.78|
|   Tina| 33|  Male| 37873.35|
|  Paula| 25|  Male| 37159.34|
|  Alice| 36|Female| 39459.19|
| George| 46|Female|  39155.6|
| Violet| 38|  Male| 82755.47|
|  Julia| 38|  Male|117771.75|
+-------+---+------+---------+
only showing top 20 rows



In [90]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)



## Registrando o DataFrame como uma tabela temporaria

In [91]:
df.createOrReplaceTempView("employees")

In [92]:
result = spark.sql("SELECT * FROM employees WHERE age > 30")
result.show()

+-------+---+------+---------+
|   name|age|gender|   salary|
+-------+---+------+---------+
|  Yusuf| 43|Female| 86578.46|
|  Laura| 57|Female| 47389.78|
|  Julia| 44|  Male| 31511.88|
|  Alice| 46| Other| 48866.98|
|Quentin| 57|Female|111466.92|
|Charlie| 32| Other| 46416.16|
|    Ian| 55|  Male| 93695.64|
|  Oscar| 46|Female| 44608.38|
|   Umar| 56| Other| 73034.14|
|   Xena| 52|Female| 95854.65|
|   Tina| 33|  Male| 37873.35|
|  Alice| 36|Female| 39459.19|
| George| 46|Female|  39155.6|
| Violet| 38|  Male| 82755.47|
|  Julia| 38|  Male|117771.75|
| Edward| 58|  Male|110338.49|
|  Julia| 41|  Male| 79133.32|
|Charlie| 39| Other| 41167.06|
| Edward| 56|Female| 58327.04|
|    Bob| 40|Female| 44123.49|
+-------+---+------+---------+
only showing top 20 rows



In [98]:
avg_salary_by_gender = spark.sql("SELECT gender, AVG(salary) AS avg_salary FROM employees GROUP BY gender")
avg_salary_by_gender.show()

+------+-----------------+
|gender|       avg_salary|
+------+-----------------+
|Female|72023.52731707317|
| Other|70028.68531249999|
|  Male|77439.64370370371|
+------+-----------------+



## Criando e gerenciando visualizações temporarias

In [99]:
df.createOrReplaceTempView("employees")

In [101]:
result = spark.sql("SELECT * FROM employees WHERE age > 40")
result.show()

+-------+---+------+---------+
|   name|age|gender|   salary|
+-------+---+------+---------+
|  Yusuf| 43|Female| 86578.46|
|  Laura| 57|Female| 47389.78|
|  Julia| 44|  Male| 31511.88|
|  Alice| 46| Other| 48866.98|
|Quentin| 57|Female|111466.92|
|    Ian| 55|  Male| 93695.64|
|  Oscar| 46|Female| 44608.38|
|   Umar| 56| Other| 73034.14|
|   Xena| 52|Female| 95854.65|
| George| 46|Female|  39155.6|
| Edward| 58|  Male|110338.49|
|  Julia| 41|  Male| 79133.32|
| Edward| 56|Female| 58327.04|
|  Fiona| 42|Female| 96926.92|
| Violet| 58| Other|107608.74|
| Edward| 54|Female|  71366.5|
|  Yusuf| 53| Other| 53739.83|
|Quentin| 47|Female|115149.73|
|   Nina| 56|Female|117634.09|
| Violet| 41|  Male| 36420.98|
+-------+---+------+---------+
only showing top 20 rows



In [102]:
view_exists = spark.catalog.tableExists("employees")
view_exists

True

In [103]:
spark.catalog.dropTempView("employees")

True

## Subquries

In [104]:
employee_data = [
    (1, "John"), (2, "Alice"), (3, "Bob"), (4, "Emily"),
    (5, "David"), (6, "Sarah"), (7, "Michael"), (8, "Lisa"),
    (9, "William")
]

employees = spark.createDataFrame(employee_data, ["id", "name"])

salary_data = [
    ("HR", 1, 60000), ("HR", 2, 55000), ("HR", 3, 58000),
    ("IT", 4, 70000), ("IT", 5, 72000), ("IT", 6, 68000),
    ("Sales", 7, 75000), ("Sales", 8, 78000), ("Sales", 9, 77000)
]

salaries = spark.createDataFrame(salary_data, ["department", "id", "salary"])

In [107]:
employees.show()

+---+-------+
| id|   name|
+---+-------+
|  1|   John|
|  2|  Alice|
|  3|    Bob|
|  4|  Emily|
|  5|  David|
|  6|  Sarah|
|  7|Michael|
|  8|   Lisa|
|  9|William|
+---+-------+



In [108]:
salaries.show()

+----------+---+------+
|department| id|salary|
+----------+---+------+
|        HR|  1| 60000|
|        HR|  2| 55000|
|        HR|  3| 58000|
|        IT|  4| 70000|
|        IT|  5| 72000|
|        IT|  6| 68000|
|     Sales|  7| 75000|
|     Sales|  8| 78000|
|     Sales|  9| 77000|
+----------+---+------+



In [105]:
employees.createOrReplaceTempView("employees")
salaries.createOrReplaceTempView("salaries")

In [109]:
result = spark.sql("""
    SELECT name
    FROM employees
    WHERE id IN (
        SELECT id
        FROM salaries
        WHERE salary > (SELECT AVG(salary) FROM salaries)
    )
""")

result.show()

+-------+
|   name|
+-------+
|  Emily|
|  David|
|Michael|
|   Lisa|
|William|
+-------+



## Window Function

In [112]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [114]:
employees_salary = spark.sql("""
    SELECT salaries.*, employees.name
    FROM salaries
    LEFT JOIN employees
    ON salaries.id = employees.id

""")

employees_salary.show()

+----------+---+------+-------+
|department| id|salary|   name|
+----------+---+------+-------+
|        HR|  1| 60000|   John|
|        HR|  3| 58000|    Bob|
|        HR|  2| 55000|  Alice|
|        IT|  4| 70000|  Emily|
|     Sales|  7| 75000|Michael|
|        IT|  6| 68000|  Sarah|
|     Sales|  9| 77000|William|
|        IT|  5| 72000|  David|
|     Sales|  8| 78000|   Lisa|
+----------+---+------+-------+



In [117]:
window_spec = Window.partitionBy("department").orderBy(F.desc("salary"))

In [118]:
employees_salary.withColumn("rank", F.rank().over(window_spec)).show()

+----------+---+------+-------+----+
|department| id|salary|   name|rank|
+----------+---+------+-------+----+
|        HR|  1| 60000|   John|   1|
|        HR|  3| 58000|    Bob|   2|
|        HR|  2| 55000|  Alice|   3|
|        IT|  5| 72000|  David|   1|
|        IT|  4| 70000|  Emily|   2|
|        IT|  6| 68000|  Sarah|   3|
|     Sales|  8| 78000|   Lisa|   1|
|     Sales|  9| 77000|William|   2|
|     Sales|  7| 75000|Michael|   3|
+----------+---+------+-------+----+



In [119]:
spark.stop()