# Section 1

In [2]:
import findspark
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession

In [56]:
import findspark
findspark.init()
import pyspark # Call this only after findspark.init()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.conf import SparkConf
from pyspark.sql.types import *
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# NOTE: header(default = False) uses column name as c0,c1....but with header = True it will give us first line as our header.


df = spark.read.csv(r"C:\Users\u\Desktop\intern\territories.csv",inferSchema = True, header = True)
df.show(5)

+-----------+--------------------+--------+
|TerritoryID|TerritoryDescription|RegionID|
+-----------+--------------------+--------+
|       1581|            Westboro|       1|
|       1730|             Bedford|       1|
|       1833|           Georgetow|       1|
|       2116|              Boston|       1|
|       2139|           Cambridge|       1|
+-----------+--------------------+--------+
only showing top 5 rows



This are the datatypes used in pyspark

In [5]:
from pyspark.sql import types 

for t in ['BinaryType', 'BooleanType', 'ByteType', 'DateType', 
          'DecimalType', 'DoubleType', 'FloatType', 'IntegerType', 
           'LongType', 'ShortType', 'StringType', 'TimestampType']:
    print(f"{t}: {getattr(types, t)().simpleString()}")

BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp


we will change the datatype as given in ER diagrm by using cast function.

In [6]:
# df is dataframe for our territories tabel

df = df.withColumn("RegionID",df["RegionID"].cast(IntegerType()))
df = df.withColumn("TerritoryID",df["TerritoryID"].cast(StringType()))

printSchema() function shows the datatype of each column

In [7]:
df.printSchema()

root
 |-- TerritoryID: string (nullable = true)
 |-- TerritoryDescription: string (nullable = true)
 |-- RegionID: integer (nullable = true)



In [8]:
# df2 is dataframe for our regions tabel

df2 = spark.read.csv(r"C:\Users\u\Desktop\intern\regions.csv",inferSchema = True, header = True)
df2.show()

+--------+--------------------+
|RegionID|   RegionDescription|
+--------+--------------------+
|       1|Eastern          ...|
|       2|Western          ...|
|       3|Northern         ...|
|       4|            Southern|
+--------+--------------------+



In [9]:
df2 = df2.withColumn("RegionID",df2["RegionID"].cast(IntegerType()))

In [10]:
df2.printSchema()

root
 |-- RegionID: integer (nullable = true)
 |-- RegionDescription: string (nullable = true)



In [11]:
# df3 is dataframe for our employee-territories tabel

df3 = spark.read.csv(r"C:\Users\u\Desktop\intern\employee-territories.csv",inferSchema = True, header = True)
df3.show()

+----------+-----------+
|EmployeeID|TerritoryID|
+----------+-----------+
|         1|       6897|
|         1|      19713|
|         2|       1581|
|         2|       1730|
|         2|       1833|
|         2|       2116|
|         2|       2139|
|         2|       2184|
|         2|      40222|
|         3|      30346|
|         3|      31406|
|         3|      32859|
|         3|      33607|
|         4|      20852|
|         4|      27403|
|         4|      27511|
|         5|       2903|
|         5|       7960|
|         5|       8837|
|         5|      10019|
+----------+-----------+
only showing top 20 rows



In [12]:
df3 = df3.withColumn("EmployeeID",df3["EmployeeID"].cast(IntegerType()))
df3 = df3.withColumn("TerritoryID",df3["TerritoryID"].cast(StringType()))
df3.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- TerritoryID: string (nullable = true)



In [55]:
# df4 is dataframe for our employee tabel

df4 = spark.read.csv(r"C:\Users\u\Desktop\intern\employees.csv",inferSchema = True, header = True)
df4.show(3)

+----------+---------+---------+--------------------+---------------+-------------------+-------------------+--------------------+--------+------+----------+-------+--------------+---------+--------------------+--------------------+---------+--------------------+
|EmployeeID| LastName|FirstName|               Title|TitleOfCourtesy|          BirthDate|           HireDate|             Address|    City|Region|PostalCode|Country|     HomePhone|Extension|               Photo|               Notes|ReportsTo|           PhotoPath|
+----------+---------+---------+--------------------+---------------+-------------------+-------------------+--------------------+--------+------+----------+-------+--------------+---------+--------------------+--------------------+---------+--------------------+
|         1|  Davolio|    Nancy|Sales Representative|            Ms.|1948-12-08 00:00:00|1992-05-01 00:00:00|507 - 20th Ave. E...| Seattle|    WA|     98122|    USA|(206) 555-9857|     5467|0x151C2F000200000.

