
**1. A table named “famous” has two columns called user id and follower id. It represents each user ID has a particular follower ID. These follower IDs are also users of Facebook /Meta. Then, find the famous percentage of each user.Famous Percentage = number of followers a user has / total number of users on the platform**


In [0]:
%sql
CREATE TABLE famous (user_id INT, follower_id INT);
INSERT INTO famous VALUES
(1, 2), (1, 3), (2, 4), (5, 1), (5, 3), 
(11, 7), (12, 8), (13, 5), (13, 10), 
(14, 12), (14, 3), (15, 14), (15, 13);

num_affected_rows,num_inserted_rows
13,13


In [0]:
%sql
WITH users AS (
  SELECT user_id as user FROM famous
  UNION
  SELECT follower_id as user FROM famous
),
follwer_cnt AS (
  SELECT user_id, count(follower_id) as follower_cnt FROM famous GROUP BY user_id
)
SELECT u.user, round((f.follower_cnt*100.00)/(SELECT count(user) FROM users), 2) as follower_perc FROM users u JOIN follwer_cnt f 
ON u.`user`=f.user_id order by u.user 


user,follower_perc
1,15.38
2,7.69
5,15.38
11,7.69
12,7.69
13,15.38
14,15.38
15,15.38


In [0]:

#-----------------------PySpark---------------------------------------------------------------

import datetime
from pyspark.sql.functions import col, round
famous_df_columns = ["user_id", "follower_id"]
famous_data = [(1, 2), (1, 3), (2, 4), (5, 1), (5, 3), 
(11, 7), (12, 8), (13, 5), (13, 10), 
(14, 12), (14, 3), (15, 14), (15, 13)]

famous_df = spark.createDataFrame(data = famous_data, schema = famous_df_columns)
df1 = famous_df.select('user_id').alias('user')
df2 = famous_df.select('follower_id').alias('user')
users_df = df1.union(df2).distinct()
total_users = users_df.count()
follower_cnt_df = famous_df.groupBy("user_id").count().withColumnRenamed('count', 'follower_cnt')
famous_perc_df = follower_cnt_df.select("user_id","follower_cnt").withColumn("famous_perc", round((follower_cnt_df.follower_cnt*100)/total_users, 2)).drop("follower_cnt")
famous_perc_df.show()

#---------------------------- Pandas ----------------------------------------------------------
import pandas as pd
famous_df_columns = ["user_id", "follower_id"]
famous_data = [(1, 2), (1, 3), (2, 4), (5, 1), (5, 3), 
(11, 7), (12, 8), (13, 5), (13, 10), 
(14, 12), (14, 3), (15, 14), (15, 13)]

famous_df = pd.DataFrame(data = famous_data, columns = famous_df_columns)
df1 = famous_df[['user_id']].rename(columns = {'user_id': 'user'})
df2 = famous_df[['follower_id']].rename(columns = {'follower_id': 'user'})
users_df = pd.concat([df1, df2])[['user']].drop_duplicates()
total_user_count = users_df.shape[0]
follower_cnt_df = famous_df.groupby('user_id')['user_id'].count().reset_index(name = 'follower_cnt')
follower_cnt_df['famous_perc'] = ((follower_cnt_df.follower_cnt * 100)/total_user_count).round(2)
famous_perc_df = follower_cnt_df.drop(columns = ['follower_cnt'])
famous_perc_df


+-------+-----------+
|user_id|famous_perc|
+-------+-----------+
|      1|      15.38|
|      2|       7.69|
|      5|      15.38|
|     11|       7.69|
|     12|       7.69|
|     13|      15.38|
|     14|      15.38|
|     15|      15.38|
+-------+-----------+



Unnamed: 0,user_id,famous_perc
0,1,15.38
1,2,7.69
2,5,15.38
3,11,7.69
4,12,7.69
5,13,15.38
6,14,15.38
7,15,15.38



