In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('database').getOrCreate()

24/11/21 17:41:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark.sql.functions import (col,expr,count,countDistinct,to_date,date_add,year,month,lag,lead,rank,max,min,round,
                                   sum,when,lit,desc,coalesce,abs
                                  )
from pyspark.sql.types import (StructField,StructType,
                    IntegerType,StringType,DateType )
from pyspark.sql import Window

path  = '/Users/francispaulraj/Training/interview'

#### 01 Find Customers With Positive Revenue this Year EP

In [3]:
customer_schema = StructType([ 
                             StructField('customer_id',IntegerType()),
                             StructField('year',DateType()),
                             StructField('revenue',IntegerType())
                            ])

In [55]:
customer_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/1_customer.csv')
                )

In [20]:
customer_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- revenue: integer (nullable = true)



In [24]:
customer_df.filter( (col(('year')) == lit('2021')) & (col('revenue') >= 0 )).select('customer_id').show()

+-----------+
|customer_id|
+-----------+
|          1|
|          4|
+-----------+



In [26]:
customer_df.createOrReplaceTempView('customerdf')

spark.sql('select customer_id from customerdf where year == "2021" and revenue >= 0').show()

+-----------+
|customer_id|
+-----------+
|          1|
|          4|
+-----------+



#### 02 Customers Who Never Order E

In [56]:
customers_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/2_customers.csv')
                )

In [29]:
orders_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/2_orders.csv')
                )

In [30]:
customers_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [31]:
orders_df.printSchema() 

root
 |-- id: integer (nullable = true)
 |-- customerid: integer (nullable = true)



In [37]:
customers_df.join(orders_df,customers_df.id ==orders_df.customerid,'leftanti' ).select('name').show()

+-----+
| name|
+-----+
|Henry|
|  Max|
+-----+



In [39]:
customers_df.createOrReplaceTempView('customersdf')
orders_df.createOrReplaceTempView('ordersdf')

spark.sql('select name from customersdf left anti join ordersdf on customersdf.id == ordersdf.customerid').show()

+-----+
| name|
+-----+
|Henry|
|  Max|
+-----+



#### 03 Calculate Special Bonus E

In [42]:
employees_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/3_employees.csv')
                )

In [56]:
employees_df.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)



In [55]:
employees_df.withColumn('bonus',when(col('name').rlike('^M'),0)\
                        .when(col('employee_id')%2==1,col('salary')).otherwise(0)).select('employee_id','bonus').show()

+-----------+-----+
|employee_id|bonus|
+-----------+-----+
|          2|    0|
|          3|    0|
|          7| 7400|
|          8|    0|
|          9| 7700|
+-----------+-----+



In [71]:
employees_df.createOrReplaceTempView('employeesdf')

spark.sql('select employee_id,case when name rlike "^M" then 0 \
                                   when employee_id %2 == 1 then salary \
                                   else 0 end as bonus from employeesdf').show()

+-----------+-----+
|employee_id|bonus|
+-----------+-----+
|          2|    0|
|          3|    0|
|          7| 7400|
|          8|    0|
|          9| 7700|
+-----------+-----+



#### 04 Combine Two Tables E

In [72]:
person_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/4_person.csv')
                )

In [73]:
address_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/4_address.csv')
                )

In [74]:
person_df.printSchema()

root
 |-- personid: integer (nullable = true)
 |-- lastname: string (nullable = true)
 |-- firstname: string (nullable = true)



In [75]:
address_df.printSchema()

root
 |-- addressid: integer (nullable = true)
 |-- personid: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)



In [78]:
person_df.join(address_df,person_df.personid == address_df.personid,'left').select('firstname','lastname','city','state').show()

+---------+--------+-------------+--------+
|firstname|lastname|         city|   state|
+---------+--------+-------------+--------+
|    Allen|    Wang|         null|    null|
|      Bob|   Alice|New York City|New York|
+---------+--------+-------------+--------+



In [83]:
person_df.createOrReplaceTempView('persondf')
address_df.createOrReplaceTempView('addressdf')