In [14]:
df4 = df4.withColumn("EmployeeID",df4["EmployeeID"].cast(IntegerType()))
df4 = df4.withColumn("LastName",df4["LastName"].cast(StringType()))
df4 = df4.withColumn("FirstName",df4["FirstName"].cast(StringType()))
df4 = df4.withColumn("Title",df4["Title"].cast(StringType()))
df4 = df4.withColumn("TitleOfCourtesy",df4["TitleOfCourtesy"].cast(StringType()))
df4 = df4.withColumn("BirthDate",df4["BirthDate"].cast(DateType()))
df4 = df4.withColumn("HireDate",df4["HireDate"].cast(DateType()))
df4 = df4.withColumn("Address",df4["Address"].cast(StringType()))
df4 = df4.withColumn("City",df4["City"].cast(StringType()))
df4 = df4.withColumn("Region",df4["Region"].cast(StringType()))
df4 = df4.withColumn("PostalCode",df4["PostalCode"].cast(StringType()))
df4 = df4.withColumn("Country",df4["Country"].cast(StringType()))
df4 = df4.withColumn("HomePhone",df4["HomePhone"].cast(StringType()))
df4 = df4.withColumn("Extension",df4["Extension"].cast(StringType()))
df4 = df4.withColumn("Notes",df4["Notes"].cast(StringType()))
df4 = df4.withColumn("ReportsTo",df4["ReportsTo"].cast(IntegerType()))
df4 = df4.withColumn("PhotoPath",df4["PhotoPath"].cast(StringType()))

