In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from datetime import datetime

## 1.Monthly Percentage Difference 

Q. Given a table of purchases by date, calculate the month-over-month percentage change in revenue. The output should include the year-month date (YYYY-MM) and percentage change, rounded to the 2nd decimal point, and sorted from the beginning of the year to the end of the year.
The percentage change column will be populated from the 2nd month forward and can be calculated as ((this month's revenue - last month's revenue) / last month's revenue)*100.    
[link](https://platform.stratascratch.com/coding/10319-monthly-percentage-difference?code_type=3)

#### Spark-Dataframe API

In [0]:
sf_transactions = spark.read.format('csv')\
                        .option('header','true')\
                        .option('inferschema','true')\
                        .option('mode','PERMISSIVE')\
                        .load('/FileStore/tables/one.csv')

sf_transactions.show(5)
sf_transactions.printSchema()

+---+----------+------+-----------+
| id|created_at| value|purchase_id|
+---+----------+------+-----------+
|  1|2019-01-01|172692|         43|
|  2|2019-01-05|177194|         36|
|  3|2019-01-09|109513|         30|
|  4|2019-01-13|164911|         30|
|  5|2019-01-17|198872|         39|
+---+----------+------+-----------+
only showing top 5 rows

root
 |-- id: integer (nullable = true)
 |-- created_at: date (nullable = true)
 |-- value: integer (nullable = true)
 |-- purchase_id: integer (nullable = true)



In [0]:
result_df = sf_transactions.select(
            date_format(col("created_at"), "yyyy-MM").alias('ym'),col('value'))\
            .groupBy('ym').agg(sum('value').alias('sum(value)'))\
            .withColumn('pre', lag('sum(value)', 1).over(Window.orderBy('ym')))\
            .withColumn('revenue_diff_pct',round(((col('sum(value)')-col('pre'))/col('pre'))*100,2))\
            
result_df.select('ym','revenue_diff_pct').show()

+-------+----------------+
|     ym|revenue_diff_pct|
+-------+----------------+
|2019-01|            null|
|2019-02|          -28.56|
|2019-03|           23.35|
|2019-04|          -13.84|
|2019-05|           13.49|
|2019-06|           -2.78|
|2019-07|            -6.0|
|2019-08|           28.36|
|2019-09|           -4.97|
|2019-10|          -12.68|
|2019-11|            1.71|
|2019-12|           -2.11|
+-------+----------------+



#### Spark-SQL

In [0]:
sf_transactions.createOrReplaceTempView('sf_transactions_table')

In [0]:
spark.sql("""
select ym,round(((value-pre)/pre)*100,2) as revenue_diff_pct
from 
    (select * , lag(T.value) over(order by ym) as pre
    from 
        (select 
        DATE_FORMAT(created_at,"yyyy-MM") as ym,
        sum(value) as value
        from 
        sf_transactions_table
        group by ym 
        ) AS T
    ) as U
          """).show()

+-------+----------------+
|     ym|revenue_diff_pct|
+-------+----------------+
|2019-01|            null|
|2019-02|          -28.56|
|2019-03|           23.35|
|2019-04|          -13.84|
|2019-05|           13.49|
|2019-06|           -2.78|
|2019-07|            -6.0|
|2019-08|           28.36|
|2019-09|           -4.97|
|2019-10|          -12.68|
|2019-11|            1.71|
|2019-12|           -2.11|
+-------+----------------+



## 2. Top Percentile Fraud

Q. ABC Corp is a mid-sized insurer in the US and in the recent past their fraudulent claims have increased significantly for their personal auto insurance portfolio. They have developed a ML based predictive model to identify propensity of fraudulent claims. Now, they assign highly experienced claim adjusters for top 5 percentile of claims identified by the model.
Your objective is to identify the top 5 percentile of claims from each state. Your output should be policy number, state, claim cost, and fraud score.  
[link](https://platform.stratascratch.com/coding/10303-top-percentile-fraud?code_type=3)


#### Spark-Dataframe API

In [0]:
fraud_score_df = spark.read.format('csv')\
                        .option('header','true')\
                        .option('inferschema','true')\
                        .option('mode','PERMISSIVE')\
                        .load('/FileStore/tables/two.csv')

fraud_score_df.show(5)
fraud_score_df.printSchema()

+----------+-----+----------+-----------+
|policy_num|state|claim_cost|fraud_score|
+----------+-----+----------+-----------+
|  ABCD1001|   CA|      4113|      0.613|
|  ABCD1002|   CA|      3946|      0.156|
|  ABCD1003|   CA|      4335|      0.014|
|  ABCD1004|   CA|      3967|      0.142|
|  ABCD1005|   CA|      1599|      0.889|
+----------+-----+----------+-----------+
only showing top 5 rows

root
 |-- policy_num: string (nullable = true)
 |-- state: string (nullable = true)
 |-- claim_cost: integer (nullable = true)
 |-- fraud_score: double (nullable = true)



In [0]:
fraud_score_df.withColumn('rank',rank().over(Window.partitionBy(col('state'))\
                .orderBy(col('fraud_score').desc())))\
                .withColumn('rnk_filter',ceil(max(col('rank')).over(Window.partitionBy(col('state')))*0.05))\
                .filter(col('rank')<=col('rnk_filter'))\
                .select('policy_num','state','claim_cost','fraud_score').show()

+----------+-----+----------+-----------+
|policy_num|state|claim_cost|fraud_score|
+----------+-----+----------+-----------+
|  ABCD1027|   CA|      2663|      0.988|
|  ABCD1016|   CA|      1639|      0.964|
|  ABCD1079|   CA|      4224|      0.963|
|  ABCD1081|   CA|      1080|      0.951|
|  ABCD1069|   CA|      1426|      0.948|
|  ABCD1222|   FL|      2392|      0.988|
|  ABCD1218|   FL|      1419|      0.961|
|  ABCD1291|   FL|      2581|      0.939|
|  ABCD1230|   FL|      2560|      0.923|
|  ABCD1277|   FL|      2057|      0.923|
|  ABCD1189|   NY|      3577|      0.982|
|  ABCD1117|   NY|      4903|      0.978|
|  ABCD1187|   NY|      3722|      0.976|
|  ABCD1196|   NY|      2994|      0.973|
|  ABCD1121|   NY|      4009|      0.969|
|  ABCD1361|   TX|      4950|      0.999|
|  ABCD1304|   TX|      1407|      0.996|
|  ABCD1398|   TX|      3191|      0.978|
|  ABCD1366|   TX|      2453|      0.968|
|  ABCD1386|   TX|      4311|      0.963|
+----------+-----+----------+-----

#### Spark_SQL

In [0]:
fraud_score_df.createOrReplaceTempView('fraud_score_Table')

In [0]:
spark.sql("""
        select 
        policy_num,
        state,
        claim_cost,
        fraud_score
        from (  select * ,
            ceil(((count(*) over(partition by state))*0.05)) as select_rnk ,
            rank() over(partition by state order by fraud_score desc) as rnk
            from fraud_score_Table
            ) as T
        where rnk <= select_rnk 
          """).show()

+----------+-----+----------+-----------+
|policy_num|state|claim_cost|fraud_score|
+----------+-----+----------+-----------+
|  ABCD1027|   CA|      2663|      0.988|
|  ABCD1016|   CA|      1639|      0.964|
|  ABCD1079|   CA|      4224|      0.963|
|  ABCD1081|   CA|      1080|      0.951|
|  ABCD1069|   CA|      1426|      0.948|
|  ABCD1222|   FL|      2392|      0.988|
|  ABCD1218|   FL|      1419|      0.961|
|  ABCD1291|   FL|      2581|      0.939|
|  ABCD1230|   FL|      2560|      0.923|
|  ABCD1277|   FL|      2057|      0.923|
|  ABCD1189|   NY|      3577|      0.982|
|  ABCD1117|   NY|      4903|      0.978|
|  ABCD1187|   NY|      3722|      0.976|
|  ABCD1196|   NY|      2994|      0.973|
|  ABCD1121|   NY|      4009|      0.969|
|  ABCD1361|   TX|      4950|      0.999|
|  ABCD1304|   TX|      1407|      0.996|
|  ABCD1398|   TX|      3191|      0.978|
|  ABCD1366|   TX|      2453|      0.968|
|  ABCD1386|   TX|      4311|      0.963|
+----------+-----+----------+-----

## 3. Premium vs Freemium

Q. Find the total number of downloads for paying and non-paying users by date. Include only records where non-paying customers have more downloads than paying customers. The output should be sorted by earliest date first and contain 3 columns date, non-paying downloads, paying downloads.    
[link](https://platform.stratascratch.com/coding/10300-premium-vs-freemium?code_type=3)


#### Spark-Dataframe API

In [0]:
ms_user_dimension_df = spark.read.format('csv')\
                        .option('header','true')\
                        .option('inferschema','true')\
                        .option('mode','PERMISSIVE')\
                        .load('/FileStore/tables/3_one.csv')

ms_acc_dimension_df =   spark.read.format('csv')\
                        .option('header','true')\
                        .option('inferschema','true')\
                        .option('mode','PERMISSIVE')\
                        .load('/FileStore/tables/3_two.csv')

ms_download_facts =     spark.read.format('csv')\
                        .option('header','true')\
                        .option('inferschema','true')\
                        .option('mode','PERMISSIVE')\
                        .load('/FileStore/tables/3_three.csv')

ms_user_dimension_df.show(5)
ms_user_dimension_df.printSchema()

ms_acc_dimension_df.show(5)
ms_acc_dimension_df.printSchema()

ms_download_facts.show(5)
ms_download_facts.printSchema()

+-------+------+
|user_id|acc_id|
+-------+------+
|      1|   716|
|      2|   749|
|      3|   713|
|      4|   744|
|      5|   726|
+-------+------+
only showing top 5 rows

root
 |-- user_id: integer (nullable = true)
 |-- acc_id: integer (nullable = true)

+------+---------------+
|acc_id|paying_customer|
+------+---------------+
|   700|             no|
|   701|             no|
|   702|             no|
|   703|             no|
|   704|             no|
+------+---------------+
only showing top 5 rows

root
 |-- acc_id: integer (nullable = true)
 |-- paying_customer: string (nullable = true)

+----------+-------+---------+
|      date|user_id|downloads|
+----------+-------+---------+
|2020-08-24|      1|        6|
|2020-08-22|      2|        6|
|2020-08-18|      3|        2|
|2020-08-24|      4|        4|
|2020-08-19|      5|        7|
+----------+-------+---------+
only showing top 5 rows

root
 |-- date: date (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- downloa

In [0]:
ms_download_facts.join(ms_user_dimension_df ,ms_download_facts.user_id == ms_user_dimension_df.user_id, 'left' )\
        .join(ms_acc_dimension_df , ms_user_dimension_df.acc_id == ms_acc_dimension_df.acc_id, 'left')\
        .select('date',ms_download_facts.user_id,'downloads',ms_user_dimension_df.acc_id,'paying_customer')\
        .orderBy(col('date').asc())\
        .withColumn('non_paying',sum('downloads').over(Window.partitionBy('date','paying_customer')))\
        .select('date','paying_customer','non_paying').distinct()\
        .withColumn('paying',lead('non_paying').over(Window.partitionBy('date').orderBy('date')))\
        .filter(col('paying').isNotNull() ).select('date','non_paying','paying')\
        .filter(col('non_paying')>col('paying')).show()


+----------+----------+------+
|      date|non_paying|paying|
+----------+----------+------+
|2020-08-16|        15|    14|
|2020-08-17|        45|     9|
|2020-08-18|        10|     7|
|2020-08-21|        32|    17|
+----------+----------+------+



#### Spark_SQL

In [0]:
ms_user_dimension_df.createOrReplaceTempView('ms_user_dimension_table')
ms_acc_dimension_df.createOrReplaceTempView('ms_acc_dimension_table')
ms_download_facts.createOrReplaceTempView('ms_download_facts_table')

In [0]:
spark.sql("""
select *
from
    (select date,
    non_paying,
    lead(non_paying) over(partition by date order by paying_customer) as paying
    from
        (select distinct date , 
        sum(downloads) over(partition by date,paying_customer) as non_paying, 
        paying_customer 
        from ms_download_facts_table as a
        left join ms_user_dimension_table as b
        on a.user_id = b.user_id
        left join ms_acc_dimension_table as c
        on b.acc_id = c.acc_id
        ) as T
    ) as U
where paying is not null
and non_paying > paying ;
          """).show()

+----------+----------+------+
|      date|non_paying|paying|
+----------+----------+------+
|2020-08-16|        15|    14|
|2020-08-17|        45|     9|
|2020-08-18|        10|     7|
|2020-08-21|        32|    17|
+----------+----------+------+



## 4. Popularity Percentage

Q. Find the popularity percentage for each user on Meta/Facebook. The popularity percentage is defined as the total number of friends the user has divided by the total number of users on the platform, then converted into a percentage by multiplying by 100.
Output each user along with their popularity percentage. Order records in ascending order by user id.
The 'user1' and 'user2' column are pairs of friends.    
[link](https://platform.stratascratch.com/coding/10284-popularity-percentage?code_type=3)



#### Spark-Dataframe API

In [0]:
my_data = ([(2,1),
            (1,3),
            (4,1),
            (1,5),
            (1,6),
            (2,6),
            (7,2),
            (8,3),
            (3,9),
          ])

my_schema = StructType([
    StructField('user1',IntegerType()),
    StructField('user2',IntegerType())
                      ])

facebook_friends_df = spark.createDataFrame(my_data,my_schema)

facebook_friends_df.show(5)
facebook_friends_df.printSchema()

+-----+-----+
|user1|user2|
+-----+-----+
|    2|    1|
|    1|    3|
|    4|    1|
|    1|    5|
|    1|    6|
+-----+-----+
only showing top 5 rows

root
 |-- user1: integer (nullable = true)
 |-- user2: integer (nullable = true)



In [0]:
facebook_friends_df.select('user1').union(facebook_friends_df.select('user2').alias('user1'))\
                .withColumn('user_pop',count('user1').over(Window.partitionBy('user1'))) \
                .withColumn('total_user', lit(result.select(col('user1')).distinct().count()))\
                .distinct()\
                .withColumn('popularity_percent',round(col('user_pop')/col('total_user')*100,3))\
                .drop(col('user_pop'),col('total_user')).show()

+-----+------------------+
|user1|popularity_percent|
+-----+------------------+
|    1|            55.556|
|    2|            33.333|
|    3|            33.333|
|    4|            11.111|
|    5|            11.111|
|    6|            22.222|
|    7|            11.111|
|    8|            11.111|
|    9|            11.111|
+-----+------------------+



#### Spark_SQL

In [0]:
facebook_friends_df.createOrReplaceTempView('facebook_friends_df_table')

In [0]:
spark.sql("""
select user1, round((pop_count/total_count)*100,3) as popularity_percent
from
    (select  distinct user1 , 
     count(user1) over(partition by user1)  as pop_count ,
     (count(user1) over())/2 as total_count
     from
        (SELECT user1, user2 
        FROM facebook_friends_df_table 
        UNION 
        SELECT user2 AS user1, user1 AS user2 
        FROM facebook_friends_df_table
        ) as T
    ) as U
""").show()

+-----+------------------+
|user1|popularity_percent|
+-----+------------------+
|    1|            55.556|
|    2|            33.333|
|    3|            33.333|
|    4|            11.111|
|    5|            11.111|
|    6|            22.222|
|    7|            11.111|
|    8|            11.111|
|    9|            11.111|
+-----+------------------+



## 5. Top 5 States With 5 Star Businesses

Q. Find the top 5 states with the most 5 star businesses. Output the state name along with the number of 5-star businesses and order records by the number of 5-star businesses in descending order. In case there are ties in the number of businesses, return all the unique states. If two states have the same result, sort them in alphabetical order.     
[Link](https://platform.stratascratch.com/coding/10046-top-5-states-with-5-star-businesses?code_type=3)



#### Spark-Dataframe API

In [0]:
yelp_business_df = spark.read.format('csv')\
                   .option('header','true')\
                   .option('inferschema','true')\
                   .load('/FileStore/tables/five-1.csv')

yelp_business_df.show(5)
yelp_business_df.printSchema()

+--------------------+--------------------+------------+--------------------+--------------+-----+-----------+--------+---------+-----+------------+-------+--------------------+
|         business_id|                name|neighborhood|             address|          city|state|postal_code|latitude|longitude|stars|review_count|is_open|          categories|
+--------------------+--------------------+------------+--------------------+--------------+-----+-----------+--------+---------+-----+------------+-------+--------------------+
|G5ERFWvPfHy7IDAUY...|All Colors Mobile...|        null|     7137 N 28th Ave|       Phoenix|   AZ|      85051|  33.448| -112.074|  1.0|           4|      1|Auto Detailing;Au...|
|0jDvRJS-z9zdMgOUX...|             Sunfare|        null|811 W Deer Valley Rd|       Phoenix|   AZ|      85027|  33.683| -112.085|  5.0|          27|      1|Personal Chefs;Fo...|
|6HmDqeNNZtHMK0t2g...|     Dry Clean Vegas|   Southeast|2550 Windmill Ln,...|     Las Vegas|   NV|      89123|

In [0]:
yelp_business_df.select('state').filter(col('stars')==5)\
        .groupBy('state').agg(count('state').alias('five_star_counts')).orderBy(col('five_star_counts').desc())\
        .withColumn('rnk',rank().over(Window.orderBy(col('five_star_counts').desc())))\
        .orderBy('rnk','state').filter(col('rnk')<=5).drop(col('rnk')).show()

+-----+----------------+
|state|five_star_counts|
+-----+----------------+
|   AZ|              10|
|   ON|               5|
|   NV|               4|
|   IL|               3|
|   OH|               3|
|   WI|               3|
+-----+----------------+



#### Spark_SQL

In [0]:
yelp_business_df.createOrReplaceTempView('yelp_business_df_table')

In [0]:
spark.sql("""
select state,five_star_counts
from
    (select *,rank() over(order by five_star_counts desc) as rnk
    from 
        (select state , count(state) as five_star_counts 
        from yelp_business_df_table
        where stars = 5
        group by state
        order by five_star_counts desc , state 
        ) as T 
    ) as U
where rnk <= 5
          """).show()

+-----+----------------+
|state|five_star_counts|
+-----+----------------+
|   AZ|              10|
|   ON|               5|
|   NV|               4|
|   IL|               3|
|   OH|               3|
|   WI|               3|
+-----+----------------+



## 6. Host Popularity Rental Prices

Q. You’re given a table of rental property searches by users. The table consists of search results and outputs host information for searchers. Find the minimum, average, maximum rental prices for each host’s popularity rating. The host’s popularity rating is defined as below:  
0 reviews: New  
1 to 5 reviews: Rising  
6 to 15 reviews: Trending Up  
16 to 40 reviews: Popular  
more than 40 reviews: Hot 


Tip: The id column in the table refers to the search ID. You'll need to create your own host_id by concating price, room_type, host_since, zipcode, and number_of_reviews.


Output host popularity rating and their minimum, average and maximum rental prices.    
[Link](https://platform.stratascratch.com/coding/9632-host-popularity-rental-prices?code_type=3)


#### Spark-Dataframe API

In [0]:
airbnb_host_searches_df = spark.read.format('csv')\
                   .option('header','true')\
                   .option('inferschema','true')\
                   .load('/FileStore/tables/six_modified-1.csv')

airbnb_host_searches_df.show(5)
airbnb_host_searches_df.printSchema()

+--------+------+-------------+---------------+------------+---------+--------+-------------------+------------+----+----------------------+------------------+----------+-----------------+-----------------+--------------------+-------+--------+----+
|      id| price|property_type|      room_type|accommodates|bathrooms|bed_type|cancellation_policy|cleaning_fee|city|host_identity_verified|host_response_rate|host_since|    neighbourhood|number_of_reviews|review_scores_rating|zipcode|bedrooms|beds|
+--------+------+-------------+---------------+------------+---------+--------+-------------------+------------+----+----------------------+------------------+----------+-----------------+-----------------+--------------------+-------+--------+----+
| 8284881|621.46|        House|Entire home/apt|           8|        3|Real Bed|             strict|           1|  LA|                     f|              100%|2016-11-01|Pacific Palisades|                1|                null|  90272|       4|   6|


In [0]:
airbnb_host_searches_df.select(concat('price','room_type','host_since','zipcode','number_of_reviews')\
                                .alias('host_id'),'price','number_of_reviews').distinct()\
                                .withColumn('host_popularity',when(col('number_of_reviews') == 0,'New')\
                                .when((col('number_of_reviews') >= 1) & (col('number_of_reviews') <=5),'Rising')\
                                .when((col('number_of_reviews')>=6) & (col('number_of_reviews')<=15),'Trending Up')\
                                .when((col('number_of_reviews') >= 16) & (col('number_of_reviews')<=40),'Popular')\
                                .when((col('number_of_reviews')>40),'Hot'))\
                                .drop(col('host_id'),col('number_of_reviews'))\
                                .groupBy(col('host_popularity'))\
                                    .agg(round(min(col('price')),2).alias('min_price'),\
                                         round(avg(col('price')),2).alias('avg_price'),\
                                         round(max(col('price')),2).alias('max_price')).show()
 

+---------------+---------+---------+---------+
|host_popularity|min_price|avg_price|max_price|
+---------------+---------+---------+---------+
|    Trending Up|   361.09|   476.28|   685.65|
|        Popular|   270.81|   472.81|   667.83|
|            Hot|   340.12|   464.23|   633.51|
|         Rising|   355.53|   503.85|   717.01|
|            New|   313.55|   515.92|   741.76|
+---------------+---------+---------+---------+



#### Spark_SQL

In [0]:
airbnb_host_searches_df.createOrReplaceTempView('airbnb_host_searches_df_table')

In [0]:
spark.sql("""
select 
    case 
        when number_of_reviews = 0 then 'New' 
        when (number_of_reviews>=1 and number_of_reviews<=5) then 'Rising'
        when (number_of_reviews>=6 and number_of_reviews<=15) then 'Trending Up'
        when (number_of_reviews>=16 and number_of_reviews<=40) then 'Popular'
        when number_of_reviews > 40 then 'Hot' 
    end as host_popularity , 
    round(min(price),2) as min_price,
    round(avg(price),2) as avg_price,
    round(max(price),2) as max_price
from
    (
    select 
    distinct concat(price,room_type,host_since,zipcode,number_of_reviews) as host_id,
    price , 
    number_of_reviews
    from airbnb_host_searches_df_table
    ) as U
group by host_popularity
          """).show()

+---------------+---------+---------+---------+
|host_popularity|min_price|avg_price|max_price|
+---------------+---------+---------+---------+
|    Trending Up|   361.09|   476.28|   685.65|
|        Popular|   270.81|   472.81|   667.83|
|            Hot|   340.12|   464.23|   633.51|
|         Rising|   355.53|   503.85|   717.01|
|            New|   313.55|   515.92|   741.76|
+---------------+---------+---------+---------+



## 7. Cookbook Recipes

Q. You are given the table with titles of recipes from a cookbook and their page numbers. You are asked to represent how the recipes will be distributed in the book.
Produce a table consisting of three columns: left_page_number, left_title and right_title. The k-th row (counting from 0), should contain the number and the title of the page with the number (2 x K) in the first and second columns respectively, and the title of the page with the number (2 x k)+1 in the third column.
Each page contains at most 1 recipe. If the page does not contain a recipe, the appropriate cell should remain empty (NULL value). Page 0 (the internal side of the front cover) is guaranteed to be empty.     
[Link](https://platform.stratascratch.com/coding/2089-cookbook-recipes?code_type=3)


#### Spark-Dataframe API

In [0]:
my_data = ([
            (1	,'Scrambled eggs'),
            (2	,'Fondue'),
            (3	,'Sandwich'),
            (4	,'Tomato soup'),
            (6	,'Liver'),
            (11	,'Fried duck'),
            (12	,'Boiled duck'),
            (15	,'Baked chicken')
          ])

my_schema = StructType([ StructField('page_number',IntegerType()) ,
                        StructField('title',StringType()) ])

cookbook_titles_df = spark.createDataFrame(my_data,my_schema)

cookbook_titles_df.show()
cookbook_titles_df.printSchema()

+-----------+--------------+
|page_number|         title|
+-----------+--------------+
|          1|Scrambled eggs|
|          2|        Fondue|
|          3|      Sandwich|
|          4|   Tomato soup|
|          6|         Liver|
|         11|    Fried duck|
|         12|   Boiled duck|
|         15| Baked chicken|
+-----------+--------------+

root
 |-- page_number: integer (nullable = true)
 |-- title: string (nullable = true)



In [0]:
cookbook_titles_df1 = cookbook_titles_df.withColumn('left_page_number', ((row_number().over(Window.orderBy('page_number')))-1)*2)

cookbook_titles_df_alias = cookbook_titles_df.alias("df")
cookbook_titles_df1_alias = cookbook_titles_df1.alias("df1")

result  =   cookbook_titles_df1.join(cookbook_titles_df,cookbook_titles_df1_alias['left_page_number'] == 
                    cookbook_titles_df_alias['page_number'] ,"left")\
                    .withColumn('right_pgno',col('left_page_number') +1)\
                    .drop(cookbook_titles_df1_alias['title'],cookbook_titles_df1_alias['page_number'],'page_number')\
                    .withColumnRenamed("title","left_title")

result.join(cookbook_titles_df,result["right_pgno"] == cookbook_titles_df["page_number"],"left")\
                                                        .drop("right_pgno","page_number")\
                                                        .withColumnRenamed("title","right_title").show()

+----------------+-----------+--------------+
|left_page_number| left_title|   right_title|
+----------------+-----------+--------------+
|               0|       null|Scrambled eggs|
|               2|     Fondue|      Sandwich|
|               4|Tomato soup|          null|
|               6|      Liver|          null|
|               8|       null|          null|
|              10|       null|    Fried duck|
|              12|Boiled duck|          null|
|              14|       null| Baked chicken|
+----------------+-----------+--------------+



#### Spark_SQL

In [0]:
cookbook_titles_df.createOrReplaceTempView('cookbook_titles_df_table')

In [0]:
spark.sql("""
select left_page_number, 
b.title as left_title ,
c.title as right_title
from 
    (
        select 
        (row_number() over(ORDER BY page_number)-1)*2 as left_page_number ,
        (row_number() over(ORDER BY page_number)-1)*2+1 as right_page_number
        from
        cookbook_titles_df_table
    ) as a
left join cookbook_titles_df_table b
on a.left_page_number = b.page_number
left join cookbook_titles_df_table c
on a.right_page_number = c.page_number
          """).show()

+----------------+-----------+--------------+
|left_page_number| left_title|   right_title|
+----------------+-----------+--------------+
|               0|       null|Scrambled eggs|
|               2|     Fondue|      Sandwich|
|               4|Tomato soup|          null|
|               6|      Liver|          null|
|               8|       null|          null|
|              10|       null|    Fried duck|
|              12|Boiled duck|          null|
|              14|       null| Baked chicken|
+----------------+-----------+--------------+



## 8. Retention Rate

Q. Find the monthly retention rate of users for each account separately for Dec 2020 and Jan 2021. Retention rate is the percentage of active users an account retains over a given period of time. In this case, assume the user is retained if he/she stays with the app in any future months. For example, if a user was active in Dec 2020 and has activity in any future month, consider them retained for Dec. You can assume all accounts are present in Dec 2020 and Jan 2021. Your output should have the account ID and the Jan 2021 retention rate divided by Dec 2020 retention rate.           
[Link](https://platform.stratascratch.com/coding/2053-retention-rate?code_type=3)


#### Spark-Dataframe API

In [0]:
 sf_events_df = spark.read.format('csv')\
                        .option('header','true')\
                        .option('inferschema','true')\
                        .option('mode','PERMISSIVE')\
                        .load('/FileStore/tables/eight.csv')

sf_events_df.show(5)
sf_events_df.printSchema()

+----------+----------+-------+
|      date|account_id|user_id|
+----------+----------+-------+
|2021-01-01|        A1|     U1|
|2021-01-01|        A1|     U2|
|2021-01-06|        A1|     U3|
|2021-01-02|        A1|     U1|
|2020-12-24|        A1|     U2|
+----------+----------+-------+
only showing top 5 rows

root
 |--  date: date (nullable = true)
 |-- account_id: string (nullable = true)
 |-- user_id: string (nullable = true)



In [0]:
sf_events_df.select(date_format(col(' date'),"yyyy-MM").alias('date'),"account_id")\
        .withColumn('presence_in_month', lit('1'))\
        .distinct().sort('date','account_id')\
        .withColumn('dec_retension',lead('presence_in_month').over(Window.partitionBy('account_id').orderBy('date')))\
        .withColumn('jan_retension',lead('dec_retension').over(Window.partitionBy('account_id').orderBy('date')))\
        .filter(col('date') == '2020-12')\
        .withColumn('retention_rate',coalesce(col('jan_retension')/col('dec_retension'),lit(0)))\
        .select('account_id','retention_rate').show()

+----------+--------------+
|account_id|retention_rate|
+----------+--------------+
|        A1|           1.0|
|        A2|           1.0|
|        A3|           0.0|
+----------+--------------+



#### Spark_SQL

In [0]:
sf_events_df.createOrReplaceTempView('sf_events_df_table')

In [0]:
spark.sql(""" 
select account_id , coalesce((jan_retension/dec_retension),0) as retention_rate
from
(select mnth,account_id , dec_retension,
lead(dec_retension) over(Partition by account_id order by mnth) as jan_retension
from 
    (select mnth , account_id , 
    lead(1) over(Partition by account_id order by mnth) as dec_retension 
    from
        ( select date_format( `sf_events_df_table`.` date` ,"yyyy-MM") as mnth,
        account_id ,
        count(user_id) as cnt
        from sf_events_df_table
        group by mnth , account_id
        order by account_id 
        ) as U
    ) as T
where mnth != '2021-02' 
) as V
where mnth != '2021-01' 
          """).show()

+----------+--------------+
|account_id|retention_rate|
+----------+--------------+
|        A1|           1.0|
|        A2|           1.0|
|        A3|           0.0|
+----------+--------------+



## 9. The Most Popular Client_Id Among Users Using Video and Voice Calls

Q. Select the most popular client_id based on a count of the number of users who have at least 50% of their events from the following list: 'video call received', 'video call sent', 'voice call received', 'voice call sent'.   
[Link](https://platform.stratascratch.com/coding/2029-the-most-popular-client_id-among-users-using-video-and-voice-calls?code_type=3)


#### Spark-Dataframe API

In [0]:
fact_events_df = spark.read.format('csv')\
                        .option('header','true')\
                        .option('inferschema','true')\
                        .option('mode','PERMISSIVE')\
                        .load('/FileStore/tables/nine.csv')

fact_events_df.show(5)
fact_events_df.printSchema()

+---+----------+----------+-----------+---------+-------------------+--------+
| id|   time_id|   user_id|customer_id|client_id|         event_type|event_id|
+---+----------+----------+-----------+---------+-------------------+--------+
|  1|2020-02-28|3668-QPYBK|     Sendit|  desktop|       message sent|       3|
|  2|2020-02-28|7892-POOKP|  Connectix|   mobile|      file received|       2|
|  3|2020-04-03|9763-GRSKD|     Zoomit|  desktop|video call received|       7|
|  4|2020-04-02|9763-GRSKD|  Connectix|  desktop|video call received|       7|
|  5|2020-02-06|9237-HQITU|     Sendit|  desktop|video call received|       7|
+---+----------+----------+-----------+---------+-------------------+--------+
only showing top 5 rows

root
 |--  id: integer (nullable = true)
 |-- time_id: date (nullable = true)
 |-- user_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- client_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_id: intege

In [0]:
fact_events_df.select('client_id','user_id','event_type')\
        .withColumn('tot_user_events',count('event_type').over(Window.partitionBy('user_id').orderBy('user_id')))\
        .withColumn('favorable_event',when(((col('event_type') == 'video call received') | 
                                           (col('event_type') == 'video call sent') |
                                           (col('event_type') == 'voice call received') |
                                           (col('event_type') == 'voice call sent') ) , 1 ))\
        .drop(col('event_type')).groupBy('user_id','client_id','tot_user_events')\
                                .agg(count('favorable_event').alias('favorable_event'))\
        .withColumn('prcnt_event',col('favorable_event')/col('tot_user_events'))\
        .filter(col('prcnt_event')>= 0.5).select('client_id')\
        .groupby('client_id').agg(count('client_id').alias('cnt')).sort(col('cnt').desc()).limit(1)\
        .select('client_id').show()

+---------+
|client_id|
+---------+
|  desktop|
+---------+



#### Spark_SQL

In [0]:
fact_events_df.createOrReplaceTempView('fact_events_df_table')

In [0]:
spark.sql("""
with cte as
(select user_id 
from
(select distinct
user_id,
round(((sum(special_event) over(partition by user_id) )/tot_user_events)*100,2) as prcnt_event
from
    (select user_id,
    event_type,
    count(event_type) over(partition by user_id) as tot_user_events,
    case when event_type='video call received'or
              event_type='video call sent'or
              event_type='voice call received'or
              event_type='voice call sent'
        then  1
    end as special_event
    from fact_events_df_table
    order by user_id
    ) as U
) as V
where prcnt_event>=50)

select client_id 
from cte a
left join fact_events_df_table b
on a.user_id = b.user_id
group by client_id
order by count(client_id) desc
limit 1
          """).show()

+---------+
|client_id|
+---------+
|  desktop|
+---------+



## 10. Marketing Campaign Success [Advanced]

Q. You have a table of in-app purchases by user. Users that make their first in-app purchase are placed in a marketing campaign where they see call-to-actions for more in-app purchases. Find the number of users that made additional in-app purchases due to the success of the marketing campaign.
The marketing campaign doesn't start until one day after the initial in-app purchase so users that only made one or multiple purchases on the first day do not count, nor do we count users that over time purchase only the products they purchased on the first day.   
[Link](https://platform.stratascratch.com/coding/514-marketing-campaign-success-advanced?code_type=3)


#### Spark-Dataframe API

In [0]:

marketing_campaign_df = spark.read.format('csv')\
                        .option('header','true')\
                        .option('inferschema','true')\
                        .option('mode','PERMISSIVE')\
                        .load('/FileStore/tables/ten.csv')

marketing_campaign_df.show(5)
marketing_campaign_df.printSchema()

+--------+----------+----------+--------+-----+
| user_id|created_at|product_id|quantity|price|
+--------+----------+----------+--------+-----+
|      10|2019-01-01|       101|       3|   55|
|      10|2019-01-02|       119|       5|   29|
|      10|2019-03-31|       111|       2|  149|
|      11|2019-01-02|       105|       3|  234|
|      11|2019-03-31|       120|       3|   99|
+--------+----------+----------+--------+-----+
only showing top 5 rows

root
 |--  user_id: integer (nullable = true)
 |-- created_at: date (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: integer (nullable = true)



In [0]:
window_spec_1 = Window.partitionBy(" user_id").orderBy("created_at")
window_spec_2 = Window.partitionBy(" user_id" , "product_id").orderBy("created_at")

marketing_campaign_df.withColumn("date_rank", dense_rank().over(window_spec_1)) \
            .withColumn("prod_rank", row_number().over(window_spec_2))\
            .filter((col('date_rank')>1) & (col('prod_rank') == 1))\
            .select(' user_id').distinct().count()


Out[28]: 23

#### Spark_SQL

In [0]:
marketing_campaign_df.createOrReplaceTempView('marketing_campaign_df_table')

In [0]:
spark.sql("""
select count(distinct `U`.` user_id`)
from          
    (select *,
    (dense_rank() over(partition by `marketing_campaign_df_table`.` user_id` order by created_at)) as date_rank ,
    (dense_rank() over(partition by `marketing_campaign_df_table`.` user_id`, product_id order by created_at)) as prod_rank
    from
    marketing_campaign_df_table
    ) as U
where (date_rank>1) 
and (prod_rank==1)
          """).show()

+------------------------+
|count(DISTINCT  user_id)|
+------------------------+
|                      23|
+------------------------+

