<a href="https://colab.research.google.com/github/PendlimarriSivasankar/PS/blob/main/Pyspark_Scenario_001.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("colab pyspark").getOrCreate()


In [3]:
from pyspark.sql.functions import lead, col, expr
from pyspark.sql.window import Window

data = [(1111, "2021-01-15", 10),
        (1111, "2021-01-16", 15),
        (1111, "2021-01-17", 30),
        (1112, "2021-01-15", 10),
        (1112, "2021-01-15", 20),
        (1112, "2021-01-15", 30)]

myschema = ["sensorid", "timestamp", "values"]

df = spark.createDataFrame(data, schema=myschema)
df.show()
d1 = Window.partitionBy("sensorid").orderBy("values")

finaldf = df.withColumn("nextvalues", lead("values", 1).over(d1)) \
    .filter(col("nextvalues").isNotNull()) \
    .withColumn("values", expr("nextvalues-values")) \
    .drop("nextvalues") \
    .orderBy(col("sensorid")).show()

+--------+----------+------+
|sensorid| timestamp|values|
+--------+----------+------+
|    1111|2021-01-15|    10|
|    1111|2021-01-16|    15|
|    1111|2021-01-17|    30|
|    1112|2021-01-15|    10|
|    1112|2021-01-15|    20|
|    1112|2021-01-15|    30|
+--------+----------+------+

+--------+----------+------+
|sensorid| timestamp|values|
+--------+----------+------+
|    1111|2021-01-15|     5|
|    1111|2021-01-16|    15|
|    1112|2021-01-15|    10|
|    1112|2021-01-15|    10|
+--------+----------+------+



In [11]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

Person = [
    (1, "Wang", "Allen"),
    (2, "Alice", "Bob")
]

Person_columns = ["personId", "lastName", "firstName"]
Person_df = spark.createDataFrame(Person, Person_columns)
Person_df.show()

address = [
    (1, 2, "New York City", "New York"),
    (2, 3, "Leetcode", "California")
]

address_columns = ["addressId", "personId", "city", "state"]
address_df = spark.createDataFrame(address, address_columns)
address_df.show()

Person = [
    (1, "Wang", "Allen"),
    (2, "Alice", "Bob")
]

Person_columns = ["personId", "lastName", "firstName"]
Person_df = spark.createDataFrame(Person, Person_columns)
Person_df.show()

address = [
    (1, 2, "New York City", "New York"),
    (2, 3, "Leetcode", "California")
]

address_columns = ["addressId", "personId", "city", "state"]
address_df = spark.createDataFrame(address, address_columns)
address_df.show()



+--------+--------+---------+
|personId|lastName|firstName|
+--------+--------+---------+
|       1|    Wang|    Allen|
|       2|   Alice|      Bob|
+--------+--------+---------+

+---------+--------+-------------+----------+
|addressId|personId|         city|     state|
+---------+--------+-------------+----------+
|        1|       2|New York City|  New York|
|        2|       3|     Leetcode|California|
+---------+--------+-------------+----------+

+--------+--------+---------+
|personId|lastName|firstName|
+--------+--------+---------+
|       1|    Wang|    Allen|
|       2|   Alice|      Bob|
+--------+--------+---------+

+---------+--------+-------------+----------+
|addressId|personId|         city|     state|
+---------+--------+-------------+----------+
|        1|       2|New York City|  New York|
|        2|       3|     Leetcode|California|
+---------+--------+-------------+----------+



In [21]:
customer_data = [
    (1, 5),
    (2, 6),
    (3, 5),
    (3, 6),
    (1, 6)
]

customer_columns = ["customer_id", "product_key"]
customer_df = spark.createDataFrame(customer_data, customer_columns)
customer_df.show()


product_data = [
    (5,),
    (6,)
]
product_columns = ["product_key"]
product_df = spark.createDataFrame(product_data, product_columns)
product_df.show()
total_products = product_df.select(countDistinct("product_key")).collect()[0][0]
customer_df=customer_df\
    .groupBy("customer_id").agg(
    countDistinct("product_key").alias("num_products_bought"))\
    .filter(
        col("num_products_bought") == total_products
    ).select("customer_id").show()

+-----------+-----------+
|customer_id|product_key|
+-----------+-----------+
|          1|          5|
|          2|          6|
|          3|          5|
|          3|          6|
|          1|          6|
+-----------+-----------+

+-----------+
|product_key|
+-----------+
|          5|
|          6|
+-----------+

+-----------+
|customer_id|
+-----------+
|          1|
|          3|
+-----------+



In [20]:
actor_director_data = [
    (1, 1, 0),
    (1, 1, 1),
    (1, 1, 2),
    (1, 2, 3),
    (1, 2, 4),
    (2, 1, 5),
    (2, 1, 6)
]