In [15]:
df4.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- LastName: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- TitleOfCourtesy: string (nullable = true)
 |-- BirthDate: date (nullable = true)
 |-- HireDate: date (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- HomePhone: string (nullable = true)
 |-- Extension: string (nullable = true)
 |-- Photo: string (nullable = true)
 |-- Notes: string (nullable = true)
 |-- ReportsTo: integer (nullable = true)
 |-- PhotoPath: string (nullable = true)



In [16]:
# df5 is dataframe for our shippers tabel

df5= spark.read.csv(r"C:\Users\u\Desktop\intern\shippers.csv",inferSchema = True, header = True)
df5.show()

+---------+----------------+--------------+
|ShipperID|     CompanyName|         Phone|
+---------+----------------+--------------+
|        1|  Speedy Express|(503) 555-9831|
|        2|  United Package|(503) 555-3199|
|        3|Federal Shipping|(503) 555-9931|
+---------+----------------+--------------+



In [17]:
df5 = df5.withColumn("ShipperID",df5["ShipperID"].cast(IntegerType()))
df5 = df5.withColumn("CompanyName",df5["CompanyName"].cast(StringType()))
df5 = df5.withColumn("Phone",df5["Phone"].cast(StringType()))

In [18]:
df5.printSchema()

root
 |-- ShipperID: integer (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- Phone: string (nullable = true)



In [19]:
# df6 is dataframe for our Orders tabel

df6 = spark.read.csv(r"C:\Users\u\Desktop\intern\Orders.csv",inferSchema = True, header = True)

In [20]:
df6 = df6.withColumn("OrderID",df6["OrderID"].cast(IntegerType()))
df6 = df6.withColumn("EmployeeID",df6["EmployeeID"].cast(IntegerType()))
df6 = df6.withColumn("OrderDate",df6["OrderDate"].cast(DateType()))
df6 = df6.withColumn("RequiredDate",df6["RequiredDate"].cast(DateType()))
df6 = df6.withColumn("ShippedDate",df6["ShippedDate"].cast(DateType()))
df6 = df6.withColumn("ShipVia",df6["ShipVia"].cast(IntegerType()))
df6 = df6.withColumn("Freight",df6["Freight"].cast(DecimalType(10,4)))

In [54]:
df6.show(3)

+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+------------------+--------+--------------+--------------+-----------+
|OrderID|CustomerID|EmployeeID| OrderDate|RequiredDate|ShippedDate|ShipVia|Freight|            ShipName|       ShipAddress|ShipCity|    ShipRegion|ShipPostalCode|ShipCountry|
+-------+----------+----------+----------+------------+-----------+-------+-------+--------------------+------------------+--------+--------------+--------------+-----------+
|  10248|     VINET|         5|1996-07-04|  1996-08-01| 1996-07-16|      3|32.3800|Vins et alcools C...|59 rue de l'Abbaye|   Reims|          NULL|         51100|     France|
|  10249|     TOMSP|         6|1996-07-05|  1996-08-16| 1996-07-10|      1|11.6100|  Toms Spezialitäten|     Luisenstr. 48| Münster|          NULL|         44087|    Germany|
|  10250|     HANAR|         4|1996-07-08|  1996-08-05| 1996-07-12|      2|65.8300|       Hanari Carnes|       Rua do Paço|  

In [22]:
df6.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- RequiredDate: date (nullable = true)
 |-- ShippedDate: date (nullable = true)
 |-- ShipVia: integer (nullable = true)
 |-- Freight: decimal(10,4) (nullable = true)
 |-- ShipName: string (nullable = true)
 |-- ShipAddress: string (nullable = true)
 |-- ShipCity: string (nullable = true)
 |-- ShipRegion: string (nullable = true)
 |-- ShipPostalCode: string (nullable = true)
 |-- ShipCountry: string (nullable = true)



In [53]:
# df7 is dataframe for our Customers tabel

df7 = spark.read.csv(r"C:\Users\u\Desktop\intern\Customers.csv",inferSchema = True, header = True)
df7.show(3)

+----------+--------------------+--------------+--------------------+--------------------+-----------+------+----------+-------+------------+------------+
|CustomerID|         CompanyName|   ContactName|        ContactTitle|             Address|       City|Region|PostalCode|Country|       Phone|         Fax|
+----------+--------------------+--------------+--------------------+--------------------+-----------+------+----------+-------+------------+------------+
|     ALFKI| Alfreds Futterkiste|  Maria Anders|Sales Representative|       Obere Str. 57|     Berlin|  NULL|     12209|Germany| 030-0074321| 030-0076545|
|     ANATR|Ana Trujillo Empa...|  Ana Trujillo|               Owner|Avda. de la Const...|México D.F.|  NULL|     05021| Mexico|(5) 555-4729|(5) 555-3745|
|     ANTON|Antonio Moreno Ta...|Antonio Moreno|               Owner|     Mataderos  2312|México D.F.|  NULL|     05023| Mexico|(5) 555-3932|        NULL|
+----------+--------------------+--------------+--------------------+-

In [24]:
df7.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- ContactName: string (nullable = true)
 |-- ContactTitle: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Fax: string (nullable = true)



In [25]:
# df8 is dataframe for our order-details tabel

df8 = spark.read.csv(r"C:\Users\u\Desktop\intern\order-details.csv",inferSchema = True, header = True)
df8.show()

+-------+---------+---------+--------+--------+
|OrderID|ProductID|UnitPrice|Quantity|Discount|
+-------+---------+---------+--------+--------+
|  10248|       11|     14.0|      12|     0.0|
|  10248|       42|      9.8|      10|     0.0|
|  10248|       72|     34.8|       5|     0.0|
|  10249|       14|     18.6|       9|     0.0|
|  10249|       51|     42.4|      40|     0.0|
|  10250|       41|      7.7|      10|     0.0|
|  10250|       51|     42.4|      35|    0.15|
|  10250|       65|     16.8|      15|    0.15|
|  10251|       22|     16.8|       6|    0.05|
|  10251|       57|     15.6|      15|    0.05|
|  10251|       65|     16.8|      20|     0.0|
|  10252|       20|     64.8|      40|    0.05|
|  10252|       33|      2.0|      25|    0.05|
|  10252|       60|     27.2|      40|     0.0|
|  10253|       31|     10.0|      20|     0.0|
|  10253|       39|     14.4|      42|     0.0|
|  10253|       49|     16.0|      40|     0.0|
|  10254|       24|      3.6|      15|  

In [26]:
df8 = df8.withColumn("UnitPrice",df8["UnitPrice"].cast(DecimalType(10,4)))
df8 = df8.withColumn("Discount",df8["Discount"].cast(DoubleType()))
df8.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- UnitPrice: decimal(10,4) (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)



In [52]:
# df9 is dataframe for our products tabel

df9 = spark.read.csv(r"C:\Users\u\Desktop\intern\products.csv",inferSchema = True, header = True)
df9.show(3)

+---------+-------------+----------+----------+-------------------+---------+------------+------------+------------+------------+
|ProductID|  ProductName|SupplierID|CategoryID|    QuantityPerUnit|UnitPrice|UnitsInStock|UnitsOnOrder|ReorderLevel|Discontinued|
+---------+-------------+----------+----------+-------------------+---------+------------+------------+------------+------------+
|        1|         Chai|         1|         1| 10 boxes x 20 bags|     18.0|          39|           0|          10|           0|
|        2|        Chang|         1|         1| 24 - 12 oz bottles|     19.0|          17|          40|          25|           0|
|        3|Aniseed Syrup|         1|         2|12 - 550 ml bottles|     10.0|          13|          70|          25|           0|
+---------+-------------+----------+----------+-------------------+---------+------------+------------+------------+------------+
only showing top 3 rows



In [28]:
df9 = df9.withColumn("UnitPrice",df9["UnitPrice"].cast(DecimalType(10,4)))
df9 = df9.withColumn("Discontinued",df9["Discontinued"].cast(ByteType()))


In [29]:
df9.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- SupplierID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- QuantityPerUnit: string (nullable = true)
 |-- UnitPrice: decimal(10,4) (nullable = true)
 |-- UnitsInStock: integer (nullable = true)
 |-- UnitsOnOrder: integer (nullable = true)
 |-- ReorderLevel: integer (nullable = true)
 |-- Discontinued: byte (nullable = true)



In [30]:
# df10 is dataframe for our suppliers tabel

df10 = spark.read.csv(r"C:\Users\u\Desktop\intern\suppliers.csv",inferSchema = True, header = True)
df10.printSchema()

root
 |-- SupplierID: integer (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- ContactName: string (nullable = true)
 |-- ContactTitle: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Fax: string (nullable = true)
 |-- HomePage: string (nullable = true)



In [31]:
# df11 is dataframe for our categoriesk tabel

df11 = spark.read.csv(r"C:\Users\u\Desktop\intern\categories.csv",inferSchema = True, header = True)
df11.printSchema()

root
 |-- CategoryID: integer (nullable = true)
 |-- CategoryName: string (nullable = true)
 |-- Description: string (nullable = true)



# the tableName parameter specifies the table name to use for that DataFrame in the SQL queries

In [32]:
df.registerTempTable('tabel1')
df2.registerTempTable('tabel2')
df3.registerTempTable('tabel3')
df4.registerTempTable('tabel4')
df5.registerTempTable('tabel5')
df6.registerTempTable('tabel6')
df7.registerTempTable('tabel7')
df8.registerTempTable('tabel8')
df9.registerTempTable('tabel9')
df10.registerTempTable('tabel10')
df11.registerTempTable('tabel11')

# Section 2

# First Query

In [33]:
spark.sql('select ProductID,ProductName,UnitPrice from tabel9 where UnitPrice <= 20 and Discontinued = 0').show()

+---------+--------------------+---------+
|ProductID|         ProductName|UnitPrice|
+---------+--------------------+---------+
|        1|                Chai|  18.0000|
|        2|               Chang|  19.0000|
|        3|       Aniseed Syrup|  10.0000|
|       13|               Konbu|   6.0000|
|       15|        Genen Shouyu|  15.5000|
|       16|             Pavlova|  17.4500|
|       19|Teatime Chocolate...|   9.2000|
|       21| Sir Rodney's Scones|  10.0000|
|       23|            Tunnbröd|   9.0000|
|       25|NuNuCa Nuß-Nougat...|  14.0000|
|       31|   Gorgonzola Telino|  12.5000|
|       33|             Geitost|   2.5000|
|       34|       Sasquatch Ale|  14.0000|
|       35|      Steeleye Stout|  18.0000|
|       36|         Inlagd Sill|  19.0000|
|       39|    Chartreuse verte|  18.0000|
|       40|    Boston Crab Meat|  18.4000|
|       41|Jack's New Englan...|   9.6500|
|       44|        Gula Malacca|  19.4500|
|       45|         Rogede sild|   9.5000|
+---------+

# Second Query

Subquery
> This is the Subquery that we will use for our final query

In [34]:
#Discontinued = 0 implies the products which are not discontinued
# We use our group by function to 
spark.sql('select t11.CategoryName,SUM(t9.UnitsInStock) AS sumofstock from tabel9 t9 INNER JOIN tabel11 t11 on t11.CategoryID = t9.CategoryID where t9.Discontinued = 0 GROUP BY t11.CategoryName order by sumofstock DESC').show()

+--------------+----------+
|  CategoryName|sumofstock|
+--------------+----------+
|       Seafood|       701|
|     Beverages|       539|
|    Condiments|       507|
|Dairy Products|       393|
|   Confections|       386|
|Grains/Cereals|       282|
|  Meat/Poultry|       136|
|       Produce|        74|
+--------------+----------+



Final Query

> We will  select CategoryName form tabel made by our subquery

In [35]:
spark.sql('select CategoryName from(select t11.CategoryName,SUM(t9.UnitsInStock) AS sumofstock from tabel9 t9 INNER JOIN tabel11 t11 on t11.CategoryID = t9.CategoryID where t9.Discontinued = 0 GROUP BY t11.CategoryName order by sumofstock DESC) t99').show()

+--------------+
|  CategoryName|
+--------------+
|       Seafood|
|     Beverages|
|    Condiments|
|Dairy Products|
|   Confections|
|Grains/Cereals|
|  Meat/Poultry|
|       Produce|
+--------------+



# Third Query

subquery

> First we will select the customer id who has make order between july and september

In [37]:
spark.sql('select CustomerID from tabel6 WHERE MONTH(OrderDate) >= 7 and MONTH(OrderDate) <= 10').show()

+----------+
|CustomerID|
+----------+
|     VINET|
|     TOMSP|
|     HANAR|
|     VICTE|
|     SUPRD|
|     HANAR|
|     CHOPS|
|     RICSU|
|     WELLI|
|     HILAA|
|     ERNSH|
|     CENTC|
|     OTTIK|
|     QUEDE|
|     RATTC|
|     ERNSH|
|     FOLKO|
|     BLONP|
|     WARTH|
|     FRANK|
+----------+
only showing top 20 rows



Final query

> then we select id's of customer who is not in it.

In [38]:
spark.sql('select CustomerID from tabel7 where CustomerID not in(select CustomerID from tabel6 WHERE MONTH(OrderDate) >= 7 and MONTH(OrderDate) <= 10)').show()

+----------+
|CustomerID|
+----------+
|     BOTTM|
|     CACTU|
|     CONSH|
|     DRACD|
|     EASTC|
|     FISSA|
|     GALED|
|     LACOR|
|     LAZYK|
|     NORTS|
|     OCEAN|
|     PARIS|
|     SEVES|
|     SPECD|
|     TRAIH|
+----------+



# section 3  

# query 1

> Here we use spark dataframe For selecting from our product tabel.\
  Filter function is like where clause in SQL\
  dfspark1 is our dataframe for this query

In [39]:
dfspark1 = df9.select('ProductID','ProductName','UnitPrice').filter('UnitPrice >= 15 and UnitPrice <= 25')

In [40]:
dfspark1.show()

+---------+--------------------+---------+
|ProductID|         ProductName|UnitPrice|
+---------+--------------------+---------+
|        1|                Chai|  18.0000|
|        2|               Chang|  19.0000|
|        4|Chef Anton's Caju...|  22.0000|
|        5|Chef Anton's Gumb...|  21.3500|
|        6|Grandma's Boysenb...|  25.0000|
|       11|      Queso Cabrales|  21.0000|
|       14|                Tofu|  23.2500|
|       15|        Genen Shouyu|  15.5000|
|       16|             Pavlova|  17.4500|
|       22| Gustaf's Knäckebröd|  21.0000|
|       35|      Steeleye Stout|  18.0000|
|       36|         Inlagd Sill|  19.0000|
|       39|    Chartreuse verte|  18.0000|
|       40|    Boston Crab Meat|  18.4000|
|       44|        Gula Malacca|  19.4500|
|       49|            Maxilaku|  20.0000|
|       50|    Valkoinen suklaa|  16.2500|
|       55|        Pâté chinois|  24.0000|
|       57|      Ravioli Angelo|  19.5000|
|       65|Louisiana Fiery H...|  21.0500|
+---------+

# query 2

here setting ascending = False will order our datafframe in descending order.\
by using limit function we get top ten expensive products unitprice

In [41]:
dfspark2 = df9.select('ProductName','UnitPrice').orderBy("UnitPrice",ascending=False).limit(10)

In [42]:
dfspark2.show()

+--------------------+---------+
|         ProductName|UnitPrice|
+--------------------+---------+
|       Côte de Blaye| 263.5000|
|Thüringer Rostbra...| 123.7900|
|     Mishi Kobe Niku|  97.0000|
|Sir Rodney's Marm...|  81.0000|
|    Carnarvon Tigers|  62.5000|
|Raclette Courdavault|  55.0000|
|Manjimup Dried Ap...|  53.0000|
|      Tarte au sucre|  49.3000|
|         Ipoh Coffee|  46.0000|
|   Rössle Sauerkraut|  45.6000|
+--------------------+---------+



# query 3

we import all the function from our function module.\
and using filter we select the values which are greater than our average value.

In [43]:
from pyspark.sql.functions import *
df9.agg({"UnitPrice": "avg"}).show()

+--------------+
|avg(UnitPrice)|
+--------------+
|   28.86636364|
+--------------+



In [44]:
dfspark3 = df9.select('ProductName','ProductID').filter(df9['UnitPrice'] >= 28.86636364 )

In [45]:
dfspark3.show()

+--------------------+---------+
|         ProductName|ProductID|
+--------------------+---------+
|Uncle Bob's Organ...|        7|
|Northwoods Cranbe...|        8|
|     Mishi Kobe Niku|        9|
|               Ikura|       10|
|Queso Manchego La...|       12|
|        Alice Mutton|       17|
|    Carnarvon Tigers|       18|
|Sir Rodney's Marm...|       20|
| Gumbär Gummibärchen|       26|
|  Schoggi Schokolade|       27|
|   Rössle Sauerkraut|       28|
|Thüringer Rostbra...|       29|
|  Mascarpone Fabioli|       32|
|       Côte de Blaye|       38|
|         Ipoh Coffee|       43|
|Manjimup Dried Ap...|       51|
|       Perth Pasties|       53|
|Gnocchi di nonna ...|       56|
|Raclette Courdavault|       59|
|   Camembert Pierrot|       60|
+--------------------+---------+
only showing top 20 rows



# query 4

 Note: alias cnt be used on directly string.\
 1 shows product is discontinued\
 0 shows product is not discontinued\
 we are using group by for counting

In [46]:
dfspark4 = df9.groupBy(col("Discontinued").alias('current(0) and discontiued product(1)')).count()

In [47]:
dfspark4.show()

+-------------------------------------+-----+
|current(0) and discontiued product(1)|count|
+-------------------------------------+-----+
|                                    1|    8|
|                                    0|   69|
+-------------------------------------+-----+



# query 5

here we will join 4 tabel by inner join\
df4 : employees tabel\
df6 : Orderss tabel\
df8 : Order-details tabel\
df9 : Products tabel\
to count distinct supplier id we have to import countDistinct function from our pyspark.sql.functions module\
at last we will filter the employees who has sell products more than 7 distict supplierid

In [51]:
from pyspark.sql.functions import countDistinct
x = df8.join(df9,df8.ProductID == df9.ProductID).join(df6,df8.OrderID == df6.OrderID).join(df4,df6.EmployeeID == df4.EmployeeID).select(df4.EmployeeID,df4.FirstName,df4.LastName, df8.OrderID, df8.ProductID,df9.SupplierID).select(df4.EmployeeID,df4.FirstName,df4.LastName,df9.SupplierID).groupBy(df4.EmployeeID,df4.FirstName,df4.LastName).agg(countDistinct(df9.SupplierID).alias("UniqueSupplierID")) .filter('UniqueSupplierID > 7').select(df4.FirstName,df4.LastName)                                                    
x.show()

+---------+---------+
|FirstName| LastName|
+---------+---------+
|     Anne|Dodsworth|
|  Michael|   Suyama|
|    Laura| Callahan|
|    Janet|Leverling|
|   Steven| Buchanan|
|   Robert|     King|
| Margaret|  Peacock|
|   Andrew|   Fuller|
|    Nancy|  Davolio|
+---------+---------+

