<h3> Amazon Prime Subscription Rate SQL Logic | Amazon Music | Complex SQL 14 </h3>

In [0]:
%sql
drop table if exists users;
create table users
(
user_id integer,
name varchar(20),
join_date date
);
insert into users
VALUES
  (1, 'Jon', '2020-02-14'),
  (2, 'Jane', '2020-02-14'),
  (3, 'Jill', '2020-02-15'),
  (4, 'Josh', '2020-02-15'),
  (5, 'Jean', '2020-02-16'),
  (6, 'Justin', '2020-02-17'),
  (7, 'Jeremy', '2020-02-18');

drop table if exists events;
create table events
(
user_id integer,
type varchar(10),
access_date date
);

insert into events values
  (1, 'Pay', '2020-03-01'),
  (2, 'Music', '2020-03-02'),
  (2, 'P', '2020-03-12'),
  (3, 'Music', '2020-03-15'),
  (4, 'Music', '2020-03-15'),
  (1, 'P', '2020-03-16'),
  (3, 'P', '2020-03-22');

num_affected_rows,num_inserted_rows
7,7


In [0]:
%sql
select * from users

user_id,name,join_date
1,Jon,2020-02-14
2,Jane,2020-02-14
3,Jill,2020-02-15
4,Josh,2020-02-15
5,Jean,2020-02-16
6,Justin,2020-02-17
7,Jeremy,2020-02-18


In [0]:
%sql
select * from events

user_id,type,access_date
1,Pay,2020-03-01
2,Music,2020-03-02
2,P,2020-03-12
3,Music,2020-03-15
4,Music,2020-03-15
1,P,2020-03-16
3,P,2020-03-22


<h3> Solution in sql(hive) </h3>

In [0]:
%sql 
with cte as (
SELECT count(*) as total_users, sum(case when datediff(access_date,join_date)<=30 then 1 else null end) as enroll_users FROM users
left join events on users.user_id=events.user_id and type='P'
where users.user_id in(select user_id from events where type='Music'))
select *,round(enroll_users*100/total_users,2) as per_of_enroll from cte

total_users,enroll_users,per_of_enroll
3,1,33.33


<h3> solution in pyspark <\h3>

In [0]:
from pyspark.sql.functions import col,concat,datediff,sum,when,lit,round


In [0]:
df_u=spark.sql("select * from users")
df_e=spark.sql("select * from events")
df_u.show()
df_e.show()

+-------+------+----------+
|user_id|  name| join_date|
+-------+------+----------+
|      1|   Jon|2020-02-14|
|      2|  Jane|2020-02-14|
|      3|  Jill|2020-02-15|
|      4|  Josh|2020-02-15|
|      5|  Jean|2020-02-16|
|      6|Justin|2020-02-17|
|      7|Jeremy|2020-02-18|
+-------+------+----------+

+-------+-----+-----------+
|user_id| type|access_date|
+-------+-----+-----------+
|      1|  Pay| 2020-03-01|
|      2|Music| 2020-03-02|
|      2|    P| 2020-03-12|
|      3|Music| 2020-03-15|
|      4|Music| 2020-03-15|
|      1|    P| 2020-03-16|
|      3|    P| 2020-03-22|
+-------+-----+-----------+



In [0]:
music=df_e.filter(df_e["type"]=="Music").select("user_id").distinct().rdd.flatMap(lambda x:x).collect()
df_e1=df_e.filter(df_e["type"]=="P")
df_f=df_u.filter(df_u["user_id"].isin(music)).join(df_e1,"user_id","left")\
    .withColumn("date_diff",datediff(col("access_date"),col("join_date")))
df_f=df_f.withColumn("total_users",lit(1)).withColumn("enroll_user",when(col("date_diff")<=30,1).otherwise(0))
df_final=df_f.agg(sum("total_users").alias("total_users"),sum("enroll_user").alias("enroll_user")).withColumn("per_of_enroll",col("enroll_user")*100/col("total_users"))
df_final.select("total_users","enroll_user",round(df_final["per_of_enroll"],2).alias("per_of_enroll")).show()

+-----------+-----------+-------------+
|total_users|enroll_user|per_of_enroll|
+-----------+-----------+-------------+
|          3|          1|        33.33|
+-----------+-----------+-------------+