actor_director_columns = ["actor_id", "director_id", "timestamp"]
actor_director_df = spark.createDataFrame(actor_director_data, actor_director_columns)
actor_director_df.show()
actor_director_df=actor_director_df\
    .groupBy("actor_id", "director_id")\
    .agg(count("*").alias("cooperations"))\
    .filter(col("cooperations") >= 3)\
    .select("actor_id", "director_id").show()

+--------+-----------+---------+
|actor_id|director_id|timestamp|
+--------+-----------+---------+
|       1|          1|        0|
|       1|          1|        1|
|       1|          1|        2|
|       1|          2|        3|
|       1|          2|        4|
|       2|          1|        5|
|       2|          1|        6|
+--------+-----------+---------+

+--------+-----------+
|actor_id|director_id|
+--------+-----------+
|       1|          1|
+--------+-----------+



In [19]:
sales_data = [
    (1, 100, 2008, 10, 5000),
    (2, 100, 2009, 12, 5000),
    (7, 200, 2011, 15, 9000)
]

sales_columns = ["sale_id", "product_id", "year", "quantity", "price"]
sales_df = spark.createDataFrame(sales_data, sales_columns)
sales_df.show()

product_data = [
    (100, "Nokia"),
    (200, "Apple"),
    (300, "Samsung")
]

product_columns = ["product_id", "product_name"]
product_df = spark.createDataFrame(product_data, product_columns)
product_df.show()

sales_df=sales_df\
    .join(product_df, on="product_id", how="inner")\
    .select("product_name", "year", "price").show()

+-------+----------+----+--------+-----+
|sale_id|product_id|year|quantity|price|
+-------+----------+----+--------+-----+
|      1|       100|2008|      10| 5000|
|      2|       100|2009|      12| 5000|
|      7|       200|2011|      15| 9000|
+-------+----------+----+--------+-----+

+----------+------------+
|product_id|product_name|
+----------+------------+
|       100|       Nokia|
|       200|       Apple|
|       300|     Samsung|
+----------+------------+

+------------+----+-----+
|product_name|year|price|
+------------+----+-----+
|       Nokia|2008| 5000|
|       Nokia|2009| 5000|
|       Apple|2011| 9000|
+------------+----+-----+



In [18]:
sales_data = [
    (1, 100, 2008, 10, 5000),
    (2, 100, 2009, 12, 5000),
    (7, 200, 2011, 15, 9000),
]

sales_columns = ["sale_id", "product_id", "year", "quantity", "price"]
sales_df = spark.createDataFrame(sales_data, sales_columns)
sales_df.show()

product_data = [
    (100, "Nokia"),
    (200, "Apple"),
    (300, "Samsung"),
]

product_columns = ["product_id", "product_name"]
product_df = spark.createDataFrame(product_data, product_columns)
product_df.show()

sales_df=sales_df\
    .groupBy("product_id").agg(sum("quantity").alias("total_quantity")).show()

+-------+----------+----+--------+-----+
|sale_id|product_id|year|quantity|price|
+-------+----------+----+--------+-----+
|      1|       100|2008|      10| 5000|
|      2|       100|2009|      12| 5000|
|      7|       200|2011|      15| 9000|
+-------+----------+----+--------+-----+

+----------+------------+
|product_id|product_name|
+----------+------------+
|       100|       Nokia|
|       200|       Apple|
|       300|     Samsung|
+----------+------------+

+----------+--------------+
|product_id|total_quantity|
+----------+--------------+
|       100|            22|
|       200|            15|
+----------+--------------+



In [17]:
sales_data = [
    (1, 100, 2008, 10, 5000),
    (2, 100, 2009, 12, 5000),
    (7, 200, 2011, 15, 9000)
]

sales_columns = ["sale_id", "product_id", "year", "quantity", "price"]
sales_df = spark.createDataFrame(sales_data, sales_columns)
sales_df.show()

product_data = [
    (100, "Nokia"),
    (200, "Apple"),
    (300, "Samsung")
]

product_columns = ["product_id", "product_name"]
product_df = spark.createDataFrame(product_data, product_columns)
product_df.show()

windowSpec = Window.partitionBy("product_id").orderBy("year")

sales_with_rank_df = sales_df.withColumn("rn", row_number().over(windowSpec))

sales_with_rank_df=sales_with_rank_df\
    .join(product_df, on="product_id", how="inner")\
        .filter(col("rn") == 1)\
            .select("product_name",col("year").alias("first_year"),"quantity","price").show()

+-------+----------+----+--------+-----+
|sale_id|product_id|year|quantity|price|
+-------+----------+----+--------+-----+
|      1|       100|2008|      10| 5000|
|      2|       100|2009|      12| 5000|
|      7|       200|2011|      15| 9000|
+-------+----------+----+--------+-----+