***2. Given a table 'sf_transactions' of purchases by date, calculate the month-over-month percentage change in revenue. The output should include the year-month date (YYYY-MM) and percentage change, rounded to the 2nd decimal point, and sorted from the beginning of the year to the end of the year. The percentage change column will be populated from the 2nd month forward and calculated as ((this month’s revenue — last month’s revenue) / last month’s revenue)100***


In [0]:

#------PySpark------------------------------------------------------------------------------------------
sf_transactions_columns = ['id', 'created_at', 'value', 'purchased_id']
sf_transactions_data = [(1, '2019-01-01 00:00:00',  172692, 43), (2,'2019-01-05 00:00:00',  177194, 36),(3, '2019-01-09 00:00:00',  109513, 30),(4, '2019-01-13 00:00:00',  164911, 30),(5, '2019-01-17 00:00:00',  198872, 39), (6, '2019-01-21 00:00:00',  184853, 31),(7, '2019-01-25 00:00:00',  186817, 26), (8, '2019-01-29 00:00:00',  137784, 22),(9, '2019-02-02 00:00:00',  140032, 25), (10, '2019-02-06 00:00:00', 116948, 43), (11, '2019-02-10 00:00:00', 162515, 25), (12, '2019-02-14 00:00:00', 114256, 12), (13, '2019-02-18 00:00:00', 197465, 48), (14, '2019-02-22 00:00:00', 120741, 20), (15, '2019-02-26 00:00:00', 100074, 49), (16, '2019-03-02 00:00:00', 157548, 19), (17, '2019-03-06 00:00:00', 105506, 16), (18, '2019-03-10 00:00:00', 189351, 46), (19, '2019-03-14 00:00:00', 191231, 29), (20, '2019-03-18 00:00:00', 120575, 44), (21, '2019-03-22 00:00:00', 151688, 47), (22, '2019-03-26 00:00:00', 102327, 18), (23, '2019-03-30 00:00:00', 156147, 25)]

from pyspark.sql.functions import cast, date_format
sf_transactions_df = spark.createDataFrame(data=sf_transactions_data, schema=sf_transactions_columns)
sf_transactions_df = sf_transactions_df.withColumn("created_at", sf_transactions_df.created_at.cast("timestamp")).withColumn('year_month', date_format('created_at', 'yyyy-MM'))

In [0]:
from pyspark.sql.functions import sum, lead, lag, round
from pyspark.sql.window import Window
monthly_transations_df = sf_transactions_df.groupBy('year_month').agg(sum('value').alias('total_value'))
windowSpec = Window.orderBy('year_month')
monthly_transations_df = monthly_transations_df.withColumn('previous_value', lag('total_value', 1).over(windowSpec))
perc_change_df = monthly_transations_df.withColumn('perc_change', round((((col('total_value')-col('previous_value'))*100)/col('previous_value')),2))
perc_change_df.show()

+----------+-----------+--------------+-----------+
|year_month|total_value|previous_value|perc_change|
+----------+-----------+--------------+-----------+
|   2019-01|    1332636|          null|       null|
|   2019-02|     952031|       1332636|     -28.56|
|   2019-03|    1174373|        952031|      23.35|
+----------+-----------+--------------+-----------+



In [0]:
#------Pandas------------------------------------------------------------------------------------------
import pandas as pd