spark.sql('select firstname,lastname,city,state  from addressdf \
                  right join persondf on persondf.personid == addressdf.personid').show()

+---------+--------+-------------+--------+
|firstname|lastname|         city|   state|
+---------+--------+-------------+--------+
|    Allen|    Wang|         null|    null|
|      Bob|   Alice|New York City|New York|
+---------+--------+-------------+--------+



#### 05 Sellers With No Sales EP

In [89]:
orders_schema = StructType([ 
                             StructField('order_id',IntegerType()),
                             StructField('sale_date',DateType()),
                             StructField('order_cost',IntegerType()),
                             StructField('customer_id',IntegerType()),
                             StructField('seller_id',IntegerType())
                            ])

In [84]:
customer_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/5_customer.csv')
                )

In [85]:
orders_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/5_orders.csv')
                )

In [101]:
seller_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/5_seller.csv'))

In [102]:
customer_df.printSchema()

root
 |-- customerid: integer (nullable = true)
 |-- customername: string (nullable = true)



In [103]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- sale_date: string (nullable = true)
 |-- order_cost: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)



In [104]:
seller_df.printSchema() 

root
 |-- seller_iid: integer (nullable = true)
 |-- seller_name: string (nullable = true)



In [105]:
orders_df.show()

+--------+----------+----------+-----------+---------+
|order_id| sale_date|order_cost|customer_id|seller_id|
+--------+----------+----------+-----------+---------+
|       1|2020-03-01|      1500|        101|        1|
|       2|2020-05-25|      2400|        102|        2|
|       3|2019-05-25|       800|        101|        3|
|       4|2020-09-13|      1000|        103|        2|
|       5|2019-02-11|       700|        101|        2|
+--------+----------+----------+-----------+---------+



In [106]:
seller_df.show()

+----------+-----------+
|seller_iid|seller_name|
+----------+-----------+
|         1|     Daniel|
|         2|  Elizabeth|
|         3|      Frank|
+----------+-----------+



In [137]:
order20 = orders_df.filter(year('sale_date') == lit('2020')).select('seller_id')
year20 = [order20.collect()[i][0] for i in range(0,len(order20.collect()))]
seller_df.filter(~col('seller_iid').isin(year20)).select('seller_name').show()

+-----------+
|seller_name|
+-----------+
|      Frank|
+-----------+



In [138]:
orders_df.createOrReplaceTempView('ordersdf')
seller_df.createOrReplaceTempView('sellerdf')

spark.sql('select seller_name from sellerdf where seller_iid not in ( \
             select seller_id from ordersdf where year(sale_date) == "2020"  )').show()

+-----------+
|seller_name|
+-----------+
|      Frank|
+-----------+



#### 06 Top Travellers E

In [139]:
users_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/6_users.csv')
                )

In [140]:
riders_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/6_riders.csv')
                )

In [141]:
users_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [142]:
riders_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- distance: integer (nullable = true)



In [157]:
users_df.join(riders_df,users_df.id == riders_df.user_id,'left').groupBy('name')\
            .agg(sum(col('distance')).alias('usr_distanct'))\
            .select('name',coalesce(col('usr_distanct'),lit('0')).alias('usrdis')).orderBy(desc('usrdis')).show()

+--------+------+
|    name|usrdis|
+--------+------+
|   Elvis|   450|
|     Lee|   450|
|     Bob|   317|
|Jonathan|   312|
|    Alex|   222|
|   Alice|   120|
|  Donald|     0|
+--------+------+



In [168]:
users_df.createOrReplaceTempView('usersdf')
riders_df.createOrReplaceTempView('ridersdf')

spark.sql('select name, coalesce(sum(distance),0 ) as distance from usersdf left  \
              join ridersdf on usersdf.id == ridersdf.user_id group by name order by distance desc').show()

+--------+--------+
|    name|distance|
+--------+--------+
|   Elvis|     450|
|     Lee|     450|
|     Bob|     317|
|Jonathan|     312|
|    Alex|     222|
|   Alice|     120|
|  Donald|       0|
+--------+--------+