+----------+------------+
|product_id|product_name|
+----------+------------+
|       100|       Nokia|
|       200|       Apple|
|       300|     Samsung|
+----------+------------+

+------------+----------+--------+-----+
|product_name|first_year|quantity|price|
+------------+----------+--------+-----+
|       Nokia|      2008|      10| 5000|
|       Apple|      2011|      15| 9000|
+------------+----------+--------+-----+



In [16]:
project_data = [
    (1, 1),
    (1, 2),
    (1, 3),
    (2, 1),
    (2, 4)
]

project_columns = ["project_id", "employee_id"]
project_df = spark.createDataFrame(project_data, project_columns)
project_df.show()

employee_data = [
    (1, "Khaled", 3),
    (2, "Ali", 2),
    (3, "John", 1),
    (4, "Doe", 2)
]

employee_columns = ["employee_id", "name", "experience_years"]
employee_df = spark.createDataFrame(employee_data, employee_columns)
employee_df.show()
product_df=project_df\
    .join(employee_df, on="employee_id", how="inner")\
        .groupBy("project_id") \
                  .agg(round(avg("experience_years"), 2).alias("average_years")).show()

+----------+-----------+
|project_id|employee_id|
+----------+-----------+
|         1|          1|
|         1|          2|
|         1|          3|
|         2|          1|
|         2|          4|
+----------+-----------+

+-----------+------+----------------+
|employee_id|  name|experience_years|
+-----------+------+----------------+
|          1|Khaled|               3|
|          2|   Ali|               2|
|          3|  John|               1|
|          4|   Doe|               2|
+-----------+------+----------------+

+----------+-------------+
|project_id|average_years|
+----------+-------------+
|         1|          2.0|
|         2|          2.5|
+----------+-------------+



In [22]:
project_data_1076 = [
    (1, 1),
    (1, 2),
    (1, 3),
    (2, 1),
    (2, 4),
]
project_columns = ["project_id", "employee_id"]
project_df = spark.createDataFrame(project_data, project_columns)
project_df.show()

employee_data = [
    (1, "Khaled", 3),
    (2, "Ali", 2),
    (3, "John", 1),
    (4, "Doe", 2),
]

employee_columns = ["employee_id", "name", "experience_years"]
employee_df = spark.createDataFrame(employee_data, employee_columns)
employee_df.show()

project_counts = project_df.groupBy("project_id") \
                           .agg(count("employee_id").alias("employee_count"))

max_count = project_counts.agg(max("employee_count").alias("max_count")).collect()[0]["max_count"]

project_counts=project_counts.\
    filter(project_counts["employee_count"] == max_count) \
                       .select("project_id").show()

+----------+-----------+
|project_id|employee_id|
+----------+-----------+
|         1|          1|
|         1|          2|
|         1|          3|
|         2|          1|
|         2|          4|
+----------+-----------+

+-----------+------+----------------+
|employee_id|  name|experience_years|
+-----------+------+----------------+
|          1|Khaled|               3|
|          2|   Ali|               2|
|          3|  John|               1|
|          4|   Doe|               2|
+-----------+------+----------------+

+----------+
|project_id|
+----------+
|         1|
+----------+



In [23]:
project_data = [
    (1, 1),
    (1, 2),
    (1, 3),
    (2, 1),
    (2, 4),
]
project_columns = ["project_id", "employee_id"]
project_df = spark.createDataFrame(project_data, project_columns)
project_df.show()

employee_data = [
    (1, "Khaled", 3),
    (2, "Ali", 2),
    (3, "John", 3),
    (4, "Doe", 2),
]

employee_columns = ["employee_id", "name", "experience_years"]
employee_df = spark.createDataFrame(employee_data, employee_columns)
employee_df.show()

joined_df = project_df.join(employee_df, on="employee_id", how="inner")
windowSpec = Window.partitionBy("project_id").orderBy(col("experience_years").desc())

joined_df=joined_df\
    .withColumn("rnk", rank().over(windowSpec))\
        .filter(col("rnk") == 1).select("project_id", "employee_id").show()

+----------+-----------+
|project_id|employee_id|
+----------+-----------+
|         1|          1|
|         1|          2|
|         1|          3|
|         2|          1|
|         2|          4|
+----------+-----------+

+-----------+------+----------------+
|employee_id|  name|experience_years|
+-----------+------+----------------+
|          1|Khaled|               3|
|          2|   Ali|               2|
|          3|  John|               3|
|          4|   Doe|               2|
+-----------+------+----------------+

+----------+-----------+
|project_id|employee_id|
+----------+-----------+
|         1|          1|
|         1|          3|
|         2|          1|
+----------+-----------+