sf_transactions_columns = ['id', 'created_at', 'value', 'purchased_id']
sf_transactions_data = [(1, '2019-01-01 00:00:00',  172692, 43), (2,'2019-01-05 00:00:00',  177194, 36),(3, '2019-01-09 00:00:00',  109513, 30),(4, '2019-01-13 00:00:00',  164911, 30),(5, '2019-01-17 00:00:00',  198872, 39), (6, '2019-01-21 00:00:00',  184853, 31),(7, '2019-01-25 00:00:00',  186817, 26), (8, '2019-01-29 00:00:00',  137784, 22),(9, '2019-02-02 00:00:00',  140032, 25), (10, '2019-02-06 00:00:00', 116948, 43), (11, '2019-02-10 00:00:00', 162515, 25), (12, '2019-02-14 00:00:00', 114256, 12), (13, '2019-02-18 00:00:00', 197465, 48), (14, '2019-02-22 00:00:00', 120741, 20), (15, '2019-02-26 00:00:00', 100074, 49), (16, '2019-03-02 00:00:00', 157548, 19), (17, '2019-03-06 00:00:00', 105506, 16), (18, '2019-03-10 00:00:00', 189351, 46), (19, '2019-03-14 00:00:00', 191231, 29), (20, '2019-03-18 00:00:00', 120575, 44), (21, '2019-03-22 00:00:00', 151688, 47), (22, '2019-03-26 00:00:00', 102327, 18), (23, '2019-03-30 00:00:00', 156147, 25)]

pd_sf_transactions_df = pd.DataFrame(data = sf_transactions_data, columns = sf_transactions_columns)
pd_sf_transactions_df['created_at'] = pd.to_datetime(pd_sf_transactions_df.created_at)
pd_sf_transactions_df['year_month'] = pd_sf_transactions_df['created_at'].dt.to_period('M')
pd_month_transactions_df = pd_sf_transactions_df.groupby('year_month')['value'].sum().reset_index()
pd_month_transactions_df['previous_value'] = pd_month_transactions_df['value'].shift(1)
pd_month_transactions_df['mom_perc_change'] = round((((pd_month_transactions_df.value - pd_month_transactions_df.previous_value)*100)/pd_month_transactions_df.previous_value),2)
pd_month_transactions_df.head()


Unnamed: 0,year_month,value,previous_value,mom_perc_change
0,2019-01,1332636,,
1,2019-02,952031,1332636.0,-28.56
2,2019-03,1174373,952031.0,23.35


In [0]:
dbutils.fs.rm('dbfs:/user/hive/warehouse/sf_transactions', True)

Out[17]: True

In [0]:
%sql
DROP TABLE IF EXISTS sf_transactions;
CREATE TABLE sf_transactions(id INT, created_at TIMESTAMP, value INT, purchase_id INT);

INSERT INTO sf_transactions VALUES
(1, '2019-01-01 00:00:00',  172692, 43), (2,'2019-01-05 00:00:00',  177194, 36),(3, '2019-01-09 00:00:00',  109513, 30),(4, '2019-01-13 00:00:00',  164911, 30),(5, '2019-01-17 00:00:00',  198872, 39), (6, '2019-01-21 00:00:00',  184853, 31),(7, '2019-01-25 00:00:00',  186817, 26), (8, '2019-01-29 00:00:00',  137784, 22),(9, '2019-02-02 00:00:00',  140032, 25), (10, '2019-02-06 00:00:00', 116948, 43), (11, '2019-02-10 00:00:00', 162515, 25), (12, '2019-02-14 00:00:00', 114256, 12), (13, '2019-02-18 00:00:00', 197465, 48), (14, '2019-02-22 00:00:00', 120741, 20), (15, '2019-02-26 00:00:00', 100074, 49), (16, '2019-03-02 00:00:00', 157548, 19), (17, '2019-03-06 00:00:00', 105506, 16), (18, '2019-03-10 00:00:00', 189351, 46), (19, '2019-03-14 00:00:00', 191231, 29), (20, '2019-03-18 00:00:00', 120575, 44), (21, '2019-03-22 00:00:00', 151688, 47), (22, '2019-03-26 00:00:00', 102327, 18), (23, '2019-03-30 00:00:00', 156147, 25);

num_affected_rows,num_inserted_rows
23,23