#### 07 Sales Person E

In [176]:
salesperson_schema = StructType([ 
                 StructField('sales_id',IntegerType()),
                 StructField('name',StringType()),
                 StructField('salary',IntegerType()),
                 StructField('commission_rate',IntegerType()),
                 StructField('hire_date',DateType())
                ])

In [179]:
order_schema = StructType([ 
                 StructField('order_id',IntegerType()),
                 StructField('order_date',DateType()),
                 StructField('com_id',IntegerType()),
                 StructField('sales_id',IntegerType()),
                 StructField('amount',IntegerType())
                ])

In [192]:
salesperson_df = (spark.read
               .option('header',True)
               #.schema(salesperson_schema) 
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/7_salesperson.csv'))
               #.withColumn('hire_date',to_date('hire_date','MM-dd-yyyy')))

In [178]:
salesperson_df.printSchema()

root
 |-- sales_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- commission_rate: integer (nullable = true)
 |-- hire_date: date (nullable = true)



In [195]:
orders_df = (spark.read
               .option('header',True)
               #.schema(order_schema)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/7_orders.csv'))
               #.withColumn('order_date',to_date('order_date','MM-dd-yyyy')))

In [196]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- com_id: integer (nullable = true)
 |-- sales_id: integer (nullable = true)
 |-- amount: integer (nullable = true)



In [174]:
company_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/7_company.csv')
                )

In [175]:
company_df.printSchema()

root
 |-- com_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)



In [240]:
orders = orders_df.join(salesperson_df,orders_df.sales_id == salesperson_df.sales_id,'right') \
         .join(company_df,orders_df.com_id == company_df.com_id,'left')\
         .select(coalesce(orders_df.com_id,lit('0')).alias('com_id'),salesperson_df.name)

red = orders.filter(col('com_id').isin([1])).select('name').collect()
x = [red[i][0]   for i in range(0,len(red))]
orders.filter(~col('name').isin(x)).select('name').show()

+----+
|name|
+----+
| Amy|
|Mark|
|Alex|
+----+



In [250]:
salesperson_df.createOrReplaceTempView('salespersondf')
orders_df.createOrReplaceTempView('ordersdf')
company_df.createOrReplaceTempView('companydf')

spark.sql('with main as (select coalesce(ordersdf.com_id,0) as com_id,salespersondf.name  \
                         from ordersdf right join salespersondf on ordersdf.sales_id == salespersondf.sales_id),\
                 red as (select main.name from main join companydf on main.com_id == companydf.com_id where companydf.name == "RED") \
                         select main.name from main where name not in (select name from red) \
                        ').show()

+----+
|name|
+----+
| Amy|
|Mark|
|Alex|
+----+



#### 08 The Latest Login in 2020 E

In [251]:
logins_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/8_logins.csv'))

In [252]:
logins_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- time_stamp: string (nullable = true)



In [256]:
logins_df.filter(year('time_stamp') == lit('2020')).groupBy('user_id').agg(max(col('time_stamp'))).show()

+-------+-------------------+
|user_id|    max(time_stamp)|
+-------+-------------------+
|      6|2020-06-30 15:06:07|
|      8|2020-12-30 00:46:50|
|      2|2020-01-16 02:49:50|
+-------+-------------------+



In [258]:
logins_df.createOrReplaceTempView('loginsdf')

spark.sql('select user_id,max(time_stamp) as time_stamp from \
          loginsdf where year(time_stamp) == "2020" group by user_id').show()

+-------+-------------------+
|user_id|         time_stamp|
+-------+-------------------+
|      6|2020-06-30 15:06:07|
|      8|2020-12-30 00:46:50|
|      2|2020-01-16 02:49:50|
+-------+-------------------+



#### 09 Game Play Analysis E

In [264]:
logins_schema = StructType([ 
                 StructField('player_id',IntegerType()),
                 StructField('device_id',IntegerType()),
                 StructField('event_date',DateType()),
                 StructField('games_played',IntegerType()),
                ])