In [0]:
%sql
WITH MonthlyRevenue AS (
  SELECT date_format(created_at, 'yyyy-MM') as year_month,
  sum(value) as total_revenue
  FROM sf_transactions
  GROUP BY date_format(created_at, 'yyyy-MM')
),
RevenueChange AS (
  SELECT year_month,
  total_revenue,
  LAG(total_revenue) OVER(ORDER BY year_month) as previous_revenue
  FROM MonthlyRevenue
)
SELECT 
    year_month,
    total_revenue,
    ROUND(
      CASE When previous_revenue IS NULL then NULL
      ELSE ((total_revenue-previous_revenue)/CAST(previous_revenue AS FLOAT))*100 END, 2
    ) as perc_change
    FROM RevenueChange
    ORDER BY year_month

year_month,total_revenue,perc_change
2019-01,1332636,
2019-02,952031,-28.56
2019-03,1174373,23.35


In [0]:
%sql
--- With out CTE
SELECT year_month, total_revenue,  round((total_revenue-previous_revenue)*100/previous_revenue, 2) as perc_change FROM (
SELECT year_month, total_revenue, lag(total_revenue) over(order by year_month) as previous_revenue FROM (
SELECT date_format(created_at, 'yyyy-MM') as year_month, sum(value) as total_revenue FROM sf_transactions group by date_format(created_at, 'yyyy-MM')  order by date_format(created_at, 'yyyy-MM')
)
)


year_month,total_revenue,perc_change
2019-01,1332636,
2019-02,952031,-28.56
2019-03,1174373,23.35


**3. You are analyzing a social network dataset at Google. Your task is to find mutual friends between two users, Karl and Hans. There is only one user named Karl and one named Hans in the dataset.The output should contain 'user_id' and 'user_name' columns.**

Google


In [0]:
dbutils.fs.rm('dbfs:/user/hive/warehouse/gusers', True)
dbutils.fs.rm('dbfs:/user/hive/warehouse/friends', True)

Out[16]: False

In [0]:
%sql
CREATE TABLE IF NOT EXISTS gusers (user_id INT, user_name varchar(30)) ;
INSERT INTO gusers VALUES (1, 'Karl'), (2, 'Hans'), (3, 'Emma'), (4, 'Emma'), (5, 'Mike'), (6, 'Lucas'), (7, 'Sarah'), (8, 'Lucas'), (9, 'Anna'), (10, 'John');

CREATE TABLE IF NOT EXISTS friends(user_id INT, friend_id INT);
INSERT INTO friends VALUES (1,3),(1,5),(2,3),(2,4),(3,1),(3,2),(3,6),(4,7),(5,8),(6,9),(7,10),(8,6),(9,10),(10,7),(10,9);


num_affected_rows,num_inserted_rows
15,15


In [0]:
%sql
WITH hans_friends AS (
  SELECT f.friend_id as friend FROM gusers u JOIN friends f ON u.user_id == f.user_id AND u.user_name = 'Hans'
),
karl_friends AS (
  SELECT f.friend_id as friend FROM gusers u JOIN friends f ON u.user_id == f.user_id AND u.user_name = 'Karl'
)
SELECT u.user_id, u.user_name FROM gusers u JOIN hans_friends hf ON u.user_id = hf.friend 
JOIN karl_friends kf ON u.user_id = kf.friend

user_id,user_name
3,Emma


In [0]:
#------PySpark------------------------------------------------------------------------------------------
gusers_data = [(1, 'Karl'), (2, 'Hans'), (3, 'Emma'), (4, 'Emma'), (5, 'Mike'), (6, 'Lucas'), (7, 'Sarah'), (8, 'Lucas'), (9, 'Anna'), (10, 'John')]
friends_data = [(1,3),(1,5),(2,3),(2,4),(3,1),(3,2),(3,6),(4,7),(5,8),(6,9),(7,10),(8,6),(9,10),(10,7),(10,9)]
gusers_columns = ["user_id", "user_name"]
friends_columns = ["user_id", "friend_id"]

gusers_df = spark.createDataFrame(data=gusers_data, schema=gusers_columns)
friends_df = spark.createDataFrame(data=friends_data, schema=friends_columns)



In [0]:
hans_friends_df = gusers_df.join(friends_df, on="user_id", how="inner").filter(gusers_df.user_name == 'Hans')
Karl_friends_df = gusers_df.join(friends_df, on="user_id", how="inner").filter(gusers_df.user_name == 'Karl')

hans_friends_df.show()
Karl_friends_df.show()

+-------+---------+---------+
|user_id|user_name|friend_id|
+-------+---------+---------+
|      2|     Hans|        3|
|      2|     Hans|        4|
+-------+---------+---------+

+-------+---------+---------+
|user_id|user_name|friend_id|
+-------+---------+---------+
|      1|     Karl|        3|
|      1|     Karl|        5|
+-------+---------+---------+



In [0]:
from pyspark.sql.functions import col
g =  gusers_df.select(col('user_name').alias('user'), col("user_id").alias('u_id'))
common_friends_df = g.join(hans_friends_df, hans_friends_df.friend_id == g.u_id, "inner")\
    .join(Karl_friends_df, Karl_friends_df.friend_id == g.u_id, "inner")
common_friends_df.select('user', 'u_id').show()

+----+----+
|user|u_id|
+----+----+
|Emma|   3|
+----+----+



In [0]:
#------Pandas------------------------------------------------------------------------------------------
import pandas as pd
gusers_data = [(1, 'Karl'), (2, 'Hans'), (3, 'Emma'), (4, 'Emma'), (5, 'Mike'), (6, 'Lucas'), (7, 'Sarah'), (8, 'Lucas'), (9, 'Anna'), (10, 'John')]
friends_data = [(1,3),(1,5),(2,3),(2,4),(3,1),(3,2),(3,6),(4,7),(5,8),(6,9),(7,10),(8,6),(9,10),(10,7),(10,9)]
gusers_columns = ["user_id", "user_name"]
friends_columns = ["user_id", "friend_id"]

pd_gusers_df = pd.DataFrame(
                    data=gusers_data,
                    columns=gusers_columns
                    )

pd_friends_df = pd.DataFrame(
                    data=friends_data,
                    columns=friends_columns
)
karl_id = pd_gusers_df[pd_gusers_df.user_name == 'Karl']['user_id'].iloc[0]
hans_id = pd_gusers_df[pd_gusers_df.user_name == 'Hans']['user_id'].iloc[0]
karl_friends = set(pd_friends_df[pd_friends_df.user_id == karl_id]['friend_id'])
hans_friends = set(pd_friends_df[pd_friends_df.user_id == hans_id]['friend_id'])
common_friends = karl_friends & hans_friends
common_friends_df = pd_gusers_df[pd_gusers_df['user_id'].isin(common_friends)]
common_friends_df.head()

Unnamed: 0,user_id,user_name
2,3,Emma


**4. Some forecasting methods are extremely simple and surprisingly effective. Naïve forecast is one of them. To create a naïve forecast for "distance per dollar" (defined as distance_to_travel/monetary_cost), first sum the "distance to travel" and "monetary cost" values monthly. This gives the actual value for the current month. For the forecasted value, use the previous month's value. After obtaining both actual and forecasted values, calculate the root mean squared error (RMSE) using the formula RMSE = sqrt(mean(square(actual - forecast))). Report the RMSE rounded to two decimal places.**


In [0]:
dbutils.fs.rm('dbfs:/user/hive/warehouse/uber_request_logs', True)

Out[32]: True

In [0]:
%sql
CREATE TABLE uber_request_logs(request_id int, request_date Timestamp, request_status varchar(10), distance_to_travel float, monetary_cost float, driver_to_client_distance float);