In [265]:
activity_df = (spark.read
               .option('header',True)
               .schema(logins_schema)
               .format('csv')
               .load('../../data/advsql/9_activity.csv')
               .withColumn('event_date',to_date('event_date','MM-dd--yyyy')))

In [266]:
activity_df.printSchema()

root
 |-- player_id: integer (nullable = true)
 |-- device_id: integer (nullable = true)
 |-- event_date: date (nullable = true)
 |-- games_played: integer (nullable = true)



In [271]:
activity_df.groupBy('player_id').agg(min(col('event_date')).alias('event_date')).orderBy('player_id').show()

+---------+----------+
|player_id|event_date|
+---------+----------+
|        1|2016-03-01|
|        2|2017-06-25|
|        3|2016-03-02|
+---------+----------+



In [274]:
activity_df.createOrReplaceTempView('activitydf')

spark.sql('select player_id,min(event_date) as event_date from activitydf group by player_id order by player_id').show()

+---------+----------+
|player_id|event_date|
+---------+----------+
|        1|2016-03-01|
|        2|2017-06-25|
|        3|2016-03-02|
+---------+----------+



#### 10 Warehouse Manager EP

In [275]:
warehouse_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/10_warehouse.csv'))

In [276]:
products_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/10_products.csv'))

In [277]:
warehouse_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- units: integer (nullable = true)



In [278]:
products_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- width: integer (nullable = true)
 |-- length: integer (nullable = true)
 |-- height: integer (nullable = true)



In [279]:
products_df.join(warehouse_df,products_df.product_id == warehouse_df.product_id,'inner').show()

+----------+------------+-----+------+------+--------+----------+-----+
|product_id|product_name|width|length|height|    name|product_id|units|
+----------+------------+-----+------+------+--------+----------+-----+
|         1|       LC-TV|    5|    50|    40|LCHouse2|         1|    2|
|         1|       LC-TV|    5|    50|    40|LCHouse1|         1|    1|
|         2| LC-KeyChain|    5|     5|     5|LCHouse2|         2|    2|
|         2| LC-KeyChain|    5|     5|     5|LCHouse1|         2|   10|
|         3|    LC-Phone|    2|    10|    10|LCHouse1|         3|    5|
|         4|  LC-T-Shirt|    4|    10|    20|LCHouse3|         4|    1|
+----------+------------+-----+------+------+--------+----------+-----+



In [287]:
products_df.withColumn('size',col('width') * col('length') * col('height') )\
        .join(warehouse_df,products_df.product_id == warehouse_df.product_id,'inner')\
        .withColumn('tot_spacee',col('size')*col('units'))\
        .groupby('name').agg(sum(col('tot_spacee')).alias('space'))\
        .select(col('name').alias('warehouse_name'),'space').orderBy('name').show()

+--------------+-----+
|warehouse_name|space|
+--------------+-----+
|      LCHouse1|12250|
|      LCHouse2|20250|
|      LCHouse3|  800|
+--------------+-----+



In [296]:
warehouse_df.createOrReplaceTempView('warehousedf')
products_df.createOrReplaceTempView('productsdf')

spark.sql('select productsdf.product_id,product_name,(width * length * height)as size,units from productsdf join \
              warehousedf on productsdf.product_id == warehousedf.product_id \
          ').show()

+----------+------------+-----+-----+
|product_id|product_name| size|units|
+----------+------------+-----+-----+
|         1|       LC-TV|10000|    2|
|         1|       LC-TV|10000|    1|
|         2| LC-KeyChain|  125|    2|
|         2| LC-KeyChain|  125|   10|
|         3|    LC-Phone|  200|    5|
|         4|  LC-T-Shirt|  800|    1|
+----------+------------+-----+-----+



In [301]:
spark.sql('select name as warehouse_name,sum(tot_size) as size from \
          (select name,((width * length * height) * units) as tot_size from productsdf join \
              warehousedf on productsdf.product_id == warehousedf.product_id) group by name  order by name\
          ').show()

+--------------+-----+
|warehouse_name| size|
+--------------+-----+
|      LCHouse1|12250|
|      LCHouse2|20250|
|      LCHouse3|  800|
+--------------+-----+