INSERT INTO uber_request_logs VALUES (1,'2020-01-09','success', 70.59, 6.56,14.36), (2,'2020-01-24','success', 93.36, 22.68,19.9), (3,'2020-02-08','fail', 51.24, 11.39,21.32), (4,'2020-02-23','success', 61.58,8.04,44.26), (5,'2020-03-09','success', 25.04,7.19,1.74), (6,'2020-03-24','fail', 45.57, 4.68,24.19), (7,'2020-04-08','success', 24.45,12.69,15.91), (8,'2020-04-23','success', 48.22,11.2,48.82), (9,'2020-05-08','success', 56.63,4.04,16.08), (10,'2020-05-23','fail', 19.03,16.65,11.22), (11,'2020-06-07','fail', 81,6.56,26.6), (12,'2020-06-22','fail', 21.32,8.86,28.57), (13,'2020-07-07','fail', 14.74,17.76,19.33), (14,'2020-07-22','success',66.73,13.68,14.07), (15,'2020-08-06','success',32.98,16.17,25.34), (16,'2020-08-21','success',46.49,1.84,41.9), (17,'2020-09-05','fail', 45.98,12.2,2.46), (18,'2020-09-20','success',3.14,24.8,36.6), (19,'2020-10-05','success',75.33,23.04,29.99), (20,'2020-10-20','success', 53.76,22.94,18.74);

num_affected_rows,num_inserted_rows
20,20


In [0]:
%sql
SELECT * FROM uber_request_logs

request_id,request_date,request_status,distance_to_travel,monetary_cost,driver_to_client_distance
1,2020-01-09T00:00:00.000+0000,success,70.59,6.56,14.36
2,2020-01-24T00:00:00.000+0000,success,93.36,22.68,19.9
3,2020-02-08T00:00:00.000+0000,fail,51.24,11.39,21.32
4,2020-02-23T00:00:00.000+0000,success,61.58,8.04,44.26
5,2020-03-09T00:00:00.000+0000,success,25.04,7.19,1.74
6,2020-03-24T00:00:00.000+0000,fail,45.57,4.68,24.19
7,2020-04-08T00:00:00.000+0000,success,24.45,12.69,15.91
8,2020-04-23T00:00:00.000+0000,success,48.22,11.2,48.82
9,2020-05-08T00:00:00.000+0000,success,56.63,4.04,16.08
10,2020-05-23T00:00:00.000+0000,fail,19.03,16.65,11.22


In [0]:
%sql
SELECT 
    round(sqrt(avg(power((actual - forecast), 2))), 2) AS rmse 
    FROM (
        SELECT 
            year_month, 
            distance_per_doller as actual, 
            lag(distance_per_doller, 1) over (order by year_month) as forecast 
            FROM (
                SELECT year_month, 
                    total_distance/total_cost as distance_per_doller 
                    FROM (
                        SELECT date_format(request_date, 'yyyy-M') as year_month, 
                            sum(distance_to_travel) as total_distance, 
                            sum(monetary_cost) as total_cost 
                            FROM uber_request_logs 
                                GROUP BY date_format(request_date, 'yyyy-M')))
            )
        

rmse
2.66


In [0]:
%sql
WITH monthly_uber_logs AS (
  SELECT
    date_format(request_date, 'yyyy-MM') as year_month,
    sum(distance_to_travel) as total_distance,
    sum(monetary_cost) as total_cost
    FROM uber_request_logs
    GROUP BY date_format(request_date, 'yyyy-MM')
),
distance_per_doller AS (
  SELECT 
    year_month,
    total_distance/total_cost as distance_per_doller
    FROM monthly_uber_logs
),
navie_foracast AS (
  SELECT 
    year_month,
    distance_per_doller as actual,
    lag(distance_per_doller, 1) over(order by year_month) as forecast
    FROM distance_per_doller
)
SELECT ROUND(sqrt(avg(power((actual-forecast),2))), 2) as rmse FROM navie_foracast

rmse
2.34


In [0]:
data_columns = ['request_id', 'request_date', 'request_status', 'distance_to_travel', 'monetary_cost', 'driver_to_client_distance']
data = [(1,'2020-01-09','success', 70.59, 6.56,14.36), (2,'2020-01-24','success', 93.36, 22.68,19.9), (3,'2020-02-08','fail', 51.24, 11.39,21.32), (4,'2020-02-23','success', 61.58,8.04,44.26), (5,'2020-03-09','success', 25.04,7.19,1.74), (6,'2020-03-24','fail', 45.57, 4.68,24.19), (7,'2020-04-08','success', 24.45,12.69,15.91), (8,'2020-04-23','success', 48.22,11.2,48.82), (9,'2020-05-08','success', 56.63,4.04,16.08), (10,'2020-05-23','fail', 19.03,16.65,11.22), (11,'2020-06-07','fail', 81.0,6.56,26.6), (12,'2020-06-22','fail', 21.32,8.86,28.57), (13,'2020-07-07','fail', 14.74,17.76,19.33), (14,'2020-07-22','success',66.73,13.68,14.07), (15,'2020-08-06','success',32.98,16.17,25.34), (16,'2020-08-21','success',46.49,1.84,41.9), (17,'2020-09-05','fail', 45.98,12.2,2.46), (18,'2020-09-20','success',3.14,24.8,36.6), (19,'2020-10-05','success',75.33,23.04,29.99), (20,'2020-10-20','success', 53.76,22.94,18.74)]

In [0]:
uber_request_logs_df = spark.createDataFrame(data=data, schema=data_columns)
uber_request_logs_df.show()

+----------+------------+--------------+------------------+-------------+-------------------------+
|request_id|request_date|request_status|distance_to_travel|monetary_cost|driver_to_client_distance|
+----------+------------+--------------+------------------+-------------+-------------------------+
|         1|  2020-01-09|       success|             70.59|         6.56|                    14.36|
|         2|  2020-01-24|       success|             93.36|        22.68|                     19.9|
|         3|  2020-02-08|          fail|             51.24|        11.39|                    21.32|
|         4|  2020-02-23|       success|             61.58|         8.04|                    44.26|
|         5|  2020-03-09|       success|             25.04|         7.19|                     1.74|
|         6|  2020-03-24|          fail|             45.57|         4.68|                    24.19|
|         7|  2020-04-08|       success|             24.45|        12.69|                    15.91|


In [0]:
uber_request_logs_df.printSchema()

root
 |-- request_id: long (nullable = true)
 |-- request_date: string (nullable = true)
 |-- request_status: string (nullable = true)
 |-- distance_to_travel: double (nullable = true)
 |-- monetary_cost: double (nullable = true)
 |-- driver_to_client_distance: double (nullable = true)



In [0]:
#------PySpark------------------------------------------------------------------------------------------
from pyspark.sql.functions import to_date, col, cast, date_format
from pyspark.sql import functions as F
monthly_uber_logs_df = uber_request_logs_df.withColumn('year_month', date_format(to_date('request_date', 'yyyy-MM-dd'), 'yyyy-MM'))
monthly_uber_logs_df = monthly_uber_logs_df.groupBy('year_month').agg(
                            F.sum('distance_to_travel').alias('total_distance'),
                            F.sum('monetary_cost').alias('total_cost')
)
monthly_uber_logs_df.show()

+----------+--------------+------------------+
|year_month|total_distance|        total_cost|
+----------+--------------+------------------+
|   2020-01|        163.95|             29.24|
|   2020-02|        112.82|             19.43|
|   2020-03|         70.61|11.870000000000001|
|   2020-05|         75.66|20.689999999999998|
|   2020-04|         72.67|             23.89|
|   2020-06|        102.32|15.419999999999998|
|   2020-07|         81.47|             31.44|
|   2020-08|         79.47|             18.01|
|   2020-09|         49.12|              37.0|
|   2020-10|        129.09|45.980000000000004|
+----------+--------------+------------------+



In [0]:
distance_per_doller_df = monthly_uber_logs_df.withColumn('distance_per_doller', col('total_distance')/col('total_cost'))
distance_per_doller_df.show()