#### 11 Customer Placing the Largest Number of Orders E

In [26]:
orders_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/11_orders.csv'))

In [27]:
orders_df.printSchema()

root
 |-- oder_number: integer (nullable = true)
 |-- customer_number: integer (nullable = true)



In [28]:
orders_df.groupBy('customer_number').agg(count(col('oder_number')).alias('order'))\
                .orderBy(desc('order')).limit(1).select('customer_number').show()

+---------------+
|customer_number|
+---------------+
|              3|
+---------------+



In [43]:
orders_df.createOrReplaceTempView('ordersdf')

spark.sql('select customer_number from (select customer_number,count("order_number") as order from ordersdf \
                    group by customer_number order by order desc limit 1)').show()

+---------------+
|customer_number|
+---------------+
|              3|
+---------------+



#### 12 Find Total Time Spent by Each Employee E

In [44]:
employees_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/12_employees.csv'))

In [45]:
employees_df.printSchema()

root
 |-- emp_id: integer (nullable = true)
 |-- event_day: string (nullable = true)
 |-- in_time: integer (nullable = true)
 |-- out_time: integer (nullable = true)



In [65]:
employees_df.select(col('event_day').alias('day'),'emp_id',abs(col('in_time') - col('out_time')).alias('minutes') )\
            .groupBy('emp_id','day').agg(sum(col('minutes'))).show()

+------+----------+------------+
|emp_id|       day|sum(minutes)|
+------+----------+------------+
|     1|2020-11-28|         173|
|     2|2020-11-28|          30|
|     1|2020-12-03|          41|
|     2|2020-12-09|          27|
+------+----------+------------+



In [69]:
employees_df.createOrReplaceTempView('employeesdf')

spark.sql(' select day,emp_id,sum(minutes) as minutes from  \
          (select emp_id,event_day as day, abs(in_time - out_time) as minutes from employeesdf) group by emp_id,day ').show()

+----------+------+-------+
|       day|emp_id|minutes|
+----------+------+-------+
|2020-11-28|     1|    173|
|2020-11-28|     2|     30|
|2020-12-03|     1|     41|
|2020-12-09|     2|     27|
+----------+------+-------+



#### 13 Immediate Food Delivery I EP

In [70]:
delivery_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/13_delivery.csv'))

In [71]:
delivery_df.printSchema()

root
 |-- delivery_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_pref_delivery_date: string (nullable = true)



In [126]:
delivery_df.filter(col('order_date') == col('customer_pref_delivery_date'))\
        .select( round(count(col('delivery_id'))/delivery_df.selectExpr("count(delivery_id)").collect()[0][0],2)\
        .alias('immediate_percentage')).show()

+--------------------+
|immediate_percentage|
+--------------------+
|                0.33|
+--------------------+



In [163]:
delivery_df.createOrReplaceTempView('deliverydf')
spark.sql('select round(a.two/b.one,2) as immediate_percentage from \
          (select count("delivery_id") as one from deliverydf) b, \
           (select count("delivery_id") as two from deliverydf \
                   where order_date == customer_pref_delivery_date) a').show()

+--------------------+
|immediate_percentage|
+--------------------+
|                0.33|
+--------------------+



#### 14 Bank Account Summary II E

In [155]:
users_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/14_users.csv'))

In [156]:
transaction_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/14_transaction.csv'))

In [157]:
users_df.printSchema()

root
 |-- account: integer (nullable = true)
 |-- name: string (nullable = true)



In [158]:
transaction_df.printSchema()

root
 |-- trans_id: integer (nullable = true)
 |-- account: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- transacted_on: string (nullable = true)



In [162]:
transaction_df.groupBy('account').agg(sum(col(('amount'))).alias('balance'))\
        .join(users_df,transaction_df.account == users_df.account,'inner') \
        .filter(col('balance') >= lit('10000')).select('name','balance').show()

+-----+-------+
| name|balance|
+-----+-------+
|Alice|  11000|
+-----+-------+