+----------+--------------+------------------+-------------------+
|year_month|total_distance|        total_cost|distance_per_doller|
+----------+--------------+------------------+-------------------+
|   2020-01|        163.95|             29.24|   5.60704514363885|
|   2020-02|        112.82|             19.43|  5.806484817292846|
|   2020-03|         70.61|11.870000000000001|  5.948609941027801|
|   2020-05|         75.66|20.689999999999998| 3.6568390526824555|
|   2020-04|         72.67|             23.89|  3.041858518208455|
|   2020-06|        102.32|15.419999999999998|  6.635538261997406|
|   2020-07|         81.47|             31.44| 2.5912849872773536|
|   2020-08|         79.47|             18.01|  4.412548584119933|
|   2020-09|         49.12|              37.0| 1.3275675675675676|
|   2020-10|        129.09|45.980000000000004|  2.807525010874293|
+----------+--------------+------------------+-------------------+



In [0]:
from pyspark.sql.functions import lag
from pyspark.sql.window import Window
window_spec = Window.orderBy('year_month')
navie_forecast_df = distance_per_doller_df.select(col('year_month'),col('distance_per_doller').alias('actual'), lag(col('distance_per_doller'), 1).over(window_spec).alias('forecast'))
navie_forecast_df.show()

+----------+------------------+------------------+
|year_month|            actual|          forecast|
+----------+------------------+------------------+
|   2020-01|  5.60704514363885|              null|
|   2020-02| 5.806484817292846|  5.60704514363885|
|   2020-03| 5.948609941027801| 5.806484817292846|
|   2020-04| 3.041858518208455| 5.948609941027801|
|   2020-05|3.6568390526824555| 3.041858518208455|
|   2020-06| 6.635538261997406|3.6568390526824555|
|   2020-07|2.5912849872773536| 6.635538261997406|
|   2020-08| 4.412548584119933|2.5912849872773536|
|   2020-09|1.3275675675675676| 4.412548584119933|
|   2020-10| 2.807525010874293|1.3275675675675676|
+----------+------------------+------------------+



In [0]:
from pyspark.sql.functions import sqrt, avg, pow, round
rmse = navie_forecast_df.select(round(sqrt(avg(pow((col('actual')-col('forecast')), 2))), 2).alias('rmse'))
rmse.show()

+----+
|rmse|
+----+
|2.34|
+----+



In [0]:
#------Pandas------------------------------------------------------------------------------------------
import pandas as pd
pd_uber_request_logs_df = pd.DataFrame(data=data, columns=data_columns)
pd_uber_request_logs_df['request_date'] = pd.to_datetime(pd_uber_request_logs_df.request_date)
pd_uber_request_logs_df['year_month'] = pd_uber_request_logs_df['request_date'].dt.to_period('M')
pd_monhtly_user_logs_df = pd_uber_request_logs_df.groupby('year_month').agg(total_distance = ('distance_to_travel', 'sum'), total_cost = ('monetary_cost', 'sum')).reset_index()
pd_monhtly_user_logs_df['distance_per_doller'] = pd_monhtly_user_logs_df.total_distance/pd_monhtly_user_logs_df.total_cost
pd_navie_forecast_df = pd_monhtly_user_logs_df[['year_month', 'distance_per_doller']].rename(columns = {'distance_per_doller' : 'actual'})
pd_navie_forecast_df['forecast'] = pd_navie_forecast_df['actual'].shift(1)
pd_navie_forecast_df.head()


Unnamed: 0,year_month,actual,forecast
0,2020-01,5.607045,
1,2020-02,5.806485,5.607045
2,2020-03,5.94861,5.806485
3,2020-04,3.041859,5.94861
4,2020-05,3.656839,3.041859


In [0]:
import numpy as np
rmse = np.round(np.sqrt(np.mean(np.power((pd_navie_forecast_df.actual - pd_navie_forecast_df.forecast), 2))), 2)
print(rmse)


2.34