In [184]:
users_df.createOrReplaceTempView('usersdf')
transaction_df.createOrReplaceTempView('transactiondf')

spark.sql(' select name, sum(amount) as balance from transactiondf \
            join usersdf on usersdf.account == transactiondf.account group by name having balance >= 10000 \
         ').show()

+-----+-------+
| name|balance|
+-----+-------+
|Alice|  11000|
+-----+-------+



#### 15 Duplicate Emails E

In [186]:
person_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/15_person.csv'))

In [187]:
person_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- email: string (nullable = true)



In [205]:
person_df.filter(col('email') == person_df.groupBy('email').agg(count(col('email')).alias('cnt_email'))\
    .orderBy(desc(col('cnt_email'))).select('email').first()[:][0]).select('email').distinct().show()

+-------+
|  email|
+-------+
|a@b.com|
+-------+



In [211]:
person_df.createOrReplaceTempView('persondf')

spark.sql('select email from persondf group by email having count(email) >=2 ').show()

+-------+
|  email|
+-------+
|a@b.com|
+-------+



#### 16 Actors and Directors Who Cooperated At Least Three Times E

In [212]:
actordirector_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/16_actordirector.csv'))

In [213]:
actordirector_df.printSchema()

root
 |-- actor_id: integer (nullable = true)
 |-- director_id: integer (nullable = true)
 |-- timestamp: integer (nullable = true)



In [228]:
x = actordirector_df.groupBy('actor_id','director_id').agg(count(col('director_id')).alias('cnt')).orderBy(desc('cnt')).first()[2]

actordirector_df.groupBy('actor_id','director_id').agg(count(col('director_id')).alias('cnt'))\
        .filter(col('cnt') == x).select('actor_id','director_id').show()

+--------+-----------+
|actor_id|director_id|
+--------+-----------+
|       1|          1|
+--------+-----------+



In [237]:
actordirector_df.createOrReplaceTempView('actordirectordf')

spark.sql(' select actor_id,director_id from  \
              (select actor_id,director_id,count(director_id) as cnt \
              from actordirectordf group by actor_id,director_id order by cnt desc limit 1) ').show()

+--------+-----------+
|actor_id|director_id|
+--------+-----------+
|       1|          1|
+--------+-----------+



#### 17  Customer Order Frequency EP

In [54]:
customers_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/17_customers.csv')
               .withColumnRenamed('customer_id','cus_id'))

In [55]:
product_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/17_product.csv'))

In [56]:
orders_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/17_orders.csv'))

In [57]:
customers_df.printSchema()

root
 |-- cus_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- country: string (nullable = true)



In [10]:
product_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- price: integer (nullable = true)



In [11]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- quantity: integer (nullable = true)



In [96]:
orders_df.filter(month(col('order_date')).isin([6,7])) \
    .join(product_df,orders_df.product_id == product_df.product_id,'inner') \
    .select('customer_id',month('order_date').alias('month'),(col('quantity') * col('price')).alias('amount')) \
    .join(customers_df,orders_df.customer_id == customers_df.cus_id,'inner')\
    .groupBy('cus_id','month','name').agg(sum(col('amount')).alias('amount')).filter(col('amount') >= 100) \
    .groupBy('cus_id','name').agg(count(col('month')).alias('cnt_month')).filter(col('cnt_month') >= 2)\
    .select('cus_id','name').show()

+------+-------+
|cus_id|   name|
+------+-------+
|     1|Winston|
+------+-------+



In [95]:
orders_df.createOrReplaceTempView('ordersdf')
product_df.createOrReplaceTempView('productdf')
customers_df.createOrReplaceTempView('customersdf')

spark.sql('with one as (select customer_id,(quantity * price) as amount,month(order_date) as month from ordersdf \
                        join productdf on ordersdf.product_id == productdf.product_id where month(order_date) in (6,7)), \
                two as (select cus_id,name,month,sum(amount) as amount from customersdf \
                        join one on customersdf.cus_id == one.customer_id  group by cus_id,name,month having amount >= 100) \
                       select cus_id,name from two group by cus_id,name having count(amount) >= 2 \
          ').show()


+------+-------+
|cus_id|   name|
+------+-------+
|     1|Winston|
+------+-------+



#### 18 Daily Leads and Partners E

In [98]:
dailysaless_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/18_dailysales.csv'))

In [99]:
dailysaless_df.printSchema()

root
 |-- date_id: string (nullable = true)
 |-- make_name: string (nullable = true)
 |-- lead_id: integer (nullable = true)
 |-- partner_id: integer (nullable = true)



In [107]:
dailysaless_df.groupBy('date_id','make_name')\
              .agg(countDistinct(col('lead_id')).alias('lead_id'),countDistinct(col('partner_id')).alias('partne_id'))\
              .orderBy(desc('make_name')).show()

+---------+---------+-------+---------+
|  date_id|make_name|lead_id|partne_id|
+---------+---------+-------+---------+
|2020-12-7|   toyota|      1|        2|
|2020-12-8|   toyota|      2|        3|
|2020-12-8|    honda|      2|        2|
|2020-12-7|    honda|      3|        2|
+---------+---------+-------+---------+



In [111]:
dailysaless_df.createOrReplaceTempView('dailysalessdf')

spark.sql('select date_id,make_name,count(distinct lead_id) as lead_id,count(distinct partner_id) as partner_id \
           from dailysalessdf group by date_id,make_name order by make_name desc').show()

+---------+---------+-------+----------+
|  date_id|make_name|lead_id|partner_id|
+---------+---------+-------+----------+
|2020-12-8|   toyota|      2|         3|
|2020-12-7|   toyota|      1|         2|
|2020-12-7|    honda|      3|         2|
|2020-12-8|    honda|      2|         2|
+---------+---------+-------+----------+



#### 19 Friendly Movies Streamed Last Month EP

In [112]:
tvprogram_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/19_tvprogram.csv'))

In [113]:
content_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/19_content.csv'))

In [114]:
tvprogram_df.printSchema()

root
 |-- program_date: string (nullable = true)
 |-- content_id: integer (nullable = true)
 |-- channel: string (nullable = true)



In [115]:
content_df.printSchema()

root
 |-- content_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- kids_content: string (nullable = true)
 |-- content_type: string (nullable = true)



In [119]:
tvprogram_df.filter(month(col('program_date')) == 6)\
            .join(content_df,content_df.content_id == tvprogram_df.content_id,'inner')\
          .filter(col('kids_content') == lit('Y')).select('title').show()

+-------+
|  title|
+-------+
|Aladdin|
+-------+



In [124]:
tvprogram_df.createOrReplaceTempView('tvprogramdf')
content_df.createOrReplaceTempView('contentdf')

spark.sql('select title from tvprogramdf join contentdf on tvprogramdf.content_id == contentdf.content_id \
              where kids_content == "Y" and month(program_date) == 6  ').show()

+-------+
|  title|
+-------+
|Aladdin|
+-------+



#### 20 Rearrange Products Table E

In [125]:
products_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/20_products.csv'))

In [126]:
products_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- store1: integer (nullable = true)
 |-- store2: integer (nullable = true)
 |-- store3: integer (nullable = true)



In [135]:
products_df.selectExpr(
    "product_id",
    "stack(3, 'store1', store1, 'store2', store2, 'store3', store3) as (store, value)"
).filter(~col('value').isNull()).show()

+----------+------+-----+
|product_id| store|value|
+----------+------+-----+
|         0|store1|   95|
|         0|store2|  100|
|         0|store3|  105|
|         1|store1|   70|
|         1|store3|   80|
+----------+------+-----+



In [145]:
products_df.show()

+----------+------+------+------+
|product_id|store1|store2|store3|
+----------+------+------+------+
|         0|    95|   100|   105|
|         1|    70|  null|    80|
+----------+------+------+------+



In [153]:
products_df.createOrReplaceTempView('productsdf')

spark.sql(' select * from (SELECT product_id, "store1" AS store, store1 AS value FROM productsdf \
            UNION ALL \
            SELECT product_id, "store2" AS store, store2 AS value FROM productsdf \
            UNION ALL \
            SELECT product_id, "store3" AS store, store3 AS value FROM productsdf) where value is not null \
         ').show()

+----------+------+-----+
|product_id| store|value|
+----------+------+-----+
|         0|store1|   95|
|         1|store1|   70|
|         0|store2|  100|
|         0|store3|  105|
|         1|store3|   80|
+----------+------+-----+



In [None]:
''' spark.sql("""
    SELECT product_id,
           store,
           value
    FROM productsdf
    LATERAL VIEW explode(array(
        struct('store1', store1),
        struct('store2', store2),
        struct('store3', store3)
    )) AS store_value
    SELECT product_id, store_value.col1 as store, store_value.col2 as value
""") '''

#### 21 Shortest Distance in a Line EP

In [156]:
point_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/21_point.csv'))

In [157]:
point_df.printSchema()

root
 |-- x: integer (nullable = true)



In [213]:
window_spec = Window.orderBy('x')
point_df.withColumn('xx',lag('x',1).over(window_spec)).select(expr('x-xx')\
                        .alias('xxx')).agg(min('xxx').alias('min_distancee')).show()

+-------------+
|min_distancee|
+-------------+
|            1|
+-------------+



24/08/15 22:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [214]:
point_df.createOrReplaceTempView('pointdf')
spark.sql('with one as (select x,lag(x) over(order by x) as xx from pointdf) \
                   select min(x - xx) as min_distance from one').show()

+------------+
|min_distance|
+------------+
|           1|
+------------+



24/08/15 22:17:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


#### 22 Employees With Missing Information E

In [224]:
employees_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/22_employees.csv'))

In [225]:
salaries_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/22_salaries.csv'))

In [226]:
employees_df.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)



In [227]:
salaries_df.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- salary: double (nullable = true)



In [235]:
employees_df.join(salaries_df,employees_df.employee_id == salaries_df.employee_id,'outer')\
            .filter(col('name').isNull() | col('salary').isNull())\
            .select(coalesce(employees_df.employee_id,salaries_df.employee_id).alias('employee_id')).show()

+-----------+
|employee_id|
+-----------+
|          1|
|          2|
+-----------+



In [269]:
salaries_df.createOrReplaceTempView('salariesdf')
employees_df.createOrReplaceTempView('employeesdf')

spark.sql(' with emp as (select employee_id as emp_id,name from employeesdf), \
              salary as (select employee_id,salary from salariesdf), \
                xxxx as (select emp_id,name,employee_id,salary from emp \
                         full outer join salary on emp.emp_id == salary.employee_id) \
                         select coalesce(emp_id,employee_id) as employee_id from xxxx where name is null or salary is null').show()


+-----------+
|employee_id|
+-----------+
|          1|
|          2|
+-----------+



#### 23 Find the Team Size EP

In [57]:
employee_df = (spark.read
               .option('header',True)
               .option('inferSchema',True)
               .format('csv')
               .load('../../data/advsql/23_employee.csv'))

In [271]:
employee_df.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- team_id: integer (nullable = true)



In [280]:
team_df = employee_df.groupBy('team_id').agg(count(col('team_id')).alias('team'))

employee_df.join(team_df,employee_df.team_id == team_df.team_id,'inner')\
.select('employee_id','team').orderBy('employee_id').show()

+-----------+----+
|employee_id|team|
+-----------+----+
|          1|   3|
|          2|   3|
|          3|   3|
|          4|   1|
|          5|   2|
|          6|   2|
+-----------+----+



In [286]:
employee_df.createOrReplaceTempView('employeedf')

spark.sql('select employee_id,team from employee_df, \
          (select team_id,count(team_id) as team from employeedf group by team_id) as t \
          where  employee_df.team_id == t.team_id order by employee_id').show()

+-----------+----+
|employee_id|team|
+-----------+----+
|          1|   3|
|          2|   3|
|          3|   3|
|          4|   1|
|          5|   2|
|          6|   2|
+-----------+----+

